From d1242aae7a43dd1a88c55c6bc3dc9e45384f5242 Mon Sep 17 00:00:00 2001 From: lnk Date: Tue, 11 Mar 2025 21:07:17 +0800 Subject: [PATCH] fix soe and add connect info --- cfg_parse/cfg_parser.cpp | 12 +++++++++ json/create_json.cpp | 23 ++++++++++++++--- json/mms_json_inter.h | 2 +- mms/mms_process.c | 8 +++--- mms/mmscli_log.c | 7 +++-- mms/mmscli_rpt.c | 7 +++++ mms/rdb_client.h | 4 +-- mms/rdb_ext_utils.c | 56 ++++++++++++++++++++++++++++++++++------ 8 files changed, 99 insertions(+), 20 deletions(-) diff --git a/cfg_parse/cfg_parser.cpp b/cfg_parse/cfg_parser.cpp index 78631c6..41e14f9 100644 --- a/cfg_parse/cfg_parser.cpp +++ b/cfg_parse/cfg_parser.cpp @@ -319,6 +319,9 @@ std::string G_LOG_KEY = "";//key std::string G_MQCONSUMER_TOPIC_LOG = "";//consumer topie std::string G_MQCONSUMER_TAG_LOG = "";//consumer tag std::string G_MQCONSUMER_KEY_LOG = "";//consumer key +std::string G_CONNECT_TOPIC = "";//consumer topie +std::string G_CONNECT_TAG = "";//consumer tag +std::string G_CONNECT_KEY = "";//consumer key int G_TEST_FLAG = 0; int G_TEST_NUM = 0; @@ -917,6 +920,12 @@ void init_config() { G_LOG_TAG = strdup(ba.data()); ba = settings.value("RocketMq/LOGKey", "").toString().toLatin1(); G_LOG_KEY = strdup(ba.data()); + ba = settings.value("RocketMq/CONNECTTopic", "").toString().toLatin1(); + G_CONNECT_TOPIC = strdup(ba.data()); + ba = settings.value("RocketMq/CONNECTTag", "").toString().toLatin1(); + G_CONNECT_TAG = strdup(ba.data()); + ba = settings.value("RocketMq/CONNECTKey", "").toString().toLatin1(); + G_CONNECT_KEY = strdup(ba.data()); //MQ测试 G_TEST_FLAG = settings.value("RocketMq/Testflag", 0).toInt(); G_TEST_NUM = settings.value("RocketMq/Testnum", 0).toInt(); @@ -933,6 +942,9 @@ void init_config() { std::cout << "Read G_LOG_TOPIC:" << G_LOG_TOPIC << std::endl; std::cout << "Read G_LOG_TAG:" << G_LOG_TAG << std::endl; std::cout << "Read G_LOG_KEY:" << G_LOG_KEY << std::endl; + std::cout << "Read G_CONNECT_TOPIC:" << G_CONNECT_TOPIC << std::endl; + std::cout << "Read G_CONNECT_TAG:" << G_CONNECT_TAG << std::endl; + std::cout << "Read G_CONNECT_KEY:" << G_CONNECT_KEY << std::endl; //消费者相关打印 std::cout << "Read G_ROCKETMQ_CONSUMER:" << G_ROCKETMQ_CONSUMER << std::endl; std::cout << "Read G_MQCONSUMER_IPPORT:" << G_MQCONSUMER_IPPORT << std::endl; diff --git a/json/create_json.cpp b/json/create_json.cpp index 22f59dc..dbc8183 100644 --- a/json/create_json.cpp +++ b/json/create_json.cpp @@ -29,6 +29,7 @@ extern std::string WEB_INTEGRITY; extern std::string WEB_COMFLAG; extern std::string WEB_EVENT; extern std::string WEB_FILEDOWNLOAD; +extern std::string G_CONNECT_TOPIC; /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// using namespace std; @@ -4003,9 +4004,17 @@ void connectlog_pgsql(char* id,char* datetime,int status) //std::cout << "jsonString: " << jsonString << std::endl;////减少多余的打印 //发送数据到远端 - SendJsonAPI_web(WEB_COMFLAG, "", jsonString,&ptr);//不使用接口lnk20250310 - //使用队列 - //if()//稳态才上传 + //SendJsonAPI_web(WEB_COMFLAG, "", jsonString,&ptr);//不使用接口lnk20250310 + //使用mq + Ckafka_data_t connect_info; + connect_info.strTopic = QString::fromStdString(G_CONNECT_TOPIC); + connect_info.strText = QString::fromStdString(std::string(jsonString)); + + if(g_node_id == STAT_DATA_BASE_NODE_ID){//稳态才上传 + kafka_data_list_mutex.lock(); //加锁 + kafka_data_list.append(connect_info); //添加 kafka发送链表 + kafka_data_list_mutex.unlock(); //解锁 + } //处理返回的数据 @@ -4199,7 +4208,7 @@ QString errorlog_datamatch_pgsql(QString id, QString time, int BASE_MAT_NUM,int #endif //lnk202411-5 暂态数据不再使用kafka发送,改成http接口 int transfer_json_qvvr_data(unsigned int func_type, int monitor_id, -float mag, float dur, long long start_tm, long long end_tm, int dis_kind, //伏值,持续时间,开始时间,结束时间,暂态类型 +double mag, double dur, long long start_tm, long long end_tm, int dis_kind, //伏值,持续时间,开始时间,结束时间,暂态类型 char* uuid_cfg,char* uuid_dat, //两个录波文件 char* mp_id,char* Qvvr_rptname,char* devtype) //监测点号,暂态报告名,设备类型 { @@ -4222,6 +4231,12 @@ char* mp_id,char* Qvvr_rptname,char* devtype) // // 初始化 cJSON 根对象 cJSON* root = cJSON_CreateObject(); // 拼接 JSON 数据 + + //调试用 + //printf("Inside function: &mag = %p, &dur = %p\n", &mag, &dur); + //printf("~~~~~~~send QVVR_PerTime after record is %f~~~~~~~~~~ \n",dur); + //printf("~~~~~~~send QVVR_Amg after record is %f~~~~~~~~~~ \n",mag); + //不需要数据类型,因为是http接口不是数据队列 cJSON_AddStringToObject(root, "monitorId", mp_id); cJSON_AddNumberToObject(root, "amplitude", mag); diff --git a/json/mms_json_inter.h b/json/mms_json_inter.h index c973f46..0920a64 100644 --- a/json/mms_json_inter.h +++ b/json/mms_json_inter.h @@ -123,7 +123,7 @@ QString errorlog_datamatch_pgsql(QString id, QString time, int BASE_MAT_NUM, int extern "C" { #endif /* __cplusplus */ -int transfer_json_qvvr_data(unsigned int func_type,int monitor_id,float mag,float dur,long long start_tm,long long end_tm,int dis_kind, char* uuid_cfg, char* uuid_dat, char* mp_id, char* Qvvr_rptname, char* devtype); +int transfer_json_qvvr_data(unsigned int func_type,int monitor_id,double mag,double dur,long long start_tm,long long end_tm,int dis_kind, char* uuid_cfg, char* uuid_dat, char* mp_id, char* Qvvr_rptname, char* devtype); void processGGIO_start_data_end(char* mp_id, char* fullname, double v,long long time,char* devtype,int monitor_id); //void errorlog_num(CDataValue* pDataValue, double value, int monitorId, double UL, int maxcount); //void errorlog_num_json(int monitorId); diff --git a/mms/mms_process.c b/mms/mms_process.c index 3f46eb7..296fd41 100644 --- a/mms/mms_process.c +++ b/mms/mms_process.c @@ -2063,7 +2063,7 @@ apr_status_t call_cn_wavelist(LD_info_t *LD_info ) if (cfg_result != NULL && dat_result != NULL) { snprintf(cfg_result, cfg_len, "%s-%s", uuid_cfg, filename_cfg); // 拼接字符串并确保不会溢出目标缓冲区 snprintf(dat_result, dat_len, "%s-%s", uuid_dat, filename_dat); // 拼接字符串并确保不会溢出目标缓冲区 - ret3 = transfer_json_qvvr_data(g_node_id, LD_info->line_id, qvvr->QVVR_Amg, qvvr->QVVR_PerTime, start_tm, end_tm, qvvr->QVVR_type, cfg_result, dat_result, LD_info->mp_id, qvvr->QVVR_Rptname, ied_usr->dev_type); + ret3 = transfer_json_qvvr_data(g_node_id, LD_info->line_id, (double)qvvr->QVVR_Amg, (double)qvvr->QVVR_PerTime, start_tm, end_tm, qvvr->QVVR_type, cfg_result, dat_result, LD_info->mp_id, qvvr->QVVR_Rptname, ied_usr->dev_type); } free(cfg_result); // 使用完毕后释放动态分配的内存空间 free(dat_result); // 使用完毕后释放动态分配的内存空间 @@ -2081,7 +2081,9 @@ apr_status_t call_cn_wavelist(LD_info_t *LD_info ) snprintf(dat_result, dat_len, wavepath_dat); // 拼接字符串并确保不会溢出目标缓冲区 ret3 = transfer_json_qvvr_data(g_node_id, //这个参数没有使用 LD_info->line_id, //监测点序号 - qvvr->QVVR_Amg, qvvr->QVVR_PerTime, start_tm, end_tm, qvvr->QVVR_type, //伏值、持续时间、开始时间、结束时间、暂态类型 + (double)qvvr->QVVR_Amg, (double)qvvr->QVVR_PerTime, + qvvr->QVVR_time, //这里不使用文件中的开始时间start_tm,因为会影响写库,使用QVVR的时间QVVR_time//lnk20250311 + end_tm, qvvr->QVVR_type, //伏值、持续时间、开始时间、结束时间、暂态类型 cfg_result, dat_result, //两个文件路径 LD_info->mp_id, qvvr->QVVR_Rptname, ied_usr->dev_type);//监测点号,文件和监测点暂态事件匹配上的暂态报告名,终端类型 } @@ -2097,7 +2099,7 @@ apr_status_t call_cn_wavelist(LD_info_t *LD_info ) if (cfg_result != NULL && dat_result != NULL) { snprintf(cfg_result, cfg_len, "%s-%s", oss_file_fullname_cfg, cfg_only_filename_ret); // 拼接字符串并确保不会溢出目标缓冲区 snprintf(dat_result, dat_len, "%s-%s", oss_file_fullname_dat, dat_only_filename_ret); // 拼接字符串并确保不会溢出目标缓冲区 - ret3 = transfer_json_qvvr_data(g_node_id, LD_info->line_id, qvvr->QVVR_Amg, qvvr->QVVR_PerTime, start_tm, end_tm, qvvr->QVVR_type, cfg_result, dat_result, LD_info->mp_id, qvvr->QVVR_Rptname, ied_usr->dev_type); + ret3 = transfer_json_qvvr_data(g_node_id, LD_info->line_id, (double)qvvr->QVVR_Amg, (double)qvvr->QVVR_PerTime, start_tm, end_tm, qvvr->QVVR_type, cfg_result, dat_result, LD_info->mp_id, qvvr->QVVR_Rptname, ied_usr->dev_type); } free(cfg_result); // 使用完毕后释放动态分配的内存空间 free(dat_result); // 使用完毕后释放动态分配的内存空间 diff --git a/mms/mmscli_log.c b/mms/mmscli_log.c index dd74e08..b162fcb 100644 --- a/mms/mmscli_log.c +++ b/mms/mmscli_log.c @@ -200,6 +200,9 @@ static ST_RET process_jou_entry(loginfo_t *loginfo,apr_time_t t, v = mms_dec_data.data_item[ii].u.data_double; apr_snprintf(mms_ref, sizeof(mms_ref), "%s$%s", do_name, mms_dec_data.data_item[ii].comp_name); + //调试用 + printf("u.data_str: %s (as hex: 0x%X)\n", mms_dec_data.data_item[ii].u.data_str, *((ST_UINT16*)mms_dec_data.data_item[ii].u.data_str)); + if (strstr(mms_ref, "QVVR")) { log_data_type = QVVR_DATA; timeflag = FALSE; @@ -279,7 +282,7 @@ static ST_RET process_jou_entry(loginfo_t *loginfo,apr_time_t t, } } if (readtime == 1 && readquailty == 1) { - if (quality == 0) {//调试,要改回1 + if (0) {//调试,要改回quality == 0 timeflag = TRUE; //调试用lnk20250307 printf("readtime == 1 && readquailty = 1 && quality == 1 continue"); @@ -393,7 +396,7 @@ static ST_RET process_jou_entry(loginfo_t *loginfo,apr_time_t t, printf("quality = 1"); quality = 1; } - if (quality == 0) {//调试用,要改回1 + if (0) {//调试用,要改回quality == 0 printf("quality = 1 continue"); continue; } diff --git a/mms/mmscli_rpt.c b/mms/mmscli_rpt.c index f07d202..9319d3d 100644 --- a/mms/mmscli_rpt.c +++ b/mms/mmscli_rpt.c @@ -2202,6 +2202,9 @@ static ST_RET myLocToTextBs(ST_UCHAR* pSrc, RUNTIME_TYPE* rt, ST_CHAR* text) ST_CHAR* destBuf; ST_UCHAR mask; + //调试用 + //printf("pSrc[0]: 0x%X, pSrc[1]: 0x%X, text: %s\n", pSrc[0], pSrc[1], text); + //printf("%s text_len %d %d pSrc %d %d \n", text, strlen(text), rt->u.p.el_len, pSrc[0], pSrc[1]); text[0] = 0; @@ -2243,6 +2246,10 @@ static ST_RET myLocToTextBs(ST_UCHAR* pSrc, RUNTIME_TYPE* rt, ST_CHAR* text) } } destBuf[i] = 0; + + //调试用 + //printf("pSrc[0]: 0x%X, pSrc[1]: 0x%X, text: %s\n", pSrc[0], pSrc[1], text); + return (SD_SUCCESS); } /************************************************************************/ diff --git a/mms/rdb_client.h b/mms/rdb_client.h index 604e4bf..6c81d56 100644 --- a/mms/rdb_client.h +++ b/mms/rdb_client.h @@ -232,8 +232,8 @@ struct QVVR_t{ int QVVR_start; int QVVR_type; long long QVVR_time; - float QVVR_PerTime; - float QVVR_Amg; + double QVVR_PerTime; + double QVVR_Amg; char QVVR_Rptname[128]; uint32_t timestamp; diff --git a/mms/rdb_ext_utils.c b/mms/rdb_ext_utils.c index 4e5017b..b38ad31 100644 --- a/mms/rdb_ext_utils.c +++ b/mms/rdb_ext_utils.c @@ -516,7 +516,7 @@ void processQVVR_end(LD_info_t* LD_info) ied_t *ied = LD_info->ied; ied_usr_t *ied_usr = GET_IEDEXT_ADDR(ied); int ret; - long long end_tm = (long long)(LD_info->qvvr[LD_info->qvvr_idx].QVVR_PerTime) + LD_info->qvvr[LD_info->qvvr_idx].QVVR_time; //结束时间是持续时间加触发时间,毫秒 + //////////////////////////////////////////////////////////////////////////////////////////////////////////// int find_paired = FALSE; @@ -526,16 +526,41 @@ void processQVVR_end(LD_info_t* LD_info) return; for (i=0;iqvvr_idx) //跳过监测点当前波动位置 + if (i==LD_info->qvvr_idx) //跳过监测点当前波动位置,第一次记录波形后,第一次的第三个事件会让0号点为QVVR_DATA_RECEIVED, + //第二次记录波形,第二次的第一个事件qvvr_idx=1,1号中为QVVR_DATA_RECEIVED,QVVR_start=1,跳过记录, + //第二次的第二个事件qvvr_idx=1,QVVR_start=0,记录到0中,0为QVVR_DATA_PAIRED + //第二次的第三个事件qvvr_idx=2,2号中为QVVR_DATA_RECEIVED,QVVR_start=1,跳过记录 + //第二次录波匹配文件时应该匹配到1号的qvvrtime,但是1没有QVVR_DATA_PAIRED所以对不上,需要修改逻辑 continue; if (LD_info->qvvr[i].used_status != QVVR_DATA_RECEIVED)//跳过没有收到波动的位置 continue; - //其他某位置的波动也没有定义类型/这个位置的波动类型和当前位置的波动类型一致,暂降事件一次会发三个报告 + + //添加时间判断lnk20250311 + if (LD_info->qvvr[LD_info->qvvr_idx].QVVR_time - LD_info->qvvr[i].QVVR_time >110) + continue; + + + //其他某位置的波动也没有定义类型/这个位置的波动类型和当前位置的波动类型一致,暂降事件一次会发三个报告,启动和上值和结束 if ( (LD_info->qvvr[i].QVVR_type==0)||(LD_info->qvvr[i].QVVR_type==LD_info->qvvr[LD_info->qvvr_idx].QVVR_type) ) { + long long end_tm = (long long)(LD_info->qvvr[LD_info->qvvr_idx].QVVR_PerTime*1000) + LD_info->qvvr[i].QVVR_time;//结束时间是持续时间加最初的触发时间,毫秒 //调试用 - printf("\n~~~~~~~this qvvr node type before record is %d~~~~~~~~~~ \n",LD_info->qvvr[i].QVVR_type); + //printf("~~~~~~~this qvvr node type before record is %d~~~~~~~~~~ \n",LD_info->qvvr[i].QVVR_type); + //printf("~~~~~~~this qvvr node QVVR_PerTime before record is %f~~~~~~~~~~ \n",LD_info->qvvr[i].QVVR_PerTime); + //printf("~~~~~~~this qvvr node QVVR_Amg before record is %f~~~~~~~~~~ \n",LD_info->qvvr[i].QVVR_Amg); + //printf("~~~~~~~this qvvr node QVVR_time before record is %lld~~~~~~~~~~ \n",LD_info->qvvr[i].QVVR_time); + //printf("~~~~~~~this qvvr node used_status before record is %d~~~~~~~~~~ \n",LD_info->qvvr[i].used_status); + //printf("~~~~~~~this qvvr node QVVR_start before record is %d~~~~~~~~~~ \n",LD_info->qvvr[i].QVVR_start); + //printf("~~~~~~~this qvvr node timestamp before record is %d~~~~~~~~~~ \n",LD_info->qvvr[i].timestamp); + + printf("\n~~~~~~~now qvvr node type before record is %d~~~~~~~~~~ \n",LD_info->qvvr[LD_info->qvvr_idx].QVVR_type); + printf("~~~~~~~now qvvr node QVVR_PerTime before record is %f~~~~~~~~~~ \n",LD_info->qvvr[LD_info->qvvr_idx].QVVR_PerTime); + printf("~~~~~~~now qvvr node QVVR_Amg before record is %f~~~~~~~~~~ \n",LD_info->qvvr[LD_info->qvvr_idx].QVVR_Amg); + printf("~~~~~~~now qvvr node QVVR_time before record is %lld~~~~~~~~~~ \n",LD_info->qvvr[LD_info->qvvr_idx].QVVR_time); + printf("~~~~~~~now qvvr node used_status before record is %d~~~~~~~~~~ \n",LD_info->qvvr[LD_info->qvvr_idx].used_status); + printf("~~~~~~~now qvvr node QVVR_start before record is %d~~~~~~~~~~ \n",LD_info->qvvr[LD_info->qvvr_idx].QVVR_start); + printf("~~~~~~~now qvvr node timestamp before record is %d~~~~~~~~~~ \n",LD_info->qvvr[LD_info->qvvr_idx].timestamp); LD_info->qvvr[i].used_status = QVVR_DATA_PAIRED; //匹配上了 LD_info->qvvr[i].QVVR_type = LD_info->qvvr[LD_info->qvvr_idx].QVVR_type; @@ -545,13 +570,28 @@ void processQVVR_end(LD_info_t* LD_info) LD_info->qvvr[LD_info->qvvr_idx].used_status = QVVR_DATA_NOT_USED; //当前这个点位置释放 find_paired = TRUE; - //调试用 - printf("\n~~~~~~~qvvr node %d status is pair~~~~~~~~~~ \n",i); + printf("~~~~~~~this qvvr node QVVR_PerTime after record is %f~~~~~~~~~~ \n",LD_info->qvvr[i].QVVR_PerTime); + printf("~~~~~~~this qvvr node QVVR_Amg after record is %f~~~~~~~~~~ \n",LD_info->qvvr[i].QVVR_Amg); + printf("~~~~~~~this qvvr node used_status after record is %d~~~~~~~~~~ \n",LD_info->qvvr[i].used_status); + printf("~~~~~~~this qvvr node QVVR_type after record is %d~~~~~~~~~~ \n",LD_info->qvvr[i].QVVR_type); + printf("~~~~~~~this qvvr node QVVR_time after record is %lld~~~~~~~~~~ \n",LD_info->qvvr[i].QVVR_time); + printf("~~~~~~~this qvvr node timestamp before record is %d~~~~~~~~~~ \n",LD_info->qvvr[i].timestamp); + printf("~~~~~~~this qvvr node QVVR_start before record is %d~~~~~~~~~~ \n",LD_info->qvvr[i].QVVR_start); - //匹配后再发qvvr + //调试用 + //printf("\n~~~~~~~qvvr node %d status is pair~~~~~~~~~~ \n",i); + //printf("Before calling: &QVVR_Amg = %p, &QVVR_PerTime = %p\n", + //&LD_info->qvvr[LD_info->qvvr_idx].QVVR_Amg, + //&LD_info->qvvr[LD_info->qvvr_idx].QVVR_PerTime); + + //匹配后再发qvvr,起始时间要填暂态触发的时间,就是第一次事件上送时只有时间没有值的那个时间 ret = transfer_json_qvvr_data(g_node_id, //这个参数没有使用 LD_info->line_id, //监测点序号 - LD_info->qvvr[LD_info->qvvr_idx].QVVR_Amg, LD_info->qvvr[LD_info->qvvr_idx].QVVR_PerTime, LD_info->qvvr[LD_info->qvvr_idx].QVVR_time, end_tm, LD_info->qvvr[LD_info->qvvr_idx].QVVR_type, //伏值、持续时间、开始时间、结束时间、暂态类型 + (double)LD_info->qvvr[LD_info->qvvr_idx].QVVR_Amg, + (double)LD_info->qvvr[LD_info->qvvr_idx].QVVR_PerTime, + LD_info->qvvr[i].QVVR_time, + end_tm, + LD_info->qvvr[LD_info->qvvr_idx].QVVR_type, //伏值、持续时间、开始时间、结束时间、暂态类型 "", "", //两个文件路径为空 LD_info->mp_id, LD_info->qvvr[LD_info->qvvr_idx].QVVR_Rptname, ied_usr->dev_type);//监测点号,文件和监测点暂态事件匹配上的暂态报告名,终端类型 if(!ret)//失败