log optimize

This commit is contained in:
lnk
2025-05-30 15:40:20 +08:00
parent 8d3ec192e8
commit a33418113a
6 changed files with 106 additions and 52 deletions

View File

@@ -880,12 +880,12 @@ void execute_bash_debug(string fun,string ip,string type,int proindex)
system(command);
}
void parse_set(const std::string& json_str) {
int parse_set(const std::string& json_str) {
// 解析 JSON 字符串
cJSON* root = cJSON_Parse(json_str.c_str());
if (root == nullptr) {
std::cout << "Error parsing JSON." << std::endl;
return;
return 1;
}
// 提取 "messageBody" 部分
@@ -893,7 +893,7 @@ void parse_set(const std::string& json_str) {
if (messageJson == NULL || messageJson->type != cJSON_String) {
std::cerr << "'messageJson' is missing or is not an cJSON_String" << std::endl;
cJSON_Delete(root);
return ;
return 1;
}
// 解析 messageBody 中的 JSON 字符串
@@ -901,14 +901,14 @@ void parse_set(const std::string& json_str) {
if (messageBodyStr == nullptr || strlen(messageBodyStr) == 0) {
std::cerr << "Failed to parse 'messageBody' JSON or it's empty." << std::endl;
cJSON_Delete(root);
return;
return 1;
}
cJSON* messageBody = cJSON_Parse(messageBodyStr); // 解析 messageBody 字符串
if (messageBody == NULL) {
std::cerr << "Failed to parse 'messageBody' JSON." << std::endl;
cJSON_Delete(root);
return ;
return 1;
}
// 获取 guid 字段
@@ -916,7 +916,7 @@ void parse_set(const std::string& json_str) {
if (guidstr == nullptr) {
std::cout << "Missing 'guid' in JSON." << std::endl;
cJSON_Delete(root);
return;
return 1;
}
// 根据 guid 字段回复消息
@@ -927,7 +927,7 @@ void parse_set(const std::string& json_str) {
if (code == nullptr) {
std::cout << "Missing 'code' in JSON." << std::endl;
cJSON_Delete(root);
return;
return 1;
}
// 根据 code 字段值执行不同的解析逻辑
@@ -937,7 +937,7 @@ void parse_set(const std::string& json_str) {
if (processNo == nullptr) {
std::cout << "Missing 'processNo' in JSON." << std::endl;
cJSON_Delete(root);
return;
return 1;
}
//判断是不是自己进程号:
@@ -947,7 +947,7 @@ void parse_set(const std::string& json_str) {
if (funtion == nullptr) {
std::cout << "Missing 'fun' in JSON." << std::endl;
cJSON_Delete(root);
return;
return 1;
}
std::string fun = funtion->valuestring;
@@ -956,7 +956,7 @@ void parse_set(const std::string& json_str) {
if (front == nullptr) {
std::cout << "Missing 'frontType' in JSON." << std::endl;
cJSON_Delete(root);
return;
return 1;
}
std::string frontType = front->valuestring;
@@ -964,12 +964,13 @@ void parse_set(const std::string& json_str) {
if (index_value != g_front_seg_index && g_front_seg_index != 0) {
std::cout << "msg index:"<< index_value <<"doesnt match self index:" << g_front_seg_index << std::endl;
cJSON_Delete(root);
return;
return 0;
}
//进程号为0或者进程号匹配上
std::cout << "msg index:"<< index_value <<" self index:" << g_front_seg_index << std::endl;
DIY_INFOLOG("process","【NORMAL】前置的%s%d号进程处理topic:%s_%s的进程控制消息",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(),G_MQCONSUMER_TOPIC_SET.c_str());
if (code_str == "set_process") {
@@ -977,7 +978,7 @@ void parse_set(const std::string& json_str) {
if (num == nullptr) {
std::cout << "Missing 'processNum' in JSON." << std::endl;
cJSON_Delete(root);
return;
return 1;
}
int processNum = num->valueint;
@@ -995,6 +996,8 @@ void parse_set(const std::string& json_str) {
}
execute_bash(fun, processNum, frontType);
DIY_WARNLOG("process","【WARN】前置的%s%d号进程执行指令:%s,reset表示重启所有进程,add表示添加进程",get_front_msg_from_subdir(), g_front_seg_index,fun.c_str());
//脚本在3秒后执行
//回复消息
send_reply_to_kafka(guid,"1","收到重置进程指令,重启所有进程!");
@@ -1015,6 +1018,7 @@ void parse_set(const std::string& json_str) {
send_reply_to_kafka(guid,"1","收到删除进程指令,这个进程将会重启 ");
//上送日志
DIY_WARNLOG("process","【WARN】前置的%s%d号进程执行指令:%s,即将重启",get_front_msg_from_subdir(), g_front_seg_index,fun.c_str());
apr_sleep(apr_time_from_sec(10));
::_exit(-1039); //进程退出
@@ -1032,7 +1036,7 @@ void parse_set(const std::string& json_str) {
if (onlyip == nullptr) {
std::cout << "Missing 'ip' in JSON." << std::endl;
cJSON_Delete(root);
return;
return 1;
}
std::string ip = onlyip->valuestring;
@@ -1041,7 +1045,7 @@ void parse_set(const std::string& json_str) {
if (index_item == nullptr) {
std::cout << "Missing 'proindex' in JSON." << std::endl;
cJSON_Delete(root);
return;
return 1;
}
int proindex = index_item->valueint;
@@ -1054,6 +1058,8 @@ void parse_set(const std::string& json_str) {
(frontType == "stat" || frontType == "recall" || frontType == "realTime" || frontType == "comtrade") &&
(proindex >= 10 && proindex < 100)){ //单连测试用的进程号应该大于10小于100
execute_bash_debug(fun, ip, frontType,proindex);
DIY_WARNLOG("process","【WARN】前置的%s%d号进程执行指令:%s,start开启单连进程,delete杀死单连进程",get_front_msg_from_subdir(), g_front_seg_index,fun.c_str());
}
else{
std::cout << "param is not executable" <<std::endl;
@@ -1071,6 +1077,7 @@ void parse_set(const std::string& json_str) {
// 释放 JSON 对象
cJSON_Delete(root);
return 0;
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -1265,12 +1272,12 @@ int StringToInt(const std::string& str) {
}
// 解析 JSON 字符串并执行相应操作
void parse_log(const std::string& json_str) {
int parse_log(const std::string& json_str) {
// 解析 JSON 字符串
cJSON* root = cJSON_Parse(json_str.c_str());
if (root == nullptr) {
std::cout << "Error parsing JSON." << std::endl;
return;
return 1;
}
// 提取 "messageBody" 部分
@@ -1278,7 +1285,7 @@ void parse_log(const std::string& json_str) {
if (messageJson == NULL || messageJson->type != cJSON_String) {
std::cerr << "'messageJson' is missing or is not an cJSON_String" << std::endl;
cJSON_Delete(root);
return ;
return 1;
}
// 解析 messageBody 中的 JSON 字符串
@@ -1286,14 +1293,14 @@ void parse_log(const std::string& json_str) {
if (messageBodyStr == nullptr || strlen(messageBodyStr) == 0) {
std::cerr << "Failed to parse 'messageBody' JSON or it's empty." << std::endl;
cJSON_Delete(root);
return;
return 1;
}
cJSON* messageBody = cJSON_Parse(messageBodyStr); // 解析 messageBody 字符串
if (messageBody == NULL) {
std::cerr << "Failed to parse 'messageBody' JSON." << std::endl;
cJSON_Delete(root);
return ;
return 1;
}
// 获取 guid 字段
@@ -1301,7 +1308,7 @@ void parse_log(const std::string& json_str) {
if (guidstr == nullptr) {
std::cout << "Missing 'guid' in JSON." << std::endl;
cJSON_Delete(root);
return;
return 1;
}
// 根据 guid 字段回复消息
@@ -1312,7 +1319,7 @@ void parse_log(const std::string& json_str) {
if (code == nullptr) {
std::cout << "Missing 'code' in JSON." << std::endl;
cJSON_Delete(root);
return;
return 1;
}
// 根据 code 字段值执行不同的解析逻辑
@@ -1323,7 +1330,7 @@ void parse_log(const std::string& json_str) {
if (process == nullptr) {
std::cout << "Missing 'processNo' in JSON." << std::endl;
cJSON_Delete(root);
return;
return 1;
}
//判断是不是自己进程号:
@@ -1334,7 +1341,7 @@ void parse_log(const std::string& json_str) {
if (idstr == nullptr) {
std::cout << "Missing 'id' in JSON." << std::endl;
cJSON_Delete(root);
return;
return 1;
}
std::string id = idstr->valuestring;
@@ -1344,7 +1351,7 @@ void parse_log(const std::string& json_str) {
if (levelstr == nullptr) {
std::cout << "Missing 'level' in JSON." << std::endl;
cJSON_Delete(root);
return;
return 1;
}
std::string level = levelstr->valuestring;
@@ -1354,7 +1361,7 @@ void parse_log(const std::string& json_str) {
if (gradestr == nullptr) {
std::cout << "Missing 'grade' in JSON." << std::endl;
cJSON_Delete(root);
return;
return 1;
}
std::string grade = gradestr->valuestring;
@@ -1364,7 +1371,7 @@ void parse_log(const std::string& json_str) {
if (logtypestr == nullptr) {
std::cout << "Missing 'logtype' in JSON." << std::endl;
cJSON_Delete(root);
return;
return 1;
}
std::string logtype = logtypestr->valuestring;
@@ -1374,7 +1381,7 @@ void parse_log(const std::string& json_str) {
if (frontTypestr == nullptr) {
std::cout << "Missing 'frontType' in JSON." << std::endl;
cJSON_Delete(root);
return;
return 1;
}
std::string frontType = frontTypestr->valuestring;
@@ -1382,15 +1389,17 @@ void parse_log(const std::string& json_str) {
if (processNo != g_front_seg_index) {
std::cout << "msg index:"<< processNo <<"doesnt match self index:" << g_front_seg_index << std::endl;
cJSON_Delete(root);
return;
return 0;
}
if (frontType != subdir) {
std::cout << "msg frontType:"<< frontType <<"doesnt match self frontType:" << subdir << std::endl;
cJSON_Delete(root);
return;
return 0;
}
DIY_INFOLOG("process","【NORMAL】前置的%s%d号进程处理日志上送消息",get_front_msg_from_subdir(), g_front_seg_index);
//进程号和匹配上
std::cout << "msg index:"<< processNo <<" self index:" << g_front_seg_index << std::endl;
std::cout << "msg frontType:"<< frontType <<" self frontType:" << subdir << std::endl;
@@ -1419,15 +1428,16 @@ void parse_log(const std::string& json_str) {
// 释放 JSON 对象
cJSON_Delete(root);
return 0;
}
// 台账更新不区分功能
void parse_control(const std::string& json_str, const std::string& output_dir) {
int parse_control(const std::string& json_str, const std::string& output_dir) {
// 解析 JSON 字符串
cJSON* root = cJSON_Parse(json_str.c_str());
if (root == nullptr) {
std::cout << "Error parsing JSON." << std::endl;
return;
return 1;
}
// 提取 "messageBody" 部分
@@ -1435,7 +1445,7 @@ void parse_control(const std::string& json_str, const std::string& output_dir) {
if (messageJson == NULL || messageJson->type != cJSON_String) {
std::cerr << "'messageJson' is missing or is not an cJSON_String" << std::endl;
cJSON_Delete(root);
return ;
return 1;
}
// 解析 messageBody 中的 JSON 字符串
@@ -1443,14 +1453,14 @@ void parse_control(const std::string& json_str, const std::string& output_dir) {
if (messageBodyStr == nullptr || strlen(messageBodyStr) == 0) {
std::cerr << "Failed to parse 'messageBody' JSON or it's empty." << std::endl;
cJSON_Delete(root);
return;
return 1;
}
cJSON* messageBody = cJSON_Parse(messageBodyStr); // 解析 messageBody 字符串
if (messageBody == NULL) {
std::cerr << "Failed to parse 'messageBody' JSON." << std::endl;
cJSON_Delete(root);
return ;
return 1;
}
// 获取 code 字段
@@ -1458,7 +1468,7 @@ void parse_control(const std::string& json_str, const std::string& output_dir) {
if (code == nullptr) {
std::cout << "Missing 'code' in JSON." << std::endl;
cJSON_Delete(root);
return;
return 1;
}
// 根据 code 字段值执行不同的解析逻辑
@@ -1469,7 +1479,7 @@ void parse_control(const std::string& json_str, const std::string& output_dir) {
if (process == nullptr) {
std::cout << "Missing 'processNo' in JSON." << std::endl;
cJSON_Delete(root);
return;
return 1;
}
//判断是不是自己进程号:
@@ -1480,7 +1490,7 @@ void parse_control(const std::string& json_str, const std::string& output_dir) {
if (guidstr == nullptr) {
std::cout << "Missing 'guid' in JSON." << std::endl;
cJSON_Delete(root);
return;
return 1;
}
// 根据 guid 字段回复消息
@@ -1490,12 +1500,15 @@ void parse_control(const std::string& json_str, const std::string& output_dir) {
if (process_No != g_front_seg_index && g_front_seg_index !=0) {
std::cout << "msg index:"<< process_No <<"doesnt match self index:" << g_front_seg_index << std::endl;
cJSON_Delete(root);
return;
return 0;
}
//进程号为0或者进程号匹配上
std::cout << "msg index:"<< process_No <<" self index:" << g_front_seg_index << std::endl;
//记录日志
DIY_INFOLOG("process","【NORMAL】前置的%s%d号进程处理topic:%s_%s的台账更新消息",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(),G_MQCONSUMER_TOPIC_UD.c_str());
//匹配后响应收到台账更新消息
//除了回复收到消息,执行结束后还要回复结果
send_reply_to_kafka(guid,"1","收到台账更新指令");
@@ -1709,6 +1722,8 @@ void parse_control(const std::string& json_str, const std::string& output_dir) {
// 释放 JSON 对象
cJSON_Delete(root);
return 0;
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -1759,6 +1774,8 @@ int myMessageCallbackrtdata(CPushConsumer* consumer, CMessageExt* msg)
return E_RECONSUME_LATER;
}
else{
//记录日志
DIY_INFOLOG("process","【NORMAL】前置消费topic:%s_%s的实时触发消息",FRONT_INST.c_str(),G_MQCONSUMER_TOPIC_RT.c_str());
// 处理消息(例如,打印消息内容)
std::cout << "rt data Callback received message: " << body << std::endl;
@@ -1776,19 +1793,18 @@ int myMessageCallbackrtdata(CPushConsumer* consumer, CMessageExt* msg)
// 解析 JSON 数据
if (!parseJsonMessageRT(body, devid, line, realData, soeData, limit)) {
std::cerr << "Failed to parse the JSON message." << std::endl;
//记录日志
DIY_ERRORLOG("process","【ERROR】前置消费topic:%s_%s的实时触发消息失败,消息的json格式不正确",FRONT_INST.c_str(),G_MQCONSUMER_TOPIC_RT.c_str());
return E_RECONSUME_LATER;
}
//记录日志
//mq处理实时数据指令查询台账时添加锁
pthread_mutex_lock(&mtx); std::cout << "update ledger xml hold lock !!!!!!!!!!!" << std::endl;
pthread_mutex_lock(&mtx); std::cout << "rtdata hold lock !!!!!!!!!!!" << std::endl;
int dev_index = find_dev_index_from_dev_id(devid);
int mp_index = find_mp_index_from_mp_id(line);
pthread_mutex_unlock(&mtx); std::cout << "update ledger xml free lock !!!!!!!!!!!" << std::endl;
pthread_mutex_unlock(&mtx); std::cout << "rtdata free lock !!!!!!!!!!!" << std::endl;
if(dev_index == 0 || mp_index == 0){
std::cerr << "dev index or mp index is 0" << std::endl;
@@ -1796,6 +1812,7 @@ int myMessageCallbackrtdata(CPushConsumer* consumer, CMessageExt* msg)
}
// 创建 XML 文件
if (!createXmlFile(dev_index, mp_index, realData, soeData, limit,"new")) {
DIY_ERRORLOG("process","【ERROR】前置无法创建实时数据触发文件");
std::cerr << "Failed to create the XML file." << std::endl;
return E_RECONSUME_LATER;
}
@@ -1823,6 +1840,7 @@ int myMessageCallbackupdate(CPushConsumer* consumer, CMessageExt* msg)
return E_RECONSUME_LATER;
}
else{
//处理消费数据
std::cout << "ledger update Callback received message: " << body << std::endl;
if (key) {
@@ -1834,7 +1852,9 @@ int myMessageCallbackupdate(CPushConsumer* consumer, CMessageExt* msg)
//处理台账更新消息
std::string updatefilepath = "/home/pq/FeProject/etc/ledgerupdate";
parse_control(body,updatefilepath);
if(parse_control(body,updatefilepath)){
DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程处理topic:%s_%s的台账更新消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(),G_MQCONSUMER_TOPIC_UD.c_str());
}
}
@@ -1868,7 +1888,9 @@ int myMessageCallbackset(CPushConsumer* consumer, CMessageExt* msg)
}
//处理进程更新消息
parse_set(body);
if(parse_set(body)){
DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程处理topic:%s_%s的进程控制消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(),G_MQCONSUMER_TOPIC_SET.c_str());
}
}
@@ -1902,7 +1924,9 @@ int myMessageCallbacklog(CPushConsumer* consumer, CMessageExt* msg)
}
//处理进程更新消息
parse_log(body);
if(parse_log(body)){
DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程处理topic:%s_%s的日志上送消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(),G_MQCONSUMER_TOPIC_LOG.c_str());
}
}
@@ -1950,6 +1974,7 @@ int myMessageCallbackrecall(CPushConsumer* consumer, CMessageExt* msg)
}
else{
std::cerr << "recall data is NULL." << std::endl;
DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程处理topic:%s_%s的补招触发消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(),G_MQCONSUMER_TOPIC_RC.c_str());
}
}