修改mq为C++库并固定生产者和消费者

This commit is contained in:
lnk
2026-04-28 11:44:40 +08:00
parent b87da0f454
commit b1d8440e6a
9 changed files with 636 additions and 225 deletions

View File

@@ -29,10 +29,14 @@
#include "../rocketmq/DefaultMQPushConsumer.h"
#include "../rocketmq/ConsumeType.h"
#include "../rocketmq/MQMessageListener.h"
#include "../rocketmq/MQMessageExt.h"
#include "../rocketmq/SessionCredentials.h"
// 引入提供的消费者接口头文件
#include "../rocketmq/CPushConsumer.h"
#include "../rocketmq/CCommon.h"
#include "../rocketmq/CMessageExt.h"
//#include "../rocketmq/CPushConsumer.h"
//#include "../rocketmq/CCommon.h"
//#include "../rocketmq/CMessageExt.h"
#include <map>
#include <pthread.h> // 用于互斥锁(在 C++98 中没有 std::mutex
#include <utility> // for std::pair
@@ -79,20 +83,40 @@ extern std::string G_MQCONSUMER_CHANNEL;
class RocketMQConsumer;
// 全局映射CPushConsumer* -> RocketMQConsumer*
std::map<CPushConsumer*, RocketMQConsumer*> g_consumerMap;//
pthread_mutex_t g_consumerMapMutex = PTHREAD_MUTEX_INITIALIZER;
//std::map<CPushConsumer*, RocketMQConsumer*> g_consumerMap;//
//pthread_mutex_t g_consumerMapMutex = PTHREAD_MUTEX_INITIALIZER;
////////////////////////////////
int64_t G_APP_START_MS = []() -> int64_t {
using namespace std::chrono;
return duration_cast<milliseconds>(
system_clock::now().time_since_epoch()
).count();
}();
int64_t G_START_SKEW_MS = 1000;
bool should_process_after_start(const rocketmq::MQMessageExt& msg)
{
const int64_t born_ts = static_cast<int64_t>(msg.getBornTimestamp());
return born_ts >= (G_APP_START_MS - G_START_SKEW_MS);
}
///////////////////////////////
class InternalListener;
class RocketMQConsumer {
public:
// 构造函数:初始化消费者并启动
RocketMQConsumer(const std::string& consumerName, const std::string& nameServer, const std::string& groupId);
RocketMQConsumer(const std::string& consumerName, const std::string& nameServer);
// 禁用拷贝和赋值
RocketMQConsumer(const RocketMQConsumer&) {}
//RocketMQConsumer(const RocketMQConsumer&) {}
RocketMQConsumer& operator=(const RocketMQConsumer&) { return *this; }
// 订阅主题和标签,并注册回调
void subscribe(const std::string& topic, const std::string& tag, MessageCallBack callback);
void subscribe(const std::string& topic,
const std::string& tag,
MessageCallBack callback);
// 启动消费者
void start();
@@ -100,23 +124,47 @@ public:
//修改消费模式
void setConsumerMessageModel(const std::string& topic);
rocketmq::ConsumeStatus handleMessage(const rocketmq::MQMessageExt& msg);
// 析构函数:关闭并销毁消费者
~RocketMQConsumer();
private:
CPushConsumer* consumer_; // C 接口消费者指针
//CPushConsumer* consumer_; // C 接口消费者指针
//MessageCallBack messageCallback_; // 函数指针用于回调
rocketmq::DefaultMQPushConsumer consumer_;
InternalListener* listener_;
std::map<std::pair<std::string, std::string>, MessageCallBack> callbacks_; // 订阅到回调的映射
// 静态消息处理回调
/*// 静态消息处理回调
static int messageHandler(CPushConsumer* consumer, CMessageExt* msg);
// 实例消息处理函数
int handleMessage(CMessageExt* msg);
};
int handleMessage(CMessageExt* msg);*/
// 构造函数实现
RocketMQConsumer::RocketMQConsumer(const std::string& consumerName, const std::string& nameServer, const std::string& groupId)
};
class InternalListener : public rocketmq::MessageListenerConcurrently {
public:
explicit InternalListener(RocketMQConsumer* owner)
: owner_(owner) {}
rocketmq::ConsumeStatus consumeMessage(
const std::vector<rocketmq::MQMessageExt>& msgs) override
{
for (size_t i = 0; i < msgs.size(); ++i) {
rocketmq::ConsumeStatus ret = owner_->handleMessage(msgs[i]);
if (ret != rocketmq::CONSUME_SUCCESS) {
return ret;
}
}
return rocketmq::CONSUME_SUCCESS;
}
private:
RocketMQConsumer* owner_;
};
// 构造函数实现C
/*RocketMQConsumer::RocketMQConsumer(const std::string& consumerName, const std::string& nameServer, const std::string& groupId)
: consumer_(NULL)//, messageCallback_(NULL)
{
// 创建消费者
@@ -163,12 +211,29 @@ RocketMQConsumer::RocketMQConsumer(const std::string& consumerName, const std::s
pthread_mutex_unlock(&g_consumerMapMutex);
std::cout << "RocketMQ Consumer initialized and started." << std::endl;
}*/
//构造函数实现C++
RocketMQConsumer::RocketMQConsumer(const std::string& consumerGroup,
const std::string& nameServer)
: consumer_(consumerGroup),
listener_(NULL)
{
consumer_.setNamesrvAddr(nameServer);
consumer_.setSessionCredentials(
G_MQCONSUMER_ACCESSKEY,
G_MQCONSUMER_SECRETKEY,
G_MQCONSUMER_CHANNEL
);
listener_ = new InternalListener(this);
}
// 启动消费者
void RocketMQConsumer::start()
{
if (StartPushConsumer(consumer_) != 0) {
/*if (StartPushConsumer(consumer_) != 0) {
pthread_mutex_lock(&g_consumerMapMutex);
g_consumerMap.erase(consumer_);
pthread_mutex_unlock(&g_consumerMapMutex);
@@ -177,21 +242,26 @@ void RocketMQConsumer::start()
}
else{
std::cout << "RocketMQ Consumer started." << std::endl;
}
}*/
consumer_.registerMessageListener(listener_);
consumer_.start();
}
void RocketMQConsumer::subscribe(const std::string& topic, const std::string& tag, MessageCallBack callback)
{
if (Subscribe(consumer_, topic.c_str(), tag.c_str()) != 0) {
/*if (Subscribe(consumer_, topic.c_str(), tag.c_str()) != 0) {
throw std::runtime_error("Failed to subscribe to topic/tag.");
}
}*/
consumer_.subscribe(topic, tag);
//调试用
std::cout << "Subscribed to topic: " << topic << ", tag: " << tag << std::endl;
// 使用 std::pair 作为键
std::pair<std::string, std::string> key(topic, tag);
callbacks_[key] = callback;
std::pair<std::string, std::string> mapKey(topic, tag);
callbacks_[mapKey] = callback;
}
/*
// 静态消息处理回调实现
int RocketMQConsumer::messageHandler(CPushConsumer* consumer, CMessageExt* msg)
{
@@ -212,68 +282,66 @@ int RocketMQConsumer::messageHandler(CPushConsumer* consumer, CMessageExt* msg)
return instance->handleMessage(msg);
} else {
std::cerr << "Consumer instance not found for callback." << std::endl;
return E_RECONSUME_LATER; // 默认返回重试状态
//return E_RECONSUME_LATER; // 默认返回重试状态
return rocketmq::RECONSUME_LATER;
}
}
int RocketMQConsumer::handleMessage(CMessageExt* msg)
*/
/*int RocketMQConsumer::handleMessage(CMessageExt* msg)
{
// 检查 msg 和 consumer_ 是否为 NULL
if (!msg || !consumer_) {
std::cerr << "Received null message or consumer." << std::endl;
return E_RECONSUME_LATER;
}
//return E_RECONSUME_LATER;
return rocketmq::RECONSUME_LATER;
}*/
// 获取消息的主题和标签
std::string topic = GetMessageTopic(msg); // 假设存在此函数
std::string tag = GetMessageTags(msg); // 假设存在此函数
//std::string topic = GetMessageTopic(msg); // 假设存在此函数
//std::string tag = GetMessageTags(msg); // 假设存在此函数
// 打印调试信息
std::cout << "Handling message for topic: " << topic << ", tag: " << tag << std::endl;
// 使用 std::pair 作为键
std::pair<std::string, std::string> key(topic, tag);
// 查找对应的回调函数
std::map<std::pair<std::string, std::string>, MessageCallBack>::iterator it = callbacks_.find(key);
if (it != callbacks_.end()) {
// 调用对应的回调函数
//调试
std::cout << "callback Handling message " <<std::endl;
return it->second(consumer_, msg);
} else {
//调试
std::cout << "there is no callback " <<std::endl;
// 如果没有找到对应的回调,执行默认处理
const char* body = GetMessageBody(msg);
const char* msgKey = GetMessageKeys(msg);
if (body) {
std::cout << "Received message body: " << body << std::endl;
} else {
std::cout << "Received message with empty body." << std::endl;
}
if (msgKey) {
std::cout << "Message Key: " << msgKey << std::endl;
} else {
std::cout << "Message Key: N/A" << std::endl;
}
return E_CONSUME_SUCCESS;
}
rocketmq::ConsumeStatus RocketMQConsumer::handleMessage( const rocketmq::MQMessageExt& msg) {
std::string tag = msg.getTags();
std::string topic = msg.getTopic();
// 打印调试信息
std::cout << "Handling message for topic: " << topic << ", tag: " << tag << std::endl;
// 使用 std::pair 作为键
std::pair<std::string, std::string> key(topic, tag);
// 查找对应的回调函数
std::map<std::pair<std::string, std::string>, MessageCallBack>::iterator it = callbacks_.find(key);
if (it != callbacks_.end())
{ // 调用对应的回调函数
//调试
std::cout << "callback Handling message " <<std::endl;
//return it->second(consumer_, msg);
return it->second(msg);
}
else {
//调试
std::cout << "there is no callback " <<std::endl;
// 如果没有找到对应的回调,执行默认处理
//const char* body = GetMessageBody(msg);
//const char* msgKey = GetMessageKeys(msg);
std::string body = msg.getBody();
std::string msgKey = msg.getKeys();
if (!body.empty()) {
std::cout << "Received message body: " << body << std::endl;
} else {
std::cout << "Received message with empty body." << std::endl;
} if (!msgKey.empty()) {
std::cout << "Message Key: " << msgKey << std::endl;
} else {
std::cout << "Message Key: N/A" << std::endl;
} //return E_CONSUME_SUCCESS;
return rocketmq::CONSUME_SUCCESS;
}
}
// 析构函数实现
RocketMQConsumer::~RocketMQConsumer()
{
if (consumer_) {
/*if (consumer_) {
// 关闭消费者
ShutdownPushConsumer(consumer_);
@@ -287,7 +355,16 @@ RocketMQConsumer::~RocketMQConsumer()
consumer_ = NULL;
std::cout << "RocketMQ Consumer shutdown and destroyed." << std::endl;
}*/
try {
consumer_.shutdown();
} catch (...) {
}
delete listener_;
listener_ = NULL;
std::cout << "RocketMQ Consumer shutdown and destroyed." << std::endl;
}
// 在 RocketMQConsumer 类中新增函数用来设置消费模式
@@ -302,11 +379,13 @@ void RocketMQConsumer::setConsumerMessageModel(const std::string& topic)
}
} else*/ {
// 默认设置为广播消费模式
if (SetPushConsumerMessageModel(consumer_, BROADCASTING) != 0) {
/*if (SetPushConsumerMessageModel(consumer_, BROADCASTING) != 0) {
std::cout << "Error setting message model to BROADCASTING for topic: " << topic << std::endl;
} else {
std::cout << "Set consumer to BROADCASTING for topic: " << topic << std::endl;
}
}*/
consumer_.setMessageModel(rocketmq::BROADCASTING);
std::cout << "Set consumer to BROADCASTING for topic: " << topic << std::endl;
}
}
@@ -322,11 +401,12 @@ void InitializeConsumer(
if (g_consumer == NULL) {
std::cout << "create new consumer!" << std::endl;
try {
g_consumer = new RocketMQConsumer(consumerName, nameServer,consumerName);//用消费名作为消费组不同进程不同的消费者同时消费topic的同一条消息
//g_consumer = new RocketMQConsumer(consumerName, nameServer,consumerName);//用消费名作为消费组不同进程不同的消费者同时消费topic的同一条消息
g_consumer = new RocketMQConsumer(consumerName, nameServer);
for (size_t i = 0; i < subscriptions.size(); ++i) {
g_consumer->setConsumerMessageModel(subscriptions[i].topic);//初始化时根据topic设置消费模式
g_consumer->subscribe(subscriptions[i].topic, subscriptions[i].tag, subscriptions[i].callback);
g_consumer->subscribe(subscriptions[i].topic, subscriptions[i].tag,subscriptions[i].callback);
}
g_consumer->start();
@@ -407,24 +487,36 @@ int RoundRobinSelector(int queueNum, CMessage* msg, void* arg) {
class RocketMQProducer {
public:
RocketMQProducer(const std::string& producerName, const std::string& nameServer)
: producer_(NULL)
: producer_(producerName)
{
// 创建生产者
producer_ = CreateProducer(producerName.c_str());
/*producer_ = CreateProducer(producerName.c_str());
if (producer_ == NULL) {
throw std::runtime_error("Failed to create producer.");
}
}*/
// 设置日志
producer_.setLogLevel(rocketmq::eLOG_LEVEL_ERROR);
producer_.setLogFileSizeAndNum(5, 50);
// 设置 nameserver 地址
SetProducerNameServerAddress(producer_, nameServer.c_str());
//SetProducerNameServerAddress(producer_, nameServer.c_str());
producer_.setNamesrvAddr(nameServer);
//lnk20260417设置数据上送消息体最大值默认4M调整为1M避免过大消息导致发送失败
SetProducerMaxMessageSize(producer_, 1024 * 1024); // 1MB
//SetProducerMaxMessageSize(producer_, 1024 * 1024); // 1MB
producer_.setMaxMessageSize(1024 * 1024);
SetProducerSessionCredentials(producer_, G_MQCONSUMER_ACCESSKEY.c_str(),G_MQCONSUMER_SECRETKEY.c_str(), "");
//SetProducerSessionCredentials(producer_, G_MQCONSUMER_ACCESSKEY.c_str(),G_MQCONSUMER_SECRETKEY.c_str(), "");
producer_.setSessionCredentials(
G_MQCONSUMER_ACCESSKEY,
G_MQCONSUMER_SECRETKEY,
""
);
// 启动生产者
StartProducer(producer_);
//StartProducer(producer_);
producer_.start();
std::cout << "rocketmq_Producer initialized and started." << std::endl;
}
@@ -433,7 +525,7 @@ public:
RocketMQProducer(const RocketMQProducer&) = delete;
RocketMQProducer& operator=(const RocketMQProducer&) = delete;
void printSendResult(const CSendResult& result) {
/*void printSendResult(const CSendResult& result) {
std::cout << "SendResult:" << std::endl;
std::cout << " Status: ";
switch (result.sendStatus) {
@@ -457,10 +549,10 @@ public:
std::cout << " MsgID : " << result.msgId << std::endl;
std::cout << " Offset: " << result.offset << std::endl;
}
}*/
// 发送消息
void sendMessage(const char* strbody, const char* topic, const std::string& tags, const std::string& keys) {
/* void sendMessage(const char* strbody, const char* topic, const std::string& tags, const std::string& keys) {
if (DEBUGOPEN) {
std::cout << "sendMessage called with topic: " << (topic ? topic : "NULL")
@@ -585,7 +677,7 @@ public:
<< std::endl;*/
// 发送消息:临时改成同步发送,绕过 orderly / selector便于定位问题
if (sendResult == 0) { // 假设返回 0 表示成功
/*if (sendResult == 0) { // 假设返回 0 表示成功
std::cout << "[MQ][SEND_OK]"
<< " topic=" << (topic ? topic : "")
<< ", tags=" << tags
@@ -630,20 +722,97 @@ public:
DestroyMessage(msg);
}
}
}*/
void sendMessage(const std::string& body,
const std::string& topic,
const std::string& tags,
const std::string& keys)
{
try {
if (DEBUGOPEN) {
std::cout << "sendMessage called with topic: " << topic
<< ", tags: " << tags
<< ", keys: " << keys
<< ", body_len=" << body.size()
<< std::endl;
size_t n = std::min((size_t)200, body.size());
std::cout << "[MQ][BODY_HEAD] "
<< body.substr(0, n)
<< std::endl;
if (body.size() > n) {
std::cout << "[MQ][BODY_TAIL] "
<< body.substr(body.size() - n, n)
<< std::endl;
}
std::cout << "[MQ][HEX_HEAD] ";
for (size_t i = 0; i < std::min((size_t)100, body.size()); ++i) {
printf("%02X ", (unsigned char)body[i]);
}
printf("\n");
}
rocketmq::MQMessage msg(topic, tags, keys, body);
rocketmq::SendResult result = producer_.send(msg);
std::cout << "[MQ][SEND_OK]"
<< " topic=" << topic
<< ", tags=" << tags
<< ", keys=" << keys
<< ", msgId=" << result.getMsgId()
<< ", status=" << result.getSendStatus()
<< ", body_len=" << body.size()
<< std::endl;
}
catch (const rocketmq::MQClientException& e) {
std::cerr << "[MQ][SEND_FAIL] MQClientException: "
<< e.what() << std::endl;
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,
"【ERROR】前置的%s%d号进程 mq发送失败,mq客户端错误,请检查mq配置",
get_front_msg_from_subdir(), g_front_seg_index);
}
catch (const std::exception& e) {
std::cerr << "[MQ][SEND_FAIL] exception: "
<< e.what() << std::endl;
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,
"【ERROR】前置的%s%d号进程 mq发送失败,mq发送错误,发送请检查mq配置",
get_front_msg_from_subdir(), g_front_seg_index);
}
catch (...) {
std::cerr << "[MQ][SEND_FAIL] unknown exception" << std::endl;
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,
"【ERROR】前置的%s%d号进程 mq发送失败,未知错误,请检查mq配置",
get_front_msg_from_subdir(), g_front_seg_index);
}
}
// 析构函数中关闭并销毁生产者
~RocketMQProducer() {
if (producer_) {
/*if (producer_) {
ShutdownProducer(producer_);
DestroyProducer(producer_);
std::cout << "rocketmq_Producer shutdown and destroyed." << std::endl;
}*/
try {
producer_.shutdown();
}
catch (...) {
}
std::cout << "rocketmq_Producer shutdown and destroyed." << std::endl;
}
private:
CProducer* producer_;
//CProducer* producer_;
rocketmq::DefaultMQProducer producer_;
};
// 全局生产者实例
@@ -654,7 +823,7 @@ void InitializeProducer()
{
if (g_producer == NULL) {
try {
g_producer = new RocketMQProducer(G_ROCKETMQ_PRODUCER, G_ROCKETMQ_IPPORT);
g_producer = new RocketMQProducer(G_ROCKETMQ_PRODUCER, G_ROCKETMQ_IPPORT);//生产者名称和NameServer地址
}
catch (const std::exception& e) {
std::cerr << "Failed to initialize producer: " << e.what() << std::endl;
@@ -674,34 +843,33 @@ void ShutdownAndDestroyProducer()
}
// 发送消息的接口函数
void rocketmq_producer_send(const char* strbody, const char* topic)
void rocketmq_producer_send(const std::string& body,
const std::string& topic,
const std::string& tags,
const std::string& keys)
{
if (g_producer == NULL) {
try {
InitializeProducer();
}
catch (...) {
} catch (...) {
std::cerr << "Cannot send message because producer initialization failed." << std::endl;
return;
}
}
// 假设 tags 和 keys 是固定的,可以根据需要修改
std::string tags = G_ROCKETMQ_TAG;
std::string keys = G_ROCKETMQ_KEY;
try {
g_producer->sendMessage(strbody, topic, tags, keys);
}
catch (const std::exception& e) {
g_producer->sendMessage(body, topic, tags, keys);
} catch (const std::exception& e) {
std::cerr << "Failed to send message: " << e.what() << std::endl;
// 处理发送失败的情况,例如记录日志或重试
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,"【ERROR】前置的%s%d号进程 mq发送失败,请检查mq配置", get_front_msg_from_subdir(), g_front_seg_index);
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,
"【ERROR】前置的%s%d号进程 mq发送失败,请检查mq配置",
get_front_msg_from_subdir(), g_front_seg_index);
}
}
#endif
//////////////////////////////////////////////////////////////////////////////////////////////////////////
/*
// producer_send0测试用
void StartSendMessage(CProducer* producer)
{
@@ -786,7 +954,7 @@ void producer_send(const char* strbody)
DestroyProducer(producer);
cout << "Producer Shutdown!" << endl;
}
*/
///////////////////////////////////////////////////////////////////////////////////////////////////////////
extern "C" {