diff --git a/cfg_parse/cfg_parser.cpp b/cfg_parse/cfg_parser.cpp index 909c02a..6e74cce 100644 --- a/cfg_parse/cfg_parser.cpp +++ b/cfg_parse/cfg_parser.cpp @@ -312,6 +312,12 @@ std::string G_MQCONSUMER_KEY_RC = "";//consumer key std::string G_MQCONSUMER_TOPIC_SET = "";//consumer topie std::string G_MQCONSUMER_TAG_SET = "";//consumer tag std::string G_MQCONSUMER_KEY_SET = "";//consumer key +std::string G_LOG_TOPIC = "";//topie +std::string G_LOG_TAG = "";//tag +std::string G_LOG_KEY = "";//key +std::string G_MQCONSUMER_TOPIC_LOG = "";//consumer topie +std::string G_MQCONSUMER_TAG_LOG = "";//consumer tag +std::string G_MQCONSUMER_KEY_LOG = "";//consumer key int G_TEST_FLAG = 0; int G_TEST_NUM = 0; @@ -896,6 +902,19 @@ void init_config() { G_MQCONSUMER_TAG_SET = strdup(ba.data()); ba = settings.value("RocketMq/ConsumerKeySET", "").toString().toLatin1(); G_MQCONSUMER_KEY_SET = strdup(ba.data()); + + ba = settings.value("RocketMq/ConsumerTopicLOG", "").toString().toLatin1(); + G_MQCONSUMER_TOPIC_LOG = strdup(ba.data()); + ba = settings.value("RocketMq/ConsumerTagLOG", "").toString().toLatin1(); + G_MQCONSUMER_TAG_LOG = strdup(ba.data()); + ba = settings.value("RocketMq/ConsumerKeyLOG", "").toString().toLatin1(); + G_MQCONSUMER_KEY_LOG = strdup(ba.data()); + ba = settings.value("RocketMq/LOGTopic", "").toString().toLatin1(); + G_LOG_TOPIC = strdup(ba.data()); + ba = settings.value("RocketMq/LOGTag", "").toString().toLatin1(); + G_LOG_TAG = strdup(ba.data()); + ba = settings.value("RocketMq/LOGKey", "").toString().toLatin1(); + G_LOG_KEY = strdup(ba.data()); //MQ测试 G_TEST_FLAG = settings.value("RocketMq/Testflag", 0).toInt(); G_TEST_NUM = settings.value("RocketMq/Testnum", 0).toInt(); @@ -909,6 +928,9 @@ void init_config() { std::cout << "Read G_ROCKETMQ_TAG:" << G_ROCKETMQ_TAG << std::endl; std::cout << "Read G_ROCKETMQ_KEY:" << G_ROCKETMQ_KEY << std::endl; std::cout << "Read QUEUENUM:" << QUEUENUM << std::endl; + std::cout << "Read G_LOG_TOPIC:" << G_LOG_TOPIC << std::endl; + std::cout << "Read G_LOG_TAG:" << G_LOG_TAG << std::endl; + std::cout << "Read G_LOG_KEY:" << G_LOG_KEY << std::endl; //消费者相关打印 std::cout << "Read G_ROCKETMQ_CONSUMER:" << G_ROCKETMQ_CONSUMER << std::endl; std::cout << "Read G_MQCONSUMER_IPPORT:" << G_MQCONSUMER_IPPORT << std::endl; @@ -927,6 +949,9 @@ void init_config() { std::cout << "Read G_MQCONSUMER_TOPIC_SET:" << G_MQCONSUMER_TOPIC_SET << std::endl; std::cout << "Read G_MQCONSUMER_TAG_SET:" << G_MQCONSUMER_TAG_SET << std::endl; std::cout << "Read G_MQCONSUMER_KEY_SET:" << G_MQCONSUMER_KEY_SET << std::endl; + std::cout << "Read G_MQCONSUMER_TOPIC_LOG:" << G_MQCONSUMER_TOPIC_LOG << std::endl; + std::cout << "Read G_MQCONSUMER_TAG_LOG:" << G_MQCONSUMER_TAG_LOG << std::endl; + std::cout << "Read G_MQCONSUMER_KEY_LOG:" << G_MQCONSUMER_KEY_LOG << std::endl; //Mq测试相关打印 std::cout << "Read G_TEST_FLAG:" << G_TEST_FLAG << std::endl; std::cout << "Read G_TEST_NUM:" << G_TEST_NUM << std::endl; @@ -15137,6 +15162,115 @@ void rocketmq_test_300(int mpnum,int front_index) { std::cout << "Finished sending " << total_messages << " messages." << std::endl; } +///////////////////////////////////////////////////////////////////////////////lnk实时日志部分20250205 +// 全局列表和开关 +std::list errorList; +std::list warnList; +std::list normalList; + +// 定义全局互斥锁 +pthread_mutex_t errorListMutex = PTHREAD_MUTEX_INITIALIZER; +pthread_mutex_t warnListMutex = PTHREAD_MUTEX_INITIALIZER; +pthread_mutex_t normalListMutex = PTHREAD_MUTEX_INITIALIZER; + +bool errorOutputEnabled = false; +bool warnOutputEnabled = false; +bool normalOutputEnabled = false; + +// 自定义的流缓冲区 +class RedirectStreamBuf : public std::streambuf { + public: + RedirectStreamBuf(std::list& targetList, pthread_mutex_t& targetMutex) + : targetList(targetList), targetMutex(targetMutex) {} + + protected: + virtual int_type overflow(int_type ch) override { + if (ch != EOF) { + char c = static_cast(ch); + + // 加锁保护 targetList + pthread_mutex_lock(&targetMutex); + targetList.push_back(std::string(1, c)); + pthread_mutex_unlock(&targetMutex); + } + return ch; + } + + private: + std::list& targetList; + pthread_mutex_t& targetMutex; +}; + +// 自定义错误输出 +void redirectErrorOutput(bool enabled) { + errorOutputEnabled = enabled; + if (enabled) { + static RedirectStreamBuf errorBuf(errorList, errorListMutex); + std::cerr.rdbuf(&errorBuf); + } else { + std::cerr.rdbuf(nullptr); // 恢复到标准错误流 + } +} + +// 自定义告警输出 +void redirectWarnOutput(bool enabled) { + warnOutputEnabled = enabled; + if (enabled) { + static RedirectStreamBuf warnBuf(warnList, warnListMutex); + std::clog.rdbuf(&warnBuf); + } else { + std::clog.rdbuf(nullptr); // 恢复到标准告警流 + } +} + +// 自定义普通输出 +void redirectNormalOutput(bool enabled) { + normalOutputEnabled = enabled; + if (enabled) { + static RedirectStreamBuf normalBuf(normalList, normalListMutex); + std::cout.rdbuf(&normalBuf); + } else { + std::cout.rdbuf(nullptr); // 恢复到标准输出流 + } +} +// 自定义 printf 输出 +int customPrintf(const char* format, ...) { + va_list args; + va_start(args, format); + char buffer[1024]; + vsnprintf(buffer, sizeof(buffer), format, args); + va_end(args); + + // 如果所有开关都没开启,使用原生 printf 输出 + if (!errorOutputEnabled && !warnOutputEnabled && !normalOutputEnabled) { + vprintf(format, args); + return 0; // 返回值为已打印字符数 + } + + // 根据启用的开关将输出添加到对应的列表中 + if (errorOutputEnabled) { + pthread_mutex_lock(&errorListMutex); // 锁定 errorList + errorList.push_back(buffer); + pthread_mutex_unlock(&errorListMutex); // 解锁 errorList + } + + if (warnOutputEnabled) { + pthread_mutex_lock(&warnListMutex); // 锁定 warnList + warnList.push_back(buffer); + pthread_mutex_unlock(&warnListMutex); // 解锁 warnList + } + + if (normalOutputEnabled) { + pthread_mutex_lock(&normalListMutex); // 锁定 normalList + normalList.push_back(buffer); + pthread_mutex_unlock(&normalListMutex); // 解锁 normalList + } + + return 0; // 返回值为已打印字符数 +} + +/////////////////////////////////////////////////////////////////////////////// + diff --git a/cfg_parse/custom_printf.h b/cfg_parse/custom_printf.h new file mode 100644 index 0000000..26862e0 --- /dev/null +++ b/cfg_parse/custom_printf.h @@ -0,0 +1,26 @@ +// custom_printf.h +#ifndef CUSTOM_PRINTF_H +#define CUSTOM_PRINTF_H + +#include +#include +#include +#include + +// 鍋囪杩欎簺鏄綘绠$悊杈撳嚭鐨勫垪琛 +extern std::list errorList; +extern std::list warnList; +extern std::list normalList; + +// 寮鍏 +extern bool errorOutputEnabled; +extern bool warnOutputEnabled; +extern bool normalOutputEnabled; + +// 鑷畾涔夌殑 printf 鍑芥暟 +int customPrintf(const char* format, ...); + +// 浣跨敤瀹忓皢 printf 鏇挎崲涓 customPrintf +#define printf customPrintf + +#endif // CUSTOM_PRINTF_H diff --git a/json/save2json.cpp b/json/save2json.cpp index e037837..1d37414 100644 --- a/json/save2json.cpp +++ b/json/save2json.cpp @@ -118,6 +118,9 @@ extern std::string G_MQCONSUMER_KEY_RC;//key extern std::string G_MQCONSUMER_TOPIC_SET;//topie_recall extern std::string G_MQCONSUMER_TAG_SET;//tag extern std::string G_MQCONSUMER_KEY_SET;//key +extern std::string G_MQCONSUMER_TOPIC_LOGSET;//topie_log +extern std::string G_MQCONSUMER_TAG_LOGSET;//tag +extern std::string G_MQCONSUMER_KEY_LOGSET;//key #define APRTIME_8H (28800000000ULL) #define APRTIME_1H (3600000000ULL) @@ -127,11 +130,8 @@ const int MAX_LIST_SIZE = 16; static QMap > real_data_report_map; static QMap json_data_map;//CZY 2023-08-17 ww 2023年3月13日17:23:17扩展Map,用于保存各条线路的数据 static QMap json_flicker_data_map;//CZY 2023-09-11 展Map,用于保存各条线路的闪变数据 - static QMap json_pst_data_map;//CZY 2023-09-11 展Map,用于保存各条线路的闪变数据 - - int urcbRealDataHasReceived(int dev_index, LD_info_t* LD_info, long long Time) { QList& ts_list = real_data_report_map[LD_info->line_id]; @@ -584,6 +584,21 @@ void KafkaSendThread::run() printf("END my_kafka_send no.%i -------->>>>>>>>>>>> %s \n\n", count++, QDateTime::currentDateTime().toString("yyyy-MM-dd hh:mm:ss.zzz").toAscii().data()); } + + //lnk20250225添加日志上送 + Ckafka_data_t log_send; + log_send.strTopic + bool log_gotten; + + log_gotten = false; + pthread_mutex_lock(&targetMutex); + if (!kafka_data_list.isEmpty()) { + data_gotten = true; + log_send = kafka_data_list.takeFirst(); + } + kafka_data_list_mutex.unlock(); + + if (log_gotten && ) { /*if (data_gotten) { LD_info_t* LD_info = find_LD_info_only_from_mp_id(data.mp_id.toAscii().data()); @@ -1358,6 +1373,131 @@ int StringToInt(const std::string& str) { return number; } +// 解析 JSON 字符串并执行相应操作 +void parse_log(const std::string& json_str, const std::string& output_dir) { + // 解析 JSON 字符串 + cJSON* root = cJSON_Parse(json_str.c_str()); + if (root == nullptr) { + std::cout << "Error parsing JSON." << std::endl; + return; + } + + // 提取 "messageBody" 部分 + cJSON* messageJson = cJSON_GetObjectItem(root, "messageBody"); + if (messageJson == NULL || messageJson->type != cJSON_String) { + std::cerr << "'messageJson' is missing or is not an cJSON_String" << std::endl; + cJSON_Delete(root); + return ; + } + + // 解析 messageBody 中的 JSON 字符串 + const char* messageBodyStr = messageJson->valuestring; + if (messageBodyStr == nullptr || strlen(messageBodyStr) == 0) { + std::cerr << "Failed to parse 'messageBody' JSON or it's empty." << std::endl; + cJSON_Delete(root); + return; + } + + cJSON* messageBody = cJSON_Parse(messageBodyStr); // 解析 messageBody 字符串 + if (messageBody == NULL) { + std::cerr << "Failed to parse 'messageBody' JSON." << std::endl; + cJSON_Delete(root); + return ; + } + + // 获取 code 字段 + cJSON* code = cJSON_GetObjectItem(messageBody, "code"); + if (code == nullptr) { + std::cout << "Missing 'code' in JSON." << std::endl; + cJSON_Delete(root); + return; + } + + cJSON* index = cJSON_GetObjectItem(messageBody, "index"); + if (index == nullptr) { + std::cout << "Missing 'index' in JSON." << std::endl; + cJSON_Delete(root); + return; + } + + //判断是不是自己进程号: + int index_value = index->valueint; + //string index_value_str = index->valuestring; + //int index_value = StringToInt(index_value_str); + + + //进程号为0的进程处理所有台账更新消息 + if (index_value != g_front_seg_index) { + std::cout << "msg index:"<< index_value <<"doesnt match self index:" << g_front_seg_index << std::endl; + cJSON_Delete(root); + return; + } + + //进程号匹配上 + std::cout << "msg index:"<< index_value <<" self index:" << g_front_seg_index << std::endl; + + // 根据 code 字段值执行不同的解析逻辑 + std::string code_str = code->valuestring; + + if (code_str == "set_log") { + + // 解析 set_process + cJSON* data = cJSON_GetObjectItem(messageBody, "data"); + if (data != nullptr && data->type == cJSON_Array) { + int data_size = cJSON_GetArraySize(data); + for (int i = 0; i < data_size; i++) { + cJSON* item = cJSON_GetArrayItem(data, i); + std::string fun = cJSON_GetObjectItem(item, "fun")->valuestring; + std::string level = cJSON_GetObjectItem(item, "level")->valuestring; + std::string frontType = cJSON_GetObjectItem(item, "frontType")->valuestring; + //校验数据 + if(frontType == subdir){ + if(fun == "open"){ + if (code_str == "ERROR"){ + // 启用错误输出 + redirectErrorOutput(true); + } + else if (code_str == "WARN"){ + // 启用告警输出 + redirectWarnOutput(true); + } + else if (code_str == "NORMAL"){ + // 启用普通输出 + redirectNormalOutput(true); + } + else{ + std::cout << "code_str error" <//lnk20241022 +#include "../cfgparse/custom_printf.h"//lnk20250225 #define LOG_IDX (0) #define RPT_IDX (1) diff --git a/mms/rdb_client.c b/mms/rdb_client.c index dc2c858..0e062c5 100644 --- a/mms/rdb_client.c +++ b/mms/rdb_client.c @@ -401,7 +401,7 @@ static void* APR_THREAD_FUNC rtdb_worker(apr_thread_t* thd, void* data) /* Maintenance the clients request */ while (1) { - pthread_mutex_lock(&mtx); printf("work hold lock !!!!!!!!!!!"); + //pthread_mutex_lock(&mtx); printf("work hold lock !!!!!!!!!!!"); //调试用 printf("check error4 !!!!!!!!!!!!!!\n"); @@ -421,6 +421,8 @@ static void* APR_THREAD_FUNC rtdb_worker(apr_thread_t* thd, void* data) //check_recall_config();//补召进程读取补召消息 create_recall_xml();//生成待补招xml文件 + pthread_mutex_lock(&mtx); printf("work hold lock !!!!!!!!!!!"); + check_ledger_update();//lnk20250113读取台账更新,触发台账更新 pthread_mutex_unlock(&mtx); printf("work free lock !!!!!!!!!!!");