11 Commits

21 changed files with 6019 additions and 531 deletions

View File

@@ -29,10 +29,14 @@
#include "../rocketmq/DefaultMQPushConsumer.h" #include "../rocketmq/DefaultMQPushConsumer.h"
#include "../rocketmq/ConsumeType.h" #include "../rocketmq/ConsumeType.h"
#include "../rocketmq/MQMessageListener.h"
#include "../rocketmq/MQMessageExt.h"
#include "../rocketmq/SessionCredentials.h"
// 引入提供的消费者接口头文件 // 引入提供的消费者接口头文件
#include "../rocketmq/CPushConsumer.h" //#include "../rocketmq/CPushConsumer.h"
#include "../rocketmq/CCommon.h" //#include "../rocketmq/CCommon.h"
#include "../rocketmq/CMessageExt.h" //#include "../rocketmq/CMessageExt.h"
#include <map> #include <map>
#include <pthread.h> // 用于互斥锁(在 C++98 中没有 std::mutex #include <pthread.h> // 用于互斥锁(在 C++98 中没有 std::mutex
#include <utility> // for std::pair #include <utility> // for std::pair
@@ -41,11 +45,15 @@
using namespace std; using namespace std;
static std::once_flag g_consumer_once;
static std::once_flag g_producer_once;
extern std::string G_ROCKETMQ_PRODUCER;//rocketmq producer extern std::string G_ROCKETMQ_PRODUCER;//rocketmq producer
extern std::string G_ROCKETMQ_IPPORT;//rocketmq ip+port extern std::string G_ROCKETMQ_IPPORT;//rocketmq ip+port
extern std::string G_ROCKETMQ_TOPIC;//topie
extern std::string G_ROCKETMQ_TAG;//tag extern std::string G_ROCKETMQ_TOPIC_TEST;//topie
extern std::string G_ROCKETMQ_KEY;//key extern std::string G_ROCKETMQ_TAG_TEST;//tag
extern std::string G_ROCKETMQ_KEY_TEST;//key
extern std::string G_MQCONSUMER_TOPIC_LOG; extern std::string G_MQCONSUMER_TOPIC_LOG;
extern std::string G_MQCONSUMER_TOPIC_SET; extern std::string G_MQCONSUMER_TOPIC_SET;
@@ -53,6 +61,8 @@ extern std::string G_MQCONSUMER_TOPIC_RC;
extern std::string G_MQCONSUMER_TOPIC_UD; extern std::string G_MQCONSUMER_TOPIC_UD;
extern std::string G_MQCONSUMER_TOPIC_RT; extern std::string G_MQCONSUMER_TOPIC_RT;
extern std::string G_MQCONSUMER_TOPIC_TEST;
extern std::string FRONT_INST; extern std::string FRONT_INST;
extern bool DEBUGOPEN; extern bool DEBUGOPEN;
@@ -79,20 +89,40 @@ extern std::string G_MQCONSUMER_CHANNEL;
class RocketMQConsumer; class RocketMQConsumer;
// 全局映射CPushConsumer* -> RocketMQConsumer* // 全局映射CPushConsumer* -> RocketMQConsumer*
std::map<CPushConsumer*, RocketMQConsumer*> g_consumerMap;// //std::map<CPushConsumer*, RocketMQConsumer*> g_consumerMap;//
pthread_mutex_t g_consumerMapMutex = PTHREAD_MUTEX_INITIALIZER; //pthread_mutex_t g_consumerMapMutex = PTHREAD_MUTEX_INITIALIZER;
////////////////////////////////
int64_t G_APP_START_MS = []() -> int64_t {
using namespace std::chrono;
return duration_cast<milliseconds>(
system_clock::now().time_since_epoch()
).count();
}();
int64_t G_START_SKEW_MS = 1000;
bool should_process_after_start(const rocketmq::MQMessageExt& msg)
{
const int64_t born_ts = static_cast<int64_t>(msg.getBornTimestamp());
return born_ts >= (G_APP_START_MS - G_START_SKEW_MS);
}
///////////////////////////////
class InternalListener;
class RocketMQConsumer { class RocketMQConsumer {
public: public:
// 构造函数:初始化消费者并启动 // 构造函数:初始化消费者并启动
RocketMQConsumer(const std::string& consumerName, const std::string& nameServer, const std::string& groupId); RocketMQConsumer(const std::string& consumerName, const std::string& nameServer);
// 禁用拷贝和赋值 // 禁用拷贝和赋值
RocketMQConsumer(const RocketMQConsumer&) {} RocketMQConsumer(const RocketMQConsumer&) = delete;
RocketMQConsumer& operator=(const RocketMQConsumer&) { return *this; } RocketMQConsumer& operator=(const RocketMQConsumer&) = delete;
// 订阅主题和标签,并注册回调 // 订阅主题和标签,并注册回调
void subscribe(const std::string& topic, const std::string& tag, MessageCallBack callback); void subscribe(const std::string& topic,
const std::string& tag,
MessageCallBack callback);
// 启动消费者 // 启动消费者
void start(); void start();
@@ -100,23 +130,47 @@ public:
//修改消费模式 //修改消费模式
void setConsumerMessageModel(const std::string& topic); void setConsumerMessageModel(const std::string& topic);
rocketmq::ConsumeStatus handleMessage(const rocketmq::MQMessageExt& msg);
// 析构函数:关闭并销毁消费者 // 析构函数:关闭并销毁消费者
~RocketMQConsumer(); ~RocketMQConsumer();
private: private:
CPushConsumer* consumer_; // C 接口消费者指针 //CPushConsumer* consumer_; // C 接口消费者指针
//MessageCallBack messageCallback_; // 函数指针用于回调 //MessageCallBack messageCallback_; // 函数指针用于回调
rocketmq::DefaultMQPushConsumer consumer_;
InternalListener* listener_;
std::map<std::pair<std::string, std::string>, MessageCallBack> callbacks_; // 订阅到回调的映射 std::map<std::pair<std::string, std::string>, MessageCallBack> callbacks_; // 订阅到回调的映射
// 静态消息处理回调 /*// 静态消息处理回调
static int messageHandler(CPushConsumer* consumer, CMessageExt* msg); static int messageHandler(CPushConsumer* consumer, CMessageExt* msg);
// 实例消息处理函数 // 实例消息处理函数
int handleMessage(CMessageExt* msg); int handleMessage(CMessageExt* msg);*/
};
// 构造函数实现 };
RocketMQConsumer::RocketMQConsumer(const std::string& consumerName, const std::string& nameServer, const std::string& groupId) class InternalListener : public rocketmq::MessageListenerConcurrently {
public:
explicit InternalListener(RocketMQConsumer* owner)
: owner_(owner) {}
rocketmq::ConsumeStatus consumeMessage(
const std::vector<rocketmq::MQMessageExt>& msgs) override
{
for (size_t i = 0; i < msgs.size(); ++i) {
rocketmq::ConsumeStatus ret = owner_->handleMessage(msgs[i]);
if (ret != rocketmq::CONSUME_SUCCESS) {
return ret;
}
}
return rocketmq::CONSUME_SUCCESS;
}
private:
RocketMQConsumer* owner_;
};
// 构造函数实现C
/*RocketMQConsumer::RocketMQConsumer(const std::string& consumerName, const std::string& nameServer, const std::string& groupId)
: consumer_(NULL)//, messageCallback_(NULL) : consumer_(NULL)//, messageCallback_(NULL)
{ {
// 创建消费者 // 创建消费者
@@ -163,35 +217,75 @@ RocketMQConsumer::RocketMQConsumer(const std::string& consumerName, const std::s
pthread_mutex_unlock(&g_consumerMapMutex); pthread_mutex_unlock(&g_consumerMapMutex);
std::cout << "RocketMQ Consumer initialized and started." << std::endl; std::cout << "RocketMQ Consumer initialized and started." << std::endl;
}*/
//构造函数实现C++
RocketMQConsumer::RocketMQConsumer(const std::string& consumerGroup,
const std::string& nameServer)
: consumer_(consumerGroup),
listener_(NULL)
{
consumer_.setNamesrvAddr(nameServer);
consumer_.setSessionCredentials(
G_MQCONSUMER_ACCESSKEY,
G_MQCONSUMER_SECRETKEY,
G_MQCONSUMER_CHANNEL
);
// 限制消费线程池,防止 ConsumeTP 爆炸
consumer_.setConsumeThreadCount(4);
listener_ = new InternalListener(this);
} }
// 启动消费者 // 启动消费者
void RocketMQConsumer::start() void RocketMQConsumer::start()
{ {
if (StartPushConsumer(consumer_) != 0) { static bool started = false;
pthread_mutex_lock(&g_consumerMapMutex); if (started) {
g_consumerMap.erase(consumer_); std::cout << "Consumer already started" << std::endl;
pthread_mutex_unlock(&g_consumerMapMutex); return;
DestroyPushConsumer(consumer_);
throw std::runtime_error("Failed to start push consumer.");
}
else{
std::cout << "RocketMQ Consumer started." << std::endl;
} }
consumer_.registerMessageListener(listener_);
consumer_.start();
started = true;
} }
void RocketMQConsumer::subscribe(const std::string& topic, const std::string& tag, MessageCallBack callback) void RocketMQConsumer::subscribe(const std::string& topic, const std::string& tag, MessageCallBack callback)
{ {
if (Subscribe(consumer_, topic.c_str(), tag.c_str()) != 0) { /*if (Subscribe(consumer_, topic.c_str(), tag.c_str()) != 0) {
throw std::runtime_error("Failed to subscribe to topic/tag."); throw std::runtime_error("Failed to subscribe to topic/tag.");
} }*/
consumer_.subscribe(topic, tag);
//调试用
std::cout << "Subscribed to topic: " << topic << ", tag: " << tag << std::endl; std::cout << "Subscribed to topic: " << topic << ", tag: " << tag << std::endl;
// 使用 std::pair 作为键 // 使用 std::pair 作为键
std::pair<std::string, std::string> key(topic, tag); std::pair<std::string, std::string> mapKey(topic, tag);
callbacks_[key] = callback; callbacks_[mapKey] = callback;
}
//冀北添加,其他地方也通用
// 如果带 MQ_INST_xxx% 前缀再注册一个短topic
size_t pos = topic.find('%');
if (pos != std::string::npos)
{
std::string shortTopic = topic.substr(pos + 1);
callbacks_[std::make_pair(shortTopic, tag)] = callback;
std::cout << "[CALLBACK_ALIAS] "
<< topic
<< " -> "
<< shortTopic
<< ", tag=" << tag
<< std::endl;
}
}
/*
// 静态消息处理回调实现 // 静态消息处理回调实现
int RocketMQConsumer::messageHandler(CPushConsumer* consumer, CMessageExt* msg) int RocketMQConsumer::messageHandler(CPushConsumer* consumer, CMessageExt* msg)
{ {
@@ -212,68 +306,66 @@ int RocketMQConsumer::messageHandler(CPushConsumer* consumer, CMessageExt* msg)
return instance->handleMessage(msg); return instance->handleMessage(msg);
} else { } else {
std::cerr << "Consumer instance not found for callback." << std::endl; std::cerr << "Consumer instance not found for callback." << std::endl;
return E_RECONSUME_LATER; // 默认返回重试状态 //return E_RECONSUME_LATER; // 默认返回重试状态
return rocketmq::RECONSUME_LATER;
} }
} }
*/
int RocketMQConsumer::handleMessage(CMessageExt* msg) /*int RocketMQConsumer::handleMessage(CMessageExt* msg)
{ {
// 检查 msg 和 consumer_ 是否为 NULL // 检查 msg 和 consumer_ 是否为 NULL
if (!msg || !consumer_) { if (!msg || !consumer_) {
std::cerr << "Received null message or consumer." << std::endl; std::cerr << "Received null message or consumer." << std::endl;
return E_RECONSUME_LATER; //return E_RECONSUME_LATER;
} return rocketmq::RECONSUME_LATER;
}*/
// 获取消息的主题和标签 // 获取消息的主题和标签
std::string topic = GetMessageTopic(msg); // 假设存在此函数 //std::string topic = GetMessageTopic(msg); // 假设存在此函数
std::string tag = GetMessageTags(msg); // 假设存在此函数 //std::string tag = GetMessageTags(msg); // 假设存在此函数
// 打印调试信息 rocketmq::ConsumeStatus RocketMQConsumer::handleMessage( const rocketmq::MQMessageExt& msg) {
std::cout << "Handling message for topic: " << topic << ", tag: " << tag << std::endl; std::string tag = msg.getTags();
std::string topic = msg.getTopic();
// 使用 std::pair 作为键 // 打印调试信息
std::pair<std::string, std::string> key(topic, tag); std::cout << "Handling message for topic: " << topic << ", tag: " << tag << std::endl;
// 使用 std::pair 作为键
// 查找对应的回调函数 std::pair<std::string, std::string> key(topic, tag);
std::map<std::pair<std::string, std::string>, MessageCallBack>::iterator it = callbacks_.find(key); // 查找对应的回调函数
if (it != callbacks_.end()) { std::map<std::pair<std::string, std::string>, MessageCallBack>::iterator it = callbacks_.find(key);
// 调用对应的回调函数 if (it != callbacks_.end())
{ // 调用对应的回调函数
//调试 //调试
std::cout << "callback Handling message " <<std::endl; std::cout << "callback Handling message " <<std::endl;
//return it->second(consumer_, msg);
return it->second(consumer_, msg); return it->second(msg);
}
} else { else {
//调试
//调试 std::cout << "there is no callback " <<std::endl;
std::cout << "there is no callback " <<std::endl; // 如果没有找到对应的回调,执行默认处理
//const char* body = GetMessageBody(msg);
// 如果没有找到对应的回调,执行默认处理 //const char* msgKey = GetMessageKeys(msg);
const char* body = GetMessageBody(msg); std::string body = msg.getBody();
const char* msgKey = GetMessageKeys(msg); std::string msgKey = msg.getKeys();
if (!body.empty()) {
if (body) { std::cout << "Received message body: " << body << std::endl;
std::cout << "Received message body: " << body << std::endl; } else {
} else { std::cout << "Received message with empty body." << std::endl;
std::cout << "Received message with empty body." << std::endl; } if (!msgKey.empty()) {
} std::cout << "Message Key: " << msgKey << std::endl;
} else {
if (msgKey) { std::cout << "Message Key: N/A" << std::endl;
std::cout << "Message Key: " << msgKey << std::endl; } //return E_CONSUME_SUCCESS;
} else { return rocketmq::CONSUME_SUCCESS;
std::cout << "Message Key: N/A" << std::endl; }
}
return E_CONSUME_SUCCESS;
}
} }
// 析构函数实现 // 析构函数实现
RocketMQConsumer::~RocketMQConsumer() RocketMQConsumer::~RocketMQConsumer()
{ {
if (consumer_) { /*if (consumer_) {
// 关闭消费者 // 关闭消费者
ShutdownPushConsumer(consumer_); ShutdownPushConsumer(consumer_);
@@ -287,7 +379,18 @@ RocketMQConsumer::~RocketMQConsumer()
consumer_ = NULL; consumer_ = NULL;
std::cout << "RocketMQ Consumer shutdown and destroyed." << std::endl; std::cout << "RocketMQ Consumer shutdown and destroyed." << std::endl;
}*/
try {
consumer_.shutdown();
} catch (...) {
} }
sleep(1); // 等内部线程退出
delete listener_;
listener_ = NULL;
std::cout << "RocketMQ Consumer shutdown and destroyed." << std::endl;
} }
// 在 RocketMQConsumer 类中新增函数用来设置消费模式 // 在 RocketMQConsumer 类中新增函数用来设置消费模式
@@ -302,11 +405,13 @@ void RocketMQConsumer::setConsumerMessageModel(const std::string& topic)
} }
} else*/ { } else*/ {
// 默认设置为广播消费模式 // 默认设置为广播消费模式
if (SetPushConsumerMessageModel(consumer_, BROADCASTING) != 0) { /*if (SetPushConsumerMessageModel(consumer_, BROADCASTING) != 0) {
std::cout << "Error setting message model to BROADCASTING for topic: " << topic << std::endl; std::cout << "Error setting message model to BROADCASTING for topic: " << topic << std::endl;
} else { } else {
std::cout << "Set consumer to BROADCASTING for topic: " << topic << std::endl; std::cout << "Set consumer to BROADCASTING for topic: " << topic << std::endl;
} }*/
consumer_.setMessageModel(rocketmq::BROADCASTING);
std::cout << "Set consumer to BROADCASTING for topic: " << topic << std::endl;
} }
} }
@@ -322,11 +427,12 @@ void InitializeConsumer(
if (g_consumer == NULL) { if (g_consumer == NULL) {
std::cout << "create new consumer!" << std::endl; std::cout << "create new consumer!" << std::endl;
try { try {
g_consumer = new RocketMQConsumer(consumerName, nameServer,consumerName);//用消费名作为消费组不同进程不同的消费者同时消费topic的同一条消息 //g_consumer = new RocketMQConsumer(consumerName, nameServer,consumerName);//用消费名作为消费组不同进程不同的消费者同时消费topic的同一条消息
g_consumer = new RocketMQConsumer(consumerName, nameServer);
for (size_t i = 0; i < subscriptions.size(); ++i) { for (size_t i = 0; i < subscriptions.size(); ++i) {
g_consumer->setConsumerMessageModel(subscriptions[i].topic);//初始化时根据topic设置消费模式 g_consumer->setConsumerMessageModel(subscriptions[i].topic);//初始化时根据topic设置消费模式
g_consumer->subscribe(subscriptions[i].topic, subscriptions[i].tag, subscriptions[i].callback); g_consumer->subscribe(subscriptions[i].topic, subscriptions[i].tag,subscriptions[i].callback);
} }
g_consumer->start(); g_consumer->start();
@@ -354,16 +460,13 @@ void rocketmq_consumer_receive(
const std::string& nameServer, const std::string& nameServer,
const std::vector<Subscription>& subscriptions) // 接收多个订阅 const std::vector<Subscription>& subscriptions) // 接收多个订阅
{ {
if (g_consumer == NULL) { std::call_once(g_consumer_once, [&](){
try { try {
//InitializeConsumer(consumerName, nameServer, topic, tag, callback);//初始化后mq库内部来完成消息的获取 InitializeConsumer(consumerName, nameServer, subscriptions);
InitializeConsumer(consumerName, nameServer, subscriptions); // 初始化后MQ库内部开始获取消息 } catch (...) {
}
catch (...) {
std::cerr << "Cannot consume message because consumer initialization failed." << std::endl; std::cerr << "Cannot consume message because consumer initialization failed." << std::endl;
return;
} }
} });
} }
///////////////////////////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -407,24 +510,38 @@ int RoundRobinSelector(int queueNum, CMessage* msg, void* arg) {
class RocketMQProducer { class RocketMQProducer {
public: public:
RocketMQProducer(const std::string& producerName, const std::string& nameServer) RocketMQProducer(const std::string& producerName, const std::string& nameServer)
: producer_(NULL) : producer_(producerName)
{ {
// 创建生产者 // 创建生产者
producer_ = CreateProducer(producerName.c_str()); /*producer_ = CreateProducer(producerName.c_str());
if (producer_ == NULL) { if (producer_ == NULL) {
throw std::runtime_error("Failed to create producer."); throw std::runtime_error("Failed to create producer.");
} }*/
// 设置日志
producer_.setLogLevel(rocketmq::eLOG_LEVEL_ERROR);
producer_.setLogFileSizeAndNum(5, 50);
// 设置 nameserver 地址 // 设置 nameserver 地址
SetProducerNameServerAddress(producer_, nameServer.c_str()); //SetProducerNameServerAddress(producer_, nameServer.c_str());
producer_.setNamesrvAddr(nameServer);
//lnk20260417设置数据上送消息体最大值默认4M调整为1M避免过大消息导致发送失败 //lnk20260417设置数据上送消息体最大值默认4M调整为1M避免过大消息导致发送失败
SetProducerMaxMessageSize(producer_, 1024 * 1024); // 1MB //SetProducerMaxMessageSize(producer_, 1024 * 1024); // 1MB
producer_.setMaxMessageSize(1024 * 1024);
SetProducerSessionCredentials(producer_, G_MQCONSUMER_ACCESSKEY.c_str(),G_MQCONSUMER_SECRETKEY.c_str(), ""); producer_.setSendMsgTimeout(3000);//添加超时防阻塞
//SetProducerSessionCredentials(producer_, G_MQCONSUMER_ACCESSKEY.c_str(),G_MQCONSUMER_SECRETKEY.c_str(), "");
producer_.setSessionCredentials(
G_MQCONSUMER_ACCESSKEY,
G_MQCONSUMER_SECRETKEY,
""
);
// 启动生产者 // 启动生产者
StartProducer(producer_); //StartProducer(producer_);
producer_.start();
std::cout << "rocketmq_Producer initialized and started." << std::endl; std::cout << "rocketmq_Producer initialized and started." << std::endl;
} }
@@ -433,7 +550,7 @@ public:
RocketMQProducer(const RocketMQProducer&) = delete; RocketMQProducer(const RocketMQProducer&) = delete;
RocketMQProducer& operator=(const RocketMQProducer&) = delete; RocketMQProducer& operator=(const RocketMQProducer&) = delete;
void printSendResult(const CSendResult& result) { /*void printSendResult(const CSendResult& result) {
std::cout << "SendResult:" << std::endl; std::cout << "SendResult:" << std::endl;
std::cout << " Status: "; std::cout << " Status: ";
switch (result.sendStatus) { switch (result.sendStatus) {
@@ -457,10 +574,10 @@ public:
std::cout << " MsgID : " << result.msgId << std::endl; std::cout << " MsgID : " << result.msgId << std::endl;
std::cout << " Offset: " << result.offset << std::endl; std::cout << " Offset: " << result.offset << std::endl;
} }*/
// 发送消息 // 发送消息
void sendMessage(const char* strbody, const char* topic, const std::string& tags, const std::string& keys) { /* void sendMessage(const char* strbody, const char* topic, const std::string& tags, const std::string& keys) {
if (DEBUGOPEN) { if (DEBUGOPEN) {
std::cout << "sendMessage called with topic: " << (topic ? topic : "NULL") std::cout << "sendMessage called with topic: " << (topic ? topic : "NULL")
@@ -585,7 +702,7 @@ public:
<< std::endl;*/ << std::endl;*/
// 发送消息:临时改成同步发送,绕过 orderly / selector便于定位问题 // 发送消息:临时改成同步发送,绕过 orderly / selector便于定位问题
if (sendResult == 0) { // 假设返回 0 表示成功 /*if (sendResult == 0) { // 假设返回 0 表示成功
std::cout << "[MQ][SEND_OK]" std::cout << "[MQ][SEND_OK]"
<< " topic=" << (topic ? topic : "") << " topic=" << (topic ? topic : "")
<< ", tags=" << tags << ", tags=" << tags
@@ -630,20 +747,97 @@ public:
DestroyMessage(msg); DestroyMessage(msg);
} }
} }
}*/
void sendMessage(const std::string& body,
const std::string& topic,
const std::string& tags,
const std::string& keys)
{
try {
if (DEBUGOPEN) {
std::cout << "sendMessage called with topic: " << topic
<< ", tags: " << tags
<< ", keys: " << keys
<< ", body_len=" << body.size()
<< std::endl;
size_t n = std::min((size_t)200, body.size());
std::cout << "[MQ][BODY_HEAD] "
<< body.substr(0, n)
<< std::endl;
if (body.size() > n) {
std::cout << "[MQ][BODY_TAIL] "
<< body.substr(body.size() - n, n)
<< std::endl;
}
std::cout << "[MQ][HEX_HEAD] ";
for (size_t i = 0; i < std::min((size_t)100, body.size()); ++i) {
printf("%02X ", (unsigned char)body[i]);
}
printf("\n");
}
rocketmq::MQMessage msg(topic, tags, keys, body);
rocketmq::SendResult result = producer_.send(msg);
std::cout << "[MQ][SEND_OK]"
<< " topic=" << topic
<< ", tags=" << tags
<< ", keys=" << keys
<< ", msgId=" << result.getMsgId()
<< ", status=" << result.getSendStatus()
<< ", body_len=" << body.size()
<< std::endl;
}
catch (const rocketmq::MQClientException& e) {
std::cerr << "[MQ][SEND_FAIL] MQClientException: "
<< e.what() << std::endl;
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,
"【ERROR】前置的%s%d号进程 mq发送失败,mq客户端错误,请检查mq配置",
get_front_msg_from_subdir(), g_front_seg_index);
}
catch (const std::exception& e) {
std::cerr << "[MQ][SEND_FAIL] exception: "
<< e.what() << std::endl;
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,
"【ERROR】前置的%s%d号进程 mq发送失败,mq发送错误,发送请检查mq配置",
get_front_msg_from_subdir(), g_front_seg_index);
}
catch (...) {
std::cerr << "[MQ][SEND_FAIL] unknown exception" << std::endl;
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,
"【ERROR】前置的%s%d号进程 mq发送失败,未知错误,请检查mq配置",
get_front_msg_from_subdir(), g_front_seg_index);
}
} }
// 析构函数中关闭并销毁生产者 // 析构函数中关闭并销毁生产者
~RocketMQProducer() { ~RocketMQProducer() {
if (producer_) { /*if (producer_) {
ShutdownProducer(producer_); ShutdownProducer(producer_);
DestroyProducer(producer_); DestroyProducer(producer_);
std::cout << "rocketmq_Producer shutdown and destroyed." << std::endl; std::cout << "rocketmq_Producer shutdown and destroyed." << std::endl;
}*/
try {
producer_.shutdown();
} }
catch (...) {
}
std::cout << "rocketmq_Producer shutdown and destroyed." << std::endl;
} }
private: private:
CProducer* producer_; //CProducer* producer_;
rocketmq::DefaultMQProducer producer_;
}; };
// 全局生产者实例 // 全局生产者实例
@@ -654,7 +848,7 @@ void InitializeProducer()
{ {
if (g_producer == NULL) { if (g_producer == NULL) {
try { try {
g_producer = new RocketMQProducer(G_ROCKETMQ_PRODUCER, G_ROCKETMQ_IPPORT); g_producer = new RocketMQProducer(G_ROCKETMQ_PRODUCER, G_ROCKETMQ_IPPORT);//生产者名称和NameServer地址
} }
catch (const std::exception& e) { catch (const std::exception& e) {
std::cerr << "Failed to initialize producer: " << e.what() << std::endl; std::cerr << "Failed to initialize producer: " << e.what() << std::endl;
@@ -674,43 +868,37 @@ void ShutdownAndDestroyProducer()
} }
// 发送消息的接口函数 // 发送消息的接口函数
void rocketmq_producer_send(const char* strbody, const char* topic) void rocketmq_producer_send(const std::string& body,
const std::string& topic,
const std::string& tags,
const std::string& keys)
{ {
if (g_producer == NULL) { std::call_once(g_producer_once, [&](){
try { InitializeProducer();
InitializeProducer(); });
}
catch (...) {
std::cerr << "Cannot send message because producer initialization failed." << std::endl;
return;
}
}
// 假设 tags 和 keys 是固定的,可以根据需要修改
std::string tags = G_ROCKETMQ_TAG;
std::string keys = G_ROCKETMQ_KEY;
try { try {
g_producer->sendMessage(strbody, topic, tags, keys); g_producer->sendMessage(body, topic, tags, keys);
} } catch (const std::exception& e) {
catch (const std::exception& e) {
std::cerr << "Failed to send message: " << e.what() << std::endl; std::cerr << "Failed to send message: " << e.what() << std::endl;
// 处理发送失败的情况,例如记录日志或重试 DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,"【ERROR】前置的%s%d号进程 mq发送失败,请检查mq配置", get_front_msg_from_subdir(), g_front_seg_index); "【ERROR】前置的%s%d号进程 mq发送失败,请检查mq配置",
get_front_msg_from_subdir(), g_front_seg_index);
} }
} }
#endif #endif
////////////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////////////
/*
// producer_send0测试用 // producer_send0测试用
void StartSendMessage(CProducer* producer) void StartSendMessage(CProducer* producer)
{ {
CSendResult result; CSendResult result;
// create message and set some values for it // create message and set some values for it
CMessage* msg = CreateMessage(G_ROCKETMQ_TOPIC.c_str()); CMessage* msg = CreateMessage(G_ROCKETMQ_TOPIC_TEST.c_str());
SetMessageTags(msg, G_ROCKETMQ_TAG.c_str()); SetMessageTags(msg, G_ROCKETMQ_TAG_TEST.c_str());
SetMessageKeys(msg, G_ROCKETMQ_KEY.c_str()); SetMessageKeys(msg, G_ROCKETMQ_KEY_TEST.c_str());
for (int i = 0; i < 10; i++) for (int i = 0; i < 10; i++)
{ {
@@ -734,9 +922,9 @@ void StartSendMessage(CProducer* producer,const char* strbody)
CSendResult result; CSendResult result;
// create message and set some values for it // create message and set some values for it
CMessage* msg = CreateMessage(G_ROCKETMQ_TOPIC.c_str()); CMessage* msg = CreateMessage(G_ROCKETMQ_TOPIC_TEST.c_str());
SetMessageTags(msg, G_ROCKETMQ_TAG.c_str()); SetMessageTags(msg, G_ROCKETMQ_TAG_TEST.c_str());
SetMessageKeys(msg, G_ROCKETMQ_KEY.c_str()); SetMessageKeys(msg, G_ROCKETMQ_KEY_TEST.c_str());
SetMessageBody(msg, strbody); SetMessageBody(msg, strbody);
// send message // send message
@@ -786,7 +974,7 @@ void producer_send(const char* strbody)
DestroyProducer(producer); DestroyProducer(producer);
cout << "Producer Shutdown!" << endl; cout << "Producer Shutdown!" << endl;
} }
*/
/////////////////////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////////////////////
extern "C" { extern "C" {
@@ -795,13 +983,13 @@ void rocketmq_test_rt()
{ {
Ckafka_data_t data; Ckafka_data_t data;
data.monitor_id = 123123; data.monitor_id = 123123;
data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_RT); data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_RT);
std::ifstream file("rt.txt"); // 文件中存储长字符串 std::ifstream file("rt.txt"); // 文件中存储长字符串
std::stringstream buffer; std::stringstream buffer;
buffer << file.rdbuf(); // 读取整个文件内容 buffer << file.rdbuf(); // 读取整个文件内容
data.strText = QString::fromStdString(buffer.str()); data.strText = QString::fromStdString(buffer.str());
data.mp_id = 123123; data.mp_id = QString::number(123456);
my_rocketmq_send(data); my_rocketmq_send(data);
} }
//extern std::string G_MQCONSUMER_TOPIC_UD; //extern std::string G_MQCONSUMER_TOPIC_UD;
@@ -809,13 +997,13 @@ void rocketmq_test_ud()//用来测试台账更新
{ {
Ckafka_data_t data; Ckafka_data_t data;
data.monitor_id = 123123; data.monitor_id = 123123;
data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_UD); data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_UD);
std::ifstream file("ud.txt"); // 文件中存储长字符串 std::ifstream file("ud.txt"); // 文件中存储长字符串
std::stringstream buffer; std::stringstream buffer;
buffer << file.rdbuf(); // 读取整个文件内容 buffer << file.rdbuf(); // 读取整个文件内容
data.strText = QString::fromStdString(buffer.str()); data.strText = QString::fromStdString(buffer.str());
data.mp_id = 123123; data.mp_id = QString::number(123456);
my_rocketmq_send(data); my_rocketmq_send(data);
} }
@@ -823,13 +1011,13 @@ void rocketmq_test_set()//用来测试进程控制脚本
{ {
Ckafka_data_t data; Ckafka_data_t data;
data.monitor_id = 123123; data.monitor_id = 123123;
data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_SET); data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_SET);
std::ifstream file("set.txt"); // 文件中存储长字符串 std::ifstream file("set.txt"); // 文件中存储长字符串
std::stringstream buffer; std::stringstream buffer;
buffer << file.rdbuf(); // 读取整个文件内容 buffer << file.rdbuf(); // 读取整个文件内容
data.strText = QString::fromStdString(buffer.str()); data.strText = QString::fromStdString(buffer.str());
data.mp_id = 123123; data.mp_id = QString::number(123456);
my_rocketmq_send(data); my_rocketmq_send(data);
} }
@@ -837,13 +1025,13 @@ void rocketmq_test_only()//用来测试进程控制脚本
{ {
Ckafka_data_t data; Ckafka_data_t data;
data.monitor_id = 123123; data.monitor_id = 123123;
data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_SET); data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_TEST);
std::ifstream file("set_debug.txt"); // 文件中存储长字符串 std::ifstream file("test.txt"); // 文件中存储长字符串
std::stringstream buffer; std::stringstream buffer;
buffer << file.rdbuf(); // 读取整个文件内容 buffer << file.rdbuf(); // 读取整个文件内容
data.strText = QString::fromStdString(buffer.str()); data.strText = QString::fromStdString(buffer.str());
data.mp_id = 123123; data.mp_id = QString::number(123456);
my_rocketmq_send(data); my_rocketmq_send(data);
} }
@@ -852,13 +1040,13 @@ void rocketmq_test_rc()
{ {
Ckafka_data_t data; Ckafka_data_t data;
data.monitor_id = 123123; data.monitor_id = 123123;
data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_RC); data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_RC);
std::ifstream file("rc.txt"); // 文件中存储长字符串 std::ifstream file("rc.txt"); // 文件中存储长字符串
std::stringstream buffer; std::stringstream buffer;
buffer << file.rdbuf(); // 读取整个文件内容 buffer << file.rdbuf(); // 读取整个文件内容
data.strText = QString::fromStdString(buffer.str()); data.strText = QString::fromStdString(buffer.str());
data.mp_id = 123123; data.mp_id = QString::number(123456);
my_rocketmq_send(data); my_rocketmq_send(data);
} }
@@ -867,13 +1055,13 @@ void rocketmq_test_log()
{ {
Ckafka_data_t data; Ckafka_data_t data;
data.monitor_id = 123123; data.monitor_id = 123123;
data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_LOG); data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_LOG);
std::ifstream file("log_test.txt"); // 文件中存储长字符串 std::ifstream file("log_test.txt"); // 文件中存储长字符串
std::stringstream buffer; std::stringstream buffer;
buffer << file.rdbuf(); // 读取整个文件内容 buffer << file.rdbuf(); // 读取整个文件内容
data.strText = QString::fromStdString(buffer.str()); data.strText = QString::fromStdString(buffer.str());
data.mp_id = 123123; data.mp_id = QString::number(123456);
my_rocketmq_send(data); my_rocketmq_send(data);
} }

View File

@@ -280,9 +280,9 @@ extern int g_front_seg_num;
//生产者 //生产者
std::string G_ROCKETMQ_PRODUCER = "";//rocketmq producer std::string G_ROCKETMQ_PRODUCER = "";//rocketmq producer
std::string G_ROCKETMQ_IPPORT = "";//rocketmq ip+port std::string G_ROCKETMQ_IPPORT = "";//rocketmq ip+port
std::string G_ROCKETMQ_TOPIC = "";//topie std::string G_ROCKETMQ_TOPIC_TEST = "";//topie
std::string G_ROCKETMQ_TAG = "";//tag std::string G_ROCKETMQ_TAG_TEST = "";//tag
std::string G_ROCKETMQ_KEY = "";//key std::string G_ROCKETMQ_KEY_TEST = "";//key
int QUEUENUM = 0; int QUEUENUM = 0;
std::string BROKERNAME = ""; std::string BROKERNAME = "";
//消费者 //消费者
@@ -321,12 +321,22 @@ std::string Heart_Beat_Key = "";
std::string Topic_Reply_Topic = ""; std::string Topic_Reply_Topic = "";
std::string Topic_Reply_Tag = ""; std::string Topic_Reply_Tag = "";
std::string Topic_Reply_Key = ""; std::string Topic_Reply_Key = "";
//数据追踪
std::string DATA_TRACE_TOPIC = "";
std::string DATA_TRACE_TAG = "";
std::string DATA_TRACE_KEY = "";
//lnk20260310添加文件管理的topic和tag //lnk20260310添加文件管理的topic和tag
std::string G_MQCONSUMER_TOPIC_FILE = "";//consumer topie std::string G_MQCONSUMER_TOPIC_FILE = "";//consumer topie
std::string G_MQCONSUMER_TAG_FILE = "";//consumer tag std::string G_MQCONSUMER_TAG_FILE = "";//consumer tag
std::string G_MQCONSUMER_KEY_FILE = "";//consumer key std::string G_MQCONSUMER_KEY_FILE = "";//consumer key
std::string G_REPLY_TOPIC_FILE = "";//consumer topie
std::string G_REPLY_TAG_FILE = "";//consumer tag
std::string G_REPLY_KEY_FILE = "";//consumer key
std::string G_MQCONSUMER_TOPIC_TEST = "";
int G_TEST_FLAG = 0; int G_TEST_FLAG = 0;
int G_TEST_NUM = 0; int G_TEST_NUM = 0;
int G_TEST_TYPE = 0; int G_TEST_TYPE = 0;
@@ -644,12 +654,12 @@ void init_config() {
G_ROCKETMQ_PRODUCER = strdup(ba.data()); G_ROCKETMQ_PRODUCER = strdup(ba.data());
ba = settings.value("RocketMq/Ipport", "").toString().toLatin1(); ba = settings.value("RocketMq/Ipport", "").toString().toLatin1();
G_ROCKETMQ_IPPORT = strdup(ba.data()); G_ROCKETMQ_IPPORT = strdup(ba.data());
ba = settings.value("RocketMq/Topic", "").toString().toLatin1(); ba = settings.value("RocketMq/TESTTopic", "").toString().toLatin1();
G_ROCKETMQ_TOPIC = strdup(ba.data()); G_ROCKETMQ_TOPIC_TEST = strdup(ba.data());
ba = settings.value("RocketMq/Tag", "").toString().toLatin1(); ba = settings.value("RocketMq/TESTTag", "").toString().toLatin1();
G_ROCKETMQ_TAG = strdup(ba.data()); G_ROCKETMQ_TAG_TEST = strdup(ba.data());
ba = settings.value("RocketMq/Key", "").toString().toLatin1(); ba = settings.value("RocketMq/TESTKey", "").toString().toLatin1();
G_ROCKETMQ_KEY = strdup(ba.data()); G_ROCKETMQ_KEY_TEST = strdup(ba.data());
QUEUENUM = settings.value("RocketMq/Queuenum", 0).toInt(); QUEUENUM = settings.value("RocketMq/Queuenum", 0).toInt();
//心跳 //心跳
@@ -667,6 +677,13 @@ void init_config() {
ba = settings.value("RocketMq/Topic_Reply_Key", "").toString().toLatin1(); ba = settings.value("RocketMq/Topic_Reply_Key", "").toString().toLatin1();
Topic_Reply_Key = strdup(ba.data()); Topic_Reply_Key = strdup(ba.data());
//数据追踪
ba = settings.value("RocketMq/TraceTopic", "").toString().toLatin1();
DATA_TRACE_TOPIC = strdup(ba.data());
ba = settings.value("RocketMq/TraceTag", "").toString().toLatin1();
DATA_TRACE_TAG = strdup(ba.data());
ba = settings.value("RocketMq/TraceKey", "").toString().toLatin1();
DATA_TRACE_KEY = strdup(ba.data());
//消费者 //消费者
ba = settings.value("RocketMq/consumer", "").toString().toLatin1(); ba = settings.value("RocketMq/consumer", "").toString().toLatin1();
@@ -724,12 +741,21 @@ void init_config() {
G_CONNECT_KEY = strdup(ba.data()); G_CONNECT_KEY = strdup(ba.data());
//lnk20260310添加文件管理的topic和tag //lnk20260310添加文件管理的topic和tag
ba = settings.value("RocketMq/ConsumerTopicFile", "").toString().toLatin1(); ba = settings.value("RocketMq/ConsumerTopicFILE", "").toString().toLatin1();
G_MQCONSUMER_TOPIC_FILE = strdup(ba.data()); G_MQCONSUMER_TOPIC_FILE = strdup(ba.data());
ba = settings.value("RocketMq/ConsumerTagFile", "").toString().toLatin1(); ba = settings.value("RocketMq/ConsumerTagFILE", "").toString().toLatin1();
G_MQCONSUMER_TAG_FILE = strdup(ba.data()); G_MQCONSUMER_TAG_FILE = strdup(ba.data());
ba = settings.value("RocketMq/ConsumerKeyFile", "").toString().toLatin1(); ba = settings.value("RocketMq/ConsumerKeyFILE", "").toString().toLatin1();
G_MQCONSUMER_KEY_FILE = strdup(ba.data()); G_MQCONSUMER_KEY_FILE = strdup(ba.data());
ba = settings.value("RocketMq/ReplyTopicFILE", "").toString().toLatin1();
G_REPLY_TOPIC_FILE = strdup(ba.data());
ba = settings.value("RocketMq/ReplyTagFILE", "").toString().toLatin1();
G_REPLY_TAG_FILE = strdup(ba.data());
ba = settings.value("RocketMq/ReplyKeyFILE", "").toString().toLatin1();
G_REPLY_KEY_FILE = strdup(ba.data());
ba = settings.value("RocketMq/ConsumerTopicTEST", "").toString().toLatin1();
G_MQCONSUMER_TOPIC_TEST = strdup(ba.data());
//MQ测试 //MQ测试
@@ -747,9 +773,9 @@ void init_config() {
//生产者相关打印 //生产者相关打印
std::cout << "Read G_ROCKETMQ_PRODUCER:" << G_ROCKETMQ_PRODUCER << std::endl; std::cout << "Read G_ROCKETMQ_PRODUCER:" << G_ROCKETMQ_PRODUCER << std::endl;
std::cout << "Read G_ROCKETMQ_IPPORT:" << G_ROCKETMQ_IPPORT << std::endl; std::cout << "Read G_ROCKETMQ_IPPORT:" << G_ROCKETMQ_IPPORT << std::endl;
std::cout << "Read G_ROCKETMQ_TOPIC:" << G_ROCKETMQ_TOPIC << std::endl; std::cout << "Read G_ROCKETMQ_TOPIC_TEST:" << G_ROCKETMQ_TOPIC_TEST << std::endl;
std::cout << "Read G_ROCKETMQ_TAG:" << G_ROCKETMQ_TAG << std::endl; std::cout << "Read G_ROCKETMQ_TAG_TEST:" << G_ROCKETMQ_TAG_TEST << std::endl;
std::cout << "Read G_ROCKETMQ_KEY:" << G_ROCKETMQ_KEY << std::endl; std::cout << "Read G_ROCKETMQ_KEY_TEST:" << G_ROCKETMQ_KEY_TEST << std::endl;
std::cout << "Read QUEUENUM:" << QUEUENUM << std::endl; std::cout << "Read QUEUENUM:" << QUEUENUM << std::endl;
std::cout << "Read G_LOG_TOPIC:" << G_LOG_TOPIC << std::endl; std::cout << "Read G_LOG_TOPIC:" << G_LOG_TOPIC << std::endl;
std::cout << "Read G_LOG_TAG:" << G_LOG_TAG << std::endl; std::cout << "Read G_LOG_TAG:" << G_LOG_TAG << std::endl;
@@ -1512,8 +1538,8 @@ int parse_rpt_log_ini()
} }
for (cpuno = 0; cpuno < ied->cpucount; cpuno++) { for (cpuno = 0; cpuno < ied->cpucount; cpuno++) {//遍历所有测点但是某些测点可能为空台账初始化时ied->cpucount取的是测点号
LD_info = &(ied_usr->LD_info[cpuno]); LD_info = &(ied_usr->LD_info[cpuno]);//测点号从1开始cpuno从0开始所以ied_usr->LD_info[cpuno]对应测点号为cpuno+1注意台账初始化时ied_usr->LD_info数组的大小是按照测点总数分配的所以有些测点号可能没有对应的LD_info添加判断防止崩溃
//char str[256]; //256大小 //char str[256]; //256大小
char* tmp = Get_IED(ied_usr->dev_type); char* tmp = Get_IED(ied_usr->dev_type);
if(tmp == NULL){std::cerr << "front read ied config error!" << std::endl;continue;} if(tmp == NULL){std::cerr << "front read ied config error!" << std::endl;continue;}
@@ -1527,10 +1553,21 @@ int parse_rpt_log_ini()
//调试用 //调试用
printf("%s使用内存地址 LD_name[%d]: %p\n", ied_usr->terminal_id, cpuno, (void*)ied_usr->LD_info[cpuno].LD_name); printf("%s使用内存地址 LD_name[%d]: %p\n", ied_usr->terminal_id, cpuno, (void*)ied_usr->LD_info[cpuno].LD_name);
//添加判断有的监测点没有cpuno为2直接申请了LD_info[1]没申请LD_info[0] //添加判断,比如有的监测点没有cpuno为2直接申请了LD_info[1]没申请LD_info[0]
if(ied_usr->LD_info[cpuno].LD_name == NULL){ if(ied_usr->LD_info[cpuno].LD_name == NULL){ //空说明台账初始化没申请空间
printf("this ld_info didn't palloc space ,maybe this ledger has problem!"); printf("this ld_info didn't palloc space ,maybe this ledger has problem!");
DIY_ERRORLOG_CODE("process",0,LOG_CODE_RPTINIT,"【ERROR】终端%s的监测点序号为%d的监测点无法初始化报告,这个装置的台账存在缺失,请检查装置台账的监测点总数和各监测点的序号",ied_usr->terminal_id,cpuno + 1); DIY_ERRORLOG_CODE("process",0,LOG_CODE_RPTINIT,"【ERROR】终端%s的监测点序号为%d的监测点无法初始化报告,这个装置的台账存在缺失,请检查装置台账的监测点总数和各监测点的序号",ied_usr->terminal_id,cpuno + 1);
//添加默认值防止崩溃20260601
LD_info->rptcount = 0;
LD_info->logcount = 0;
LD_info->cpuno = 0;
LD_info->LD_name = NULL;
LD_info->rptinfo = NULL;
LD_info->loginfo = NULL;
delete[] tmp;//释放内存
continue;//跳过防止崩溃 continue;//跳过防止崩溃
} }
@@ -1544,7 +1581,19 @@ int parse_rpt_log_ini()
apr_snprintf(buf, sizeof(buf), "%s", rpt_cfg_strlists[type]->at(i).toAscii().constData()); apr_snprintf(buf, sizeof(buf), "%s", rpt_cfg_strlists[type]->at(i).toAscii().constData());
fill_rptctrl_by_cfg(LD_info, i, buf); //fill_rptctrl_by_cfg(LD_info, i, buf);
int ret = fill_rptctrl_by_cfg(LD_info, i, buf);
if (ret != 0) {
printf("[RPT_INIT][ERROR] fill_rptctrl_by_cfg failed dev=%s cpu=%d rpt=%d cfg=%s\n",
ied_usr->terminal_id, cpuno, i, buf);
DIY_ERRORLOG_CODE(ied_usr->terminal_id,1,LOG_CODE_RPTINIT,"【ERROR】终端%s的监测点序号为%d的监测点的第%d个报告配置解析失败请检查报告配置格式是否正确",ied_usr->terminal_id, cpuno + 1, i + 1);
if (LD_info->rptinfo && LD_info->rptinfo[i]) {
LD_info->rptinfo[i]->rptID = NULL;
LD_info->rptinfo[i]->LD_info = NULL;
}
}
} }
@@ -2996,6 +3045,10 @@ void Cout_account_information() {
add_comm_log(const_cast<char*>(text.toLocal8Bit().constData())); add_comm_log(const_cast<char*>(text.toLocal8Bit().constData()));
for (cpuno = 0; cpuno < ied->cpucount; cpuno++) { for (cpuno = 0; cpuno < ied->cpucount; cpuno++) {
LD_info = &(ied_usr->LD_info[cpuno]); LD_info = &(ied_usr->LD_info[cpuno]);
if (LD_info->cpuno == 0 || LD_info->LD_name == NULL)
continue;
QString text2;//待组装的pgsql语句 QString text2;//待组装的pgsql语句
text2.append(QString("mp_id: \"%1\" terminal_code:\"%2\" ").arg(LD_info->mp_id).arg(LD_info->terminal_code)); text2.append(QString("mp_id: \"%1\" terminal_code:\"%2\" ").arg(LD_info->mp_id).arg(LD_info->terminal_code));
add_comm_log(const_cast<char*>(text2.toLocal8Bit().constData())); add_comm_log(const_cast<char*>(text2.toLocal8Bit().constData()));
@@ -4189,6 +4242,18 @@ int terminal_ledger_web(QMap<QString, terminal_dev*>* terminal_dev_map,
return 0; // 确保函数有返回值 return 0; // 确保函数有返回值
} }
static void init_oper_type_cache(ied_usr_t *ied_usr)
{
if (ied_usr == NULL)
return;
ied_usr->oper_type_cache.inited = SD_FALSE;
ied_usr->oper_type_cache.ledrs_oper_type_id = -1;
ied_usr->oper_type_cache.reboot_oper_type_id = -1;
ied_usr->oper_type_cache.reset_oper_type_id = -1;
}
int parse_device_cfg_web() int parse_device_cfg_web()
{ {
std::cout << "parse_device_cfg_web" << endl; std::cout << "parse_device_cfg_web" << endl;
@@ -4422,6 +4487,10 @@ int parse_device_cfg_web()
apr_snprintf(ied_usr->dev_key, sizeof(ied_usr->dev_key), "%s", "");//DEV_Key apr_snprintf(ied_usr->dev_key, sizeof(ied_usr->dev_key), "%s", "");//DEV_Key
cout << "defalut dev_key:" << ied_usr->dev_key << endl; cout << "defalut dev_key:" << ied_usr->dev_key << endl;
} }
//lnk20260512
init_oper_type_cache(ied_usr);
//lnk20260304 //lnk20260304
ied_usr->log_level = log_level;//日志等级 ied_usr->log_level = log_level;//日志等级
cout << "ied_usr->log_level:" << ied_usr->log_level << endl; cout << "ied_usr->log_level:" << ied_usr->log_level << endl;
@@ -4495,7 +4564,7 @@ int parse_device_cfg_web()
//otl_datetime timestamp; //otl_datetime timestamp;
int monitor_log_level = 1;//监测点日志等级 int monitor_log_level = 1;//监测点日志等级
//for (int j = 0; j < 10; ++j) { // 假设最多有10个监测点 //for (int j = 0; j < 10; ++j) { // 假设最多有10个监测点
for (int j = 0; value->line[j].monitor_id[0] != '\0'; ++j){ for (int j = 0; value->line[j].monitor_id[0] != '\0'; ++j){//只看有测点号的监测点,避免访问无效数据
ledger_monitor& monitor = value->line[j]; ledger_monitor& monitor = value->line[j];
// 检查监测点 ID 是否为空以避免访问无效数据 // 检查监测点 ID 是否为空以避免访问无效数据
/*if (monitor.monitor_id[0] != '\0') { /*if (monitor.monitor_id[0] != '\0') {
@@ -4517,27 +4586,33 @@ int parse_device_cfg_web()
strncpy(monitor_status, monitor.status, sizeof(monitor_status) - 1);//添加监测点状态 strncpy(monitor_status, monitor.status, sizeof(monitor_status) - 1);//添加监测点状态
monitor_log_level = monitor.log_level;//监测点日志等级 monitor_log_level = monitor.log_level;//监测点日志等级
//监测点台账处理 //监测点台账处理
count_real_monitor++; count_real_monitor++;//初始为0进入一次+1有效监测点排号从1开始
memset(&line_info, 0, sizeof(line_info)); memset(&line_info, 0, sizeof(line_info));//有效测点的初始化
line_info.line_id = count_real_monitor; //监测点排号
line_info.line_id = count_real_monitor; //监测点排号,前置本地使用
cout << "line_id:" << line_info.line_id << endl; cout << "line_id:" << line_info.line_id << endl;
strcpy(line_info.mp_id, monitor_id);
strcpy(line_info.mp_id, monitor_id);//测点id
cout << "mp_id:" << line_info.mp_id << endl; cout << "mp_id:" << line_info.mp_id << endl;
strcpy(line_info.terminal_code, terminal_code); //从上级获取的终端号 strcpy(line_info.terminal_code, terminal_code); //从上级获取的终端号
cout << "terminal_code:" << line_info.terminal_code << endl; cout << "terminal_code:" << line_info.terminal_code << endl;
if (isCharPtrEmpty(logical_device_seq)) { if (isCharPtrEmpty(logical_device_seq)) {
line_info.cpuno = 1; //默认监测点实例号1 line_info.cpuno = 1; //默认监测点实例号1
cout << "logical_device_seq:is null,set cpuno:"<< line_info.cpuno << endl; cout << "logical_device_seq:is null,set cpuno:"<< line_info.cpuno << endl;
} }
else { else {
line_info.cpuno = std::atoi(logical_device_seq); line_info.cpuno = std::atoi(logical_device_seq); //台账实例号,即台账的测点号
cout << "logical_device_seq:"<< line_info.cpuno << endl; cout << "logical_device_seq:"<< line_info.cpuno << endl;
} }
//cout << "cpuno:" << line_info.cpuno << endl; //cout << "cpuno:" << line_info.cpuno << endl;
strcpy(line_info.voltage_level, voltage_level); strcpy(line_info.voltage_level, voltage_level);//电压等级
cout << "voltage_level:" << line_info.voltage_level << endl; cout << "voltage_level:" << line_info.voltage_level << endl;
strcpy(line_info.v_wiring_type, terminal_connect);
strcpy(line_info.v_wiring_type, terminal_connect);//接线方式
cout << "v_wiring_type:" << line_info.v_wiring_type << endl; cout << "v_wiring_type:" << line_info.v_wiring_type << endl;
//lnk2024-8-14记录接线标志 //lnk2024-8-14记录接线标志
if (strcmp(line_info.v_wiring_type, "0") != 0) if (strcmp(line_info.v_wiring_type, "0") != 0)
{ {
@@ -4545,7 +4620,7 @@ int parse_device_cfg_web()
cout << "monitor_id" << monitor_id << "v_wiring_type:" << line_info.v_wiring_type << "is delta wiring:" << isdelta_flag << endl; cout << "monitor_id" << monitor_id << "v_wiring_type:" << line_info.v_wiring_type << "is delta wiring:" << isdelta_flag << endl;
DIY_WARNLOG_CODE("process",0,LOG_CODE_LEDGER,"【WARN】前置连接的监测点 %s 是角形接线,对应终端为%s 终端类型是%s",line_info.mp_id,ied_usr->terminal_id,ied_usr->dev_type); DIY_WARNLOG_CODE("process",0,LOG_CODE_LEDGER,"【WARN】前置连接的监测点 %s 是角形接线,对应终端为%s 终端类型是%s",line_info.mp_id,ied_usr->terminal_id,ied_usr->dev_type);
} }
strcpy(line_info.monitor_status, monitor_status); strcpy(line_info.monitor_status, monitor_status);//测点状态
cout << "monitor_status:" << line_info.monitor_status << endl; cout << "monitor_status:" << line_info.monitor_status << endl;
//// 构造struct tm对象 //// 构造struct tm对象
struct tm timeinfo; struct tm timeinfo;
@@ -4556,27 +4631,32 @@ int parse_device_cfg_web()
timeinfo.tm_min = timestamp.minute; timeinfo.tm_min = timestamp.minute;
timeinfo.tm_sec = timestamp.second; timeinfo.tm_sec = timestamp.second;
time_t time = std::mktime(&timeinfo); time_t time = std::mktime(&timeinfo);
line_info.time = static_cast<long long>(time); line_info.time = static_cast<long long>(time);//测点更新时间
cout << "time:" << line_info.time << endl; cout << "time:" << line_info.time << endl;
strcpy(line_info.name, monitor_name);
strcpy(line_info.name, monitor_name);//测点名
cout << "name:" << line_info.name << endl; cout << "name:" << line_info.name << endl;
line_info.read_flag = ENABLE; //监测点有效 line_info.read_flag = ENABLE; //监测点有效
line_info.log_level = monitor_log_level; //监测点日志等级 line_info.log_level = monitor_log_level; //监测点日志等级
cout << "log_level_monitor:" << line_info.log_level << endl; cout << "log_level_monitor:" << line_info.log_level << endl;
//ied = find_ied_from_dev_code(line_info.terminal_code); //不需要再找上级终端了,已经在终端里了 //ied = find_ied_from_dev_code(line_info.terminal_code); //不需要再找上级终端了,已经在终端里了
if (ied && ied->usr_ext && line_info.cpuno && (static_cast<int>(line_info.cpuno) < 10)) { if (ied && ied->usr_ext && line_info.cpuno && (static_cast<int>(line_info.cpuno) < 10)) {//台账测点号有效
char str[256]; //256大小 char str[256]; //256大小
byte_t cpuno = line_info.cpuno; byte_t cpuno = line_info.cpuno; //台账的测点号
cout << "cpuno:" << (int)line_info.cpuno << endl; cout << "cpuno:" << (int)line_info.cpuno << endl;
cout << "index cpuno:" << cpuno-1 << endl; cout << "index cpuno:" << cpuno-1 << endl;
ied_usr = (ied_usr_t*)ied->usr_ext; ied_usr = (ied_usr_t*)ied->usr_ext;
ied_usr->LD_info[cpuno - 1] = line_info; //cpuno默认是1 ied_usr->LD_info[cpuno - 1] = line_info; //cpuno默认是1
ied_usr->LD_info[cpuno - 1].ied = ied; ied_usr->LD_info[cpuno - 1].ied = ied; //父级指针绑定
apr_snprintf(str, sizeof(str), "PQMonitorPQM%d", cpuno);//将监测点逻辑号转为PQMonitorPQM+逻辑号 apr_snprintf(str, sizeof(str), "PQMonitorPQM%d", cpuno);//将监测点逻辑号转为PQMonitorPQM+逻辑号
//lnk20250208不使用apr_pstrdup后续直接复用 //lnk20250208不使用apr_pstrdup后续直接复用
//ied_usr->LD_info[cpuno - 1].LD_name = apr_pstrdup(g_init_pool, str);//将 str 中的格式化字符串复制到内存池 g_init_pool 中。ied_usr->LD_info[cpuno - 1].LD_name 存储了这个字符串的副本LD_name 现在是 PQMonitorPQM{cpuno} 的形式。 //ied_usr->LD_info[cpuno - 1].LD_name = apr_pstrdup(g_init_pool, str);//将 str 中的格式化字符串复制到内存池 g_init_pool 中。ied_usr->LD_info[cpuno - 1].LD_name 存储了这个字符串的副本LD_name 现在是 PQMonitorPQM{cpuno} 的形式。
// 从 g_init_pool 内存池中分配固定 256 字节的内存 // 从 g_init_pool 内存池中分配固定 256 字节的内存
ied_usr->LD_info[cpuno - 1].LD_name = (char *)apr_palloc(g_init_pool, 256); ied_usr->LD_info[cpuno - 1].LD_name = (char *)apr_palloc(g_init_pool, 256);//根据台账中的测点号申请内存
//调试用,申请的地址 //调试用,申请的地址
printf("%s分配内存地址 LD_name[%d]: %p\n", ied_usr->terminal_id, cpuno - 1, (void*)ied_usr->LD_info[cpuno - 1].LD_name); printf("%s分配内存地址 LD_name[%d]: %p\n", ied_usr->terminal_id, cpuno - 1, (void*)ied_usr->LD_info[cpuno - 1].LD_name);
// 清空内存,防止残留数据 // 清空内存,防止残留数据
@@ -4587,21 +4667,21 @@ int parse_device_cfg_web()
//这里申请的空间基于ied的数量挂载到ied上 //这里申请的空间基于ied的数量挂载到ied上
ied_usr->LD_info[cpuno - 1].ht_fcd = apr_hash_make(g_init_pool); //这两行代码分别为 ied_usr->LD_info[cpuno - 1] 的两个成员ht_fcd 和 ht_full_fcda创建了空的哈希表。apr_hash_make(g_init_pool) 会在 g_init_pool 内存池中为这两个哈希表分配内存空间 ied_usr->LD_info[cpuno - 1].ht_fcd = apr_hash_make(g_init_pool); //这两行代码分别为 ied_usr->LD_info[cpuno - 1] 的两个成员ht_fcd 和 ht_full_fcda创建了空的哈希表。apr_hash_make(g_init_pool) 会在 g_init_pool 内存池中为这两个哈希表分配内存空间
ied_usr->LD_info[cpuno - 1].ht_full_fcda = apr_hash_make(g_init_pool);//它们的 key 值和 value 在后续的代码中可能会被填充 ied_usr->LD_info[cpuno - 1].ht_full_fcda = apr_hash_make(g_init_pool);//它们的 key 值和 value 在后续的代码中可能会被填充
ied_usr->LD_info[cpuno - 1].rptcount = 0; ied_usr->LD_info[cpuno - 1].rptcount = 0;//有效测点报告数为初始为0
cout << "rptcount:" << ied_usr->LD_info[cpuno - 1].rptcount << endl; cout << "rptcount:" << ied_usr->LD_info[cpuno - 1].rptcount << endl;
if (cpuno > ied->cpucount) { if (cpuno > ied->cpucount) {//测点号大于测点数
ied->cpucount = cpuno; ied->cpucount = cpuno;//取测点号,后续遍历测点时需要跳过无效测点
} }
} }
} }
} }
////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////
if (count_real < count_cfg){ if (count_real < count_cfg){//读取计数器和台账数量对比,如果读取的数量小于台账数量,说明有些台账读取失败了,按照读取的数量来设置进程的台账数量,避免后续访问读取失败的台账导致崩溃
g_node->n_clients = count_real; g_node->n_clients = count_real;
} }
if (count_cfg != count_real){ if (count_cfg != count_real){//应该相等,如果不相等说明有些台账读取失败了,按照读取的数量来设置进程的台账数量,避免后续访问读取失败的台账导致崩溃
return APR_EBADF; return APR_EBADF;
} }
@@ -5301,7 +5381,7 @@ std::string base64_encode(const std::string& in) {
return out; // 返回编码后的字符串 return out; // 返回编码后的字符串
} }
void handleUploadResponse(const std::string& response, char* wavepath) { void handleUploadResponse(const std::string& response,const std::string& localPath, char* wavepath, int type) {
// 解析 JSON 响应 // 解析 JSON 响应
cJSON* json_data = cJSON_Parse(response.c_str()); cJSON* json_data = cJSON_Parse(response.c_str());
@@ -5349,11 +5429,43 @@ void handleUploadResponse(const std::string& response, char* wavepath) {
} }
// 拷贝到 wavepath // 拷贝到 wavepath
strcpy(wavepath, nameWithoutExt.c_str()); if (type == 1) {
strcpy(wavepath, nameWithoutExt.c_str());
}
else {
strcpy(wavepath, name.c_str());
}
std::cout << "wavepath: " << wavepath << std::endl; std::cout << "wavepath: " << wavepath << std::endl;
DIY_INFOLOG_CODE("process",0,LOG_CODE_TRANSIENT_COMM,"【NORMAL】前置上传文件成功,远端文件名:%s",wavepath); DIY_INFOLOG_CODE("process",0,LOG_CODE_TRANSIENT_COMM,"【NORMAL】前置上传文件成功,远端文件名:%s",wavepath);
// =========================
// 上传成功后删除本地文件
// =========================
if (remove(localPath.c_str()) == 0)
{
std::cout << "Delete local file success: "
<< localPath << std::endl;
DIY_INFOLOG_CODE("process",0,
LOG_CODE_TRANSIENT_COMM,
"【NORMAL】删除本地文件成功:%s",
localPath.c_str());
}
else
{
std::cout << "Delete local file failed: "
<< localPath
<< " errno=" << errno
<< " err=" << strerror(errno)
<< std::endl;
DIY_ERRORLOG_CODE("process",0,
LOG_CODE_TRANSIENT_COMM,
"【ERROR】删除本地文件失败:%s errno=%d err=%s",
localPath.c_str(),
errno,
strerror(errno));
}
} }
} else { } else {
std::cerr << "Error: Missing expected fields in JSON response." << std::endl; std::cerr << "Error: Missing expected fields in JSON response." << std::endl;
@@ -5423,7 +5535,7 @@ void handleUploadResponse(const std::string& response, char* wavepath) {
}*/ }*/
//这是dataform发送方式 //这是dataform发送方式
void SendFileWeb(const std::string& strUrl, const char* localpath, const char* cloudpath, char* wavepath) { void SendFileWeb(const std::string& strUrl, const char* localpath, const char* cloudpath, char* wavepath,int type) {
// 初始化 curl // 初始化 curl
CURL* curl = curl_easy_init(); CURL* curl = curl_easy_init();
if (curl) { if (curl) {
@@ -5477,7 +5589,7 @@ void SendFileWeb(const std::string& strUrl, const char* localpath, const char* c
DIY_ERRORLOG_CODE("process",0,LOG_CODE_TRANSIENT_COMM,"【ERROR】前置上传暂态录波文件 %s 失败,请检查文件上传接口配置",localpath); DIY_ERRORLOG_CODE("process",0,LOG_CODE_TRANSIENT_COMM,"【ERROR】前置上传暂态录波文件 %s 失败,请检查文件上传接口配置",localpath);
} else { } else {
std::cout << "http web success, response: " << resPost0 << std::endl; std::cout << "http web success, response: " << resPost0 << std::endl;
handleUploadResponse(resPost0, wavepath); // 处理响应 handleUploadResponse(resPost0, localpath, wavepath, type); // 处理响应
} }
// 清理 // 清理
@@ -5492,7 +5604,7 @@ void SendFileWeb(const std::string& strUrl, const char* localpath, const char* c
void SOEFileWeb(char* localpath,char* cloudpath, char* wavepath) void SOEFileWeb(char* localpath,char* cloudpath, char* wavepath)
{ {
//示例ip更换为实际ip即可 //示例ip更换为实际ip即可
SendFileWeb(WEB_FILEUPLOAD,localpath,cloudpath,wavepath); SendFileWeb(WEB_FILEUPLOAD,localpath,cloudpath,wavepath,1);
} }
void SOEFileWeb_test() void SOEFileWeb_test()
@@ -5545,9 +5657,9 @@ int DownloadFileWeb(const std::string& strUrl,
std::string fullUrl = strUrl; std::string fullUrl = strUrl;
if (fullUrl.find('?') == std::string::npos) if (fullUrl.find('?') == std::string::npos)
fullUrl += "?path="; fullUrl += "?filePath=";
else else
fullUrl += "&path="; fullUrl += "&filePath=";
fullUrl += encodedPath; fullUrl += encodedPath;
curl_easy_setopt(curl, CURLOPT_URL, fullUrl.c_str()); curl_easy_setopt(curl, CURLOPT_URL, fullUrl.c_str());
@@ -5569,15 +5681,26 @@ int DownloadFileWeb(const std::string& strUrl,
if (res != CURLE_OK) if (res != CURLE_OK)
{ {
std::cerr << "DownloadFileWeb failed: " << curl_easy_strerror(res) << std::endl; std::cerr << "DownloadFileWeb failed: " << curl_easy_strerror(res) << std::endl;
remove(localpath);
return -1; return -1;
} }
if (http_code != 200) if (http_code != 200)
{ {
std::cerr << "DownloadFileWeb http code: " << http_code << std::endl; std::cerr << "DownloadFileWeb http code: " << http_code << std::endl;
remove(localpath);
return -1; return -1;
} }
if (chmod(localpath, 0777) != 0)
{
std::cerr << "chmod 777 failed: " << localpath << std::endl;
}
else
{
std::cout << "chmod 777 success: " << localpath << std::endl;
}
return 0; return 0;
} }
@@ -5711,6 +5834,9 @@ int update_one_terminal_ledger(terminal* update, int i,ied_t* ied,int terminal_i
chnl_usr->m_state = CHANNEL_DISCONNECTED; chnl_usr->m_state = CHANNEL_DISCONNECTED;
chnl_usr->m_ClosedMsTime = NEXT_CONNECT_TIME * (-1); chnl_usr->m_ClosedMsTime = NEXT_CONNECT_TIME * (-1);
//lnk20260512
init_oper_type_cache(ied_usr);
// 将 monitorData 中的数据写入到 LD_info 中 // 将 monitorData 中的数据写入到 LD_info 中
int count_real_monitor = 0; //遍历监测点台账的计数器 int count_real_monitor = 0; //遍历监测点台账的计数器
int j; int j;
@@ -6092,10 +6218,22 @@ int parse_rpt_log_ini_one(ied_t* ied)
//调试 //调试
printf("%s使用内存地址 LD_name[%d]: %p\n", ied_usr->terminal_id, cpuno, (void*)ied_usr->LD_info[cpuno].LD_name); printf("%s使用内存地址 LD_name[%d]: %p\n", ied_usr->terminal_id, cpuno, (void*)ied_usr->LD_info[cpuno].LD_name);
//添加判断有的监测点没有cpuno为2直接申请了LD_info[2-1]没申请LD_info[0] //添加判断有的监测点没有cpuno为2直接申请了LD_info[2-1]没申请LD_info[0]
if(ied_usr->LD_info[cpuno].LD_name == NULL){ if(ied_usr->LD_info[cpuno].LD_name == NULL){
printf("this ld_info didn't palloc space ,maybe this ledger has problem!"); printf("this ld_info didn't palloc space ,maybe this ledger has problem!");
DIY_ERRORLOG_CODE("process",0,LOG_CODE_RPTINIT,"【ERROR】终端%s的监测点序号为%d的监测点无法初始化报告,这个装置的台账存在缺失,请检查装置台账的监测点总数和各监测点的序号",ied_usr->terminal_id,cpuno + 1); DIY_ERRORLOG_CODE("process",0,LOG_CODE_RPTINIT,"【ERROR】终端%s的监测点序号为%d的监测点无法初始化报告,这个装置的台账存在缺失,请检查装置台账的监测点总数和各监测点的序号",ied_usr->terminal_id,cpuno + 1);
//添加默认值防止崩溃20260601
LD_info->rptcount = 0;
LD_info->logcount = 0;
LD_info->cpuno = 0;
LD_info->LD_name = NULL;
LD_info->rptinfo = NULL;
LD_info->loginfo = NULL;
delete[] tmp;//Get_IED中分配了内存使用后删除
continue;//跳过防止崩溃 continue;//跳过防止崩溃
} }
@@ -6114,7 +6252,18 @@ int parse_rpt_log_ini_one(ied_t* ied)
init_rptctrl_by_count(LD_info, rpt_cfg_strlists[type]->size()); init_rptctrl_by_count(LD_info, rpt_cfg_strlists[type]->size());
for (int i = 0; i < rpt_cfg_strlists[type]->size(); ++i) { for (int i = 0; i < rpt_cfg_strlists[type]->size(); ++i) {
apr_snprintf(buf, sizeof(buf), "%s", rpt_cfg_strlists[type]->at(i).toAscii().constData()); apr_snprintf(buf, sizeof(buf), "%s", rpt_cfg_strlists[type]->at(i).toAscii().constData());
fill_rptctrl_by_cfg(LD_info, i, buf); //fill_rptctrl_by_cfg(LD_info, i, buf);
int ret = fill_rptctrl_by_cfg(LD_info, i, buf);
if (ret != 0) {
printf("[RPT_INIT][ERROR] fill_rptctrl_by_cfg failed dev=%s cpu=%d rpt=%d cfg=%s\n",
ied_usr->terminal_id, cpuno, i, buf);
DIY_ERRORLOG_CODE(ied_usr->terminal_id,1,LOG_CODE_RPTINIT,"【ERROR】终端%s的监测点序号为%d的监测点的第%d个报告配置解析失败请检查报告配置格式是否正确",ied_usr->terminal_id, cpuno + 1, i + 1);
if (LD_info->rptinfo && LD_info->rptinfo[i]) {
LD_info->rptinfo[i]->rptID = NULL;
LD_info->rptinfo[i]->LD_info = NULL;
}
}
} }
//初始化监测点的日志控制 //初始化监测点的日志控制
init_logctrl_by_count(LD_info, log_cfg_strlists[type]->size()); init_logctrl_by_count(LD_info, log_cfg_strlists[type]->size());
@@ -6528,7 +6677,7 @@ bool shouldSkipTerminal(const char* terminal_id) {
void rocketmq_test_300(int mpnum,int front_index,int type) { void rocketmq_test_300(int mpnum,int front_index,int type) {
Ckafka_data_t data; Ckafka_data_t data;
data.strTopic = QString::fromStdString(G_ROCKETMQ_TOPIC); data.strTopic = QString::fromStdString(G_ROCKETMQ_TOPIC_TEST);
data.mp_id = "0"; data.mp_id = "0";
// 读取文件内容 // 读取文件内容
@@ -7167,6 +7316,7 @@ void send_reply_to_kafka(const std::string& guid, const std::string& step, const
// 封装 Kafka 消息 // 封装 Kafka 消息
Ckafka_data_t connect_info; Ckafka_data_t connect_info;
connect_info.strTopic = QString::fromStdString(Topic_Reply_Topic); connect_info.strTopic = QString::fromStdString(Topic_Reply_Topic);
connect_info.mp_id = QString::fromStdString(guid);//guid作为key
connect_info.strText = QString::fromStdString(jsonString); connect_info.strText = QString::fromStdString(jsonString);
// 加入发送队列(带互斥锁保护) // 加入发送队列(带互斥锁保护)
@@ -7195,6 +7345,7 @@ void send_reply_to_kafka_recall(const std::string& guid, const std::string& step
// 封装 Kafka 消息 // 封装 Kafka 消息
Ckafka_data_t connect_info; Ckafka_data_t connect_info;
connect_info.strTopic = QString::fromStdString(Topic_Reply_Topic); connect_info.strTopic = QString::fromStdString(Topic_Reply_Topic);
connect_info.mp_id = QString::fromStdString(guid);//guid作为key
connect_info.strText = QString::fromStdString(jsonString); connect_info.strText = QString::fromStdString(jsonString);
// 加入发送队列(带互斥锁保护) // 加入发送队列(带互斥锁保护)
@@ -7204,20 +7355,25 @@ void send_reply_to_kafka_recall(const std::string& guid, const std::string& step
} }
void send_heartbeat_to_kafka(const std::string& status) { void send_heartbeat_to_kafka(const std::string& status) {
std::string front_type = get_front_type_from_subdir();
// 构造 JSON 字符串 // 构造 JSON 字符串
std::ostringstream oss; std::ostringstream oss;
oss << "{" oss << "{"
<< "\"nodeId\":\"" << FRONT_INST << "\"," << "\"nodeId\":\"" << FRONT_INST << "\","
<< "\"frontType\":\"" << get_front_type_from_subdir() << "\"," << "\"frontType\":\"" << front_type << "\","
<< "\"processNo\":\"" << g_front_seg_index << "\"," << "\"processNo\":\"" << g_front_seg_index << "\","
<< "\"status\":\"" << status << "\"" << "\"status\":\"" << status << "\""
<< "}"; << "}";
std::string jsonString = oss.str(); std::string jsonString = oss.str();
std::string mpid_str = std::to_string(g_node_id) + "_" + std::to_string(g_front_seg_index);
// 封装 Kafka 消息 // 封装 Kafka 消息
Ckafka_data_t connect_info; Ckafka_data_t connect_info;
connect_info.strTopic = QString::fromStdString(Heart_Beat_Topic); connect_info.strTopic = QString::fromStdString(Heart_Beat_Topic);
connect_info.mp_id = QString::fromStdString(mpid_str);
connect_info.strText = QString::fromStdString(jsonString); connect_info.strText = QString::fromStdString(jsonString);
// 加入发送队列(带互斥锁保护) // 加入发送队列(带互斥锁保护)

View File

@@ -360,21 +360,25 @@ protected:
final_msg = suppressed_oss.str(); final_msg = suppressed_oss.str();
} }
std::string business_id = extract_logger_id(logger_name);
std::string front_type = get_front_type_from_subdir();
std::ostringstream oss; std::ostringstream oss;
oss << "{\"processNo\":\"" << intToString(g_front_seg_index) oss << "{\"processNo\":\"" << intToString(g_front_seg_index)
<< "\",\"nodeId\":\"" << FRONT_INST << "\",\"nodeId\":\"" << escape_json(FRONT_INST)
<< "\",\"businessId\":\"" << extract_logger_id(logger_name) << "\",\"businessId\":\"" << escape_json(business_id)
<< "\",\"level\":\"" << level_str << "\",\"level\":\"" << escape_json(level_str)
<< "\",\"grade\":\"" << get_level_str(level) << "\",\"grade\":\"" << escape_json(get_level_str(level))
<< "\",\"logtype\":\"" << safe_logtype << "\",\"logtype\":\"" << safe_logtype
<< "\",\"frontType\":\"" << get_front_type_from_subdir() << "\",\"frontType\":\"" << escape_json(front_type)
<< "\",\"code\":" << code << "\",\"code\":" << code
<< ",\"log\":\"" << escape_json(final_msg) << "\"}"; << ",\"log\":\"" << escape_json(final_msg) << "\"}";
Ckafka_data_t connect_info; Ckafka_data_t connect_info;
connect_info.strTopic = QString::fromStdString(G_LOG_TOPIC); connect_info.strTopic = QString::fromStdString(G_LOG_TOPIC);
connect_info.strText = QString::fromStdString(oss.str()); connect_info.mp_id = QString::fromStdString(business_id);
connect_info.strText = QString::fromStdString(oss.str());
kafka_data_list_mutex.lock(); kafka_data_list_mutex.lock();
kafka_data_list.append(connect_info); kafka_data_list.append(connect_info);
kafka_data_list_mutex.unlock(); kafka_data_list_mutex.unlock();

View File

@@ -224,8 +224,19 @@ extern "C" {
#include <sys/time.h> #include <sys/time.h>
#include <sys/resource.h> #include <sys/resource.h>
#endif #endif
/*
#define max(a,b) (((a) > (b)) ? (a) : (b)) #define max(a,b) (((a) > (b)) ? (a) : (b))
#define min(a,b) (((a) < (b)) ? (a) : (b)) #define min(a,b) (((a) < (b)) ? (a) : (b))
*/
#ifndef __cplusplus
#ifndef max
#define max(a,b) (((a) > (b)) ? (a) : (b))
#endif
#ifndef min
#define min(a,b) (((a) < (b)) ? (a) : (b))
#endif
#endif
#include <stdlib.h> #include <stdlib.h>
#include <errno.h> #include <errno.h>
#include <sys/types.h> #include <sys/types.h>

View File

@@ -1,21 +1,34 @@
#ifdef __cplusplus #ifdef __cplusplus
#include "../json/mms_json_inter.h" #include "../json/mms_json_inter.h"
#include "../rocketmq/CProducer.h" //#include "../rocketmq/CProducer.h"
#include "../rocketmq/CMessage.h" //#include "../rocketmq/CMessage.h"
#include "../rocketmq/CSendResult.h" //#include "../rocketmq/CSendResult.h"
//#include "../rocketmq/CPushConsumer.h"
#include "../rocketmq/CPushConsumer.h" #include "../rocketmq/DefaultMQProducer.h"
#include "../rocketmq/MQMessage.h"
#include "../rocketmq/SendResult.h"
#include "../rocketmq/SessionCredentials.h"
#include "../rocketmq/MQMessageExt.h"
#include "../rocketmq/ConsumeType.h"
#include "../rocketmq/MQMessageListener.h"
#include <vector> #include <vector>
#include <iostream>
#include <string>
using namespace rocketmq;
/*添加测试函数lnk10-10*/ /*添加测试函数lnk10-10*/
void producer_send0(); //void producer_send0();
void StartSendMessage(CProducer* producer,const char* strbody); //void StartSendMessage(CProducer* producer,const char* strbody);
void producer_send(const char* strbody); //void producer_send(const char* strbody);
void rocketmq_producer_send(const char* strbody,const char* topic); //void rocketmq_producer_send(const char* strbody,const char* topic);
void rocketmq_StartSendMessage(CProducer* producer,const char* strbody,const char* topic); //void rocketmq_StartSendMessage(CProducer* producer,const char* strbody,const char* topic);
void rocketmq_producer_send(const std::string& body,
const std::string& topic,
const std::string& tags,
const std::string& keys);
extern "C" { extern "C" {
void rocketmq_test_rt(); void rocketmq_test_rt();
void rocketmq_test_ud(); void rocketmq_test_ud();
@@ -32,24 +45,29 @@ extern void my_rocketmq_send(Ckafka_data_t& data);
void InitializeProducer(); void InitializeProducer();
void ShutdownAndDestroyProducer(); void ShutdownAndDestroyProducer();
//////////////////////////////////////////////////////消费者 //////////////////////////////////////////////////////消费者
void InitializeConsumer(const std::string& consumerName, const std::string& nameServer, const char* topic, const char* tag, const std::string& key); typedef ConsumeStatus (*MessageCallBack)(
void ShutdownAndDestroyConsumer(); const MQMessageExt& msg
);
struct Subscription { struct Subscription {
std::string topic; std::string topic;
std::string tag; std::string tag;
MessageCallBack callback; MessageCallBack callback;
Subscription(const std::string& t, const std::string& tg, MessageCallBack cb) Subscription(const std::string& t,
const std::string& tg,
MessageCallBack cb)
: topic(t), tag(tg), callback(cb) {} : topic(t), tag(tg), callback(cb) {}
}; };
//void InitializeConsumer(const std::string& consumerName, const std::string& nameServer, const char* topic, const char* tag, const std::string& key);
void InitializeConsumer(const std::string& consumerName,
const std::string& nameServer,
const std::vector<Subscription>& subscriptions);
void ShutdownAndDestroyConsumer();
void rocketmq_consumer_receive( void rocketmq_consumer_receive(
const std::string& consumerName, const std::string& consumerName,
const std::string& nameServer, const std::string& nameServer,
//const std::string& topic,
//const std::string& tag,
//MessageCallBack callback);
const std::vector<Subscription>& subscriptions); const std::vector<Subscription>& subscriptions);
////////////////////////////////////////////////////// //////////////////////////////////////////////////////

View File

@@ -49,7 +49,7 @@
#ifndef HIBYTE #ifndef HIBYTE
#define HIBYTE(w) ((byte_t)((uint16_t)(w) >> 8)) #define HIBYTE(w) ((byte_t)((uint16_t)(w) >> 8))
#endif #endif
/*
#ifndef max #ifndef max
#define max(a,b) (((a) > (b)) ? (a) : (b)) #define max(a,b) (((a) > (b)) ? (a) : (b))
#endif #endif
@@ -57,7 +57,18 @@
#ifndef min #ifndef min
#define min(a,b) (((a) < (b)) ? (a) : (b)) #define min(a,b) (((a) < (b)) ? (a) : (b))
#endif #endif
*/
#ifndef __cplusplus
#ifndef max
#define max(a,b) (((a) > (b)) ? (a) : (b))
#endif
#ifndef min
#define min(a,b) (((a) < (b)) ? (a) : (b))
#endif
#endif
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif

1297
json/PQSMsg.cpp Normal file

File diff suppressed because it is too large Load Diff

2548
json/PQSMsg.h Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -44,6 +44,9 @@ extern std::string WEB_COMFLAG;
extern std::string WEB_EVENT; extern std::string WEB_EVENT;
extern std::string WEB_FILEDOWNLOAD; extern std::string WEB_FILEDOWNLOAD;
extern std::string G_CONNECT_TOPIC; extern std::string G_CONNECT_TOPIC;
extern std::string DATA_TRACE_TOPIC;//lnk20260609数据追踪topic
extern int RECALL_ONLY_FLAG;
//lnk20250115添加台账锁 //lnk20250115添加台账锁
extern pthread_mutex_t mtx; extern pthread_mutex_t mtx;
@@ -164,6 +167,7 @@ public:
QString WavePhasicB; QString WavePhasicB;
QString WavePhasicC; QString WavePhasicC;
QString TypeOfData; //闪变和统计是否合并 0-分开 1-合并 QString TypeOfData; //闪变和统计是否合并 0-分开 1-合并
QString IEDControl; //例LD0 lnk2026-5-13
QString UnitOfTimeUnit; //暂态事件持续事件单位0 - 毫秒 1 - 秒 lnk20260127 QString UnitOfTimeUnit; //暂态事件持续事件单位0 - 毫秒 1 - 秒 lnk20260127
QString ValueOfTimeUnit; //上送值的时间UTC-UTC时间 beijing-北京时间 QString ValueOfTimeUnit; //上送值的时间UTC-UTC时间 beijing-北京时间
QString WaveTimeFlag; //录波文件的时间UTC-UTC时间 beijing-北京时间 QString WaveTimeFlag; //录波文件的时间UTC-UTC时间 beijing-北京时间
@@ -287,24 +291,36 @@ static void trace_hit_and_decrement(const QString& mp_id)
} }
//追踪61850原始数据 //追踪61850原始数据
static QString build_mms_multiline_text(const json_block_data* data) static QString build_mms_json_object(const json_block_data* data)
{ {
QStringList lines; QString json = "{";
QMapIterator<QString, double> it(data->mms_str_map); QMapIterator<QString, double> it(data->mms_str_map);
bool first = true;
while (it.hasNext()) { while (it.hasNext()) {
it.next(); it.next();
lines << QString("%1 = %2").arg(it.key()).arg(it.value(), 0, 'g', 15);
if (!first)
json += ",";
json += QString("\"%1\":%2")
.arg(escape_json_string(it.key()))
.arg(QString::number(it.value(), 'g', 15));
first = false;
} }
return lines.join("\n"); json += "}";
return json;
} }
static QString build_trace_json(const json_block_data* data) static QString build_trace_json(const json_block_data* data)
{ {
if (!data) return "{}"; if (!data) return "{}";
QString mms_text = build_mms_multiline_text(data); QString mms_json = build_mms_json_object(data);
QString json; QString json;
json += "{"; json += "{";
@@ -313,7 +329,7 @@ static QString build_trace_json(const json_block_data* data)
json += QString("\"data_time\":%1,").arg(QString::number((qlonglong)data->time)); json += QString("\"data_time\":%1,").arg(QString::number((qlonglong)data->time));
json += QString("\"voltage_level\":\"%1\",").arg(QString::number(data->voltage_level, 'f', 6)); json += QString("\"voltage_level\":\"%1\",").arg(QString::number(data->voltage_level, 'f', 6));
json += QString("\"dev_type\":\"%1\",").arg(escape_json_string(data->dev_type)); json += QString("\"dev_type\":\"%1\",").arg(escape_json_string(data->dev_type));
json += QString("\"mms_text\":\"%1\"").arg(escape_json_string(mms_text)); json += QString("\"mms_json\":%1").arg(mms_json);
json += "}"; json += "}";
return json; return json;
@@ -331,7 +347,7 @@ static void send_trace_if_needed(json_block_data* pdata)
Ckafka_data_t KafkaData; Ckafka_data_t KafkaData;
KafkaData.monitor_id = pdata->monitorId; KafkaData.monitor_id = pdata->monitorId;
KafkaData.mp_id = pdata->mp_id; KafkaData.mp_id = pdata->mp_id;
KafkaData.strTopic = "DATA_TRACE_TOPIC"; KafkaData.strTopic = QString::fromStdString(DATA_TRACE_TOPIC);
KafkaData.strText = jsonText; KafkaData.strText = jsonText;
kafka_data_list_mutex.lock(); kafka_data_list_mutex.lock();
@@ -382,6 +398,7 @@ bool get_xml_config_by_dev_type(const char* dev_type, XmlConfigC* out_cfg) {
strncpy(out_cfg->TypeOfData, cfg.TypeOfData.toUtf8().constData(), sizeof(out_cfg->TypeOfData) - 1); out_cfg->TypeOfData[sizeof(out_cfg->TypeOfData) - 1] = '\0';//lnk20260127 strncpy(out_cfg->TypeOfData, cfg.TypeOfData.toUtf8().constData(), sizeof(out_cfg->TypeOfData) - 1); out_cfg->TypeOfData[sizeof(out_cfg->TypeOfData) - 1] = '\0';//lnk20260127
strncpy(out_cfg->ValueOfTimeUnit, cfg.ValueOfTimeUnit.toUtf8().constData(),sizeof(out_cfg->ValueOfTimeUnit) - 1); strncpy(out_cfg->ValueOfTimeUnit, cfg.ValueOfTimeUnit.toUtf8().constData(),sizeof(out_cfg->ValueOfTimeUnit) - 1);
strncpy(out_cfg->WaveTimeFlag, cfg.WaveTimeFlag.toUtf8().constData(), sizeof(out_cfg->WaveTimeFlag) - 1); strncpy(out_cfg->WaveTimeFlag, cfg.WaveTimeFlag.toUtf8().constData(), sizeof(out_cfg->WaveTimeFlag) - 1);
strncpy(out_cfg->IEDControl, cfg.IEDControl.toUtf8().constData(), sizeof(out_cfg->IEDControl) - 1);//lnk2026-5-13
strncpy(out_cfg->IEDname, cfg.IEDname.toUtf8().constData(), sizeof(out_cfg->IEDname) - 1); strncpy(out_cfg->IEDname, cfg.IEDname.toUtf8().constData(), sizeof(out_cfg->IEDname) - 1);
strncpy(out_cfg->LDevicePrefix, cfg.LDevicePrefix.toUtf8().constData(), sizeof(out_cfg->LDevicePrefix) - 1); strncpy(out_cfg->LDevicePrefix, cfg.LDevicePrefix.toUtf8().constData(), sizeof(out_cfg->LDevicePrefix) - 1);
@@ -1034,6 +1051,10 @@ bool ParseXMLConfig2(int xml_flag, XmlConfig *cfg, list<CTopic*> *ctopiclist,QSt
cfg->WaveTimeFlag.append(e.attribute("WaveTimeFlag")); cfg->WaveTimeFlag.append(e.attribute("WaveTimeFlag"));
} }
if ("IEDControl" == strTag)
{
cfg->IEDControl.append(e.attribute("name"));
}
if ("IED" == strTag) if ("IED" == strTag)
{ {
cfg->IEDname.append(e.attribute("name")); cfg->IEDname.append(e.attribute("name"));
@@ -1196,6 +1217,9 @@ int transfer_json_block_data(char v_wiring_type[], json_block_data *data) //json
print_mms_str_map(data); print_mms_str_map(data);
} }
//数据追踪上送
send_trace_if_needed(data);
list<CTopic*> ctopic_list; list<CTopic*> ctopic_list;
////lnk2024-8-15 区分星型,角型接线 ////lnk2024-8-15 区分星型,角型接线
@@ -1253,6 +1277,7 @@ int transfer_json_block_data(char v_wiring_type[], json_block_data *data) //json
printf("TypeOfData = '%s'\n", cfg1.TypeOfData); printf("TypeOfData = '%s'\n", cfg1.TypeOfData);
printf("ValueOfTimeUnit = '%s'\n", cfg1.ValueOfTimeUnit); printf("ValueOfTimeUnit = '%s'\n", cfg1.ValueOfTimeUnit);
printf("WaveTimeFlag = '%s'\n", cfg1.WaveTimeFlag); printf("WaveTimeFlag = '%s'\n", cfg1.WaveTimeFlag);
printf("IEDControl = '%s'\n", cfg1.IEDControl);
printf("IEDname = '%s'\n", cfg1.IEDname); printf("IEDname = '%s'\n", cfg1.IEDname);
printf("LDevicePrefix = '%s'\n", cfg1.LDevicePrefix); printf("LDevicePrefix = '%s'\n", cfg1.LDevicePrefix);
printf("=====================================\n"); printf("=====================================\n");
@@ -2854,7 +2879,7 @@ void processGGIO_start_data_end(char* mp_id,char* fullname,double v,long long ti
printf("transfer json ggio data: %s==%s \n", KafkaData.strText.toStdString().c_str(), fullname); printf("transfer json ggio data: %s==%s \n", KafkaData.strText.toStdString().c_str(), fullname);
kafka_data_list_mutex.lock(); //加锁 kafka_data_list_mutex.lock(); //加锁
kafka_data_list.append(KafkaData); //添加 kafka发送链表 //kafka_data_list.append(KafkaData); //添加 kafka发送链表
kafka_data_list_mutex.unlock(); //解锁 kafka_data_list_mutex.unlock(); //解锁
} }
@@ -3100,9 +3125,9 @@ char* Get_IED(char* devtype)
} }
QString ied; QString ied;
ied.append(xmlinfo_list[type]->xmlcfg.IEDname); // 从配置文件读取的终端名 ied.append(xmlinfo_list[type]->xmlcfg.IEDname); // 从配置文件读取的终端名PQ
ied.append(xmlinfo_list[type]->xmlcfg.LDevicePrefix); //// 从配置文件读取的监测点名 ied.append(xmlinfo_list[type]->xmlcfg.LDevicePrefix); //// 从配置文件读取的监测点名PQM
ied.append("%d"); ied.append("%d");//最后构成PQPQM1
QByteArray byteArray = ied.toLocal8Bit(); QByteArray byteArray = ied.toLocal8Bit();
@@ -3271,14 +3296,13 @@ void connectlog_pgsql(char* id,char* datetime,int status)
return; return;
} }
//使用mq //使用mq
Ckafka_data_t connect_info; Ckafka_data_t connect_info;
connect_info.strTopic = QString::fromStdString(G_CONNECT_TOPIC); connect_info.strTopic = QString::fromStdString(G_CONNECT_TOPIC);
connect_info.mp_id = QString::fromLocal8Bit(id);//这里填装置id后续作为key
connect_info.strText = QString::fromStdString(std::string(jsonString)); connect_info.strText = QString::fromStdString(std::string(jsonString));
if(g_node_id == STAT_DATA_BASE_NODE_ID){//稳态才上传 if((g_node_id == STAT_DATA_BASE_NODE_ID && RECALL_ONLY_FLAG == 0) || (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID && RECALL_ONLY_FLAG == 1)){//稳态或者补招才上传
kafka_data_list_mutex.lock(); //加锁 kafka_data_list_mutex.lock(); //加锁
kafka_data_list.append(connect_info); //添加 kafka发送链表 kafka_data_list.append(connect_info); //添加 kafka发送链表
kafka_data_list_mutex.unlock(); //解锁 kafka_data_list_mutex.unlock(); //解锁

File diff suppressed because it is too large Load Diff

Binary file not shown.

Binary file not shown.

View File

@@ -113,12 +113,12 @@ extern LOG_TLS int g_log_code_tls; // 声明为 TLS 变量,定义见 log4.cpp
int __old_code__ = g_log_code_tls; \ int __old_code__ = g_log_code_tls; \
g_log_code_tls = (int)(CODE_INT); \ g_log_code_tls = (int)(CODE_INT); \
\ \
char __msg_buf__[256]; \ char __msg_buf__[512]; \
format_log_msg(__msg_buf__, sizeof(__msg_buf__), __VA_ARGS__); \ format_log_msg(__msg_buf__, sizeof(__msg_buf__), __VA_ARGS__); \
\ \
const char* __key_raw__ = (KEY); \ const char* __key_raw__ = (KEY); \
\ \
char __key_buf__[256]; \ char __key_buf__[512]; \
switch ((int)(KEY_TYPE)) { \ switch ((int)(KEY_TYPE)) { \
case 0: \ case 0: \
snprintf(__key_buf__, sizeof(__key_buf__), "process"); \ snprintf(__key_buf__, sizeof(__key_buf__), "process"); \

View File

@@ -165,6 +165,7 @@ typedef struct {
char TypeOfData[64]; char TypeOfData[64];
char ValueOfTimeUnit[64]; char ValueOfTimeUnit[64];
char WaveTimeFlag[64]; char WaveTimeFlag[64];
char IEDControl[64];
char IEDname[64]; char IEDname[64];
char LDevicePrefix[64]; char LDevicePrefix[64];
} XmlConfigC; } XmlConfigC;
@@ -183,6 +184,8 @@ typedef struct file_dir_req_t
char devid[128]; char devid[128];
int type; int type;
char path[256]; char path[256];
char remote_path[256];
time_t create_time; time_t create_time;
} file_dir_req_t; } file_dir_req_t;

View File

@@ -32,7 +32,7 @@ extern apr_pool_t* g_cfg_pool;
extern apr_pool_t* g_init_pool; extern apr_pool_t* g_init_pool;
extern int g_DevFlag; //日志配置中读取的参数暂无特定使用lnk20250121 extern int g_DevFlag; //日志配置中读取的参数暂无特定使用lnk20250121
extern bool DEBUGOPEN;//调试开关,控制是否输出调试日志,默认关闭
extern int IED_COUNT; extern int IED_COUNT;
extern int RECALL_ONLY_FLAG; //lnk20260309添加一个全局变量标志是否只运行补招程序 extern int RECALL_ONLY_FLAG; //lnk20260309添加一个全局变量标志是否只运行补招程序
@@ -132,10 +132,25 @@ void CloseIECReports(chnl_usr_t *chnl_usr)
ied_usr = GET_IEDEXT_ADDR(ied); ied_usr = GET_IEDEXT_ADDR(ied);
for(cpuno=0 ; cpuno<ied->cpucount; cpuno++) { for(cpuno=0 ; cpuno<ied->cpucount; cpuno++) {
LD_info = &(ied_usr->LD_info[cpuno]); LD_info = &(ied_usr->LD_info[cpuno]);
//添加保护lnk20260602
if (LD_info->cpuno == 0 ||
LD_info->LD_name == NULL ||
LD_info->rptcount <= 0 ||
LD_info->rptinfo == NULL)
{
continue;
}
for(rpt_no=0 ; rpt_no<LD_info->rptcount; rpt_no++) { for(rpt_no=0 ; rpt_no<LD_info->rptcount; rpt_no++) {
char rpt_inst_name[65]; char rpt_inst_name[65];
rptinfo = LD_info->rptinfo[rpt_no]; rptinfo = LD_info->rptinfo[rpt_no];
if ( ! rptinfo->rpt_registered )
//添加保护lnk20260602
if (rptinfo == NULL)
continue;
if ( !rptinfo->rpt_registered )
continue; continue;
if ( rptinfo->chnl_id != chnl_usr->chnl_id) if ( rptinfo->chnl_id != chnl_usr->chnl_id)
continue; continue;
@@ -260,6 +275,18 @@ void IECReport_tryGI(chnl_usr_t *chnl_usr,rptinfo_t *rptinfo)
//增加处理根据稳态,或暂态功能等,决定 报告是否需要注册、取消注册或 不做任何处理 //增加处理根据稳态,或暂态功能等,决定 报告是否需要注册、取消注册或 不做任何处理
int judge_rpt_next_should_do(rptinfo_t *rptinfo) int judge_rpt_next_should_do(rptinfo_t *rptinfo)
{ {
if (rptinfo == NULL) {
printf("[RPT][ERROR] rptinfo is NULL\n");
return SHOULD_DO_NOTHING;
}
if (rptinfo->LD_info == NULL) {
printf("[RPT][ERROR] rptinfo->LD_info is NULL, rptID=%s\n",
rptinfo->rptID ? rptinfo->rptID : "NULL");
return SHOULD_DO_NOTHING;
}
int should_register_state = 1; //各功能默认注册 int should_register_state = 1; //各功能默认注册
int is_real_report = (rptinfo->report_PQ_type & REPORT_TYPE_REAL);//报告控制中包含的类型 int is_real_report = (rptinfo->report_PQ_type & REPORT_TYPE_REAL);//报告控制中包含的类型
int is_soe_report = (rptinfo->report_PQ_type & REPORT_TYPE_SOE); int is_soe_report = (rptinfo->report_PQ_type & REPORT_TYPE_SOE);
@@ -302,7 +329,23 @@ void ChannelCheckIECReports(chnl_usr_t *chnl_usr)
{ {
LD_info = &(ied_usr->LD_info[cpuno]); //遍历监测点 LD_info = &(ied_usr->LD_info[cpuno]); //遍历监测点
if (LD_info->cpuno==0){ //添加保护,防止台账中测点号不连续导致的崩溃
if (LD_info->cpuno == 0 ||
LD_info->LD_name == NULL ||
LD_info->rptcount <= 0 ||
LD_info->rptinfo == NULL)
{
printf("[RPT][SKIP] invalid LD_info cpu=%d cpuno=%d LD_name=%p rptcount=%d rptinfo=%p\n",
cpuno,
LD_info->cpuno,
LD_info->LD_name,
LD_info->rptcount,
LD_info->rptinfo);
continue;
}
//未初始化的测点告警没有意义直接访问id还可能崩溃
/*if (LD_info->cpuno==0){
// 仅在还没达到5次上限时打印 // 仅在还没达到5次上限时打印
if (!LD_info->has_logged_regist) { if (!LD_info->has_logged_regist) {
LD_info->registcount++; LD_info->registcount++;
@@ -319,13 +362,39 @@ void ChannelCheckIECReports(chnl_usr_t *chnl_usr)
} }
} }
continue; continue;
} }*/
for(rpt_no=0 ; rpt_no<LD_info->rptcount; rpt_no++) { //遍历报告(映射文件中读取的报告控制) for(rpt_no=0 ; rpt_no<LD_info->rptcount; rpt_no++) { //遍历报告(映射文件中读取的报告控制)
rptinfo = LD_info->rptinfo[rpt_no] ; rptinfo = LD_info->rptinfo[rpt_no] ;
//检查是否需要注册或取消注册报告,或不做任何处理
printf("[RPT][CHECK] ip=%s cpu=%d rpt_no=%d rptcount=%d LD_name=%s rptinfo=%p\n",
chnl_usr->ip_str,
cpuno,
rpt_no,
LD_info->rptcount,
LD_info->LD_name ? LD_info->LD_name : "NULL",
rptinfo);
if (rptinfo == NULL) {
printf("[RPT][ERROR] rptinfo NULL, skip\n");
continue;
}
if (rptinfo->LD_info == NULL) {
printf("[RPT][ERROR] rptinfo->LD_info NULL, rptID=%s, skip\n",
rptinfo->rptID ? rptinfo->rptID : "NULL");
continue;
}
if (rptinfo->rptID == NULL || rptinfo->rptID[0] == '\0') {
printf("[RPT][ERROR] rptID empty, skip\n");
continue;
}
//添加保护
if (judge_rpt_next_should_do(rptinfo)==SHOULD_DO_NOTHING)//检查是否触发 if (judge_rpt_next_should_do(rptinfo)==SHOULD_DO_NOTHING)//检查是否触发
{ {
//DIY_DEBUGLOG_CODE(LD_info->mp_id,2,LOG_CODE_REPORT,"【DEBUG】监测点:%s - id:%s不注册报告", LD_info->name,LD_info->mp_id); //DIY_DEBUGLOG_CODE(LD_info->mp_id,2,LOG_CODE_REPORT,"【DEBUG】监测点:%s - id:%s不注册报告", LD_info->name,LD_info->mp_id);
@@ -496,8 +565,17 @@ void ChannelCheckIECLogs(chnl_usr_t *chnl_usr)
for(cpuno=0 ; cpuno<ied->cpucount; cpuno++) { for(cpuno=0 ; cpuno<ied->cpucount; cpuno++) {
LD_info = &(ied_usr->LD_info[cpuno]); LD_info = &(ied_usr->LD_info[cpuno]);
//添加保护,防止台账中测点号不连续导致的崩溃
if (LD_info->cpuno == 0 ||
LD_info->LD_name == NULL ||
LD_info->logcount <= 0 ||
LD_info->loginfo == NULL)
{
continue;
}
//日志控制块缺失 //日志控制块缺失
if (LD_info->logcount<=0){ /*if (LD_info->logcount<=0){
// 仅在还没达到5次上限时打印 // 仅在还没达到5次上限时打印
if (!LD_info->has_logged_regist) { if (!LD_info->has_logged_regist) {
LD_info->registcount++; LD_info->registcount++;
@@ -515,10 +593,14 @@ void ChannelCheckIECLogs(chnl_usr_t *chnl_usr)
} }
continue; continue;
} }*/
loginfo = LD_info->loginfo[0] ; loginfo = LD_info->loginfo[0] ;
//添加保护,防止台账中测点号不连续导致的崩溃
if (loginfo == NULL || loginfo->LD_info == NULL)
continue;
apr_sleep(apr_time_from_sec(1) / 10); apr_sleep(apr_time_from_sec(1) / 10);
Check_Recall_Config(LD_info->mp_id);//尝试获取xml结构 Check_Recall_Config(LD_info->mp_id);//尝试获取xml结构
@@ -1531,6 +1613,8 @@ void CheckAllConnectedChannel()
if(chnl_usr->m_state == CHANNEL_CONNECTED) if(chnl_usr->m_state == CHANNEL_CONNECTED)
{ {
if(g_node_id == THREE_SECS_DATA_BASE_NODE_ID) { if(g_node_id == THREE_SECS_DATA_BASE_NODE_ID) {
InitLedrsOperTypeForChannel(chnl_usr);//写特殊控制的初始化
if(DEBUGOPEN)printf("[FILEDIR] enter HandleFileDirReqForChannel");
HandleFileDirReqForChannel(chnl_usr);//文件目录请求 HandleFileDirReqForChannel(chnl_usr);//文件目录请求
} }
@@ -2065,6 +2149,8 @@ apr_status_t call_cn_wavelist(LD_info_t *LD_info )
printf(">>>>>>>> extract_timestamp_from_cfg_file success \n"); printf(">>>>>>>> extract_timestamp_from_cfg_file success \n");
qvvr = find_qvvr_by_trig_tm(LD_info,trig_tm); //根据文件的触发时间查找检测点记录的匹配上的暂态事件 qvvr = find_qvvr_by_trig_tm(LD_info,trig_tm); //根据文件的触发时间查找检测点记录的匹配上的暂态事件
if (qvvr) { if (qvvr) {
DIY_INFOLOG_CODE(LD_info->mp_id,2,LOG_CODE_COMTRADE_FILE,"【NORMAL】监测点:%s - id:%s波形文件匹配暂态事件成功", LD_info->name,LD_info->mp_id);
char* uuid_cfg = (char*)malloc(65 * sizeof(char));//上传文件后获取到的路径 char* uuid_cfg = (char*)malloc(65 * sizeof(char));//上传文件后获取到的路径
char* uuid_dat = (char*)malloc(65 * sizeof(char)); char* uuid_dat = (char*)malloc(65 * sizeof(char));
char* filename_cfg = (char*)malloc(100 * sizeof(char));//上传文件后获取到的文件名 char* filename_cfg = (char*)malloc(100 * sizeof(char));//上传文件后获取到的文件名
@@ -2290,6 +2376,9 @@ apr_status_t call_cn_wavelist(LD_info_t *LD_info )
} }
} }
else {
DIY_INFOLOG_CODE(LD_info->mp_id,2,LOG_CODE_COMTRADE_FILE,"【INFO】监测点:%s - id:%s的录波文件没有任何暂态事件可以匹配录波号段%d", LD_info->name,LD_info->mp_id,LD_info->FltNum[i]);
}
} }
} }

View File

@@ -80,6 +80,9 @@
#include <ctype.h> //lnk20241119 #include <ctype.h> //lnk20241119
#include "../cfg_parse/custom_printf.h"//lnk20250225 #include "../cfg_parse/custom_printf.h"//lnk20250225
#include "../log4cplus/log4.h"
extern uint32_t g_node_id; extern uint32_t g_node_id;
extern char subdir[128]; extern char subdir[128];
unsigned int g_no_auth = 0; unsigned int g_no_auth = 0;
@@ -132,6 +135,8 @@ IDENT_RESP_INFO identify_response_info =
/************************************************************************/ /************************************************************************/
extern pt61850app_t *g_pt61850app;
extern TP0_CONN *tp0_conn_arr; /* ptr to array of "max_num_conns" structs */ extern TP0_CONN *tp0_conn_arr; /* ptr to array of "max_num_conns" structs */
static ST_VOID disc_ind_fun (MVL_NET_INFO *cc, ST_INT discType); static ST_VOID disc_ind_fun (MVL_NET_INFO *cc, ST_INT discType);
@@ -269,12 +274,25 @@ MY_CONTROL_INFO my_control_info;
ST_INT mms_var_type_id_create (MVL_NET_INFO *clientNetInfo, ST_INT scope, ST_INT mms_var_type_id_create (MVL_NET_INFO *clientNetInfo, ST_INT scope,
ST_CHAR *dom_name, ST_CHAR *var_name, int iTimeOut) ST_CHAR *dom_name, ST_CHAR *var_name, int iTimeOut)
{ {
MVL_REQ_PEND *reqCtrl; //MVL_REQ_PEND *reqCtrl;
//reqCtrl 必须初始化为 NULL防止 mvla_getvar 失败后释放野指针
MVL_REQ_PEND *reqCtrl = NULL;
GETVAR_REQ_INFO getvar_req; GETVAR_REQ_INFO getvar_req;
ST_INT type_id = -1; /* start with invalid type id */ ST_INT type_id = -1; /* start with invalid type id */
ST_RET ret; ST_RET ret;
//参数合法性检查
if (clientNetInfo == NULL || dom_name == NULL || var_name == NULL)
{
printf("[GETVAR] invalid arg netInfo=%p dom=%p var=%p\n",
clientNetInfo, dom_name, var_name);
return -1;
}
//结构体清零,避免未初始化字段导致异常
memset(&getvar_req, 0, sizeof(getvar_req));
/* Get the type of this "Oper" attribute & create type. */ /* Get the type of this "Oper" attribute & create type. */
/* Would be more efficient to do this just once before this function.*/ /* Would be more efficient to do this just once before this function.*/
getvar_req.req_tag = GETVAR_NAME; getvar_req.req_tag = GETVAR_NAME;
@@ -283,19 +301,53 @@ ST_RET ret;
if (scope == DOM_SPEC) if (scope == DOM_SPEC)
getvar_req.name.domain_id= dom_name; getvar_req.name.domain_id= dom_name;
getvar_req.name.obj_name.vmd_spec = var_name; getvar_req.name.obj_name.vmd_spec = var_name;
//增加调试打印,确认崩溃点
printf("[GETVAR] start dom=%s var=%s\n", dom_name, var_name);
ret = mvla_getvar (clientNetInfo, &getvar_req, &reqCtrl); ret = mvla_getvar (clientNetInfo, &getvar_req, &reqCtrl);
if (ret == SD_SUCCESS)
//打印 mvla_getvar 返回值和 reqCtrl
printf("[GETVAR] mvla_getvar ret=0x%X reqCtrl=%p\n", ret, reqCtrl);
if (ret == SD_SUCCESS){
ret = waitReqDone (reqCtrl, iTimeOut); ret = waitReqDone (reqCtrl, iTimeOut);
if (ret != SD_SUCCESS) //打印 waitReqDone 返回值
printf("[GETVAR] wait ret=0x%X\n", ret);
}
if (ret != SD_SUCCESS){
echo_warn2 ("Error getting type of variable '%s' in domain '%s'\n", var_name, dom_name); echo_warn2 ("Error getting type of variable '%s' in domain '%s'\n", var_name, dom_name);
}
else else
{ {
/* Don't care about name so pass NULL. */ /* Don't care about name so pass NULL. */
type_id = mvl_type_id_create (NULL, reqCtrl->u.getvar.resp_info->type_spec.data, //type_id = mvl_type_id_create (NULL, reqCtrl->u.getvar.resp_info->type_spec.data,
reqCtrl->u.getvar.resp_info->type_spec.len); // reqCtrl->u.getvar.resp_info->type_spec.len);
//严格检查 resp_info/type_spec避免空指针崩溃
if (ret == SD_SUCCESS &&
reqCtrl != NULL &&
reqCtrl->u.getvar.resp_info != NULL &&
reqCtrl->u.getvar.resp_info->type_spec.data != NULL &&
reqCtrl->u.getvar.resp_info->type_spec.len > 0)
{
type_id = mvl_type_id_create(
NULL,
reqCtrl->u.getvar.resp_info->type_spec.data,
reqCtrl->u.getvar.resp_info->type_spec.len);
printf("[GETVAR] create type_id=%d len=%d\n",
type_id,
reqCtrl->u.getvar.resp_info->type_spec.len);
}
else
{
printf("[GETVAR] failed dom=%s var=%s ret=0x%X\n",
dom_name, var_name, ret);
}
} }
mvl_free_req_ctrl (reqCtrl); /* Done with request struct */ //只有 reqCtrl 非空才释放
if (reqCtrl != NULL)mvl_free_req_ctrl (reqCtrl); /* Done with request struct */
return (type_id); return (type_id);
} }
@@ -1683,20 +1735,20 @@ static ST_VOID *my_realloc_err (ST_VOID *old, ST_UINT size)
} }
///////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////
//#define MAX_FILE_HANDLE_NUM (256) #define MAX_FILE_HANDLE_NUM (256)
//static FILE *fp_arr[MAX_FILE_HANDLE_NUM]; static FILE *fp_arr[MAX_FILE_HANDLE_NUM];
//static ST_INT32 cur_handle = 0; static ST_INT32 cur_handle = 0;
//ST_INT32 set_file_pointer( FILE *fp) ST_INT32 set_file_pointer( FILE *fp)
//{ {
// ST_INT32 the_handle = cur_handle; ST_INT32 the_handle = cur_handle;
// fp_arr[cur_handle++] = fp; fp_arr[cur_handle++] = fp;
// cur_handle %= MAX_FILE_HANDLE_NUM; cur_handle %= MAX_FILE_HANDLE_NUM;
// return the_handle; return the_handle;
//} }
//FILE* get_file_pointer(ST_INT32 handle) FILE* get_file_pointer(ST_INT32 handle)
//{ {
// return fp_arr[handle]; return fp_arr[handle];
//} }
/////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////
#if (MMS_FOPEN_EN & RESP_EN) #if (MMS_FOPEN_EN & RESP_EN)
@@ -1705,39 +1757,39 @@ static ST_VOID *my_realloc_err (ST_VOID *old, ST_UINT size)
/************************************************************************/ /************************************************************************/
ST_VOID u_mvl_fopen_ind (MVL_IND_PEND *indCtrl) ST_VOID u_mvl_fopen_ind (MVL_IND_PEND *indCtrl)
{ {
//FILE *fp; FILE *fp;
//FOPEN_RESP_INFO resp_info; FOPEN_RESP_INFO resp_info;
//struct stat stat_buf; struct stat stat_buf;
//fp = fopen (indCtrl->u.fopen.filename, "rb"); /* CRITICAL: use "b" flag for binary transfer*/ fp = fopen (indCtrl->u.fopen.filename, "rb"); /* CRITICAL: use "b" flag for binary transfer*/
//if (fp == NULL) if (fp == NULL)
// { {
// _mplas_err_resp (indCtrl,11,6); /* File-access denied */ _mplas_err_resp (indCtrl,11,6); /* File-access denied */
// return; return;
// } }
//if (fseek (fp, indCtrl->u.fopen.init_pos, SEEK_SET)) if (fseek (fp, indCtrl->u.fopen.init_pos, SEEK_SET))
// { {
// _mplas_err_resp (indCtrl,11,5); /* Position invalid */ _mplas_err_resp (indCtrl,11,5); /* Position invalid */
// return; return;
// } }
///* WARNING: this only works if (FILE *) is a 32-bit pointer. */ /* WARNING: this only works if (FILE *) is a 32-bit pointer. */
//resp_info.frsmid = set_file_pointer(fp); //(ST_INT32) fp; resp_info.frsmid = set_file_pointer(fp); //(ST_INT32) fp;
//if (fstat (fileno (fp), &stat_buf)) if (fstat (fileno (fp), &stat_buf))
// { /* Can't get file size or time */ { /* Can't get file size or time */
// _mplas_err_resp (indCtrl,11,0); /* File Problem, Other */ _mplas_err_resp (indCtrl,11,0); /* File Problem, Other */
// return; return;
// } }
//else else
// { {
// resp_info.ent.fsize = stat_buf.st_size; resp_info.ent.fsize = stat_buf.st_size;
// resp_info.ent.mtimpres = SD_TRUE; resp_info.ent.mtimpres = SD_TRUE;
// resp_info.ent.mtime = stat_buf.st_mtime; resp_info.ent.mtime = stat_buf.st_mtime;
// } }
//indCtrl->u.fopen.resp_info = &resp_info; indCtrl->u.fopen.resp_info = &resp_info;
//mplas_fopen_resp (indCtrl); mplas_fopen_resp (indCtrl);
} }
#endif /* MMS_FOPEN_EN & RESP_EN */ #endif /* MMS_FOPEN_EN & RESP_EN */
@@ -1748,31 +1800,31 @@ ST_VOID u_mvl_fopen_ind (MVL_IND_PEND *indCtrl)
/************************************************************************/ /************************************************************************/
ST_VOID u_mvl_fread_ind (MVL_IND_PEND *indCtrl) ST_VOID u_mvl_fread_ind (MVL_IND_PEND *indCtrl)
{ {
//FILE *fp; FILE *fp;
//ST_UCHAR *tmp_buf; ST_UCHAR *tmp_buf;
//MVLAS_FREAD_CTRL *fread_ctrl = &indCtrl->u.fread; MVLAS_FREAD_CTRL *fread_ctrl = &indCtrl->u.fread;
//FREAD_RESP_INFO resp_info; FREAD_RESP_INFO resp_info;
//fp = get_file_pointer(fread_ctrl->req_info->frsmid);// (FILE *) fread_ctrl->req_info->frsmid; fp = get_file_pointer(fread_ctrl->req_info->frsmid);// (FILE *) fread_ctrl->req_info->frsmid;
///* Do NOT read more than "max_size". */ /* Do NOT read more than "max_size". */
//tmp_buf = (ST_UCHAR *) chk_malloc (fread_ctrl->max_size); tmp_buf = (ST_UCHAR *) chk_malloc (fread_ctrl->max_size);
//resp_info.fd_len = fread (tmp_buf, 1, fread_ctrl->max_size, fp); resp_info.fd_len = fread (tmp_buf, 1, fread_ctrl->max_size, fp);
//if (resp_info.fd_len == 0 && ferror (fp)) if (resp_info.fd_len == 0 && ferror (fp))
// { {
// _mplas_err_resp (indCtrl, 3, 0); _mplas_err_resp (indCtrl, 3, 0);
// return; return;
// } }
//resp_info.filedata = tmp_buf; resp_info.filedata = tmp_buf;
//if (resp_info.fd_len == fread_ctrl->max_size) if (resp_info.fd_len == fread_ctrl->max_size)
// resp_info.more_follows = SD_TRUE; resp_info.more_follows = SD_TRUE;
//else else
// resp_info.more_follows = SD_FALSE; resp_info.more_follows = SD_FALSE;
//fread_ctrl->resp_info = &resp_info; fread_ctrl->resp_info = &resp_info;
//mplas_fread_resp (indCtrl); mplas_fread_resp (indCtrl);
//chk_free (tmp_buf); /* Temporary buffer */ chk_free (tmp_buf); /* Temporary buffer */
} }
#endif /* #if (MMS_FREAD_EN & RESP_EN) */ #endif /* #if (MMS_FREAD_EN & RESP_EN) */
@@ -1782,15 +1834,15 @@ ST_VOID u_mvl_fread_ind (MVL_IND_PEND *indCtrl)
/************************************************************************/ /************************************************************************/
ST_VOID u_mvl_fclose_ind (MVL_IND_PEND *indCtrl) ST_VOID u_mvl_fclose_ind (MVL_IND_PEND *indCtrl)
{ {
//FILE *fp; FILE *fp;
//MVLAS_FCLOSE_CTRL *fclose_ctrl = &indCtrl->u.fclose; MVLAS_FCLOSE_CTRL *fclose_ctrl = &indCtrl->u.fclose;
//fp = get_file_pointer(fclose_ctrl->req_info->frsmid);//(FILE *) fclose_ctrl->req_info->frsmid; fp = get_file_pointer(fclose_ctrl->req_info->frsmid);//(FILE *) fclose_ctrl->req_info->frsmid;
//if (fclose (fp)) if (fclose (fp))
// _mplas_err_resp (indCtrl, 11, 0); /* File problem, other */ _mplas_err_resp (indCtrl, 11, 0); /* File problem, other */
//else else
// mplas_fclose_resp (indCtrl); mplas_fclose_resp (indCtrl);
} }
#endif /* #if (MMS_FCLOSE_EN & RESP_EN) */ #endif /* #if (MMS_FCLOSE_EN & RESP_EN) */
@@ -1945,6 +1997,295 @@ MVL_REQ_PEND *reqCtrl;
return (ret); return (ret);
} }
//lnk20260508添加重启装置函数
int BuildResetDomName(ied_usr_t *ied_usr, char *domName, size_t domNameSize)
{
if (ied_usr == NULL || domName == NULL || domNameSize == 0)
return -1;
domName[0] = '\0';
XmlConfigC cfg1;
memset(&cfg1, 0, sizeof(cfg1));
if (get_xml_config_by_dev_type(ied_usr->dev_type, &cfg1))
{
printf("========== XmlConfigC dump ==========\n");
printf("IEDControl = '%s'\n", cfg1.IEDControl);
printf("IEDname = '%s'\n", cfg1.IEDname);
printf("LDevicePrefix = '%s'\n", cfg1.LDevicePrefix);
printf("=====================================\n");
if (cfg1.IEDControl[0] != '\0')
{
snprintf(domName, domNameSize, "%s", cfg1.IEDControl);
printf("[RESET] use cfg1.IEDControl=%s\n", domName);
return 0;
}
}
if (ied_usr->LD_info && ied_usr->LD_info[0].LD_name)
{
snprintf(domName, domNameSize, "%s", ied_usr->LD_info[0].LD_name);
int len = strlen(domName);
if (len > 0 && isdigit(domName[len - 1]))
domName[len - 1] = '0';
printf("[RESET] use LD_name domName=%s\n", domName);
DIY_WARNLOG_CODE(ied_usr->terminal_id, 1, LOG_CODE_FILE_CONTROL,
"【WARN】未取到 IEDControl 信息,使用 LD_name=%s terminal_id=%s",
domName, ied_usr->terminal_id);
return -2;
}
snprintf(domName, domNameSize, "%s", "PQMonitorPQM0");
printf("[RESET] use default domName=%s\n", domName);
DIY_ERRORLOG_CODE(ied_usr->terminal_id, 1, LOG_CODE_FILE_CONTROL,
"【ERROR】未取到 LD 信息,使用默认 domName=%s terminal_id=%s",
domName, ied_usr->terminal_id);
return -1;
}
static ST_INT ledrs_var_type_create(MVL_NET_INFO* net_info,
OBJECT_NAME* varObj,
ST_INT timeOut)
{
MVL_REQ_PEND* reqCtrl;
GETVAR_REQ_INFO getvar_req;
VAR_ACC_TSPEC* type_spec;
ST_INT type_id = -1;
ST_RET ret;
memset(&getvar_req, 0, sizeof(getvar_req));
getvar_req.req_tag = GETVAR_NAME;
getvar_req.name = *varObj;
ret = mvla_getvar(net_info, &getvar_req, &reqCtrl);
if (ret == SD_SUCCESS)
ret = waitReqDone(reqCtrl, timeOut);
if (ret == SD_SUCCESS)
{
type_spec = &reqCtrl->u.getvar.resp_info->type_spec;
type_id = mvl_type_id_create(NULL,
type_spec->data,
type_spec->len);
}
if (reqCtrl)
mvl_free_req_ctrl(reqCtrl);
return type_id;
}
static ST_INT create_oper_type_id(MVL_NET_INFO *net_info,
ST_CHAR *domName,
const char *ctlName,
ST_INT timeOut)
{
OBJECT_NAME obj;
ST_CHAR varName[MAX_IDENT_LEN + 1];
if (net_info == NULL || domName == NULL || ctlName == NULL)
return -1;
memset(&obj, 0, sizeof(obj));
apr_snprintf(varName,
sizeof(varName),
"LLN0$%s",
ctlName);
obj.object_tag = DOM_SPEC;
obj.domain_id = domName;
obj.obj_name.vmd_spec = varName;
printf("[CTRL_INIT] create type dom=%s var=%s\n",
domName, varName);
return ledrs_var_type_create(net_info, &obj, timeOut);
}
void InitLedrsOperTypeForChannel(chnl_usr_t *chnl_usr)
{
printf("[CTRL_INIT] enter\n");
if (chnl_usr == NULL || chnl_usr->chnl == NULL ||
chnl_usr->chnl->ied == NULL || chnl_usr->net_info == NULL)
{
printf("[CTRL_INIT] invalid chnl_usr\n");
return;
}
ied_t *ied = chnl_usr->chnl->ied;
ied_usr_t *ied_usr = GET_IEDEXT_ADDR(ied);
if (ied_usr == NULL || ied_usr->LD_info == NULL)
{
printf("[CTRL_INIT] invalid ied_usr or LD_info\n");
return;
}
printf("[CTRL_INIT] current inited=%d ledrs=%d reboot=%d reset=%d\n",
(int)ied_usr->oper_type_cache.inited,
ied_usr->oper_type_cache.ledrs_oper_type_id,
ied_usr->oper_type_cache.reboot_oper_type_id,
ied_usr->oper_type_cache.reset_oper_type_id);
if (ied_usr->oper_type_cache.inited == SD_TRUE)
{
printf("[CTRL_INIT] already inited\n");
return;
}
char domName[256] = {0};
BuildResetDomName(ied_usr,
domName,
sizeof(domName));
printf("[CTRL_INIT] final dom=%s\n", domName);
ied_usr->oper_type_cache.ledrs_oper_type_id =
create_oper_type_id(chnl_usr->net_info,
domName,
"CO$LEDRs$Oper",
g_pt61850app->mmsOpTimeout);
printf("[CTRL_INIT] LEDRs type_id=%d\n",
ied_usr->oper_type_cache.ledrs_oper_type_id);
ied_usr->oper_type_cache.reboot_oper_type_id =
create_oper_type_id(chnl_usr->net_info,
domName,
"CO$Reboot$Oper",
g_pt61850app->mmsOpTimeout);
printf("[CTRL_INIT] Reboot type_id=%d\n",
ied_usr->oper_type_cache.reboot_oper_type_id);
ied_usr->oper_type_cache.reset_oper_type_id =
create_oper_type_id(chnl_usr->net_info,
domName,
"ST$Mod$stVal",
g_pt61850app->mmsOpTimeout);
printf("[CTRL_INIT] Reset type_id=%d\n",
ied_usr->oper_type_cache.reset_oper_type_id);
/* 无论成功失败,都不再重复初始化 */
ied_usr->oper_type_cache.inited = SD_TRUE;
printf("[CTRL_INIT] finish inited=%d\n",
(int)ied_usr->oper_type_cache.inited);
}
ST_RET write_common_oper(chnl_usr_t *chnl_usr,
ST_CHAR *domName,
const char *ctlName,
ST_INT oper_type_id,
ST_INT timeOut)
{
if (chnl_usr == NULL ||
chnl_usr->net_info == NULL ||
domName == NULL ||
ctlName == NULL ||
oper_type_id < 0)
{
printf("[OPER_WRITE] invalid param\n");
return SD_FAILURE;
}
ST_CHAR varName[MAX_IDENT_LEN + 1];
apr_snprintf(varName,
sizeof(varName),
"LLN0$%s",
ctlName);
Control_Oper_t oper;
memset(&oper, 0, sizeof(oper));
oper.ctlVal = SD_TRUE;
oper.origin.orCat = 3;
oper.origin.orIdent.len = 0;
oper.ctlNum = 1;
u_get_current_utc_time(&oper.T);
oper.Test = SD_FALSE;
oper.Check[0] = 0x00;
oper.Check[1] = 0x00;
if ((int)sizeof(Control_Oper_t) !=
mvl_type_ctrl[oper_type_id].data_size)
{
printf("[OPER_WRITE] SIZE MISMATCH ctl=%s local=%d runtime=%d\n",
ctlName,
(int)sizeof(Control_Oper_t),
mvl_type_ctrl[oper_type_id].data_size);
return SD_FAILURE;
}
printf("[OPER_WRITE] dom=%s var=%s type_id=%d\n",
domName, varName, oper_type_id);
return mms_named_var_write(chnl_usr->net_info,
varName,
DOM_SPEC,
domName,
oper_type_id,
(ST_CHAR *)&oper,
timeOut);
}
ST_RET mms_conclude_disconnect(MVL_NET_INFO *net_info, ST_INT timeOut)
{
MVL_REQ_PEND *reqCtrl = NULL;
ST_RET ret;
if (net_info == NULL)
return SD_FAILURE;
printf("[RESET] before mvl_concl\n");
ret = mvl_concl(net_info, &reqCtrl);
printf("[RESET] after mvl_concl ret=0x%X reqCtrl=%p\n",
ret, reqCtrl);
if (ret == SD_SUCCESS && reqCtrl != NULL)
{
ret = waitReqDone(reqCtrl, timeOut);
printf("[RESET] conclude wait ret=0x%X\n", ret);
}
if (reqCtrl != NULL)
mvl_free_req_ctrl(reqCtrl);
return ret;
}
ST_RET write_mod_stval(chnl_usr_t *chnl_usr,
ST_CHAR *domName,
ST_INT timeOut)
{
ST_CHAR varName[MAX_IDENT_LEN + 1];
ST_INT16 value = 88;
apr_snprintf(varName, sizeof(varName), "LLN0$ST$Mod$stVal");
return mms_named_var_write(chnl_usr->net_info,
varName,
DOM_SPEC,
domName,
14, //int
(ST_CHAR *)&value,
timeOut);
}
/************************************************************************/ /************************************************************************/
/* init_log_cfg */ /* init_log_cfg */

View File

@@ -135,6 +135,15 @@ int fill_rptctrl_by_cfg(LD_info_t* LD_info,int rptno,char *buf)
assert( (rptno+1) <= LD_info->rptcount ) ; assert( (rptno+1) <= LD_info->rptcount ) ;
rptinfo = LD_info->rptinfo[rptno]; rptinfo = LD_info->rptinfo[rptno];
if (rptinfo == NULL) {
printf("[RPT_INIT][ERROR] rptinfo NULL, rptno=%d\n", rptno);
return 1;
}
rptinfo->LD_info = LD_info;
if(!(str = strtok(buf,","))) if(!(str = strtok(buf,",")))
return 1; return 1;
@@ -262,6 +271,13 @@ int fill_logctrl_by_cfg(LD_info_t* LD_info,int logno,char *buf,char* devtype)
assert( (logno+1) <= LD_info->logcount ) ; assert( (logno+1) <= LD_info->logcount ) ;
loginfo = LD_info->loginfo[logno]; loginfo = LD_info->loginfo[logno];
if (loginfo == NULL) {
printf("[LOG_INIT][ERROR] loginfo NULL, logno=%d\n", logno);
return 1;
}
loginfo->LD_info = LD_info;
if(!(str = strtok(buf,","))) if(!(str = strtok(buf,",")))
return 1; return 1;
tmp_str = apr_pstrdup(g_init_pool,str); tmp_str = apr_pstrdup(g_init_pool,str);

View File

@@ -320,6 +320,17 @@ struct LD_info_t{
//录波 //录波
}; };
//装置控制初始化
typedef struct
{
ST_BOOLEAN inited;
ST_INT ledrs_oper_type_id;
ST_INT reboot_oper_type_id;
ST_INT reset_oper_type_id;
} MMS_OPER_TYPE_CACHE;
struct ied_usr_t{ struct ied_usr_t{
LD_info_t *LD_info; /**< LD数组 */ LD_info_t *LD_info; /**< LD数组 */
int dev_idx; /**< 设备序号 */ int dev_idx; /**< 设备序号 */
@@ -346,6 +357,8 @@ struct ied_usr_t{
bool lastconnectstat;//lnk20250704 bool lastconnectstat;//lnk20250704
bool has_logged_disconnect;//lnk20250704 bool has_logged_disconnect;//lnk20250704
MMS_OPER_TYPE_CACHE oper_type_cache;
}; };
@@ -533,6 +546,35 @@ int parse_file_names_by_fltnum(int fltnum, char* domname, char** filenames, int
QVVR_t* find_qvvr_by_trig_tm(LD_info_t* LD_info,long long trig_tm); QVVR_t* find_qvvr_by_trig_tm(LD_info_t* LD_info,long long trig_tm);
void HandleFileDirReqForChannel(chnl_usr_t *chnl_usr); void HandleFileDirReqForChannel(chnl_usr_t *chnl_usr);
void InitLedrsOperTypeForChannel(chnl_usr_t *chnl_usr);
//lnk20250508添加重启装置函数
//根据抓包显示oper的data结构有6个item
typedef struct
{
ST_BOOLEAN ctlVal;
struct
{
ST_INT16 orCat;
struct
{
ST_INT16 len;
ST_UINT8 data[64];
} orIdent;
} origin;
ST_UINT32 ctlNum;
MMS_UTC_TIME T;
ST_BOOLEAN Test;
ST_UCHAR Check[2];
} Control_Oper_t;
////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////

View File

@@ -145,12 +145,26 @@ RCB_INFO* FindRcbInfo(MVL_NET_INFO *net_info,ST_CHAR *dom_name, ST_CHAR *rcb_nam
for(cpuno=0 ; cpuno<ied->cpucount; cpuno++) { for(cpuno=0 ; cpuno<ied->cpucount; cpuno++) {
LD_info = &(ied_usr->LD_info[cpuno]); LD_info = &(ied_usr->LD_info[cpuno]);
if (!LD_info->LD_name) //if (!LD_info->LD_name)
continue; // continue;
//添加保护lnk20260602
if (LD_info->cpuno == 0 ||
LD_info->LD_name == NULL ||
LD_info->rptcount <= 0 ||
LD_info->rptinfo == NULL)
{
continue;
}
if ( strcmp(LD_info->LD_name,dom_name)!=0 ) if ( strcmp(LD_info->LD_name,dom_name)!=0 )
continue; continue;
for(rpt_no=0 ; rpt_no<LD_info->rptcount; rpt_no++) { for(rpt_no=0 ; rpt_no<LD_info->rptcount; rpt_no++) {
rptinfo = LD_info->rptinfo[rpt_no]; rptinfo = LD_info->rptinfo[rpt_no];
//添加保护lnk20260602
if (rptinfo == NULL || rptinfo->rptID == NULL)
continue;
get_rpt_inst_name(rptinfo,rpt_inst_name); get_rpt_inst_name(rptinfo,rpt_inst_name);
if ( strcmp(rpt_inst_name,rcb_name)==0 ) { if ( strcmp(rpt_inst_name,rcb_name)==0 ) {
if (rptinfo->rpt_registered) if (rptinfo->rpt_registered)
@@ -181,8 +195,22 @@ rptinfo_t* find_rptinfo_from_net_rpt_info_name(MVL_NET_INFO *net_info, RCB_INFO
for(cpuno=0 ; cpuno<ied->cpucount; cpuno++) { for(cpuno=0 ; cpuno<ied->cpucount; cpuno++) {
LD_info = &(ied_usr->LD_info[cpuno]); LD_info = &(ied_usr->LD_info[cpuno]);
//添加保护lnk20260602
if (LD_info->cpuno == 0 ||
LD_info->LD_name == NULL ||
LD_info->rptcount <= 0 ||
LD_info->rptinfo == NULL)
{
continue;
}
for(rpt_no=0 ; rpt_no<LD_info->rptcount; rpt_no++) { for(rpt_no=0 ; rpt_no<LD_info->rptcount; rpt_no++) {
rptinfo = LD_info->rptinfo[rpt_no]; rptinfo = LD_info->rptinfo[rpt_no];
//添加保护lnk20260602
if (rptinfo == NULL || rptinfo->rptID == NULL || rcb_info == NULL || rcb_info->RptID == NULL)
continue;
printf("%d rptinfo %s,rcbinfo %s ", rpt_no, rptinfo->rptID, rcb_info->RptID); printf("%d rptinfo %s,rcbinfo %s ", rpt_no, rptinfo->rptID, rcb_info->RptID);
if (strcmp(rcb_info->RptID,rptinfo->rptID)==0)//WW 修改为匹配字符串 if (strcmp(rcb_info->RptID,rptinfo->rptID)==0)//WW 修改为匹配字符串
return rptinfo; return rptinfo;
@@ -211,8 +239,21 @@ rptinfo_t* find_rptinfo_from_net_rcb_info(MVL_NET_INFO *net_info,RCB_INFO *rcb_i
for(cpuno=0 ; cpuno<ied->cpucount; cpuno++) { for(cpuno=0 ; cpuno<ied->cpucount; cpuno++) {
LD_info = &(ied_usr->LD_info[cpuno]); LD_info = &(ied_usr->LD_info[cpuno]);
//添加保护lnk20260602
if (LD_info->cpuno == 0 ||
LD_info->LD_name == NULL ||
LD_info->rptcount <= 0 ||
LD_info->rptinfo == NULL)
{
continue;
}
for(rpt_no=0 ; rpt_no<LD_info->rptcount; rpt_no++) { for(rpt_no=0 ; rpt_no<LD_info->rptcount; rpt_no++) {
rptinfo = LD_info->rptinfo[rpt_no]; rptinfo = LD_info->rptinfo[rpt_no];
if (rptinfo == NULL)
continue;
if (rcb_info==rptinfo->m_rcb_info) if (rcb_info==rptinfo->m_rcb_info)
return rptinfo; return rptinfo;
} }
@@ -221,7 +262,7 @@ rptinfo_t* find_rptinfo_from_net_rcb_info(MVL_NET_INFO *net_info,RCB_INFO *rcb_i
} }
//////////////////////////////////////// ////////////////////////////////////////
//WW 2023-08-29 注释 end //WW 2023-08-29 注释 end
void get_rpt_inst_name(rptinfo_t *rptinfo, char * rpt_inst_name ) /*void get_rpt_inst_name(rptinfo_t *rptinfo, char * rpt_inst_name )
{ {
strcpy(rpt_inst_name,rptinfo->rptID); strcpy(rpt_inst_name,rptinfo->rptID);
if (rptinfo->instanceNeedSuffix) { if (rptinfo->instanceNeedSuffix) {
@@ -229,7 +270,56 @@ void get_rpt_inst_name(rptinfo_t *rptinfo, char * rpt_inst_name )
apr_snprintf(rpt_suffix_str,sizeof(rpt_suffix_str),"%02d",rptinfo->m_curRptSuffix); apr_snprintf(rpt_suffix_str,sizeof(rpt_suffix_str),"%02d",rptinfo->m_curRptSuffix);
strcat(rpt_inst_name,rpt_suffix_str); strcat(rpt_inst_name,rpt_suffix_str);
} }
} } */
void get_rpt_inst_name(rptinfo_t *rptinfo, char *rpt_inst_name)
{
if (rpt_inst_name == NULL)
{
printf("get_rpt_inst_name: rpt_inst_name is NULL\n");
return;
}
rpt_inst_name[0] = '\0';
if (rptinfo == NULL)
{
printf("get_rpt_inst_name: rptinfo is NULL\n");
strcpy(rpt_inst_name, "NULL_RPT");
return;
}
if (rptinfo->rptID == NULL)
{
printf("get_rpt_inst_name: rptID is NULL\n");
strcpy(rpt_inst_name, "NULL_RPT");
return;
}
printf("[RPT] rptinfo=%p rptID=%s suffix=%d needSuffix=%d\n",
rptinfo,
rptinfo->rptID,
rptinfo->m_curRptSuffix,
rptinfo->instanceNeedSuffix);
// 原代码
strncpy(rpt_inst_name, rptinfo->rptID, 64);
rpt_inst_name[64] = '\0';
if (rptinfo->instanceNeedSuffix)
{
char rpt_suffix_str[8] = {0};
apr_snprintf(rpt_suffix_str,
sizeof(rpt_suffix_str),
"%02d",
rptinfo->m_curRptSuffix);
// 防止越界
strncat(rpt_inst_name,
rpt_suffix_str,
64 - strlen(rpt_inst_name));
}
}
void strip_file_name_tail_to_ms(char *fileName) void strip_file_name_tail_to_ms(char *fileName)
@@ -359,12 +449,20 @@ LD_info_t* find_LD_info_only_from_line_id(int line_id)
LD_info_t* find_LD_info_from_mp_id(ied_t* ied, char* mp_id) LD_info_t* find_LD_info_from_mp_id(ied_t* ied, char* mp_id)
{ {
if (mp_id == NULL)
return NULL;
LD_info_t* LD_info = NULL; LD_info_t* LD_info = NULL;
ied_usr_t* ied_usr = GET_IEDEXT_ADDR(ied); ied_usr_t* ied_usr = GET_IEDEXT_ADDR(ied);
int cpuno; int cpuno;
for (cpuno = 0; cpuno < ied->cpucount; cpuno++) { for (cpuno = 0; cpuno < ied->cpucount; cpuno++) {
LD_info = &(ied_usr->LD_info[cpuno]); LD_info = &(ied_usr->LD_info[cpuno]);
if (LD_info->cpuno == 0 || LD_info->LD_name == NULL)
continue;
if (LD_info && strcmp(LD_info->mp_id, mp_id) == 0) if (LD_info && strcmp(LD_info->mp_id, mp_id) == 0)
return LD_info; return LD_info;
} }

View File

@@ -74,7 +74,7 @@ FileFlag=4
FrontInst=884d132ac3a01225fcacc8c10da07d09 FrontInst=884d132ac3a01225fcacc8c10da07d09
FrontIP=192.168.1.167 FrontIP=192.168.1.167
SendFlag=3 SendFlag=3
RecallOnlyFlag= RecallOnlyFlag=0
[Ledger] [Ledger]
TerminalStatus="[0]" TerminalStatus="[0]"
@@ -117,37 +117,43 @@ WriteUrl=
[RocketMq] [RocketMq]
producer=Group_producer producer=Group_producer
Ipport=192.168.1.68:9876 Ipport=192.168.1.68:9876
Topic=TEST_Topic TESTTopic=TEST_Topic
Tag=Test_Tag TESTTag=884d132ac3a01225fcacc8c10da07d09
Key=Test_Keys TESTKey=Test_Keys
Queuenum=4 Queuenum=4
Testflag=1 Testflag=1
Testnum=100 Testnum=0
Testtype=1 Testtype=0
TestPort=11000 TestPort=11000
TestList= TestList=
consumer=Group_consumer consumer=Group_consumer
ConsumerIpport=192.168.1.68:9876 ConsumerIpport=192.168.1.68:9876
ConsumerTopicRT=ask_real_data_topic ConsumerTopicRT=ask_real_data_topic
ConsumerTagRT=Test_Tag ConsumerTagRT=884d132ac3a01225fcacc8c10da07d09
ConsumerKeyRT=Test_Keys ConsumerKeyRT=Test_Keys
ConsumerAccessKey=rmqroot ConsumerAccessKey=rmqroot
ConsumerSecretKey=001@#njcnmq ConsumerSecretKey=001@#njcnmq
ConsumerChannel= ConsumerChannel=
ConsumerTopicUD=control_Topic ConsumerTopicUD=control_Topic
ConsumerTagUD=Test_Tag ConsumerTagUD=884d132ac3a01225fcacc8c10da07d09
ConsumerKeyUD=Test_Keys ConsumerKeyUD=Test_Keys
ConsumerTopicRC=recall_Topic ConsumerTopicRC=recall_Topic
ConsumerTagRC=Test_Tag ConsumerTagRC=884d132ac3a01225fcacc8c10da07d09
ConsumerKeyRC=Test_Keys ConsumerKeyRC=Test_Keys
ConsumerTopicSET=process_Topic ConsumerTopicSET=process_Topic
ConsumerTagSET=Test_Tag ConsumerTagSET=884d132ac3a01225fcacc8c10da07d09
ConsumerKeySET=Test_Keys ConsumerKeySET=Test_Keys
ConsumerTopicLOG=ask_log_Topic ConsumerTopicLOG=ask_log_Topic
ConsumerTagLOG=Test_Tag ConsumerTagLOG=884d132ac3a01225fcacc8c10da07d09
ConsumerKeyLOG=Test_Keys ConsumerKeyLOG=Test_Keys
ConsumerTopicFILE=File_Topic
ConsumerTagFILE=884d132ac3a01225fcacc8c10da07d09
ConsumerKeyFILE=Test_Keys
ConsumerTopicTEST=File_Topic
LOGTopic=log_Topic LOGTopic=log_Topic
LOGTag=Test_Tag LOGTag=Test_Tag
LOGKey=Test_Keys LOGKey=Test_Keys

View File

@@ -1,21 +1,34 @@
#ifdef __cplusplus #ifdef __cplusplus
#include "../json/mms_json_inter.h" #include "../json/mms_json_inter.h"
#include "../rocketmq/CProducer.h" //#include "../rocketmq/CProducer.h"
#include "../rocketmq/CMessage.h" //#include "../rocketmq/CMessage.h"
#include "../rocketmq/CSendResult.h" //#include "../rocketmq/CSendResult.h"
//#include "../rocketmq/CPushConsumer.h"
#include "../rocketmq/CPushConsumer.h" #include "../rocketmq/DefaultMQProducer.h"
#include "../rocketmq/MQMessage.h"
#include "../rocketmq/SendResult.h"
#include "../rocketmq/SessionCredentials.h"
#include "../rocketmq/MQMessageExt.h"
#include "../rocketmq/ConsumeType.h"
#include "../rocketmq/MQMessageListener.h"
#include <vector> #include <vector>
#include <iostream> #include <iostream>
#include <string>
using namespace rocketmq;
/*添加测试函数lnk10-10*/ /*添加测试函数lnk10-10*/
void producer_send0(); //void producer_send0();
void StartSendMessage(CProducer* producer,const char* strbody); //void StartSendMessage(CProducer* producer,const char* strbody);
void producer_send(const char* strbody); //void producer_send(const char* strbody);
void rocketmq_producer_send(const char* strbody,const char* topic); //void rocketmq_producer_send(const char* strbody,const char* topic);
void rocketmq_StartSendMessage(CProducer* producer,const char* strbody,const char* topic); //void rocketmq_StartSendMessage(CProducer* producer,const char* strbody,const char* topic);
void rocketmq_producer_send(const std::string& body,
const std::string& topic,
const std::string& tags,
const std::string& keys);
extern "C" { extern "C" {
void rocketmq_test_rt(); void rocketmq_test_rt();
void rocketmq_test_ud(); void rocketmq_test_ud();
@@ -32,17 +45,25 @@ extern void my_rocketmq_send(Ckafka_data_t& data);
void InitializeProducer(); void InitializeProducer();
void ShutdownAndDestroyProducer(); void ShutdownAndDestroyProducer();
//////////////////////////////////////////////////////消费者 //////////////////////////////////////////////////////消费者
void InitializeConsumer(const std::string& consumerName, const std::string& nameServer, const char* topic, const char* tag, const std::string& key); typedef ConsumeStatus (*MessageCallBack)(
void ShutdownAndDestroyConsumer(); const MQMessageExt& msg
);
struct Subscription { struct Subscription {
std::string topic; std::string topic;
std::string tag; std::string tag;
MessageCallBack callback; MessageCallBack callback;
Subscription(const std::string& t, const std::string& tg, MessageCallBack cb) Subscription(const std::string& t,
: topic(t), tag(tg), callback(cb) {std::cout << "Subscription topic: " << topic << std::endl;} const std::string& tg,
MessageCallBack cb)
: topic(t), tag(tg), callback(cb) {}
}; };
//void InitializeConsumer(const std::string& consumerName, const std::string& nameServer, const char* topic, const char* tag, const std::string& key);
void InitializeConsumer(const std::string& consumerName,
const std::string& nameServer,
const std::vector<Subscription>& subscriptions);
void ShutdownAndDestroyConsumer();
void rocketmq_consumer_receive( void rocketmq_consumer_receive(
const std::string& consumerName, const std::string& consumerName,