/* * Description: Simple Producer demo */ #include // 用于 std::ifstream #include // 用于 std::stringstream #include #include #include #include #include "../mms/db_interface.h" #include "../include/rocketmq/CProducer.h" #include "../include/rocketmq/CMessage.h" #include "../include/rocketmq/CSendResult.h" #include "../include/rocketmq/SimpleProducer.h" //测试300数据用lnk20241202 #include #include //测试300数据用 //lnk20241209添加队列选择 #include "../include/rocketmq/MQSelector.h" #include "../include/rocketmq/MQMessageQueue.h" //#include #include #include //lnk20241209添加队列选择 #include // 引入提供的消费者接口头文件 #include "../include/rocketmq/CPushConsumer.h" #include "../include/rocketmq/CCommon.h" #include "../include/rocketmq/CMessageExt.h" #include #include // 用于互斥锁(在 C++98 中没有 std::mutex) #include // for std::pair using namespace std; extern std::string G_ROCKETMQ_PRODUCER;//rocketmq producer extern std::string G_ROCKETMQ_IPPORT;//rocketmq ip+port extern std::string G_ROCKETMQ_TOPIC;//topie extern std::string G_ROCKETMQ_TAG;//tag extern std::string G_ROCKETMQ_KEY;//key #ifdef __cplusplus extern "C" { #endif extern std::string G_MQCONSUMER_TOPIC_SET; // C++ 中的全局变量声明 #ifdef __cplusplus } #endif extern int QUEUENUM; extern int g_front_seg_index; extern char subdir[128]; //////////////////////////////////////////////////////////////////////////////////////////////////////////// //消费者连接秘钥 extern std::string G_MQCONSUMER_ACCESSKEY; extern std::string G_MQCONSUMER_SECRETKEY; extern std::string G_MQCONSUMER_CHANNEL; // 前向声明 RocketMQConsumer 类 class RocketMQConsumer; // 全局映射:CPushConsumer* -> RocketMQConsumer* std::map g_consumerMap;// pthread_mutex_t g_consumerMapMutex = PTHREAD_MUTEX_INITIALIZER; class RocketMQConsumer { public: // 构造函数:初始化消费者并启动 RocketMQConsumer(const std::string& consumerName, const std::string& nameServer, const std::string& groupId); // 禁用拷贝和赋值 RocketMQConsumer(const RocketMQConsumer&) {} RocketMQConsumer& operator=(const RocketMQConsumer&) { return *this; } // 订阅主题和标签,并注册回调 void subscribe(const std::string& topic, const std::string& tag, MessageCallBack callback); // 启动消费者 void start(); //修改消费模式 void setConsumerMessageModel(const std::string& topic); // 析构函数:关闭并销毁消费者 ~RocketMQConsumer(); private: CPushConsumer* consumer_; // C 接口消费者指针 //MessageCallBack messageCallback_; // 函数指针用于回调 std::map, MessageCallBack> callbacks_; // 订阅到回调的映射 // 静态消息处理回调 static int messageHandler(CPushConsumer* consumer, CMessageExt* msg); // 实例消息处理函数 int handleMessage(CMessageExt* msg); }; // 构造函数实现 RocketMQConsumer::RocketMQConsumer(const std::string& consumerName, const std::string& nameServer, const std::string& groupId) : consumer_(NULL)//, messageCallback_(NULL) { // 创建消费者 consumer_ = CreatePushConsumer(consumerName.c_str()); if (consumer_ == NULL) { std::cout << "error CreatePushConsumer"<< std::endl; throw std::runtime_error("Failed to create push consumer."); } SetPushConsumerSessionCredentials(consumer_,G_MQCONSUMER_ACCESSKEY.c_str(),G_MQCONSUMER_SECRETKEY.c_str(),G_MQCONSUMER_CHANNEL.c_str()); // 设置 NameServer 地址 if (SetPushConsumerNameServerAddress(consumer_, nameServer.c_str()) != 0) { DestroyPushConsumer(consumer_); std::cout << "error setting nameServer"<< std::endl; throw std::runtime_error("Failed to set NameServer address."); } // 设置消费者组ID if (SetPushConsumerGroupID(consumer_, groupId.c_str()) != 0) { DestroyPushConsumer(consumer_); std::cout << "error setting groupId"<< std::endl; throw std::runtime_error("Failed to set Consumer Group ID."); } /* // 设置消费模式为广播模式 if (SetPushConsumerMessageModel(consumer_, BROADCASTING) != 0) { DestroyPushConsumer(consumer_); std::cout << "error setting messagemodel"<< std::endl; } */ //调试用 std::string consumerlog = "./mqconsumer/" + consumerName +".log"; if ( (SetPushConsumerLogPath(consumer_,consumerlog.c_str()) || SetPushConsumerLogFileNumAndSize(consumer_,10,100) || SetPushConsumerLogLevel(consumer_,E_LOG_LEVEL_DEBUG) ) != 0) {//记录消费日志 DestroyPushConsumer(consumer_); std::cout << "error setting logpath"<< std::endl; } std::cout << "logpath:" << consumerlog << std::endl; // 注册消息回调 if (RegisterMessageCallback(consumer_, RocketMQConsumer::messageHandler) != 0) { DestroyPushConsumer(consumer_); std::cout << "error setting Callback"<< std::endl; throw std::runtime_error("Failed to register message callback."); } // 将消费者实例添加到全局映射 pthread_mutex_lock(&g_consumerMapMutex); g_consumerMap[consumer_] = this; pthread_mutex_unlock(&g_consumerMapMutex); std::cout << "RocketMQ Consumer initialized and started." << std::endl; } // 启动消费者 void RocketMQConsumer::start() { if (StartPushConsumer(consumer_) != 0) { pthread_mutex_lock(&g_consumerMapMutex); g_consumerMap.erase(consumer_); pthread_mutex_unlock(&g_consumerMapMutex); DestroyPushConsumer(consumer_); throw std::runtime_error("Failed to start push consumer."); } else{ std::cout << "RocketMQ Consumer started." << std::endl; } } void RocketMQConsumer::subscribe(const std::string& topic, const std::string& tag, MessageCallBack callback) { if (Subscribe(consumer_, topic.c_str(), tag.c_str()) != 0) { throw std::runtime_error("Failed to subscribe to topic/tag."); } std::cout << "Subscribed to topic: " << topic << ", tag: " << tag << std::endl; // 使用 std::pair 作为键 std::pair key(topic, tag); callbacks_[key] = callback; } // 静态消息处理回调实现 int RocketMQConsumer::messageHandler(CPushConsumer* consumer, CMessageExt* msg) { RocketMQConsumer* instance = NULL; //调试用 std::cout << "messagehandler" << std::endl; // 查找对应的消费者实例 pthread_mutex_lock(&g_consumerMapMutex); std::map::iterator it = g_consumerMap.find(consumer); if (it != g_consumerMap.end()) { instance = it->second; } pthread_mutex_unlock(&g_consumerMapMutex); if (instance) { return instance->handleMessage(msg); } else { std::cerr << "Consumer instance not found for callback." << std::endl; return E_RECONSUME_LATER; // 默认返回重试状态 } } int RocketMQConsumer::handleMessage(CMessageExt* msg) { // 检查 msg 和 consumer_ 是否为 NULL if (!msg || !consumer_) { std::cerr << "Received null message or consumer." << std::endl; return E_RECONSUME_LATER; } // 获取消息的主题和标签 std::string topic = GetMessageTopic(msg); // 假设存在此函数 std::string tag = GetMessageTags(msg); // 假设存在此函数 // 打印调试信息 std::cout << "Handling message for topic: " << topic << ", tag: " << tag << std::endl; // 使用 std::pair 作为键 std::pair key(topic, tag); // 查找对应的回调函数 std::map, MessageCallBack>::iterator it = callbacks_.find(key); if (it != callbacks_.end()) { // 调用对应的回调函数 //调试 std::cout << "callback Handling message " <second(consumer_, msg); } else { //调试 std::cout << "there is no callback " <& subscriptions) // 接收多个订阅 { if (g_consumer == NULL) { std::cout << "create new consumer!" << std::endl; try { g_consumer = new RocketMQConsumer(consumerName, nameServer,consumerName);//用消费名作为消费组,不同进程(不同的消费者)同时消费topic的同一条消息 for (size_t i = 0; i < subscriptions.size(); ++i) { g_consumer->setConsumerMessageModel(subscriptions[i].topic);//初始化时根据topic设置消费模式 g_consumer->subscribe(subscriptions[i].topic, subscriptions[i].tag, subscriptions[i].callback); } g_consumer->start(); } catch (const std::exception& e) { std::cerr << "Failed to initialize consumer: " << e.what() << std::endl; throw; // 重新抛出异常 } } } // 关闭并销毁消费者函数 void ShutdownAndDestroyConsumer() { if (g_consumer != NULL) { delete g_consumer; g_consumer = NULL; } } // 消费消息的接口函数 void rocketmq_consumer_receive( const std::string& consumerName, const std::string& nameServer, const std::vector& subscriptions) // 接收多个订阅 { if (g_consumer == NULL) { try { //InitializeConsumer(consumerName, nameServer, topic, tag, callback);//初始化后,mq库内部来完成消息的获取 InitializeConsumer(consumerName, nameServer, subscriptions); // 初始化后,MQ库内部开始获取消息 } catch (...) { std::cerr << "Cannot consume message because consumer initialization failed." << std::endl; return; } } // 消费逻辑已通过回调处理,无需额外操作 } ///////////////////////////////////////////////////////////////////////////////////////////////////////////// //封装生产者类 #if 1 // 全局或静态变量,用于维护当前队列索引 static int currentQueueId = 0; // 队列选择器回调函数:轮询选择队列 ID int RoundRobinSelector(int queueNum, CMessage* msg, void* arg) { if (queueNum == 0) { throw std::runtime_error("No available queues"); } int queueId = currentQueueId % queueNum; currentQueueId++; if (currentQueueId >= 1024 - queueNum) { currentQueueId = 0; } return queueId; } // 封装生产者的类 class RocketMQProducer { public: RocketMQProducer(const std::string& producerName, const std::string& nameServer) : producer_(NULL) { // 创建生产者 producer_ = CreateProducer(producerName.c_str()); if (producer_ == NULL) { throw std::runtime_error("Failed to create producer."); } // 设置 nameserver 地址 SetProducerNameServerAddress(producer_, nameServer.c_str()); // 启动生产者 StartProducer(producer_); std::cout << "rocketmq_Producer initialized and started." << std::endl; } // 禁用拷贝和赋值 RocketMQProducer(const RocketMQProducer&) = delete; RocketMQProducer& operator=(const RocketMQProducer&) = delete; // 发送消息 void sendMessage(const char* strbody, const char* topic, const std::string& tags, const std::string& keys) { CSendResult result; CMessage* msg = NULL; try { // 创建消息并设置属性 msg = CreateMessage(topic); if (msg == NULL) { throw std::runtime_error("Failed to create message."); } SetMessageTags(msg, tags.c_str()); SetMessageKeys(msg, keys.c_str()); SetMessageBody(msg, strbody); // 假设队列数量和 Broker 名称是固定的 int queueNum = QUEUENUM; // 配置的队列数量,例如5 // 发送消息 int sendResult = SendMessageOnewayOrderly( producer_, msg, RoundRobinSelector, // 队列选择器回调函数 &queueNum // 传递给选择器的额外参数(队列数量) ); if (sendResult == 0) { // 假设返回 0 表示成功 std::cout << "Message sent successfully." << std::endl; } else { std::cout << "Failed to send message." << std::endl; } // 销毁消息 DestroyMessage(msg); msg = NULL; // 防止重复销毁 } catch (const std::runtime_error& e) { std::cerr << "Runtime error during message sending: " << e.what() << std::endl; // 如果消息已经创建,确保销毁它 if (msg != NULL) { DestroyMessage(msg); } // 可以在这里添加更多的错误处理逻辑,比如重试、记录日志等 } catch (const std::exception& e) { std::cerr << "Exception during message sending: " << e.what() << std::endl; if (msg != NULL) { DestroyMessage(msg); } } catch (...) { std::cerr << "Unknown error during message sending." << std::endl; if (msg != NULL) { DestroyMessage(msg); } } } // 析构函数中关闭并销毁生产者 ~RocketMQProducer() { if (producer_) { ShutdownProducer(producer_); DestroyProducer(producer_); std::cout << "rocketmq_Producer shutdown and destroyed." << std::endl; } } private: CProducer* producer_; }; // 全局生产者实例 RocketMQProducer* g_producer = NULL; // 初始化生产者(在程序启动时调用一次) void InitializeProducer() { if (g_producer == NULL) { try { g_producer = new RocketMQProducer(G_ROCKETMQ_PRODUCER, G_ROCKETMQ_IPPORT); } catch (const std::exception& e) { std::cerr << "Failed to initialize producer: " << e.what() << std::endl; // 根据需求决定是否终止程序或采取其他措施 throw; // 重新抛出异常 } } } // 关闭并销毁生产者(在程序结束时调用一次) void ShutdownAndDestroyProducer() { if (g_producer != NULL) { delete g_producer; g_producer = NULL; } } // 发送消息的接口函数 void rocketmq_producer_send(const char* strbody, const char* topic) { if (g_producer == NULL) { try { InitializeProducer(); } catch (...) { std::cerr << "Cannot send message because producer initialization failed." << std::endl; return; } } // 假设 tags 和 keys 是固定的,可以根据需要修改 std::string tags = G_ROCKETMQ_TAG; std::string keys = G_ROCKETMQ_KEY; try { g_producer->sendMessage(strbody, topic, tags, keys); } catch (const std::exception& e) { std::cerr << "Failed to send message: " << e.what() << std::endl; // 处理发送失败的情况,例如记录日志或重试 } } #endif //lnk20241209队列轮询 #if 0 // 全局或静态变量,用于维护当前队列索引 static int currentQueueId = 0; // 队列选择器回调函数:轮询选择队列 ID int RoundRobinSelector(int queueNum, CMessage* msg, void* arg) { if (queueNum == 0) { throw std::runtime_error("No available queues"); } // 选择当前队列 ID,并更新索引 int queueId = currentQueueId % queueNum; currentQueueId++; // 防止溢出,重置索引 if (currentQueueId >= 1024 - queueNum) { currentQueueId = 0; } return queueId; } void StartSendMessage_queue(CProducer* producer, const char* strbody, const char* topic) { CSendResult result; // 创建消息并设置一些属性 CMessage* msg = CreateMessage(topic); SetMessageTags(msg, G_ROCKETMQ_TAG.c_str()); SetMessageKeys(msg, G_ROCKETMQ_KEY.c_str()); SetMessageBody(msg, strbody); // 动态获取队列数量和 Broker 名称 int queueNum = QUEUENUM; // 假设这个是配置的队列数量,比如5 // 使用 SendMessageOnewayOrderly 发送消息,传递回调函数 int sendResult = SendMessageOnewayOrderly( producer, msg, RoundRobinSelector, // 传递符合 QueueSelectorCallback 签名的函数 &queueNum // 传递给选择器的额外参数(队列数量) ); if (sendResult == 0) { // 假设返回 0 表示成功 std::cout << "Message sent successfully: " << std::endl; } else { std::cout << "Failed to send message: " << std::endl; } // 销毁消息 DestroyMessage(msg); } #endif ////////////////////////////////////////////////////////////////////////////////////////////////////////// // producer_send0测试用 void StartSendMessage(CProducer* producer) { CSendResult result; // create message and set some values for it CMessage* msg = CreateMessage(G_ROCKETMQ_TOPIC.c_str()); SetMessageTags(msg, G_ROCKETMQ_TAG.c_str()); SetMessageKeys(msg, G_ROCKETMQ_KEY.c_str()); for (int i = 0; i < 10; i++) { // construct different body string strMessageBody = "this is body number"; SetMessageBody(msg, strMessageBody.c_str()); // send message SendMessageSync(producer, msg, &result); cout << "send message[" << i << "], result status:" << result.sendStatus << ", msgBody:" << strMessageBody << endl; usleep(1000); } // destroy message DestroyMessage(msg); } //producer_send 测试用 void StartSendMessage(CProducer* producer,const char* strbody) { CSendResult result; // create message and set some values for it CMessage* msg = CreateMessage(G_ROCKETMQ_TOPIC.c_str()); SetMessageTags(msg, G_ROCKETMQ_TAG.c_str()); SetMessageKeys(msg, G_ROCKETMQ_KEY.c_str()); SetMessageBody(msg, strbody); // send message SendMessageSync(producer, msg, &result); cout << "result status:" << result.sendStatus << ", msgBody:" << strbody << endl; // destroy message DestroyMessage(msg); } //测试用 固定消息体 void producer_send0() { cout << "Producer Initializing....." << endl; // create producer and set some values for it CProducer* producer = CreateProducer(G_ROCKETMQ_PRODUCER.c_str()); SetProducerNameServerAddress(producer, G_ROCKETMQ_IPPORT.c_str()); // start producer StartProducer(producer); cout << "Producer start....." << endl; // send message StartSendMessage(producer); // shutdown producer ShutdownProducer(producer); // destroy producer DestroyProducer(producer); cout << "Producer Shutdown!" << endl; } //测试用 可控制消息体 void producer_send(const char* strbody) { cout << "Producer Initializing....." << endl; // create producer and set some values for it CProducer* producer = CreateProducer(G_ROCKETMQ_PRODUCER.c_str()); SetProducerNameServerAddress(producer, G_ROCKETMQ_IPPORT.c_str()); // start producer StartProducer(producer); cout << "Producer start....." << endl; // send message StartSendMessage(producer, strbody); // shutdown producer ShutdownProducer(producer); // destroy producer DestroyProducer(producer); cout << "Producer Shutdown!" << endl; } /////////////////////////////////////////////////////////////////////////////////////////////////////////// #if 0 void rocketmq_StartSendMessage(CProducer* producer,const char* strbody,const char* topic) { CSendResult result; // create message and set some values for it CMessage* msg = CreateMessage(topic); //多前置区分消息tag,一个进程 SetMessageTags(msg, G_ROCKETMQ_TAG.c_str()); SetMessageKeys(msg, G_ROCKETMQ_KEY.c_str()); SetMessageBody(msg, strbody); // send message SendMessageSync(producer, msg, &result); //cout << "rocketmq_result status:" << result.sendStatus << ", msgBody:" << strbody << endl; // destroy message DestroyMessage(msg); } void rocketmq_producer_send(const char* strbody,const char* topic) { cout << "rocketmq_Producer Initializing....." << endl; // create producer and set some values for it CProducer* producer = CreateProducer(G_ROCKETMQ_PRODUCER.c_str()); if (producer == NULL) { std::cerr << "Failed to create producer." << std::endl; return; } // nameserver SetProducerNameServerAddress(producer, G_ROCKETMQ_IPPORT.c_str()); // start producer StartProducer(producer); cout << "rocketmq_Producer start....." << endl; // send message //rocketmq_StartSendMessage(producer,strbody,topic); //根据队列发消息 StartSendMessage_queue(producer,strbody,topic); // shutdown producer ShutdownProducer(producer); // destroy producer DestroyProducer(producer); cout << "rocketmq_Producer Shutdown!" << endl; } #endif extern "C" { extern std::string G_MQCONSUMER_TOPIC_RT; void rocketmq_test_rt() { Ckafka_data_t data; data.monitor_id = 123123; data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_RT); std::ifstream file("rt.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 data.strText = QString::fromStdString(buffer.str()); data.mp_id = 123123; my_rocketmq_send(data); } extern std::string G_MQCONSUMER_TOPIC_UD; void rocketmq_test_ud()//用来测试台账更新 { Ckafka_data_t data; data.monitor_id = 123123; data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_UD); std::ifstream file("ud.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 data.strText = QString::fromStdString(buffer.str()); data.mp_id = 123123; my_rocketmq_send(data); } void rocketmq_test_set()//用来测试进程控制脚本 { Ckafka_data_t data; data.monitor_id = 123123; data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_SET); std::ifstream file("set.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 data.strText = QString::fromStdString(buffer.str()); data.mp_id = 123123; my_rocketmq_send(data); } void rocketmq_test_only()//用来测试进程控制脚本 { Ckafka_data_t data; data.monitor_id = 123123; data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_SET); std::ifstream file("set_debug.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 data.strText = QString::fromStdString(buffer.str()); data.mp_id = 123123; my_rocketmq_send(data); } extern std::string G_MQCONSUMER_TOPIC_RC; void rocketmq_test_rc() { Ckafka_data_t data; data.monitor_id = 123123; data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_RC); std::ifstream file("rc.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 data.strText = QString::fromStdString(buffer.str()); data.mp_id = 123123; my_rocketmq_send(data); } } std::string to_string(long long value) { std::stringstream ss; ss << value; return ss.str(); } void rocketmq_test_300(int mpnum,int front_index) { Ckafka_data_t data; data.strTopic = QString::fromStdString(G_ROCKETMQ_TOPIC); data.mp_id = "0"; // 读取文件内容 std::ifstream file("long_string.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); std::string file_contents = buffer.str(); // 获取文件内容 std::string base_strText = file_contents; // 获取当前时间作为开始时间 std::time_t t = std::time(NULL);//获取当前的系统时间(自 1970 年 1 月 1 日以来的秒数,通常称为 UNIX 时间戳) std::tm* time_info = std::localtime(&t);//将 std::time_t(表示当前的 UNIX 时间戳)转换为本地时间(std::tm 结构) time_info->tm_sec = 0; // 清零秒位 //time_info->tm_msec = 0; // 清零毫秒位(如果需要更精确,使用高精度时间) // 获取当前的时间戳(秒) std::time_t base_time_t = std::mktime(time_info);//将 std::tm 结构(本地时间)转换回 std::time_t(时间戳) // 计算每条消息的时间戳,精确到分钟,毫秒和秒清零 long long current_time_ms = static_cast(base_time_t) * 1000; // 每分钟递增,单位毫秒 // 设定总的消息数量 int total_messages = mpnum; // 循环发送 300 条消息 for (int i = 0; i < total_messages; ++i) { // 修改 Monitor 值 data.monitor_id = front_index *10000 + 1 + i; data.mp_id = QString::number(data.monitor_id); std::string modified_time = to_string(current_time_ms); // 时间转换为整数类型(Unix时间戳) // 替换消息中的 Monitor 和 TIME 字段(只匹配字段名,不匹配具体数值) std::string modified_strText = base_strText; // 替换 Monitor 字段 size_t monitor_pos = modified_strText.find("\"Monitor\""); if (monitor_pos != std::string::npos) { size_t colon_pos = modified_strText.find(":", monitor_pos); size_t quote_pos = modified_strText.find("\"", colon_pos); size_t end_quote_pos = modified_strText.find("\"", quote_pos + 1); if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) { modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, to_string(data.monitor_id)); } } // 替换 TIME 字段 size_t time_pos = modified_strText.find("\"TIME\""); if (time_pos != std::string::npos) { size_t colon_pos = modified_strText.find(":", time_pos); size_t quote_pos = colon_pos; size_t end_quote_pos = modified_strText.find(",", quote_pos + 1); if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) { modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, modified_time); } } // 更新数据 data.strText = QString::fromStdString(modified_strText); // 发送数据 my_rocketmq_send(data); // 输出调试信息 std::cout << "Sent message " << (i + 1) << " with Monitor " << data.monitor_id << " and TIME " << modified_time << std::endl; // 等待下一条消息的发送(固定为1分钟) //QThread::sleep(60); // 每次发送间隔1分钟 } std::cout << "Finished sending " << total_messages << " messages." << std::endl; }