将接口调用改为线程执行,防止超时阻塞数据/状态控制线程

This commit is contained in:
lnk
2026-06-25 13:29:28 +08:00
parent 7fbc24a384
commit 7aec4afed7
4 changed files with 202 additions and 5 deletions

View File

@@ -47,6 +47,14 @@ std::vector<terminal_dev> terminal_devlist;
//台账锁
std::mutex ledgermtx;
///////////////////////////////////异步接口队列
std::mutex g_qvvr_async_mtx;
std::list<QvvrFileDownloadTask> g_qvvr_file_download_list;
std::list<QvvrJsonTask> g_qvvr_json_list;
///////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
extern int g_front_seg_num;
@@ -1699,3 +1707,148 @@ bool get_monitor_name_by_monitor_id(const std::string& monitor_id,
out_monitor_name.clear();
return false;
}
/////////////////////////////////////////////////异步接口
// ★新增:代替直接调用 update_qvvr_file_download()
void enqueue_qvvr_file_download(const std::string& file_path,
const std::string& terminal_id)
{
std::lock_guard<std::mutex> lk(g_qvvr_async_mtx);
QvvrFileDownloadTask task;
task.file_path = file_path;
task.terminal_id = terminal_id;
g_qvvr_file_download_list.push_back(task);
std::cout << "[QVVR_ASYNC] enqueue file_download terminal="
<< terminal_id
<< " file=" << file_path
<< " size=" << g_qvvr_file_download_list.size()
<< std::endl;
}
// ★新增:代替直接调用 transfer_json_qvvr_data()
void enqueue_qvvr_json_update(const std::string& terminal_id,
int line_id,
double amplitude,
double persist_time,
long long trigger_time_ms,
int type,
int phase,
const std::string& wavepath)
{
std::lock_guard<std::mutex> lk(g_qvvr_async_mtx);
QvvrJsonTask task;
task.terminal_id = terminal_id;
task.line_id = line_id;
task.amplitude = amplitude;
task.persist_time = persist_time;
task.trigger_time_ms = trigger_time_ms;
task.type = type;
task.phase = phase;
task.wavepath = wavepath;
g_qvvr_json_list.push_back(task);
std::cout << "[QVVR_ASYNC] enqueue json terminal="
<< terminal_id
<< " line=" << line_id
<< " time=" << trigger_time_ms
<< " size=" << g_qvvr_json_list.size()
<< std::endl;
}
// ★新增FrontThread 中调用,消费异步接口任务
void process_qvvr_async_tasks()
{
// 每轮最多处理几个,避免一个接口卡住太久
const int MAX_FILE_TASK_PER_ROUND = 1;
const int MAX_JSON_TASK_PER_ROUND = 3;
// 1. 处理 update_qvvr_file_download 队列
for (int n = 0; n < MAX_FILE_TASK_PER_ROUND; ++n) {
QvvrFileDownloadTask task;
{
std::lock_guard<std::mutex> lk(g_qvvr_async_mtx);
if (g_qvvr_file_download_list.empty()) {
break;
}
task = g_qvvr_file_download_list.front();
g_qvvr_file_download_list.pop_front();
}
try {
std::cout << "[QVVR_ASYNC] call update_qvvr_file_download terminal="
<< task.terminal_id
<< " file=" << task.file_path
<< std::endl;
update_qvvr_file_download(task.file_path, task.terminal_id);
}
catch (const std::exception& e) {
std::cerr << "[QVVR_ASYNC][ERROR] update_qvvr_file_download exception: "
<< e.what()
<< " terminal=" << task.terminal_id
<< " file=" << task.file_path
<< std::endl;
}
catch (...) {
std::cerr << "[QVVR_ASYNC][ERROR] update_qvvr_file_download unknown exception"
<< " terminal=" << task.terminal_id
<< " file=" << task.file_path
<< std::endl;
}
}
// 2. 处理 transfer_json_qvvr_data 队列
for (int n = 0; n < MAX_JSON_TASK_PER_ROUND; ++n) {
QvvrJsonTask task;
{
std::lock_guard<std::mutex> lk(g_qvvr_async_mtx);
if (g_qvvr_json_list.empty()) {
break;
}
task = g_qvvr_json_list.front();
g_qvvr_json_list.pop_front();
}
try {
std::cout << "[QVVR_ASYNC] call transfer_json_qvvr_data terminal="
<< task.terminal_id
<< " line=" << task.line_id
<< " time=" << task.trigger_time_ms
<< " wavepath=" << task.wavepath
<< std::endl;
transfer_json_qvvr_data(task.terminal_id,
task.line_id,
task.amplitude,
task.persist_time,
task.trigger_time_ms,
task.type,
task.phase,
task.wavepath);
}
catch (const std::exception& e) {
std::cerr << "[QVVR_ASYNC][ERROR] transfer_json_qvvr_data exception: "
<< e.what()
<< " terminal=" << task.terminal_id
<< " line=" << task.line_id
<< std::endl;
}
catch (...) {
std::cerr << "[QVVR_ASYNC][ERROR] transfer_json_qvvr_data unknown exception"
<< " terminal=" << task.terminal_id
<< " line=" << task.line_id
<< std::endl;
}
}
}

View File

@@ -32,6 +32,44 @@ class Front;
#define RECALL_HIS_DATA_BASE_NODE_ID 600
#define RECALL_ALL_DATA_BASE_NODE_ID 700*/
///////////////////////////////////////////////////////////////////////////////////////////异步接口
struct QvvrFileDownloadTask {
std::string file_path;
std::string terminal_id;
};
struct QvvrJsonTask {
std::string terminal_id;
int line_id;
double amplitude;
double persist_time;
long long trigger_time_ms;
int type;
int phase;
std::string wavepath;
};
// ★声明
extern std::mutex g_qvvr_async_mtx;
extern std::list<QvvrFileDownloadTask> g_qvvr_file_download_list;
extern std::list<QvvrJsonTask> g_qvvr_json_list;
extern void process_qvvr_async_tasks();
extern void enqueue_qvvr_file_download(const std::string& file_path,
const std::string& terminal_id);
extern void enqueue_qvvr_json_update(const std::string& terminal_id,
int line_id,
double amplitude,
double persist_time,
long long trigger_time_ms,
int type,
int phase,
const std::string& wavepath);
///////////////////////////////////////////////////////////////////////////////////////////
//单条补招时间结构

View File

@@ -474,6 +474,9 @@ void Front::FrontThread() {
check_recall_event(); // 处理补招事件从list中读取然后直接调用接口,每一条可能都不同测点,每个测点自己做好记录
check_recall_file(); //处理补招文件-稳态和暂态
// ★新增:异步处理 QVVR 接口任务
process_qvvr_async_tasks();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
} catch (const std::exception& e) {

View File

@@ -353,9 +353,10 @@ void process_received_message(string mac, string id,const char* data, size_t len
if(record.nType != 0){
append_qvvr_event(id,event.head.name,
record.nType,record.fPersisstime,record.fMagntitude,record.triggerTimeMs,record.phase);
transfer_json_qvvr_data(id,event.head.name,
/*transfer_json_qvvr_data(id,event.head.name,
record.fMagntitude,record.fPersisstime,record.triggerTimeMs,record.nType,record.phase,
"");
"");*/
enqueue_qvvr_json_update(id, event.head.name, record.fMagntitude, record.fPersisstime, record.triggerTimeMs, record.nType, record.phase, "");
}
//事件主动上送处理完成,不需要通知状态机
@@ -848,7 +849,8 @@ void process_received_message(string mac, string id,const char* data, size_t len
std::cout << "File saved: " << file_path << std::endl;
//lnk20250805文件保存成功调用录波文件上送接口
update_qvvr_file_download(file_path, id);
//update_qvvr_file_download(file_path, id);
enqueue_qvvr_file_download(file_path, id);
}
else {
std::cerr << "Failed to save file: " << file_path
@@ -2461,8 +2463,9 @@ void process_received_message(string mac, string id,const char* data, size_t len
record.nType,record.fPersisstime,record.fMagntitude,record.triggerTimeMs,record.phase);
//直接发走暂态事件
transfer_json_qvvr_data(id,event.head.name,
record.fMagntitude,record.fPersisstime,record.triggerTimeMs,record.nType,record.phase,"");
/*transfer_json_qvvr_data(id,event.head.name,
record.fMagntitude,record.fPersisstime,record.triggerTimeMs,record.nType,record.phase,"");*/
enqueue_qvvr_json_update(id, event.head.name, record.fMagntitude, record.fPersisstime, record.triggerTimeMs, record.nType, record.phase, "");
}
//通知状态机补招暂态事件成功
on_device_response_minimal(static_cast<int>(ResponseCode::OK), id, 0, static_cast<int>(DeviceState::READING_EVENTLOG));