delete useless code
This commit is contained in:
@@ -1,6 +1,3 @@
|
||||
/*
|
||||
* Description: Simple Producer demo
|
||||
*/
|
||||
#include <fstream> // 用于 std::ifstream
|
||||
#include <sstream> // 用于 std::stringstream
|
||||
#include <unistd.h>
|
||||
@@ -12,7 +9,6 @@
|
||||
#include "../include/rocketmq/CProducer.h"
|
||||
#include "../include/rocketmq/CMessage.h"
|
||||
#include "../include/rocketmq/CSendResult.h"
|
||||
|
||||
#include "../include/rocketmq/SimpleProducer.h"
|
||||
|
||||
//测试300数据用lnk20241202
|
||||
@@ -135,13 +131,7 @@ RocketMQConsumer::RocketMQConsumer(const std::string& consumerName, const std::s
|
||||
std::cout << "error setting groupId"<< std::endl;
|
||||
throw std::runtime_error("Failed to set Consumer Group ID.");
|
||||
}
|
||||
/*
|
||||
// 设置消费模式为广播模式
|
||||
if (SetPushConsumerMessageModel(consumer_, BROADCASTING) != 0) {
|
||||
DestroyPushConsumer(consumer_);
|
||||
std::cout << "error setting messagemodel"<< std::endl;
|
||||
}
|
||||
*/
|
||||
|
||||
//调试用
|
||||
std::string consumerlog = "./mqconsumer/" + consumerName +".log";
|
||||
if ( (SetPushConsumerLogPath(consumer_,consumerlog.c_str()) || SetPushConsumerLogFileNumAndSize(consumer_,10,100) || SetPushConsumerLogLevel(consumer_,E_LOG_LEVEL_DEBUG) ) != 0) {//记录消费日志
|
||||
@@ -365,7 +355,6 @@ void rocketmq_consumer_receive(
|
||||
}
|
||||
}
|
||||
|
||||
// 消费逻辑已通过回调处理,无需额外操作
|
||||
}
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
//封装生产者类
|
||||
@@ -465,19 +454,6 @@ public:
|
||||
RoundRobinSelector, // 队列选择器回调函数
|
||||
&queueNum // 传递给选择器的额外参数(队列数量)
|
||||
);
|
||||
|
||||
//调试用
|
||||
/*int sendResult = SendMessageSync(
|
||||
producer_,
|
||||
msg,
|
||||
&result
|
||||
);
|
||||
int ret = SendMessageSync(producer_, msg, &result);
|
||||
if (ret == 0) {
|
||||
printSendResult(result);
|
||||
} else {
|
||||
std::cerr << "SendMessageSync failed with ret=" << ret << std::endl;
|
||||
}*/
|
||||
|
||||
if (sendResult == 0) { // 假设返回 0 表示成功
|
||||
std::cout << "Message sent successfully.topic:" << topic <<std::endl;
|
||||
@@ -579,56 +555,6 @@ void rocketmq_producer_send(const char* strbody, const char* topic)
|
||||
}
|
||||
#endif
|
||||
|
||||
//lnk20241209队列轮询
|
||||
#if 0
|
||||
// 全局或静态变量,用于维护当前队列索引
|
||||
static int currentQueueId = 0;
|
||||
|
||||
// 队列选择器回调函数:轮询选择队列 ID
|
||||
int RoundRobinSelector(int queueNum, CMessage* msg, void* arg) {
|
||||
if (queueNum == 0) {
|
||||
throw std::runtime_error("No available queues");
|
||||
}
|
||||
// 选择当前队列 ID,并更新索引
|
||||
int queueId = currentQueueId % queueNum;
|
||||
currentQueueId++;
|
||||
// 防止溢出,重置索引
|
||||
if (currentQueueId >= 1024 - queueNum) {
|
||||
currentQueueId = 0;
|
||||
}
|
||||
return queueId;
|
||||
}
|
||||
void StartSendMessage_queue(CProducer* producer, const char* strbody, const char* topic)
|
||||
{
|
||||
CSendResult result;
|
||||
|
||||
// 创建消息并设置一些属性
|
||||
CMessage* msg = CreateMessage(topic);
|
||||
SetMessageTags(msg, G_ROCKETMQ_TAG.c_str());
|
||||
SetMessageKeys(msg, G_ROCKETMQ_KEY.c_str());
|
||||
SetMessageBody(msg, strbody);
|
||||
|
||||
// 动态获取队列数量和 Broker 名称
|
||||
int queueNum = QUEUENUM; // 假设这个是配置的队列数量,比如5
|
||||
|
||||
// 使用 SendMessageOnewayOrderly 发送消息,传递回调函数
|
||||
int sendResult = SendMessageOnewayOrderly(
|
||||
producer,
|
||||
msg,
|
||||
RoundRobinSelector, // 传递符合 QueueSelectorCallback 签名的函数
|
||||
&queueNum // 传递给选择器的额外参数(队列数量)
|
||||
);
|
||||
|
||||
if (sendResult == 0) { // 假设返回 0 表示成功
|
||||
std::cout << "Message sent successfully: " << std::endl;
|
||||
} else {
|
||||
std::cout << "Failed to send message: " << std::endl;
|
||||
}
|
||||
|
||||
// 销毁消息
|
||||
DestroyMessage(msg);
|
||||
}
|
||||
#endif
|
||||
//////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// producer_send0测试用
|
||||
void StartSendMessage(CProducer* producer)
|
||||
@@ -716,60 +642,6 @@ void producer_send(const char* strbody)
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
#if 0
|
||||
void rocketmq_StartSendMessage(CProducer* producer,const char* strbody,const char* topic)
|
||||
{
|
||||
CSendResult result;
|
||||
|
||||
// create message and set some values for it
|
||||
CMessage* msg = CreateMessage(topic);
|
||||
|
||||
//多前置区分消息tag,一个进程
|
||||
|
||||
SetMessageTags(msg, G_ROCKETMQ_TAG.c_str());
|
||||
SetMessageKeys(msg, G_ROCKETMQ_KEY.c_str());
|
||||
|
||||
SetMessageBody(msg, strbody);
|
||||
// send message
|
||||
SendMessageSync(producer, msg, &result);
|
||||
|
||||
//cout << "rocketmq_result status:" << result.sendStatus << ", msgBody:" << strbody << endl;
|
||||
|
||||
// destroy message
|
||||
DestroyMessage(msg);
|
||||
}
|
||||
|
||||
void rocketmq_producer_send(const char* strbody,const char* topic)
|
||||
{
|
||||
cout << "rocketmq_Producer Initializing....." << endl;
|
||||
|
||||
// create producer and set some values for it
|
||||
CProducer* producer = CreateProducer(G_ROCKETMQ_PRODUCER.c_str());
|
||||
if (producer == NULL) {
|
||||
std::cerr << "Failed to create producer." << std::endl;
|
||||
return;
|
||||
}
|
||||
|
||||
// nameserver
|
||||
SetProducerNameServerAddress(producer, G_ROCKETMQ_IPPORT.c_str());
|
||||
|
||||
// start producer
|
||||
StartProducer(producer);
|
||||
|
||||
cout << "rocketmq_Producer start....." << endl;
|
||||
// send message
|
||||
//rocketmq_StartSendMessage(producer,strbody,topic);
|
||||
//根据队列发消息
|
||||
StartSendMessage_queue(producer,strbody,topic);
|
||||
// shutdown producer
|
||||
ShutdownProducer(producer);
|
||||
// destroy producer
|
||||
DestroyProducer(producer);
|
||||
cout << "rocketmq_Producer Shutdown!" << endl;
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
|
||||
extern "C" {
|
||||
extern std::string G_MQCONSUMER_TOPIC_RT;
|
||||
@@ -862,84 +734,3 @@ void rocketmq_test_log()
|
||||
}
|
||||
|
||||
|
||||
|
||||
#if 0
|
||||
void rocketmq_test_300(int mpnum,int front_index) {
|
||||
Ckafka_data_t data;
|
||||
data.strTopic = QString::fromStdString(G_ROCKETMQ_TOPIC);
|
||||
data.mp_id = "0";
|
||||
|
||||
// 读取文件内容
|
||||
std::ifstream file("long_string.txt"); // 文件中存储长字符串
|
||||
std::stringstream buffer;
|
||||
buffer << file.rdbuf();
|
||||
std::string file_contents = buffer.str(); // 获取文件内容
|
||||
std::string base_strText = file_contents;
|
||||
|
||||
// 获取当前时间作为开始时间
|
||||
std::time_t t = std::time(NULL);//获取当前的系统时间(自 1970 年 1 月 1 日以来的秒数,通常称为 UNIX 时间戳)
|
||||
std::tm* time_info = std::localtime(&t);//将 std::time_t(表示当前的 UNIX 时间戳)转换为本地时间(std::tm 结构)
|
||||
time_info->tm_sec = 0; // 清零秒位
|
||||
//time_info->tm_msec = 0; // 清零毫秒位(如果需要更精确,使用高精度时间)
|
||||
|
||||
// 获取当前的时间戳(秒)
|
||||
std::time_t base_time_t = std::mktime(time_info);//将 std::tm 结构(本地时间)转换回 std::time_t(时间戳)
|
||||
|
||||
// 计算每条消息的时间戳,精确到分钟,毫秒和秒清零
|
||||
long long current_time_ms = static_cast<long long>(base_time_t) * 1000; // 每分钟递增,单位毫秒
|
||||
|
||||
// 设定总的消息数量
|
||||
int total_messages = mpnum;
|
||||
|
||||
// 循环发送 300 条消息
|
||||
for (int i = 0; i < total_messages; ++i) {
|
||||
// 修改 Monitor 值
|
||||
data.monitor_id = front_index *10000 + 1 + i;
|
||||
|
||||
data.mp_id = QString::number(data.monitor_id);
|
||||
|
||||
std::string modified_time = my_to_string(current_time_ms); // 时间转换为整数类型(Unix时间戳)
|
||||
|
||||
// 替换消息中的 Monitor 和 TIME 字段(只匹配字段名,不匹配具体数值)
|
||||
std::string modified_strText = base_strText;
|
||||
|
||||
// 替换 Monitor 字段
|
||||
size_t monitor_pos = modified_strText.find("\"Monitor\"");
|
||||
if (monitor_pos != std::string::npos) {
|
||||
size_t colon_pos = modified_strText.find(":", monitor_pos);
|
||||
size_t quote_pos = modified_strText.find("\"", colon_pos);
|
||||
size_t end_quote_pos = modified_strText.find("\"", quote_pos + 1);
|
||||
if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) {
|
||||
modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, my_to_string(data.monitor_id));
|
||||
}
|
||||
}
|
||||
|
||||
// 替换 TIME 字段
|
||||
size_t time_pos = modified_strText.find("\"TIME\"");
|
||||
if (time_pos != std::string::npos) {
|
||||
size_t colon_pos = modified_strText.find(":", time_pos);
|
||||
size_t quote_pos = colon_pos;
|
||||
size_t end_quote_pos = modified_strText.find(",", quote_pos + 1);
|
||||
if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) {
|
||||
modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, modified_time);
|
||||
}
|
||||
}
|
||||
|
||||
// 更新数据
|
||||
data.strText = QString::fromStdString(modified_strText);
|
||||
|
||||
// 发送数据
|
||||
my_rocketmq_send(data);
|
||||
|
||||
// 输出调试信息
|
||||
std::cout << "Sent message " << (i + 1) << " with Monitor " << data.monitor_id << " and TIME " << modified_time << std::endl;
|
||||
|
||||
// 等待下一条消息的发送(固定为1分钟)
|
||||
//QThread::sleep(60); // 每次发送间隔1分钟
|
||||
}
|
||||
|
||||
std::cout << "Finished sending " << total_messages << " messages." << std::endl;
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user