Files
microser/json/save2json.cpp
2025-02-24 16:45:42 +08:00

2661 lines
81 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* @file: $RCSfile: save2json.cpp,v $
* @brief: $IEC 61850 Protocol
*
* @version: $Revision: 1.21 $
* @date: $Date: 2020/10/27 08:51:07 $
* @author: $Author: lizhongming $
* @state: $State: Exp $
*
* @latest: $Id: save2json.cpp,v 1.21 2020/10/27 08:51:07 lizhongming Exp $
*
*/
using namespace std;
#include <iostream>
#include <stdio.h>
#include <assert.h>
#include <fstream> // std::filebuf
#include <string.h>
#include <sstream>
#include "qdebug.h"
#include <QSettings>
#include <QDateTime>
#include <QDir>
#include <QMap>//CZY 2023-08-17 WW 2023年3月13日17:21:02 增加多ICD支持
#include <apr_uuid.h>
#include <apr_strings.h>
#include "../mms/db_interface.h"
#include "../json/save2json.h"
#include "../json/mms_json_inter.h"
#include "kafka_producer.h"
/*lnk10-11 */
//#include "../include/rocketmq/SimpleProducer.h"
#include "../include/rocketmq/CPushConsumer.h"
#include <vector>
#include "../json/cjson.h" //解json
#include <sstream> //创建xml
#include <fstream> //创建xml
bool createXmlFile(int devindex, int mpindex, bool realData, bool soeData, int limit,std::string type);
extern int recall_json_handle(const char* jstr);
extern std::string intToString(int number);
//extern int stringToInt(const char* str, int* result);
int StringToInt(const std::string& str);
extern pthread_mutex_t mtx;//lnk20250115
#ifdef __cplusplus
extern "C" {
#include "../mms/rdb_client.h"
#include "node.h"//lnk20241223
#include "mvl_defs.h"
#include "mms_vvar.h"
#endif /* __cplusplus */
extern unsigned int g_node_id;
extern int g_front_seg_index;
extern char subdir[128];
extern int comtrade_remain_file_num;
extern node_t* g_node; //lnk20241223
extern LD_info_t* find_LD_info_only_from_mp_id(char* mp_id);//lnk20241223
extern void print_terminal(const terminal* tmnl);
#ifdef __cplusplus
}
#endif
#ifndef nullptr
#define nullptr NULL
#endif
extern QMutex kafka_data_list_mutex;
extern QList<Ckafka_data_t> kafka_data_list;
extern QMutex oss_data_list_mutex;
extern QList<oss_data_t> oss_data_list;
extern int FILE_FLAG;
KafkaSendThread myThrd;
//WW 2023-08-22 增加数据库线程和WebSokcet线程
SQLExcuteThread sqlThrd; //Sql执行线程类对象
WebSocketThread socketThrd; //Web Socket线程类对象
WebhttpThread webhttpThrd; //Web http线程类对象 lnk202411
httpThread httpThrd; //Web http线程类对象 lnk202411
//mqtestThread mqtestThrd; //mqtest线程 lnk202412
//mqtestThread mqtestThrd(nullptr); //mqtest线程 lnk202412
mqconsumerThread mqconsumerThrd;//mq消费者线程lnk20241213
OnTimerThread onTimerThrd;//定时线程
extern QMutex Sql_data_list_mutex; //Sql执行语句锁
extern QList<QString> Sql_data_list; //Sql执行语句链表
extern int g_iOTLFlag; //Sql是否执行标志(0-不执行1-执行)
extern int g_iSqlListSize; //Sql执行语句链表允许最大元素个数 注Sql链表中元素超过此个数多出元素需移除
extern int FILE_FLAG;
extern int SEND_FLAG;
extern char* BROKER_LIST;
extern char* TOPIC_STAT;
extern char* TOPIC_PST;
extern char* TOPIC_PLT;
extern char* TOPIC_EVENT;
extern char* TOPIC_ALARM;
extern char* TOPIC_SNG;
extern char* TOPIC_RTDATA;//lnk20241220
extern char* UDS_UPLOAD_URL;
extern char g_onlyIP[255]; //直连某个IP仅仅为方便测试
//WW 2023-08-22 end
//lnk20241216添加mq消费者
extern std::string G_MQCONSUMER_IPPORT;//rocketmq ip+port
extern std::string G_MQCONSUMER_TOPIC_RT;//topie_realtimedata
extern std::string G_MQCONSUMER_TAG_RT;//tag
extern std::string G_MQCONSUMER_KEY_RT;//key
extern std::string G_MQCONSUMER_TOPIC_UD;//topie_update
extern std::string G_MQCONSUMER_TAG_UD;//tag
extern std::string G_MQCONSUMER_KEY_UD;//key
extern std::string G_MQCONSUMER_TOPIC_RC;//topie_recall
extern std::string G_MQCONSUMER_TAG_RC;//tag
extern std::string G_MQCONSUMER_KEY_RC;//key
extern std::string G_MQCONSUMER_TOPIC_SET;//topie_recall
extern std::string G_MQCONSUMER_TAG_SET;//tag
extern std::string G_MQCONSUMER_KEY_SET;//key
#define APRTIME_8H (28800000000ULL)
#define APRTIME_1H (3600000000ULL)
///////////////////////////////////////////////////////////////////////////////
const int MAX_LIST_SIZE = 16;
static QMap<int, QList<long long> > real_data_report_map;
static QMap<QString, json_block_data*> json_data_map;//CZY 2023-08-17 ww 2023年3月13日17:23:17扩展Map用于保存各条线路的数据
static QMap<QString, json_block_data*> json_flicker_data_map;//CZY 2023-09-11 展Map用于保存各条线路的闪变数据
static QMap<QString, json_block_data*> json_pst_data_map;//CZY 2023-09-11 展Map用于保存各条线路的闪变数据
int urcbRealDataHasReceived(int dev_index, LD_info_t* LD_info, long long Time)
{
QList<long long>& ts_list = real_data_report_map[LD_info->line_id];
bool bFind = ts_list.contains(Time); //实时数据时间链表
if (bFind == false) {
ts_list.append(Time);
if (ts_list.size() > MAX_LIST_SIZE)
ts_list.removeFirst();
//lnk20241223每收到一次实时数据就检查一下数量
int real_report_count = 0;
real_report_count = get_real_report_count(LD_info);
//调试
std::cout << "real_report_count is" << real_report_count << std::endl;
std::cout << "mp limit is" << LD_info->limit << std::endl;
if(real_report_count >= LD_info->limit){
std::cout << "real_report_count reach limit!!!"<< std::endl;
//生成delete.xml
if (!createXmlFile(dev_index, LD_info->line_id, 0, 0, 0,"delete")) {
std::cerr << "Failed to create delete XML file!!!." << std::endl;
}
}
return 0; //没有重复数据
}
else
return 1; //有重复数据
}
//////////////////////////////////////////////////////////////////////////
void add_comm_log(char* log_str)
{
QDateTime now = QDateTime::currentDateTime();
QString level_str = QString("[info]");
QString head_str = QString("");
QString tail_str = QString("");
#ifdef __GNUC__
QString com_log_fn("/usr/local/saslog/");
#else
QString com_log_fn("../etc/log/");
#endif
if (g_node_id == STAT_DATA_BASE_NODE_ID)
com_log_fn += "comm_100_stat.txt";
else if (g_node_id == THREE_SECS_DATA_BASE_NODE_ID)
com_log_fn += "comm_200_3s.txt";
else if (g_node_id == SOE_COMTRADE_BASE_NODE_ID)
com_log_fn += "comm_300_comtrade.txt";
else if (g_node_id == HIS_DATA_BASE_NODE_ID)
com_log_fn += "comm_400_his.txt";
else if (g_node_id == NEW_HIS_DATA_BASE_NODE_ID) {
com_log_fn.append(QString("comm_400_his_%1.txt").arg(g_front_seg_index));
}
else if(g_node_id == RECALL_HIS_DATA_BASE_NODE_ID)
com_log_fn += "comm_600_recall.txt";
else if (g_node_id == RECALL_ALL_DATA_BASE_NODE_ID) {
com_log_fn.append(QString("comm_700_allrecall_%1.txt").arg(g_front_seg_index));
}
else
com_log_fn += "comm_x00_unknown.txt";
QFile file(com_log_fn);
if (!file.open(QIODevice::WriteOnly | QIODevice::Text | QIODevice::Append))
return;
QTextStream out(&file);
out << (now.toString("yyyy-MM-dd hh:mm:ss") + " " + level_str + " " + QString::fromAscii(log_str)) << endl;
}
void add_sng_log(char* log_str)
{
QDateTime now = QDateTime::currentDateTime();
QString level_str = QString("[info]");
QString head_str = QString("");
QString tail_str = QString("");
#ifdef __GNUC__
QString com_log_fn("/usr/local/saslog/");
#else
QString com_log_fn("../etc/log/");
#endif
com_log_fn += "sng_kafka_json.txt";
QFile file(com_log_fn);
if (!file.open(QIODevice::WriteOnly | QIODevice::Text | QIODevice::Append))
return;
QTextStream out(&file);
out << (now.toString("yyyy-MM-dd hh:mm:ss") + " " + level_str + " " + QString::fromAscii(log_str)) << endl;
}
void add_stat_kafka_json_log(char* log_str)
{
QDateTime now = QDateTime::currentDateTime();
QString level_str = QString("[info]");
QString head_str = QString("");
QString tail_str = QString("");
#ifdef __GNUC__
QString com_log_fn("/usr/local/saslog/");
#else
QString com_log_fn("../etc/log/");
#endif
com_log_fn += "stat_kafka_json.txt";
QFile file(com_log_fn);
if (!file.open(QIODevice::WriteOnly | QIODevice::Text | QIODevice::Append))
return;
QTextStream out(&file);
out << (now.toString("yyyy-MM-dd hh:mm:ss") + " " + level_str + " " + QString::fromAscii(log_str)) << endl;
}
////////////////////////////////////////////////////////////////////////////
/*void TrimLeft(std::string &s)
{
const std::string &space =" \f\n\t\r\v";
s.erase(0, s.find_first_not_of(space));
}
void TrimRight(std::string &s)
{
const std::string &space =" \f\n\t\r\v";
s.erase(s.find_last_not_of(space) + 1);
}
void Trim(std::string &s)
{
const std::string &space =" \f\n\t\r\v";
s.erase(0, s.find_first_not_of(space));
s.erase(s.find_last_not_of(space) + 1);
}
int is_rpt_Time_exact_hour()
{
apr_time_t hour_time_t = g_db_info->TimeID[RPT_IDX]/APRTIME_8H*APRTIME_8H;
if (g_db_info->TimeID[RPT_IDX]==hour_time_t)
return TRUE;
else
return FALSE;
} */
//char uuid_str[APR_UUID_FORMATTED_LENGTH+1];
//int iii;
//for (iii=0;iii<10;iii++) {
// apr_uuid_t uuid;
// apr_uuid_get(&uuid);
// apr_uuid_format(uuid_str,&uuid);
// printf("uuid_str=%s \n",uuid_str);
//}
//////////////////////////////////////////////////////////////////////////
/*新增rocketmq发送数据lnk10-10*/
void my_rocketmq_send(Ckafka_data_t& data)
{
static std::string topic;
static std::string cfg_His_tp;
static std::string cfg_PLT_tp;
static std::string cfg_PST_tp;
static std::string cfg_Evt_tp;
static std::string cfg_Alm_tp;
static std::string cfg_Rt_tp;
static bool init = false;
if (!init) {
cfg_His_tp = TOPIC_STAT;
cfg_PLT_tp = TOPIC_PLT;
cfg_PST_tp = TOPIC_PST;
cfg_Evt_tp = TOPIC_EVENT;
cfg_Alm_tp = TOPIC_ALARM;
cfg_Rt_tp = TOPIC_RTDATA;
init = true;
}
std::string key = data.mp_id.toStdString();
std::string senddata = data.strText.toStdString();
if (data.strTopic == "HISDATA")
{
topic = cfg_His_tp;
}
else if (data.strTopic == "PLT")
{
topic = cfg_PLT_tp;
}
else if (data.strTopic == "PST")
{
topic = cfg_PST_tp;
}
else if (data.strTopic == "Event")
{
topic = cfg_Evt_tp;
}
else if (data.strTopic == "Alm")
{
topic = cfg_Alm_tp;
}
else if (data.strTopic == "RTDATA")//lnk20241220
{
topic = cfg_Rt_tp;
}
else
{
topic = data.strTopic.toStdString();
}
if (g_onlyIP[0] != 0)
{
//单例模式
add_sng_log(data.strText.toAscii().data());
}
rocketmq_producer_send(const_cast<char*>(senddata.c_str()),const_cast<char*>(topic.c_str()));
//printf("\nrocketmq send, monitor_id:[%s] topic:[%s] Success\n", key.c_str(), topic.c_str());
}
void my_kafka_send(Ckafka_data_t& data)
{
#ifdef __GNUC__
static FeKafkaProducer kafkaProducer;
#endif
int retsize = -1;
static std::string topic;
static std::string cfg_His_tp;
static std::string cfg_PLT_tp;
static std::string cfg_PST_tp;
static std::string cfg_Evt_tp;
static std::string cfg_Alm_tp;
static std::string cfg_Sng_tp;
static bool init = false;
if (!init) {
cfg_His_tp = TOPIC_STAT;
cfg_PLT_tp = TOPIC_PLT;
cfg_PST_tp = TOPIC_PST;
cfg_Evt_tp = TOPIC_EVENT;
cfg_Alm_tp = TOPIC_ALARM;
cfg_Sng_tp = TOPIC_SNG;
//QString topic_cfg = settings.value("Kafka/topic","").toString();
//printf("!!!!!!!!!kafka producer init Failed(%s)\n", cfg_tp);
cout << cfg_His_tp << endl;
//std::string brokerlist = brl_cfg.toStdString();//"10.240.16.145:6667,10.240.16.146:6667,10.240.16.147:6667,10.240.16.148:6667,10.240.16.149:6667";
std::string brokerlist = BROKER_LIST;
//topic = topic_cfg.toStdString();//"test";
#ifdef __GNUC__
if (kafkaProducer.init(brokerlist)) {
printf("kafka producer init success(%s)\n", brokerlist.c_str());
/*bool ret = kafkaProducer.create_topic(topic);
if(ret)
printf("create topic OK \n");
else
printf("create topic Failed \n");*/
}
else
printf("kafka producer init Failed(%s)\n", brokerlist.c_str());
#endif
init = true;
}
char tmp_str[256];
apr_snprintf(tmp_str, sizeof(tmp_str), "%d", data.monitor_id);
std::string key = std::string(tmp_str);
//std::string key_mp_id = data.mp_id.toStdString();
//key = data.monitor_id;
std::string senddata = data.strText.toStdString();
if (data.strTopic == "HISDATA")
{
topic = cfg_His_tp;
}
else if (data.strTopic == "PLT")
{
topic = cfg_PLT_tp;
}
else if (data.strTopic == "PST")
{
topic = cfg_PST_tp;
}
else if (data.strTopic == "Event")
{
topic = cfg_Evt_tp;
add_stat_kafka_json_log(data.strText.toAscii().data());
}
else if (data.strTopic == "Alm")
{
topic = cfg_Alm_tp;
}
else
{
topic = data.strTopic.toStdString();
}
//QDateTime currentTime = QDateTime::currentDateTime();
//QTime time = currentTime.time();
//if (time >= QTime(23, 30) || time < QTime(01, 00)) {
// // The current time is between 23:00 and 00:30
// add_sng_log(data.strText.toAscii().data());
//}
//add_sng_log(data.strText.toAscii().data());
if (g_onlyIP[0] != 0)
{
//单例模式
//topic = cfg_Evt_tp;
//key = "2606L20071";
//senddata = "{\"DATA_TYPE\":\"02\" , \"Monitor\":\"2606123456\" , \"Value\":{\"TIME\":\"1699576200000\", \"F_S\":{\"A\":{ \"PST\":\"0.000000\",\"FLUC\":\"343917.750000\",\"FLUCF\":\"374275.000000\" }, \"B\":{ \"PST\":\"0.000000\",\"FLUC\":\"222171.156250\",\"FLUCF\":\"369039.000000\" }, \"C\":{ \"PST\":\"0.000000\",\"FLUC\":\"208060.968750\",\"FLUCF\":\"369239.000000\" }}}}";
//senddata = "{\"DATA_TYPE\":\"04\",\"Monitor\":\"2606L20071\",\"Value\":{\"FLAG\":1,\"TIME\":1700193136480,\"VOLTAGE\":{\"MAG\":95.181,\"DUR\":54,\"STARTTIME\":1700193136480,\"ENDTIME\":1700193136634,\"DISKIND\":\"01\",\"WAVEFILE\":\"PQ_PQLD1_000392_20231117_115216_580\",\"PHASIC\":\"B\"}}}";
add_sng_log(data.strText.toAscii().data());
//char* cstr = new char[senddata.length() + 1];
//std::strcpy(cstr, senddata.c_str());
//add_sng_log(cstr);
//delete[] cstr; // 释放内存空间
}
#ifdef __GNUC__
//send data
retsize = kafkaProducer.send(senddata.c_str(), senddata.length(), topic, RdKafka::Topic::PARTITION_UA, &key);
#endif
if (retsize > 0) {
printf("\nkafka send, monitor_id:[%s] topic:[%s] Success,return length %d\n", key.c_str(), topic.c_str(), retsize);
}
else
printf("\nFailed kafka send, monitor_id:[%s] topic:[%s]\n", key.c_str(), topic.c_str());
//printf("\n--------------------------------------\n%s\n--------------------------------------\n",senddata.c_str() ); // WW 2023-08-16
}
void my_datahub_send(Ckafka_data_t& data)
{
static std::string topic;
static std::string cfg_His_tp;
static std::string cfg_PLT_tp;
static std::string cfg_PST_tp;
static std::string cfg_Evt_tp;
static std::string cfg_Alm_tp;
static bool init = false;
if (!init) {
//QString MyKafkaIniFilename = QString("../etc/") + QString("mykafka.ini"); //+QString::fromAscii(subdir)
//QSettings settings(MyKafkaIniFilename, QSettings::IniFormat);
//QString brl_cfg = settings.value("Kafka/brokerlist", "").toString();
//QString topic_his = settings.value("Kafka/HisTopic", "").toString();
//QString topic_plt = settings.value("Kafka/PLTTopic", "").toString();
//QString topic_pst = settings.value("Kafka/PSTTopic", "").toString();
//QString topic_evt = settings.value("Kafka/EventTopic", "").toString();
//QString topic_alm = settings.value("Kafka/AlmTopic", "").toString();
//cfg_His_tp = topic_his.toStdString();
//cfg_PLT_tp = topic_plt.toStdString();
//cfg_PST_tp = topic_pst.toStdString();
//cfg_Evt_tp = topic_evt.toStdString();
//cfg_Alm_tp = topic_alm.toStdString();
cfg_His_tp = TOPIC_STAT;
cfg_PLT_tp = TOPIC_PLT;
cfg_PST_tp = TOPIC_PST;
cfg_Evt_tp = TOPIC_EVENT;
cfg_Alm_tp = TOPIC_ALARM;
init = true;
}
std::string key = data.mp_id.toStdString();
std::string senddata = data.strText.toStdString();
if (data.strTopic == "HISDATA")
{
topic = cfg_His_tp;
}
else if (data.strTopic == "PLT")
{
topic = cfg_PLT_tp;
}
else if (data.strTopic == "PST")
{
topic = cfg_PST_tp;
}
else if (data.strTopic == "Event")
{
topic = cfg_Evt_tp;
}
else if (data.strTopic == "Alm")
{
topic = cfg_Alm_tp;
}
else
{
topic = data.strTopic.toStdString();
}
if (g_onlyIP[0] != 0)
{
//单例模式
add_sng_log(data.strText.toAscii().data());
}
DataHub_Send_Datahub(const_cast<char*>(topic.c_str()), const_cast<char*>(senddata.c_str()));
printf("\ndatahub send, monitor_id:[%s] topic:[%s] Success\n", key.c_str(), topic.c_str());
}
void concatenate_and_separate(char str1[], char str2[], QString* result) {
QString qstr1 = QString(str1);
QString qstr2 = QString(str2);
*result = qstr1 + "-" + qstr2;
}
void KafkaSendThread::run()
{
//线程开始创建生产者lnk20241211
InitializeProducer();
printf("\nKafkaSendThread::run() is called ...... \n\n");
while (1) {
Ckafka_data_t data;
bool data_gotten;
data_gotten = false;
kafka_data_list_mutex.lock();
if (!kafka_data_list.isEmpty()) {
data_gotten = true;
data = kafka_data_list.takeFirst();
}
kafka_data_list_mutex.unlock();
if (data_gotten) {
static uint32_t count = 0;
printf("BEGIN my_kafka_send no.%i -------->>>>>>>>>>>> %s \n", count,
QDateTime::currentDateTime().toString("yyyy-MM-dd hh:mm:ss.zzz").toAscii().data());
if (SEND_FLAG == 1) //kafka推送
{
my_kafka_send(data);
}
else if (SEND_FLAG == 2)//datahub推送
{
my_datahub_send(data);
//DataHub_Send_Datahub();
}
else if (SEND_FLAG == 3)//rocketmq推送lnk10-11
{
my_rocketmq_send(data);
}
else //未配置 默认mq推送
{
my_rocketmq_send(data);
}
printf("END my_kafka_send no.%i -------->>>>>>>>>>>> %s \n\n", count++,
QDateTime::currentDateTime().toString("yyyy-MM-dd hh:mm:ss.zzz").toAscii().data());
}
/*if (data_gotten) {
LD_info_t* LD_info = find_LD_info_only_from_mp_id(data.mp_id.toAscii().data());
ied_t* ied;
ied = find_ied_from_dev_code(LD_info->terminal_code);
ied_usr_t* ied_usr = GET_IEDEXT_ADDR(ied);
int cpuno;
for (cpuno = 0; cpuno < ied->cpucount; cpuno++) {
LD_info = &(ied_usr->LD_info[cpuno]);
data.mp_id.clear();
data.mp_id.append(LD_info->mp_id);
static uint32_t count = 0;
printf("BEGIN my_kafka_send no.%i -------->>>>>>>>>>>> %s \n", count,
QDateTime::currentDateTime().toString("yyyy-MM-dd hh:mm:ss.zzz").toAscii().data());
my_kafka_send(data);
printf("END my_kafka_send no.%i -------->>>>>>>>>>>> %s \n\n", count++,
QDateTime::currentDateTime().toString("yyyy-MM-dd hh:mm:ss.zzz").toAscii().data());
}
}*/
//lnk 20241031 不再记录匹配率、合理性、异常
/*
oss_data_t ossdata;
bool oss_data_gotten;
oss_data_gotten = false;
oss_data_list_mutex.lock();
if (!oss_data_list.isEmpty()) {
oss_data_gotten = true;
ossdata = oss_data_list.takeFirst();
}
oss_data_list_mutex.unlock();
if (oss_data_gotten) {
char file_name[256];
memset(file_name, 0, 256);
sprintf(file_name, "%s", ossdata.filename.toAscii().data());
char save_name[256];
memset(save_name, 0, 256);
sprintf(save_name, "%s", ossdata.savename.toAscii().data());
QString uuid_file_name;
std::ofstream file(save_name); // 创建一个输出文件流对象,打开文件 example.txt
if (file.is_open()) { // 判断文件是否成功打开
file << ossdata.data.toAscii().data() << "\n";
file.close(); // 关闭文件
}
else {
cout << "Unable to open file\n" << endl;
}
if (FILE_FLAG == 1) {
PutOSS(file_name, save_name);
char* file;
// 使用strrchr找到最后一个'/'的位置
char* last_slash = strrchr(file_name, '/');
if (last_slash != NULL) {
// 最后一个'/'之后的部分就是文件名
file = last_slash + 1;
}
else {
// 如果没有'/',则整个字符串就是文件名
file = file_name;
}
concatenate_and_separate(file_name, file, &uuid_file_name);
}
else if (FILE_FLAG == 2) {
OBSFile(save_name, file_name, "putObject");
char* file;
// 使用strrchr找到最后一个'/'的位置
char* last_slash = strrchr(file_name, '/');
if (last_slash != NULL) {
// 最后一个'/'之后的部分就是文件名
file = last_slash + 1;
}
else {
// 如果没有'/',则整个字符串就是文件名
file = file_name;
}
concatenate_and_separate(file_name, file, &uuid_file_name);
}
else if (FILE_FLAG == 3) {
char* fileName = (char*)malloc(65 * sizeof(char));
char* uuid = (char*)malloc(65 * sizeof(char));
WebAPI_Uds_Upload(UDS_UPLOAD_URL, save_name, uuid, fileName);
concatenate_and_separate(uuid, fileName, &uuid_file_name);
free(fileName);
free(uuid);
}
else {
}
*/
//lnk 20241031 不再记录匹配率、合理性、异常
/*
if (ossdata.log_name=="comm") {
char tnml_code[128];
memset(tnml_code, 0, 128);
sprintf(tnml_code, "%s", ossdata.id.toAscii().data());
errorlog_pgsql(tnml_code, ossdata.time, uuid_file_name);
}
else if (ossdata.log_name == "reason") {
QString pgsql;
pgsql.append(errorlog_num_pgsql(ossdata.id, ossdata.time, uuid_file_name, ossdata.list_num));
cout << pgsql.toAscii().data() << endl;
}
else if (ossdata.log_name == "match") {
QString pqsql;
pqsql.append(errorlog_datamatch_pgsql(ossdata.id, ossdata.time, ossdata.base_mat_num, ossdata.adv_mat_num, ossdata.base_act_num, ossdata.adv_act_num, uuid_file_name));
cout << pqsql.toAscii().data() << endl;
}
std::remove(save_name);
}
else {
msleep(1);
}*/
} //while(1) {
//线程结束摧毁生产者
ShutdownAndDestroyProducer();//lnk20241211
}
//lnk20241213补招部分///////////////////////////////////////////////////////////////////////////////////////////////
// 提取 'data' 数组并返回为新的 JSON 字符串 (返回 std::string)
std::string extractDataJson(const char* inputJson) {
// 解析输入 JSON 字符串
cJSON* root = cJSON_Parse(inputJson);
if (root == NULL) {
std::cerr << "Error parsing JSON" << std::endl;
return "";
}
// 提取 "messageBody" 部分
cJSON* messageJson = cJSON_GetObjectItem(root, "messageBody");
if (messageJson == NULL || messageJson->type != cJSON_String) {
std::cerr << "'messageJson' is missing or is not an cJSON_String" << std::endl;
cJSON_Delete(root);
return "";
}
// 解析 messageBody 中的 JSON 字符串
const char* messageBodyStr = messageJson->valuestring;
if (messageBodyStr == nullptr || strlen(messageBodyStr) == 0) {
std::cerr << "Failed to parse 'messageBody' JSON or it's empty." << std::endl;
cJSON_Delete(root);
return "";
}
cJSON* messageBody = cJSON_Parse(messageBodyStr); // 解析 messageBody 字符串
if (messageBody == NULL) {
std::cerr << "Failed to parse 'messageBody' JSON." << std::endl;
cJSON_Delete(root);
return "";
}
// 提取 "data" 部分
cJSON* data = cJSON_GetObjectItem(messageBody, "data");
if (data == NULL || data->type != cJSON_Array) {
std::cerr << "'data' is missing or is not an array" << std::endl;
cJSON_Delete(root);
return "";
}
// 创建新的 JSON 数组对象,只包含 "data" 部分
cJSON* newJson = cJSON_CreateArray(); // 创建一个新的数组
// 将 "data" 数组中的元素逐个添加到新数组中
cJSON* dataItem = NULL;
cJSON_ArrayForEach(dataItem, data) {
cJSON_AddItemToArray(newJson, cJSON_Duplicate(dataItem, 1));
}
// 将新的 JSON 数组转换为字符串
char* newJsonString = cJSON_Print(newJson);
if (newJsonString == NULL) {
std::cerr << "Error printing new JSON" << std::endl;
cJSON_Delete(root);
cJSON_Delete(newJson);
return "";
}
// 转换为 std::string 类型
std::string result(newJsonString);
// 清理内存
free(newJsonString);
cJSON_Delete(root);
cJSON_Delete(newJson);
return result; // 返回 std::string 类型的结果
}
//实时数据部分//////////////////////////////////////////////////////////////////////////////////////////////////////////
// 提取 JSON 消息中的相关字段
bool parseJsonMessageRT(const std::string& body, std::string& devSeries, std::string& line, bool& realData, bool& soeData, int& limit)
{
// 解析 JSON 数据
cJSON* root = cJSON_Parse(body.c_str());
if (root == NULL) {
std::cerr << "Failed to parse JSON message." << std::endl;
return false;
}
// 提取 "messageBody" 部分
cJSON* messageJson = cJSON_GetObjectItem(root, "messageBody");
if (messageJson == NULL || messageJson->type != cJSON_String) {
std::cerr << "'messageJson' is missing or is not an cJSON_String" << std::endl;
cJSON_Delete(root);
return false;
}
// 解析 messageBody 中的 JSON 字符串
const char* messageBodyStr = messageJson->valuestring;
if (messageBodyStr == nullptr || strlen(messageBodyStr) == 0) {
std::cerr << "Failed to parse 'messageBody' JSON or it's empty." << std::endl;
cJSON_Delete(root);
return false;
}
cJSON* messageBody = cJSON_Parse(messageBodyStr); // 解析 messageBody 字符串
if (messageBody == NULL) {
std::cerr << "Failed to parse 'messageBody' JSON." << std::endl;
cJSON_Delete(root);
return false;
}
// 提取字段
cJSON* devSeriesItem = cJSON_GetObjectItem(messageBody, "devSeries");
cJSON* lineItem = cJSON_GetObjectItem(messageBody, "line");
cJSON* realDataItem = cJSON_GetObjectItem(messageBody, "realData");
cJSON* soeDataItem = cJSON_GetObjectItem(messageBody, "soeData");
cJSON* limitItem = cJSON_GetObjectItem(messageBody, "limit");
if (devSeriesItem && lineItem && realDataItem && soeDataItem && limitItem) {
devSeries = devSeriesItem->valuestring;
line = lineItem->valuestring;
realData = realDataItem->valueint;
soeData = soeDataItem->valueint;
limit = limitItem->valueint;
} else {
std::cerr << "Missing expected fields in JSON message." << std::endl;
cJSON_Delete(root);
return false;
}
cJSON_Delete(root); // 清理 JSON 对象
return true;
}
// 构造 XML 内容的函数新建和删除
std::string createnewXmlContent(int devindex, int mpindex, bool realData, bool soeData, int limit)
{
std::ostringstream xmlContent;
xmlContent << "<?xml version=\"1.0\" encoding=\"gb2312\"?>\n"
<< "<Trigger3S>\n"
<< " <New>\n"
<< " <Trigger Line=\"" << mpindex << "\" "
<< "RealData=\"" << (realData ? "true" : "false") << "\" "
<< "DevSeries=\"" << devindex << "\" "
<< "Limit=\"" << limit << "\" "
<< "Count=\"0\" "
<< "SOEData=\"" << (soeData ? "true" : "false") << "\"/>\n"
<< " </New>\n"
<< "</Trigger3S>\n";
return xmlContent.str();
}
std::string createdeleteXmlContent(int devindex, int mpindex)
{
std::ostringstream xmlContent;
xmlContent << "<?xml version=\"1.0\" encoding=\"gb2312\"?>\n"
<< "<Trigger3S>\n"
<< " <Delete>\n"
<< " <Trigger Line=\"" << mpindex << "\" "
<< "RealData=\"false\" "
<< "DevSeries=\"" << devindex << "\" "
<< "Limit=\"0\" "
<< "Count=\"0\" "
<< "SOEData=\"false\"/>\n"
<< " </Delete>\n"
<< "</Trigger3S>\n";
return xmlContent.str();
}
// 写入 XML 内容到文件的函数
bool writeToFile(const std::string& filePath, const std::string& xmlContent)
{
// 打开文件流以写入 XML 内容
std::ofstream outFile(filePath.c_str()); // 使用 c_str() 转换为 const char*
if (outFile.is_open()) {
outFile << xmlContent; // 写入内容
outFile.close();
std::cout << "XML file created at: " << filePath << std::endl;
return true;
} else {
std::cerr << "Failed to open file for writing: " << filePath << std::endl;
return false;
}
}
// 创建并写入新的 XML 文件的主函数
bool createXmlFile(int devindex, int mpindex, bool realData, bool soeData, int limit,std::string type)
{
std::string xmlContent = "";
std::string directory = "";
std::string filePath = "";
if(type == "new"){
// 构造 XML 内容
xmlContent = createnewXmlContent(devindex, mpindex, realData, soeData, limit);
// 设置文件路径
directory = "../etc/trigger3s/";
filePath = directory + "newtrigger.xml";
}
else if(type == "delete"){
// 构造 XML 内容
xmlContent = createdeleteXmlContent(devindex, mpindex);
// 设置文件路径
directory = "../etc/trigger3s/";
filePath = directory + "deletetrigger.xml";
}
else{
std::cerr << "Failed to create xmlfile,type error: " << std::endl;
return false;
}
// 创建目录(如果不存在)
if (system(("mkdir -p " + directory).c_str()) != 0) {
std::cerr << "Failed to create directory: " << directory << std::endl;
return false;
}
// 将 XML 内容写入文件
return writeToFile(filePath, xmlContent);
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////
//lnk20250108进程更新部分
// 用于关闭进程监听的端口
extern int server_socket; //Web Socket服务端实例
void close_listening_socket() {
if (server_socket != -1) {
// 关闭socket
close(server_socket);
std::cout << "Server socket closed successfully!" << std::endl;
server_socket = -1; // 重置 server_socket
} else {
std::cout << "No server socket to close!" << std::endl;
}
}
//用于校验ip格式
bool isValidIP(const std::string &ip) {
std::vector<std::string> parts;
std::stringstream ss(ip);
std::string part;
// 使用 "." 作为分隔符将 IP 地址分割成各部分
while (getline(ss, part, '.')) {
parts.push_back(part);
}
// IP 地址必须有 4 部分
if (parts.size() != 4) {
return false;
}
// 校验每一部分是否为合法的数字且在 0 到 255 之间
for (size_t i = 0; i < parts.size(); ++i) {
// 校验每部分是否为数字
for (size_t j = 0; j < parts[i].size(); ++j) {
if (!isdigit(parts[i][j])) {
return false;
}
}
// 转换为整数并检查是否在有效范围内
int num = atoi(parts[i].c_str());
if (num < 0 || num > 255) {
return false;
}
// 检查是否有前导零(如 01、001 等)
if (parts[i].length() > 1 && parts[i][0] == '0') {
return false;
}
}
return true;
}
//执行脚本控制进程
void execute_bash(string fun,int process_num,string type)
{
// 为 char 数组分配足够的空间
char p_num_str[20];
// 使用 sprintf 转换
std::sprintf(p_num_str, "%d", process_num);
const char* script = "/FeProject/bin/set_process.sh";//使用setsid防止端口占用
const char* param1 = fun.c_str();
const char* param2 = p_num_str;
const char* param3 = type.c_str();
// 构造完整的命令
char command[256];
snprintf(command, sizeof(command), "%s %s %s %s &", script, param1, param2, param3);
std::cout << "command:" << command <<std::endl;
// 执行命令
system(command);
}
//执行脚本控制进程
void execute_bash_debug(string fun,string ip,string type,int proindex)
{
const char* script = "/FeProject/bin/set_debug.sh";//使用setsid防止端口占用
const char* param1 = fun.c_str();
const char* param2 = ip.c_str();
const char* param3 = type.c_str();
// 将 proindex 转换为字符串
char param4[32];
snprintf(param4, sizeof(param4), "%d", proindex);
// 构造完整的命令
char command[256];
snprintf(command, sizeof(command), "%s %s %s %s &", script, param1, param2, param3,param4);
std::cout << "command:" << command <<std::endl;
// 执行命令
system(command);
}
void parse_set(const std::string& json_str) {
// 解析 JSON 字符串
cJSON* root = cJSON_Parse(json_str.c_str());
if (root == nullptr) {
std::cout << "Error parsing JSON." << std::endl;
return;
}
// 提取 "messageBody" 部分
cJSON* messageJson = cJSON_GetObjectItem(root, "messageBody");
if (messageJson == NULL || messageJson->type != cJSON_String) {
std::cerr << "'messageJson' is missing or is not an cJSON_String" << std::endl;
cJSON_Delete(root);
return ;
}
// 解析 messageBody 中的 JSON 字符串
const char* messageBodyStr = messageJson->valuestring;
if (messageBodyStr == nullptr || strlen(messageBodyStr) == 0) {
std::cerr << "Failed to parse 'messageBody' JSON or it's empty." << std::endl;
cJSON_Delete(root);
return;
}
cJSON* messageBody = cJSON_Parse(messageBodyStr); // 解析 messageBody 字符串
if (messageBody == NULL) {
std::cerr << "Failed to parse 'messageBody' JSON." << std::endl;
cJSON_Delete(root);
return ;
}
// 获取 code 字段
cJSON* code = cJSON_GetObjectItem(messageBody, "code");
if (code == nullptr) {
std::cout << "Missing 'code' in JSON." << std::endl;
cJSON_Delete(root);
return;
}
cJSON* index = cJSON_GetObjectItem(messageBody, "index");
if (index == nullptr) {
std::cout << "Missing 'index' in JSON." << std::endl;
cJSON_Delete(root);
return;
}
//判断是不是自己进程号:
int index_value = index->valueint;
//string index_value_str = index->valuestring;
//int index_value = StringToInt(index_value_str);
//进程号为0的进程处理所有控制消息
if (index_value != g_front_seg_index && g_front_seg_index !=0) {
std::cout << "msg index:"<< index_value <<"doesnt match self index:" << g_front_seg_index << std::endl;
cJSON_Delete(root);
return;
}
//进程号为0或者进程号匹配上
std::cout << "msg index:"<< index_value <<" self index:" << g_front_seg_index << std::endl;
// 根据 code 字段值执行不同的解析逻辑
std::string code_str = code->valuestring;
if (code_str == "set_process") {
if(g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1){
std::cout << "cfg_stat_data process" << g_front_seg_index <<" handle this msg" << std::endl;
// 解析 set_process
cJSON* data = cJSON_GetObjectItem(messageBody, "data");
if (data != nullptr && data->type == cJSON_Array) {
int data_size = cJSON_GetArraySize(data);
for (int i = 0; i < data_size; i++) {
cJSON* item = cJSON_GetArrayItem(data, i);
std::string fun = cJSON_GetObjectItem(item, "fun")->valuestring;
int processNum = cJSON_GetObjectItem(item, "processNum")->valueint;
std::string frontType = cJSON_GetObjectItem(item, "frontType")->valuestring;
//校验数据
if((fun == "reset" || fun == "add") &&
(processNum >=1 && processNum < 10) &&
(frontType == "stat" || frontType == "recall" || frontType == "all")){
// 调用执行脚本函数
if(fun == "reset"){
close_listening_socket();
}
execute_bash(fun, processNum, frontType);
std::cout << "!!!!!!!!!!!!!!!! execute mark:" << i << " !!!!!!!!!!!!!!!" <<std::endl;
}
else{
std::cout << "param is not executable" <<std::endl;
}
}
std::cout << "this msg should only execute once" <<std::endl;
}
}
else{
std::cout << "only cfg_stat_data index 1 can control process,this process not handle this msg" << std::endl;
}
}
else if (code_str == "set_debug"){
if(g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1){
std::cout << "cfg_stat_data process" << g_front_seg_index <<" handle this msg" << std::endl;
// 解析 set_process
cJSON* data = cJSON_GetObjectItem(messageBody, "data");
if (data != nullptr && data->type == cJSON_Array) {
int data_size = cJSON_GetArraySize(data);
for (int i = 0; i < data_size; i++) {
cJSON* item = cJSON_GetArrayItem(data, i);
std::string fun = cJSON_GetObjectItem(item, "fun")->valuestring;
std::string ip = cJSON_GetObjectItem(item, "ip")->valuestring;
std::string frontType = cJSON_GetObjectItem(item, "frontType")->valuestring;
int proindex = cJSON_GetObjectItem(item, "proindex")->valueint;
//校验数据
if((fun == "start" || fun == "delete") &&
isValidIP(ip) &&
(frontType == "stat" || frontType == "recall" || frontType == "3s" || frontType == "comtrade") &&
(proindex >= 10 && proindex < 100)){ //单连测试用的进程号应该大于10小于100
execute_bash_debug(fun, ip, frontType,proindex);
std::cout << "!!!!!!!!!!!!!!!! execute mark:" << i << " !!!!!!!!!!!!!!!" <<std::endl;
}
else{
std::cout << "param is not executable" <<std::endl;
}
}
std::cout << "this msg should only execute once" <<std::endl;
}
}
else{
std::cout << "only cfg_stat_data index 1 can control process,this process not handle this msg" << std::endl;
}
}
else{
std::cout << "set process code str error" <<std::endl;
}
// 释放 JSON 对象
cJSON_Delete(root);
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
//lnk20250103台账更新部分
// 准备更新内容并生成 XML 字符串
// 添加缩进的函数
void add_indent(std::stringstream& stream, int level) {
for (int i = 0; i < level; ++i) {
stream << " "; // 每一级缩进 2 个空格
}
}
std::string prepare_update(const std::string& code_str, const terminal& json_data)
{
std::cout << "prepare update" << std::endl;
std::stringstream xmlStream;
int indentLevel = 0; // 缩进级别
// 根节点
add_indent(xmlStream, indentLevel);
xmlStream << "<ledger_update>" << std::endl;
indentLevel++;
if (code_str == "ledger_modify" || code_str == "add_terminal") {
// 如果是 modify 类型
if (code_str == "ledger_modify") {
add_indent(xmlStream, indentLevel);
xmlStream << "<modify>" << std::endl;
indentLevel++;
}
else {
add_indent(xmlStream, indentLevel);
xmlStream << "<add>" << std::endl;
indentLevel++;
}
// 添加数据部分
add_indent(xmlStream, indentLevel);
xmlStream << "<terminalData>" << std::endl;
indentLevel++;
add_indent(xmlStream, indentLevel);
xmlStream << "<id>" << json_data.terminal_id << "</id>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<ip>" << json_data.addr_str << "</ip>" << std::endl; // Assuming `addr_str` for IP
add_indent(xmlStream, indentLevel);
xmlStream << "<devType>" << json_data.dev_type << "</devType>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<maintName>" << json_data.maint_name << "</maintName>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<orgName>" << json_data.org_name << "</orgName>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<port>" << json_data.port << "</port>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<stationName>" << json_data.station_name << "</stationName>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<terminalCode>" << json_data.terminal_code << "</terminalCode>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<updateTime>" << json_data.timestamp << "</updateTime>" << std::endl; // Assuming `timestamp`
add_indent(xmlStream, indentLevel);
xmlStream << "<manufacturer>" << json_data.tmnl_factory << "</manufacturer>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<status>" << json_data.tmnl_status << "</status>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<series>" << json_data.dev_series << "</series>" << std::endl;
//lnk20250210
add_indent(xmlStream, indentLevel);
xmlStream << "<processNo>" << json_data.processNo << "</processNo>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<devKey>" << json_data.dev_key << "</devKey>" << std::endl;
// monitorData 部分
for (int i = 0; json_data.line[i].monitor_id[0] != '\0'; i++) {
const monitor& monitor = json_data.line[i];
add_indent(xmlStream, indentLevel);
xmlStream << "<monitorData" << (i + 1) << ">" << std::endl;
indentLevel++;
add_indent(xmlStream, indentLevel);
xmlStream << "<id>" << monitor.monitor_id << "</id>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<name>" << monitor.monitor_name << "</name>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<lineNo>" << monitor.logical_device_seq << "</lineNo>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<voltageLevel>" << monitor.voltage_level << "</voltageLevel>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<ptType>" << monitor.terminal_connect << "</ptType>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<timestamp>" << monitor.timestamp << "</timestamp>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<terminal_code>" << monitor.terminal_code << "</terminal_code>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<status>" << monitor.status << "</status>" << std::endl;
indentLevel--;
add_indent(xmlStream, indentLevel);
xmlStream << "</monitorData>" << std::endl;
}
indentLevel--;
add_indent(xmlStream, indentLevel);
xmlStream << "</terminalData>" << std::endl;
// 结束 modify 或 add 标签
if (code_str == "ledger_modify") {
indentLevel--;
add_indent(xmlStream, indentLevel);
xmlStream << "</modify>" << std::endl;
}
else {
indentLevel--;
add_indent(xmlStream, indentLevel);
xmlStream << "</add>" << std::endl;
}
} else if (code_str == "delete_terminal") {
// 如果是 delete 类型
add_indent(xmlStream, indentLevel);
xmlStream << "<delete>" << std::endl;
indentLevel++;
add_indent(xmlStream, indentLevel);
xmlStream << "<terminalData>" << std::endl;
indentLevel++;
add_indent(xmlStream, indentLevel);
xmlStream << "<id>" << json_data.terminal_id << "</id>" << std::endl;
indentLevel--;
add_indent(xmlStream, indentLevel);
xmlStream << "</terminalData>" << std::endl;
indentLevel--;
add_indent(xmlStream, indentLevel);
xmlStream << "</delete>" << std::endl;
}
else {
std::cerr << "code_str error" << std::endl;
return "";
}
// 结束根节点
indentLevel--;
add_indent(xmlStream, indentLevel);
xmlStream << "</ledger_update>" << std::endl;
return xmlStream.str(); // 返回构造的 XML 字符串
}
// 函数将string字符串转换为整数
int StringToInt(const std::string& str) {
std::stringstream ss(str);
int number;
ss >> number; // 从字符串流中读取整数
// 检查是否转换成功
if (ss.fail()) {
std::cerr << "Conversion failed!" << std::endl;
return 0; // 或者你可以选择返回一个标识失败的值,如-1
}
return number;
}
// 解析 JSON 字符串并执行相应操作
void parse_control(const std::string& json_str, const std::string& output_dir) {
// 解析 JSON 字符串
cJSON* root = cJSON_Parse(json_str.c_str());
if (root == nullptr) {
std::cout << "Error parsing JSON." << std::endl;
return;
}
// 提取 "messageBody" 部分
cJSON* messageJson = cJSON_GetObjectItem(root, "messageBody");
if (messageJson == NULL || messageJson->type != cJSON_String) {
std::cerr << "'messageJson' is missing or is not an cJSON_String" << std::endl;
cJSON_Delete(root);
return ;
}
// 解析 messageBody 中的 JSON 字符串
const char* messageBodyStr = messageJson->valuestring;
if (messageBodyStr == nullptr || strlen(messageBodyStr) == 0) {
std::cerr << "Failed to parse 'messageBody' JSON or it's empty." << std::endl;
cJSON_Delete(root);
return;
}
cJSON* messageBody = cJSON_Parse(messageBodyStr); // 解析 messageBody 字符串
if (messageBody == NULL) {
std::cerr << "Failed to parse 'messageBody' JSON." << std::endl;
cJSON_Delete(root);
return ;
}
// 获取 code 字段
cJSON* code = cJSON_GetObjectItem(messageBody, "code");
if (code == nullptr) {
std::cout << "Missing 'code' in JSON." << std::endl;
cJSON_Delete(root);
return;
}
cJSON* index = cJSON_GetObjectItem(messageBody, "index");
if (index == nullptr) {
std::cout << "Missing 'index' in JSON." << std::endl;
cJSON_Delete(root);
return;
}
//判断是不是自己进程号:
int index_value = index->valueint;
//string index_value_str = index->valuestring;
//int index_value = StringToInt(index_value_str);
//进程号为0的进程处理所有台账更新消息
if (index_value != g_front_seg_index && g_front_seg_index !=0) {
std::cout << "msg index:"<< index_value <<"doesnt match self index:" << g_front_seg_index << std::endl;
cJSON_Delete(root);
return;
}
//进程号为0或者进程号匹配上
std::cout << "msg index:"<< index_value <<" self index:" << g_front_seg_index << std::endl;
// 根据 code 字段值执行不同的解析逻辑
std::string code_str = code->valuestring;
if (code_str == "add_terminal" || code_str == "ledger_modify") {
std::cout << "add or update ledger" <<std::endl;
// 解析 add_terminal 或 ledger_modify
cJSON* data = cJSON_GetObjectItem(messageBody, "data");
if (data != nullptr && data->type == cJSON_Array) {
int data_size = cJSON_GetArraySize(data);
for (int i = 0; i < data_size; i++) {
cJSON* item = cJSON_GetArrayItem(data, i);
terminal json_data;
// 填充 terminal_dev 的数据
cJSON* id = cJSON_GetObjectItem(item, "id"); // terminal_id
if (id && id->type == cJSON_String)
std::strncpy(json_data.terminal_id, id->valuestring, sizeof(json_data.terminal_id) - 1);
else
std::strncpy(json_data.terminal_id, "N/A", sizeof(json_data.terminal_id) - 1);
cJSON* name = cJSON_GetObjectItem(item, "name"); // terminal_code
if (name && name->type == cJSON_String)
std::strncpy(json_data.terminal_code, name->valuestring, sizeof(json_data.terminal_code) - 1);
else
std::strncpy(json_data.terminal_code, "N/A", sizeof(json_data.terminal_code) - 1);
cJSON* org_name = cJSON_GetObjectItem(item, "org_name"); // org_name
if (org_name && org_name->type == cJSON_String)
std::strncpy(json_data.org_name, org_name->valuestring, sizeof(json_data.org_name) - 1);
else
std::strncpy(json_data.org_name, "N/A", sizeof(json_data.org_name) - 1);
cJSON* maint_name = cJSON_GetObjectItem(item, "maint_name"); // maint_name
if (maint_name && maint_name->type == cJSON_String)
std::strncpy(json_data.maint_name, maint_name->valuestring, sizeof(json_data.maint_name) - 1);
else
std::strncpy(json_data.maint_name, "N/A", sizeof(json_data.maint_name) - 1);
cJSON* station_name = cJSON_GetObjectItem(item, "stationName"); // station_name
if (station_name && station_name->type == cJSON_String)
std::strncpy(json_data.station_name, station_name->valuestring, sizeof(json_data.station_name) - 1);
else
std::strncpy(json_data.station_name, "N/A", sizeof(json_data.station_name) - 1);
cJSON* manufacturer = cJSON_GetObjectItem(item, "manufacturer"); // tmnl_factory
if (manufacturer && manufacturer->type == cJSON_String)
std::strncpy(json_data.tmnl_factory, manufacturer->valuestring, sizeof(json_data.tmnl_factory) - 1);
else
std::strncpy(json_data.tmnl_factory, "N/A", sizeof(json_data.tmnl_factory) - 1);
cJSON* status = cJSON_GetObjectItem(item, "status"); // tmnl_status
if (status && status->type == cJSON_String)
std::strncpy(json_data.tmnl_status, status->valuestring, sizeof(json_data.tmnl_status) - 1);
else
std::strncpy(json_data.tmnl_status, "N/A", sizeof(json_data.tmnl_status) - 1);
cJSON* dev_type = cJSON_GetObjectItem(item, "devType"); // dev_type
if (dev_type && dev_type->type == cJSON_String)
std::strncpy(json_data.dev_type, dev_type->valuestring, sizeof(json_data.dev_type) - 1);
else
std::strncpy(json_data.dev_type, "N/A", sizeof(json_data.dev_type) - 1);
cJSON* dev_key = cJSON_GetObjectItem(item, "devKey"); // dev_key
if (dev_key && dev_key->type == cJSON_String)
std::strncpy(json_data.dev_key, dev_key->valuestring, sizeof(json_data.dev_key) - 1);
else
std::strncpy(json_data.dev_key, "N/A", sizeof(json_data.dev_key) - 1);
cJSON* dev_series = cJSON_GetObjectItem(item, "series"); // dev_series
if (dev_series && dev_series->type == cJSON_String)
std::strncpy(json_data.dev_series, dev_series->valuestring, sizeof(json_data.dev_series) - 1);
else
std::strncpy(json_data.dev_series, "N/A", sizeof(json_data.dev_series) - 1);
//lnk20250210台账进程号
cJSON* processNo = cJSON_GetObjectItem(item, "processNo"); // processNo转为字符串
if (processNo && processNo->type == cJSON_Number) snprintf(json_data.processNo, sizeof(json_data.processNo), "%d", processNo->valueint);
else strncpy(json_data.processNo, "N/A", sizeof(json_data.processNo) - 1);
cJSON* ip = cJSON_GetObjectItem(item, "ip"); // addr_str
if (ip && ip->type == cJSON_String)
std::strncpy(json_data.addr_str, ip->valuestring, sizeof(json_data.addr_str) - 1);
else
std::strncpy(json_data.addr_str, "N/A", sizeof(json_data.addr_str) - 1);
cJSON* port = cJSON_GetObjectItem(item, "port"); // port
if (port && port->type == cJSON_String)
std::strncpy(json_data.port, port->valuestring, sizeof(json_data.port) - 1);
else
std::strncpy(json_data.port, "N/A", sizeof(json_data.port) - 1);
cJSON* updateTime = cJSON_GetObjectItem(item, "updateTime"); // timestamp
if (updateTime && updateTime->type == cJSON_String)
std::strncpy(json_data.timestamp, updateTime->valuestring, sizeof(json_data.timestamp) - 1);
else
std::strncpy(json_data.timestamp, "N/A", sizeof(json_data.timestamp) - 1);
// monitorData 解析,填充到 line 数组中
cJSON* monitorData = cJSON_GetObjectItem(item, "monitorData");
if (monitorData != nullptr && monitorData->type == cJSON_Array) {
int monitorData_size = cJSON_GetArraySize(monitorData);
for (int j = 0; j < monitorData_size && j < 10; j++) { // 最多 10 个监测点
cJSON* monitor_item = cJSON_GetArrayItem(monitorData, j);
monitor monitor_data;
cJSON* monitor_id = cJSON_GetObjectItem(monitor_item, "id"); // monitor_id
if (monitor_id && monitor_id->type == cJSON_String)
std::strncpy(monitor_data.monitor_id, monitor_id->valuestring, sizeof(monitor_data.monitor_id) - 1);
else
std::strncpy(monitor_data.monitor_id, "N/A", sizeof(monitor_data.monitor_id) - 1);
cJSON* monitor_name = cJSON_GetObjectItem(monitor_item, "name"); // monitor_name
if (monitor_name && monitor_name->type == cJSON_String)
std::strncpy(monitor_data.monitor_name, monitor_name->valuestring, sizeof(monitor_data.monitor_name) - 1);
else
std::strncpy(monitor_data.monitor_name, "N/A", sizeof(monitor_data.monitor_name) - 1);
cJSON* voltage_level = cJSON_GetObjectItem(monitor_item, "voltageLevel"); // voltage_level
if (voltage_level && voltage_level->type == cJSON_String)
std::strncpy(monitor_data.voltage_level, voltage_level->valuestring, sizeof(monitor_data.voltage_level) - 1);
else
std::strncpy(monitor_data.voltage_level, "N/A", sizeof(monitor_data.voltage_level) - 1);
cJSON* monitor_status = cJSON_GetObjectItem(monitor_item, "status"); // status
if (monitor_status && monitor_status->type == cJSON_String)
std::strncpy(monitor_data.status, monitor_status->valuestring, sizeof(monitor_data.status) - 1);
else
std::strncpy(monitor_data.status, "N/A", sizeof(monitor_data.status) - 1);
cJSON* lineNo = cJSON_GetObjectItem(monitor_item, "lineNo"); // logical_device_seq
if (lineNo && lineNo->type == cJSON_String)
std::strncpy(monitor_data.logical_device_seq, lineNo->valuestring, sizeof(monitor_data.logical_device_seq) - 1);
else
std::strncpy(monitor_data.logical_device_seq, "N/A", sizeof(monitor_data.logical_device_seq) - 1);
cJSON* ptType = cJSON_GetObjectItem(monitor_item, "ptType"); // terminal_connect
if (ptType && ptType->type == cJSON_String)
std::strncpy(monitor_data.terminal_connect, ptType->valuestring, sizeof(monitor_data.terminal_connect) - 1);
else
std::strncpy(monitor_data.terminal_connect, "N/A", sizeof(monitor_data.terminal_connect) - 1);
std::strncpy(monitor_data.timestamp, json_data.timestamp, sizeof(monitor_data.timestamp) - 1);
std::strncpy(monitor_data.terminal_code, json_data.terminal_code, sizeof(monitor_data.terminal_code) - 1);
// 填充到 line 数组
json_data.line[j] = monitor_data;
}
}
print_terminal(&json_data);
// 准备 XML 内容并写入文件
std::string xmlContent = prepare_update(code_str, json_data);
if (xmlContent != "") {
std::cout << "write to xml in /FeProject/etc/ledger_update" <<std::endl;
char nodeid[20];
std::sprintf(nodeid, "%u", g_node_id); // "%u" 用于 unsigned int
std::string nodeid_str(nodeid);
std::string frontindex_str = intToString(g_front_seg_index);
std::string file_name = output_dir + "/" + nodeid_str + "_" + frontindex_str + "_" + json_data.terminal_id + "_" + code_str + ".xml";
writeToFile(file_name, xmlContent);
}
}
}
}
else if (code_str == "delete_terminal") {
std::cout << "delete ledger" <<std::endl;
// 解析 delete_terminal
cJSON* data = cJSON_GetObjectItem(messageBody, "data");
if (data != nullptr && data->type == cJSON_Array) {
int data_size = cJSON_GetArraySize(data);
for (int i = 0; i < data_size; i++) {
cJSON* item = cJSON_GetArrayItem(data, i);
// 只解析 id 字段
cJSON* id = cJSON_GetObjectItem(item, "id");
if (id != nullptr) {
terminal json_data;
std::strncpy(json_data.terminal_id, cJSON_GetObjectItem(item, "id")->valuestring, sizeof(json_data.terminal_id) - 1);
// 准备 XML 内容并写入文件
std::string xmlContent = prepare_update(code_str, json_data);
if(xmlContent != ""){
char nodeid[20];
std::sprintf(nodeid, "%u", g_node_id); // "%u" 用于 unsigned int
std::string nodeid_str(nodeid);
std::string frontindex_str = intToString(g_front_seg_index);
std::string file_name = output_dir + "/" + nodeid_str + "_" + frontindex_str + "_" + json_data.terminal_id + "_delete_terminal.xml";
writeToFile(file_name, xmlContent);
}
}
}
}
}
else{
std::cout << "code_str error" <<std::endl;
}
// 释放 JSON 对象
cJSON_Delete(root);
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////
int find_dev_index_from_dev_id(std::string dev_id)
{
ied_t* ied = NULL;
int iedno;
ied_usr_t* ied_usr = NULL;
for (iedno = 0; iedno < g_node->n_clients; iedno++) {
ied = g_node->clients[iedno];
ied_usr = (ied_usr_t*)ied->usr_ext;
if (ied_usr && strcmp(ied_usr->terminal_id, dev_id.c_str()) == 0) {
return ied_usr->dev_idx;
}
}
return 0;
}
int find_mp_index_from_mp_id(std::string line)
{
LD_info_t* LD_info = NULL;
LD_info = find_LD_info_only_from_mp_id((char*)line.c_str());
if(LD_info == NULL){
return 0;
}
else{
return LD_info->line_id;
}
}
int myMessageCallbackrtdata(CPushConsumer* consumer, CMessageExt* msg)
{
if (msg == NULL) {
std::cerr << "Received null message." << std::endl;
return E_RECONSUME_LATER;
}
const char* body = GetMessageBody(msg);
const char* key = GetMessageKeys(msg);
if (body == NULL) {
std::cerr << "Message body is NULL." << std::endl;
return E_RECONSUME_LATER;
}
else{
// 处理消息(例如,打印消息内容)
std::cout << "rt data Callback received message: " << body << std::endl;
if (key) {
std::cout << "Message Key: " << key << std::endl;
}
else {
std::cout << "Message Key: N/A" << std::endl;
}
//处理消费数据
std::string devid, line;
bool realData, soeData;
int limit;
// 解析 JSON 数据
if (!parseJsonMessageRT(body, devid, line, realData, soeData, limit)) {
std::cerr << "Failed to parse the JSON message." << std::endl;
return E_RECONSUME_LATER;
}
//mq处理实时数据指令查询台账时添加锁
pthread_mutex_lock(&mtx); std::cout << "update ledger xml hold lock !!!!!!!!!!!" << std::endl;
int dev_index = find_dev_index_from_dev_id(devid);
int mp_index = find_mp_index_from_mp_id(line);
pthread_mutex_unlock(&mtx); std::cout << "update ledger xml free lock !!!!!!!!!!!" << std::endl;
if(dev_index == 0 || mp_index == 0){
std::cerr << "dev index or mp index is 0" << std::endl;
return E_RECONSUME_LATER;
}
// 创建 XML 文件
if (!createXmlFile(dev_index, mp_index, realData, soeData, limit,"new")) {
std::cerr << "Failed to create the XML file." << std::endl;
return E_RECONSUME_LATER;
}
}
// 根据业务逻辑决定返回状态
return E_CONSUME_SUCCESS;
}
int myMessageCallbackupdate(CPushConsumer* consumer, CMessageExt* msg)
{
if (msg == NULL) {
std::cerr << "Received null message." << std::endl;
return E_RECONSUME_LATER;
}
const char* body = GetMessageBody(msg);
const char* key = GetMessageKeys(msg);
if (body == NULL) {
std::cerr << "Message body is NULL." << std::endl;
return E_RECONSUME_LATER;
}
else{
//处理消费数据
// 处理消息(例如,打印消息内容)
std::cout << "ledger update Callback received message: " << body << std::endl;
if (key) {
std::cout << "Message Key: " << key << std::endl;
}
else {
std::cout << "Message Key: N/A" << std::endl;
}
//处理台账更新消息
std::string updatefilepath = "/home/pq/FeProject/etc/ledgerupdate";
parse_control(body,updatefilepath);
}
// 根据业务逻辑决定返回状态
return E_CONSUME_SUCCESS;
}
int myMessageCallbackset(CPushConsumer* consumer, CMessageExt* msg)
{
if (msg == NULL) {
std::cerr << "Received null message." << std::endl;
return E_RECONSUME_LATER;
}
const char* body = GetMessageBody(msg);
const char* key = GetMessageKeys(msg);
if (body == NULL) {
std::cerr << "Message body is NULL." << std::endl;
return E_RECONSUME_LATER;
}
else{
//处理消费数据
// 处理消息(例如,打印消息内容)
std::cout << "process Callback received message: " << body << std::endl;
if (key) {
std::cout << "Message Key: " << key << std::endl;
}
else {
std::cout << "Message Key: N/A" << std::endl;
}
//处理进程更新消息
parse_set(body);
}
// 根据业务逻辑决定返回状态
return E_CONSUME_SUCCESS;
}
int myMessageCallbackrecall(CPushConsumer* consumer, CMessageExt* msg)
{
//调试
std::cout << "myMessageCallbackrecall"<< std::endl;
if (msg == NULL) {
std::cerr << "Received null message." << std::endl;
return E_RECONSUME_LATER;
}
const char* body = GetMessageBody(msg);
const char* key = GetMessageKeys(msg);
if (body == NULL) {
std::cerr << "Message body is NULL." << std::endl;
return E_RECONSUME_LATER;
}
else{
// 处理消息(例如,打印消息内容)
std::cout << "recall Callback received message: " << body << std::endl;
if (key) {
std::cout << "Message Key: " << key << std::endl;
}
else {
std::cout << "Message Key: N/A" << std::endl;
}
//处理消费数据
std::string result = extractDataJson(body); // 使用 std::string 代替 malloc
//调试
std::cout << "extractDataJson:"<< result.c_str() <<std::endl;
if (!result.empty()) {
pthread_mutex_lock(&mtx); std::cout << "recall mq hold lock !!!!!!!!!!!" << std::endl;
recall_json_handle(result.c_str()); // 使用 c_str() 获取 const char* 类型
pthread_mutex_unlock(&mtx); std::cout << "recall mq free lock !!!!!!!!!!!" << std::endl;
}
else{
std::cerr << "recall data is NULL." << std::endl;
}
}
// 根据业务逻辑决定返回状态
return E_CONSUME_SUCCESS;
}
void mqconsumerThread::run()
{
// 配置消费者参数
std::string consumerName = std::string(subdir) + intToString(g_front_seg_index); // 消费者组ID
std::string nameServer = G_MQCONSUMER_IPPORT; // NameServer地址
// 定义多个主题、标签及其对应的回调函数
std::vector<Subscription> subscriptions;
// 初始化消费者1 //lnk20241230只有实时进程会订阅实时topic不订阅实时topic的进程无法触发实时数据
if(g_node_id == THREE_SECS_DATA_BASE_NODE_ID){
subscriptions.push_back(Subscription(G_MQCONSUMER_TOPIC_RT, G_MQCONSUMER_TAG_RT, myMessageCallbackrtdata));
}
// 初始化消费者2 //所有进程都会订阅台账更新topic不同功能进程的台账不能互相影响
subscriptions.push_back(Subscription(G_MQCONSUMER_TOPIC_UD, G_MQCONSUMER_TAG_UD, myMessageCallbackupdate));
// 初始化消费者3 //lnk20241230只有补招进程会订阅补招topic不订阅补招topic的进程无法触发补招数据
if(g_node_id == RECALL_HIS_DATA_BASE_NODE_ID){
subscriptions.push_back(Subscription(G_MQCONSUMER_TOPIC_RC, G_MQCONSUMER_TAG_RC, myMessageCallbackrecall));
}
// 初始化消费者4 //lnk20250108只有稳态进程1会订阅控制topic不订阅控制topic的进程无法触发进程重置
if(g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1){
subscriptions.push_back(Subscription(G_MQCONSUMER_TOPIC_SET, G_MQCONSUMER_TAG_SET, myMessageCallbackset));
}
try {
rocketmq_consumer_receive(consumerName, nameServer, subscriptions);
}
catch (const std::exception& e) {
std::cerr << "Exception during consumerUD setup: " << e.what() << std::endl;
}
// 程序运行中,消费者会通过回调处理消息
// 模拟程序运行
std::cout << "Consumer is running. " << std::endl;
//在主线程调用
//ShutdownAndDestroyConsumer();
}
//CZY 2023-08-23 get double class voltage level, if false will return 0;
double get_voltage_level(char voltage_level_char[]) {
try
{
int n = atoi(voltage_level_char);
switch (n)
{
case 1://交流6V
return 0.006;
case 2://交流12V
return 0.012;
case 3://交流24V
return 0.024;
case 4://交流36V
return 0.036;
case 5://交流48V
return 0.048;
case 6://交流110V
return 0.11;
case 7://交流220V
return 0.22;
case 8://交流380V含400V
return 0.38;
case 9://交流660V
return 0.66;
case 10://交流1000V含1140V
return 1;
case 11://交流600V
return 0.6;
case 12://交流750V
return 0.75;
case 13://交流1500V
return 1.5;
case 14://交流2000V
return 2.0;
case 15://交流2500V
return 2.5;
case 20://交流3kV
return 3;
case 21://交流6kV
return 6;
case 22://交流10kV
return 10;
case 23://交流15.75kV
return 15.75;
case 24://交流20kV
return 20;
case 25://交流35kV
return 35;
case 30://交流66kV
return 66;
case 31://交流72.5kV
return 72.5;
case 32://交流110kV
return 110;
case 33://交流220kV
return 220;
case 34://交流330kV
return 330;
case 35://交流500kV
return 500;
case 36://交流750kV
return 750;
case 37://交流1000kV
return 1000;
case 51://直流6V
return 0.006;
case 52://直流12V
return 0.012;
case 53://直流24V
return 0.024;
case 54://直流36V
return 0.036;
case 55://直流48V
return 0.048;
case 56://直流110V
return 0.11;
case 60://直流220V
return 0.22;
case 70://直流600V
return 0.6;
case 71://直流750V
return 0.75;
case 72://直流1500V
return 1.5;
case 73://直流3000V
return 3.0;
case 76://直流35kV
return 35;
case 77://直流30kV
return 30;
case 78://直流50kV
return 50;
case 80://直流120kV
return 120;
case 81://直流125kV
return 125;
case 82://直流400kV
return 400;
case 83://直流500kV
return 500;
case 84://直流660kV
return 660;
case 85://直流800kV
return 800;
case 86://直流1000kV
return 1000;
case 87://直流200kV
return 200;
case 88://直流320kV。
return 320;
default:
return 0;
break;
}
}
catch (const std::exception&)
{
//error
return 0;
}
}
void try_start_kafka_thread()
{
static int kafka_thread_created = 0;
if (!kafka_thread_created) {
myThrd.start();
kafka_thread_created = 1;
}
}
//lnk20241213
void try_start_mqconsumer_thread()
{
static int mqconsumer_thread_created = 0;
if (!mqconsumer_thread_created) {
mqconsumerThrd.start();
mqconsumer_thread_created = 1;
}
}
/////////////////////////////////////////////////////////////////////////
json_block_data json_blkd;
//void init_json_block_data()
//{
// json_blkd.monitorId = -1;
// json_blkd.func_type = g_node_id;
// //flag 是品质, 异常送1 正常送0
// json_blkd.flag = 0; // //剔除标记1不剔除0剔除默认剔除
// json_blkd.mms_str_map.clear();
//}
//CZY 2023-08-17 WW 2022年12月6日14:09:08 增加多个ICD支持
//json_block_data json_blkd; //json拼接参数类对象,原有的一个数据对象在多ICD下会出现数据错位问题
//解决方案是将此数据放入LDInfo结构中存储保证一条线路一个json拼接参数类对象
void init_json_block_data(char mp_id[], char voltage_level[], int flicker_flag)//WW 2023年3月13日16:38:41 多ICD修改
{
// 将 char[] 转换为 std::string
//QString keyString(mp_id);
json_block_data* pdata;
if (flicker_flag == 1) {
if (!json_flicker_data_map.contains(mp_id))
{
pdata = new json_block_data();
json_flicker_data_map.insert(mp_id, pdata);
}
pdata = json_flicker_data_map.value(mp_id);
}
else if (flicker_flag == 0) {
if (!json_data_map.contains(mp_id))
{
pdata = new json_block_data();
json_data_map.insert(mp_id, pdata);
}
pdata = json_data_map.value(mp_id);
}
else if (flicker_flag == 2) {
if (!json_pst_data_map.contains(mp_id))
{
pdata = new json_block_data();
json_pst_data_map.insert(mp_id, pdata);
}
pdata = json_pst_data_map.value(mp_id);
}
if (pdata == NULL)
return;
pdata->monitorId = -1;
QString tmp;
tmp.append(mp_id);
pdata->mp_id = tmp;
pdata->func_type = g_node_id;
//flag 是品质, 异常送1 正常送0
pdata->flag = 0; // //剔除标记1不剔除0剔除默认剔除
pdata->mms_str_map.clear();
pdata->voltage_level = get_voltage_level(voltage_level); //CZY 2023-08-23 add voltage_level
}
//0. json生成开始函数
//int json_block_create_start(int MonitorId )
//{
// try_start_kafka_thread();
//
// init_json_block_data();
// json_blkd.monitorId = MonitorId;
// printf("\n\n---------- json_block_create_start: MonitorId=%d \n",MonitorId);
// return TRUE;
//}
int json_block_create_start(char voltage_level[], char monid_char[], int flicker_flag, char temcode[], int line_id)//WW 2023年3月13日16:38 : 41 多ICD修改
{
try_start_kafka_thread();
//WW 2023-08-22 增加数据库线程
//try_start_sql_thread();//lnk2024118不需要sql线程
//WW end
init_json_block_data(monid_char, voltage_level, flicker_flag);
json_block_data* pdata;
if (flicker_flag == 1) {
if (!json_flicker_data_map.contains(monid_char))//未查到数据
return 0;
pdata = json_flicker_data_map.value(monid_char);
}
else if (flicker_flag == 0)
{
if (!json_data_map.contains(monid_char))//未查到数据
return 0;
pdata = json_data_map.value(monid_char);
}
else if (flicker_flag == 2)
{
if (!json_pst_data_map.contains(monid_char))//未查到数据
return 0;
pdata = json_pst_data_map.value(monid_char);
}
if (pdata != NULL)
{
pdata->dev_type.append(temcode);
pdata->monitorId = line_id;
if (strlen(monid_char) != 0) {
QString tmp;
tmp.append(monid_char);
pdata->mp_id = tmp;
}
else {
monid_char = "not define";
QString tmp;
tmp.append(monid_char);
pdata->mp_id = tmp;
}
}
printf("\n\n---------- json_block_create_start: mp_id=%s,voltage_level=%s,line_id=%d \n", monid_char, voltage_level, line_id);
return TRUE;
}
//1. json生成开始函数
//int json_block_create_time(int MonitorId , long long Time)
//{
// json_blkd.time = Time;
// printf("\njson_block_create_time: MonitorId=%d,Time=%lld \n",MonitorId,Time);
// return TRUE;
//}
int json_block_create_time(char monid_char[], long long Time, int flicker_flag)//WW 2023年3月13日16:38:41 多ICD修改
{
json_block_data* pdata;
if (flicker_flag == 1) {
if (!json_flicker_data_map.contains(monid_char))//未查到数据
return 0;
pdata = json_flicker_data_map.value(monid_char);
}
else if (flicker_flag == 0)
{
if (!json_data_map.contains(monid_char))//未查到数据
return 0;
pdata = json_data_map.value(monid_char);
}
else if (flicker_flag == 2)
{
if (!json_pst_data_map.contains(monid_char))//未查到数据
return 0;
pdata = json_pst_data_map.value(monid_char);
}
if (pdata != NULL)
pdata->time = Time;
printf("\njson_block_create_time: mp_id=%s,Time=%lld \n", monid_char, Time);
return TRUE;
}
//int json_block_create_flag(int MonitorId , int flag)
//{
// json_blkd.flag = flag;
// printf("\njson_block_create_flag: MonitorId=%d,flag=%d \n",MonitorId,flag);
// return TRUE;
//}
int json_block_create_flag(char monid_char[], int flag, int flicker_flag)//WW 2023年3月13日16:38:41 多ICD修改
{
json_block_data* pdata;
if (flicker_flag == 1) {
if (!json_flicker_data_map.contains(monid_char))//未查到数据
return 0;
pdata = json_flicker_data_map.value(monid_char);
}
else if (flicker_flag == 0)
{
if (!json_data_map.contains(monid_char))//未查到数据
return 0;
pdata = json_data_map.value(monid_char);
}
else if (flicker_flag == 2)
{
if (!json_pst_data_map.contains(monid_char))//未查到数据
return 0;
pdata = json_pst_data_map.value(monid_char);
}
if (pdata != NULL)
pdata->flag = flag;
printf("\njson_block_create_flag: mp_id=%s,flag=%d \n", monid_char, flag);
return TRUE;
}
//2. json生成数据回调函数
//int json_block_create_data(int MonitorId , char* mms_str , double v )
//{
// static int count = 0;
// //WW2023-08-16 去掉log注释
// //printf("#");
// //if ( ((count++ %1000)==0) || (count <2000) )
// // printf("\n%d:json_block_create_data: MonitorId=%d,mms_str=%s,v=%f \n",count,MonitorId,mms_str,v);
//
// json_blkd.mms_str_map.insert(QString::fromAscii(mms_str), v);
// return TRUE;
//}
int json_block_create_data(char monid_char[], char* mms_str, double v, int flicker_flag)//WW 2023年3月13日16:38:41 多ICD修改
{
json_block_data* pdata;
if (flicker_flag == 1) {
if (!json_flicker_data_map.contains(monid_char))//未查到数据
return 0;
pdata = json_flicker_data_map.value(monid_char);
}
else if (flicker_flag == 0)
{
if (!json_data_map.contains(monid_char))//未查到数据
return 0;
pdata = json_data_map.value(monid_char);
}
else if (flicker_flag == 2)
{
if (!json_pst_data_map.contains(monid_char))//未查到数据
return 0;
pdata = json_pst_data_map.value(monid_char);
}
static int count = 0;
if (pdata != NULL)
{
pdata->mms_str_map.insert(QString::fromAscii(mms_str), v);
if (strstr(mms_str, "MMXU2$MX$PhV"))
printf("---------- json_block_create_data: mp_id= %s ,mms_str=%s value=%fkV----------\n", monid_char, mms_str, v);
}
return TRUE;
}
//3. json生成结束函数
//int json_block_create_end(int MonitorId )
//{
// printf("\n---------- json_block_create_end: MonitorId=%d \n\n\n",MonitorId);
//
// return transfer_json_block_data(&json_blkd);
//}
//lnk2024-8-16添加接线参数
int json_block_create_end(char v_wiring_type[], char monid_char[], int flicker_flag)//WW 2023年3月13日16:38:41 多ICD修改
{
json_block_data* pdata;
if (flicker_flag == 1) {
if (!json_flicker_data_map.contains(monid_char))//未查到数据
{
printf("---------- json_block_create_end: mp_id= %s json_flicker_data_map can't find MonitorId----------\n", monid_char);
return 1;
}
pdata = json_flicker_data_map.value(monid_char);
}
else if (flicker_flag == 0)
{
if (!json_data_map.contains(monid_char))//未查到数据
{
printf("---------- json_block_create_end: mp_id= %s json_data_map can't find MonitorId----------\n", monid_char);
return 1;
}
pdata = json_data_map.value(monid_char);
}
else if (flicker_flag == 2)
{
if (!json_pst_data_map.contains(monid_char))//未查到数据
{
printf("---------- json_block_create_end: mp_id= %s json_pst_data_map can't find MonitorId----------\n", monid_char);
return 1;
}
pdata = json_pst_data_map.value(monid_char);
}
//int ret = transfer_json_block_data(pdata, DevKind);//CZY 2023-08-17 需要测试
if (pdata->mms_str_map.count() == 0) {
if (flicker_flag == 1) {
json_flicker_data_map.remove(monid_char);
}
else if (flicker_flag == 0)
{
json_data_map.remove(monid_char);
}
else if (flicker_flag == 2)
{
json_pst_data_map.remove(monid_char);
}
printf("---------- json_block_create_end: pdata->mms_str_map.count() == 0 ----------\n");
return 1;
}
//lnk2024-8-16添加接线参数
int ret = transfer_json_block_data(v_wiring_type, pdata);
if (pdata != NULL)
delete pdata;
if (flicker_flag == 1) {
json_flicker_data_map.remove(monid_char);
}
else if (flicker_flag == 0)
{
json_data_map.remove(monid_char);
}
else if (flicker_flag == 2)
{
json_pst_data_map.remove(monid_char);
}
printf("---------- json_block_create_end: MonitorId= %s ----------\n", monid_char);
return ret;
}
//#define STATUS_NORMAL 0 /**< 正常 */
//拼接Kafka Producer发送暂态事件消息 例:
//{"DATA_TYPE":"03", "TIME":"1542960911734", "1268918860":["CommResume"]}
void prcess_monitor_comm_2_json(int monitor_id, int status, long long tm)
{
Ckafka_data_t data;
QString status_str = (status == 0) ? "CommResume" : "CommInterrupt";
try_start_kafka_thread();
data.monitor_id = monitor_id;
data.strTopic = "RTDATASOE";
data.strText = QString("{\"DATA_TYPE\":\"03\", \"TIME\":\"%1\", \"%2\":[\"%3\"]}")
.arg(tm).arg(monitor_id).arg(status_str);
//发生时刻,毫秒 //装置序号 例1268918860
QString str = data.strTopic + " " + data.strText;
printf("prcess_monitor_comm_2_json: %s \n", str.toStdString().c_str());
kafka_data_list_mutex.lock();
kafka_data_list.append(data);
kafka_data_list_mutex.unlock();
}
//////////////////////////////////////////////////////////////////////////////
//int transfer_json_block_data(json_block_data *data)
//{
// Ckafka_data_t kafka_data;
// kafka_data.patition_id = 0;
// kafka_data.strText = QString("Time=%1").arg(data->time);
//
// kafka_data_list_mutex.lock();
// kafka_data_list.append(kafka_data);
// kafka_data_list_mutex.unlock();
// return TRUE;
//}
void clear_old_comtrade_files()
{
if (g_node_id != SOE_COMTRADE_BASE_NODE_ID)
return;
QString full_fn_str;
QString dir_name("../comtrade/");
QDir directory_comtrade(dir_name);
QStringList fileNames = directory_comtrade.entryList(QDir::Files | QDir::NoDotAndDotDot, QDir::Time);
if (fileNames.size() <= comtrade_remain_file_num)
return;
for (QStringList::size_type i = comtrade_remain_file_num; i != fileNames.size(); ++i) {
full_fn_str = dir_name + fileNames.at(i);
QFile::remove(full_fn_str);
}
}
/////////////////////////////////////////////
//using namespace std;
int process_login_verify()
{
int length = 64;
char password[64 + 1];
char* p = NULL;
int count = 0;
char encode_password[256];
//password = "njcnpqs@2018"
const char* passwordConfirm = "1c0e4e104de596846648ba495bd32601";
memset(password, 0, sizeof(password));
printf("Please input password : \n");
p = password;
count = 0;
system("stty -echo");
std::cin.getline(password, 64);
system("stty echo");
//while (((*p = getch()) != 13) && count < length) {
// //putch('*');
// //fflush(stdin);
// p++;
// count++;
//}
password[length] = '\0';
//printf("input typed password : %s \n",password);
MyGetSM4Code(password, (unsigned char*)"epri.sgcc.com.cn", encode_password);
//printf("encode_password : %s ,should be %s \n",encode_password,passwordConfirm);
return (strcmp(encode_password, passwordConfirm));
}
////////////////////////////////////////////
///////////////////////////////////////////
//WW 2023-08-22 增加数据库线程和WebSokcet线程
void SQLExcuteThread::run()
{
//if (THREE_SECS_DATA_BASE_NODE_ID == g_node_id)//3秒数据传输不需要写库
//return;
if (1 != g_iOTLFlag) {
Sql_data_list.clear();
return;
}
static uint32_t connect_state = 0;
static uint32_t sql_count = 0;//2024-04-01
const char* pSql = nullptr;
printf("SqlExcuteThread::run() is called ...... \n\n");
while (1)
{
msleep(1);
if (!Sql_data_list.isEmpty())
{
if (0 == sql_count++ % 300)
{
//db.connected
int rtState = OTLDbconnected();
//int rtState = db.connected;
if (rtState == 0 || connect_state != 0) {
OTLDisconnect();
int ret = OTLConnect();
if (ret != 0 && ret != 32031) {
bool bExit = false;
for (int i = 0; i < 3; i++) {
OTLDisconnect();
ret = OTLConnect();
if (ret != 0 && ret != 32031) {
if (2 == i)
bExit = true;
else
printf(">>>Postgresql reconnect %d times,errorcode= %d \n", i + 1, ret);
}
}
if (bExit) {
printf(">>>Postgresql reconnect 3 times,errorcode= %d,end thread!\n", ret);
sleep(30);
continue;
//return;
}
}
}
}
// printf("(写库)Sql执行语句链表Sql_data_list允许最大元素个数= %d实有元素个数= %d \n", g_iSqlListSize, Sql_data_list.size());
Sql_data_list_mutex.lock();
std::string strSql = Sql_data_list.takeFirst().toStdString();
printf("get one sql \n");
if (strSql.length() < 11)
{
// printf("(写库)Sql执行语句链表Sql_data_list剩余元素个数= %d当前执行Sql= %scontinue下一语句\n", Sql_data_list.count(), strSql.c_str());
continue;
}
pSql = strSql.c_str();
Sql_data_list_mutex.unlock();
//printf("BEGIN my_sql_excute no.%i -------->>>>>>>> %s \n", count, QDateTime::currentDateTime().toString("yyyy-MM-dd hh:mm:ss.zzz").toAscii().data());
/*if (2 == Log_Enable)
printf("(写库)Sql执行语句链表Sql_data_list剩余元素个数= %i当前执行Sql.%i= %s \n", Sql_data_list.count(), count, pSql);*/
printf("write one sql %s \n", pSql);
int rt = write_to_db(pSql);
connect_state = rt;
printf("connect state %d \n", connect_state);
//if (0 == rt)
//{
// if (1 == Log_Enable)
// printf("(写库)Sql执行成功.%i \n", count);
// else
// printf("(写库)Sql执行成功.%iSql= %s \n", count, pSql);
//}
//printf("END my_sql_excute no.%i -------->>>>>>>> %s \n\n", count++, QDateTime::currentDateTime().toString("yyyy-MM-dd hh:mm:ss.zzz").toAscii().data());
}
}
printf(">>>SqlExcuteThread::run() is end!!!\n");
}
void try_start_sql_thread()
{
static int sql_thread_created = 0;
if (!sql_thread_created) {
//if (2 == Log_Enable)
// printf(">>>即将启动Sql执行线程\n");
sqlThrd.start();
sql_thread_created = 1;
}
}
void try_start_socket_thread()
{
static int socket_thread_created = 0;
if (!socket_thread_created) {
//if (2 == Log_Enable)
// printf(">>>即将启动Web Socket线程\n");
socketThrd.start();
socket_thread_created = 1;
}
}
//lnk20241029
void try_start_web_http_thread()
{
static int webhttp_thread_created = 0;
if (!webhttp_thread_created) {
webhttpThrd.start();
webhttp_thread_created = 1;
}
}
void try_start_http_thread()
{
static int http_thread_created = 0;
if (!http_thread_created) {
httpThrd.start();
http_thread_created = 1;
}
}
//lnk20241202
int try_start_mqtest_thread(int argc, char *argv[])
{
//不使用简单的循环线程而是启动一个app不仅执行循环线程而且可以连接输入
/*static int mqtest_thread_created = 0;
if (!mqtest_thread_created) {
mqtestThrd.start();
mqtest_thread_created = 1;
}*/
QCoreApplication a(argc, argv);
// 创建 QThread 和 Worker 对象
QThread *thread = new QThread();
Worker *worker = new Worker();
// 将 Worker 对象移动到 QThread 中
worker->moveToThread(thread);
// 连接信号和槽
QObject::connect(thread, SIGNAL(started()), worker, SLOT(startServer()));
QObject::connect(worker, SIGNAL(serverError()), thread, SLOT(quit()));
QObject::connect(worker, SIGNAL(serverError()), worker, SLOT(deleteLater()));
QObject::connect(thread, SIGNAL(finished()), thread, SLOT(deleteLater()));
// 启动线程
thread->start();
//std::cout << "start_mqtest"<<std::endl;
// 确保在应用退出时,线程也能正确退出
QObject::connect(&a, SIGNAL(aboutToQuit()), thread, SLOT(quit()));
return a.exec();
}
void try_start_ontimer_thread()
{
static int ontimer_thread_created = 0;
if (!ontimer_thread_created) {
//if (2 == Log_Enable)
// printf(">>>即将启动Web Socket线程\n");
onTimerThrd.start();
ontimer_thread_created = 1;
}
}
//WW 2023-08-22 end
///////////////////////////////////////////
//ZW 2024-01-31 补招数据模式优化
static QMap<QString, int> mvl_type_ctrl_map;//ZW 2024-01-31 用于保存单次获取的模型
static int mvl_type_ctrl_map_size;//计数
//static std::map<int, int> myMap;
//添加doname对应的数据模型
void add_mvl_type_ctrl(char doname[], int ctrl)
{
//printf("\nadd_mvl_type_ctrl: %s\n", doname);
//printf("\nadd_mvl_type_ctrl: %p////%p\n", &ctrl,&copy);
if (!mvl_type_ctrl_map.contains(doname))
{
//MVL_TYPE_CTRL* copy = ctrl;
mvl_type_ctrl_map.insert(doname, ctrl);
}
//printf("\nadd_mvl_type_ctrl: %p\n", &doname);
}
//删除map中所有数据模型
void del_mvl_type_ctrl()
{
for (QMap<QString, int>::iterator it = mvl_type_ctrl_map.begin(); it != mvl_type_ctrl_map.end(); ++it) {
QString key = it.key();
int value = it.value();
mvl_type_id_destroy(value);
}
mvl_type_ctrl_map.clear();
}
int get_mvl_type_ctrl_map_size()
{
return mvl_type_ctrl_map.count();
}
//查找对应doname的数据模型是否存在map中
int sel_mvl_type_ctrl_flag(char doname[])
{
if (mvl_type_ctrl_map.contains(doname))
{
return mvl_type_ctrl_map.value(doname);
}
else
{
return -1;
}
}
//ZW 2024-01-31 end