优化日志,而且台账修改时无论什么状态都先关闭连接
This commit is contained in:
@@ -3,6 +3,7 @@
|
||||
#include <unistd.h>
|
||||
#include <stdlib.h>
|
||||
#include <iostream>
|
||||
#include <atomic>
|
||||
#include <string>
|
||||
#include "../mms/db_interface.h"
|
||||
|
||||
@@ -371,7 +372,7 @@ void rocketmq_consumer_receive(
|
||||
static int currentQueueId = 0;
|
||||
|
||||
// 队列选择器回调函数:轮询选择队列 ID
|
||||
int RoundRobinSelector(int queueNum, CMessage* msg, void* arg) {
|
||||
/*int RoundRobinSelector(int queueNum, CMessage* msg, void* arg) {
|
||||
if (queueNum == 0) {
|
||||
throw std::runtime_error("No available queues");
|
||||
}
|
||||
@@ -381,6 +382,24 @@ int RoundRobinSelector(int queueNum, CMessage* msg, void* arg) {
|
||||
currentQueueId = 0;
|
||||
}
|
||||
return queueId;
|
||||
}*/
|
||||
int RoundRobinSelector(int queueNum, CMessage* msg, void* arg) {
|
||||
static std::atomic<int> currentQueueId(0);
|
||||
|
||||
if (queueNum <= 0) {
|
||||
std::cout << "[MQ][SELECTOR_FAIL] queueNum=" << queueNum << std::endl;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int id = currentQueueId.fetch_add(1, std::memory_order_relaxed);
|
||||
int queueId = id % queueNum;
|
||||
|
||||
std::cout << "[MQ][SELECTOR] queueNum=" << queueNum
|
||||
<< ", current=" << id
|
||||
<< ", selected=" << queueId
|
||||
<< std::endl;
|
||||
|
||||
return queueId;
|
||||
}
|
||||
|
||||
// 封装生产者的类
|
||||
@@ -464,9 +483,20 @@ public:
|
||||
);
|
||||
|
||||
if (sendResult == 0) { // 假设返回 0 表示成功
|
||||
std::cout << "Message sent successfully.topic:" << topic <<std::endl;
|
||||
std::cout << "[MQ][SEND_OK]"
|
||||
<< " topic=" << (topic ? topic : "")
|
||||
<< ", tags=" << tags
|
||||
<< ", keys=" << keys
|
||||
<< ", body_len=" << (strbody ? strlen(strbody) : 0)
|
||||
<< std::endl;
|
||||
} else {
|
||||
std::cout << "Failed to send message." << std::endl;
|
||||
std::cout << "[MQ][SEND_FAIL]"
|
||||
<< " ret=" << sendResult
|
||||
<< ", topic=" << (topic ? topic : "")
|
||||
<< ", tags=" << tags
|
||||
<< ", keys=" << keys
|
||||
<< std::endl;
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,"【ERROR】前置的%s%d号进程 mq发送失败,请检查mq配置", get_front_msg_from_subdir(), g_front_seg_index);
|
||||
}
|
||||
|
||||
// 销毁消息
|
||||
@@ -559,7 +589,7 @@ void rocketmq_producer_send(const char* strbody, const char* topic)
|
||||
catch (const std::exception& e) {
|
||||
std::cerr << "Failed to send message: " << e.what() << std::endl;
|
||||
// 处理发送失败的情况,例如记录日志或重试
|
||||
DIY_ERRORLOG_CODE("process",LOG_CODE_MQ,"【ERROR】前置的%s%d号进程 mq发送失败,请检查mq配置", get_front_msg_from_subdir(), g_front_seg_index);
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,"【ERROR】前置的%s%d号进程 mq发送失败,请检查mq配置", get_front_msg_from_subdir(), g_front_seg_index);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
Reference in New Issue
Block a user