the heartbeat reply ledgerupdate is ok

This commit is contained in:
lnk
2025-05-12 16:43:42 +08:00
parent 8a2e6ea537
commit a6685ca801
9 changed files with 171 additions and 192 deletions

View File

@@ -73,6 +73,8 @@ extern "C" {
extern int INITFLAG;
extern std::string FRONT_INST;
extern QMutex kafka_data_list_mutex;
extern QList<Ckafka_data_t> kafka_data_list;
@@ -525,87 +527,6 @@ void KafkaSendThread::run()
printf("END my_kafka_send no.%i -------->>>>>>>>>>>> %s \n\n", count++,
QDateTime::currentDateTime().toString("yyyy-MM-dd hh:mm:ss.zzz").toAscii().data());
}
/*//lnk20250225添加日志上送
Ckafka_data_t log_send;
log_send.strTopic = QString::fromStdString(G_LOG_TOPIC);
bool log_gotten;
log_gotten = false;
if(!showinshellflag){
if (debugOutputEnabled) {
// 如果 normalOutputEnabled 和 warnOutputEnabled 都为 0且 errorOutputEnabled 为 1取 errorList 输出
// 处理 errorList 的输出
pthread_mutex_lock(&debugListMutex);
if (!debugList.empty()) {
log_gotten = true;
log_send.strText = QString::fromStdString(debugList.front());//请确保list正确
// 检查是否为空字符串(去掉空白字符后是否为空)
if (log_send.strText.trimmed().isEmpty()) {
debugList.pop_front(); // 直接丢弃这条日志
log_gotten = false; // 标记没有获取到有效日志
} else {
debugList.pop_front(); // 只有非空白字符串才会真正返回
}
}
pthread_mutex_unlock(&debugListMutex);
}
else if (normalOutputEnabled) {
// 如果 normalOutputEnabled 为 1优先从 normalList 获取输出
// 处理 normalList 的输出
pthread_mutex_lock(&normalListMutex);
if (!normalList.empty()) {
log_gotten = true;
log_send.strText = QString::fromStdString(normalList.front());
// 检查是否为空字符串(去掉空白字符后是否为空)
if (log_send.strText.trimmed().isEmpty()) {
normalList.pop_front(); // 直接丢弃这条日志
log_gotten = false; // 标记没有获取到有效日志
} else {
normalList.pop_front(); // 只有非空白字符串才会真正返回
}
}
pthread_mutex_unlock(&normalListMutex);
} else if (warnOutputEnabled) {
// 如果 normalOutputEnabled 为 0且 warnOutputEnabled 为 1优先从 warnList 获取输出
// 处理 warnList 的输出
pthread_mutex_lock(&warnListMutex);
if (!warnList.empty()) {
log_gotten = true;
log_send.strText = QString::fromStdString(warnList.front());
// 检查是否为空字符串(去掉空白字符后是否为空)
if (log_send.strText.trimmed().isEmpty()) {
warnList.pop_front(); // 直接丢弃这条日志
log_gotten = false; // 标记没有获取到有效日志
} else {
warnList.pop_front(); // 只有非空白字符串才会真正返回
}
}
pthread_mutex_unlock(&warnListMutex);
} else if (errorOutputEnabled) {
// 如果 normalOutputEnabled 和 warnOutputEnabled 都为 0且 errorOutputEnabled 为 1取 errorList 输出
// 处理 errorList 的输出
pthread_mutex_lock(&errorListMutex);
if (!errorList.empty()) {
log_gotten = true;
log_send.strText = QString::fromStdString(errorList.front());
// 检查是否为空字符串(去掉空白字符后是否为空)
if (log_send.strText.trimmed().isEmpty()) {
errorList.pop_front(); // 直接丢弃这条日志
log_gotten = false; // 标记没有获取到有效日志
} else {
errorList.pop_front(); // 只有非空白字符串才会真正返回
}
}
pthread_mutex_unlock(&errorListMutex);
}
}
if (log_gotten) {
static uint32_t count = 0;
my_rocketmq_send(log_send);
}*/
QThread::msleep(10); // 避免 CPU 空转lnk20250326
@@ -649,15 +570,17 @@ std::string extractDataJson(const char* inputJson) {
return "";
}
/*//添加guid
//添加guid
// 提取 "guid" 部分
cJSON* guidstr = cJSON_GetObjectItem(messageBody, "guid");
if (guidstr == NULL || data->type != cJSON_String) {
if (guidstr == NULL || guidstr->type != cJSON_String) {
std::cerr << "'guid' is missing or is not an array" << std::endl;
cJSON_Delete(root);
return "";
}
//guid回复*/
//guid回复
std::string guid = guidstr->valuestring;
send_reply_to_kafka(guid,"1","收到补招指令");
// 提取 "data" 部分
cJSON* data = cJSON_GetObjectItem(messageBody, "data");
@@ -737,8 +660,9 @@ bool parseJsonMessageRT(const std::string& body, std::string& devSeries, std::st
cJSON* limitItem = cJSON_GetObjectItem(messageBody, "limit");
//添加guid
//cJSON* guidstr = cJSON_GetObjectItem(messageBody, "guid");
//if(guidstr)std::string guid = guidstr->valuestring;
std::string guid;
cJSON* guidstr = cJSON_GetObjectItem(messageBody, "guid");
if(guidstr)guid = guidstr->valuestring;
if (devSeriesItem && lineItem && realDataItem && soeDataItem && limitItem) {
devSeries = devSeriesItem->valuestring;
@@ -748,7 +672,8 @@ bool parseJsonMessageRT(const std::string& body, std::string& devSeries, std::st
limit = limitItem->valueint;
//回复消息
//guid
//执行结果直接看实时数据不需要再回复1是收到消息
send_reply_to_kafka(guid,"1","收到三秒数据指令");
} else {
std::cerr << "Missing expected fields in JSON message." << std::endl;
@@ -1031,8 +956,7 @@ void parse_set(const std::string& json_str) {
std::string frontType = front->valuestring;
//进程号为0的进程处理所有控制消息
if (index_value != g_front_seg_index && g_front_seg_index !=0) {
if (index_value != g_front_seg_index) {
std::cout << "msg index:"<< index_value <<"doesnt match self index:" << g_front_seg_index << std::endl;
cJSON_Delete(root);
return;
@@ -1068,6 +992,7 @@ void parse_set(const std::string& json_str) {
//脚本在3秒后执行
//回复消息
send_reply_to_kafka(guid,"1","收到重置进程指令,重启所有进程!");
//上送日志
@@ -1081,6 +1006,11 @@ void parse_set(const std::string& json_str) {
//等待一会后退出进程
MVL_LOG_ACSE0("MYLOG: recive delete msg, so exit to restart ");
//回复消息
send_reply_to_kafka(guid,"1","收到删除进程指令,这个进程将会重启 ");
//上送日志
apr_sleep(apr_time_from_sec(10));
exit(-1039);
}
@@ -1444,7 +1374,6 @@ void parse_log(const std::string& json_str) {
std::string frontType = frontTypestr->valuestring;
//进程号为0的进程处理所有台账更新消息
if (processNo != g_front_seg_index) {
std::cout << "msg index:"<< processNo <<"doesnt match self index:" << g_front_seg_index << std::endl;
cJSON_Delete(root);
@@ -1457,10 +1386,13 @@ void parse_log(const std::string& json_str) {
return;
}
//进程号匹配上
//进程号匹配上
std::cout << "msg index:"<< processNo <<" self index:" << g_front_seg_index << std::endl;
std::cout << "msg frontType:"<< frontType <<" self frontType:" << subdir << std::endl;
//回复消息
send_reply_to_kafka(guid,"1","收到实时日志指令");
if (code_str == "set_log") {
//校验数据
if((level == "terminal" || level == "measurepoint") &&
@@ -1560,7 +1492,8 @@ void parse_control(const std::string& json_str, const std::string& output_dir) {
std::cout << "msg index:"<< process_No <<" self index:" << g_front_seg_index << std::endl;
//匹配后响应收到台账更新消息
//guid
//除了回复收到消息,执行结束后还要回复结果
send_reply_to_kafka(guid,"1","收到台账更新指令");
if (code_str == "add_terminal" || code_str == "ledger_modify") {
@@ -2032,24 +1965,22 @@ void mqconsumerThread::run()
std::vector<Subscription> subscriptions;
// 初始化消费者1 //lnk20241230只有实时进程会订阅实时topic不订阅实时topic的进程无法触发实时数据
if(g_node_id == THREE_SECS_DATA_BASE_NODE_ID){
subscriptions.push_back(Subscription(G_MQCONSUMER_TOPIC_RT, G_MQCONSUMER_TAG_RT, myMessageCallbackrtdata));
subscriptions.push_back(Subscription(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_RT, G_MQCONSUMER_TAG_RT, myMessageCallbackrtdata));
}
// 初始化消费者2 //所有进程都会订阅台账更新topic不同功能进程的台账不能互相影响
subscriptions.push_back(Subscription(G_MQCONSUMER_TOPIC_UD, G_MQCONSUMER_TAG_UD, myMessageCallbackupdate));
subscriptions.push_back(Subscription(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_UD, G_MQCONSUMER_TAG_UD, myMessageCallbackupdate));
// 初始化消费者3 //lnk20241230只有补招进程会订阅补招topic不订阅补招topic的进程无法触发补招数据
if(g_node_id == RECALL_HIS_DATA_BASE_NODE_ID){
subscriptions.push_back(Subscription(G_MQCONSUMER_TOPIC_RC, G_MQCONSUMER_TAG_RC, myMessageCallbackrecall));
subscriptions.push_back(Subscription(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_RC, G_MQCONSUMER_TAG_RC, myMessageCallbackrecall));
}
// 初始化消费者4 //lnk20250108只有稳态进程1会订阅控制topic不订阅控制topic的进程无法触发进程重置
if(g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1){
subscriptions.push_back(Subscription(G_MQCONSUMER_TOPIC_SET, G_MQCONSUMER_TAG_SET, myMessageCallbackset));
}
// 初始化消费者4 //lnk20250108只有稳态进程1会控制reset
subscriptions.push_back(Subscription(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_SET, G_MQCONSUMER_TAG_SET, myMessageCallbackset));
// 初始化消费者5 //所有进程都会订阅日志上送topic不同功能进程的日志上送不能互相影响
subscriptions.push_back(Subscription(G_MQCONSUMER_TOPIC_LOG, G_MQCONSUMER_TAG_LOG, myMessageCallbacklog));
subscriptions.push_back(Subscription(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_LOG, G_MQCONSUMER_TAG_LOG, myMessageCallbacklog));
try {
rocketmq_consumer_receive(consumerName, nameServer, subscriptions);