6 Commits

7 changed files with 1169 additions and 217 deletions

Binary file not shown.

View File

@@ -22,6 +22,8 @@
#include <fnmatch.h> #include <fnmatch.h>
#include <memory> #include <memory>
#include <unordered_set> #include <unordered_set>
#include <regex>
#include <algorithm>
///////////////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////////
@@ -657,6 +659,9 @@ void init_config() {
MULTIPLE_NODE_FLAG = 0; MULTIPLE_NODE_FLAG = 0;
std::cout << "this is single process" << std::endl; std::cout << "this is single process" << std::endl;
} }
else if(g_front_seg_num > 0 && g_front_seg_index == 0){
std::cout << "this is pqdif process" << std::endl;
}
else{ else{
DIY_ERRORLOG_CODE("process",0,LOG_CODE_CONFIG,"进程号参数异常,当前进程退出"); DIY_ERRORLOG_CODE("process",0,LOG_CODE_CONFIG,"进程号参数异常,当前进程退出");
exit(-1039); exit(-1039);
@@ -1406,7 +1411,7 @@ int recall_json_handle_from_mq(const std::string& body)
// 根据 monitorId 和提取的数字初始化补招记录 // 根据 monitorId 和提取的数字初始化补招记录
init_recall_record_file(guid, terminalId, monId, "", ""); init_recall_record_file(guid, terminalId, monId, "", "");
//根据时间戳单独补招事件 //根据时间戳单独补招事件,暂不使用
if(0)// ★新增dt==2 同步生成“按小时”的事件补招到 recall_list与 dt==1 逻辑一致)——开始 if(0)// ★新增dt==2 同步生成“按小时”的事件补招到 recall_list与 dt==1 逻辑一致)——开始
{ {
std::lock_guard<std::mutex> lock2(ledgermtx); // 复用与 dt==1 相同的加锁粒度 std::lock_guard<std::mutex> lock2(ledgermtx); // 复用与 dt==1 相同的加锁粒度
@@ -1490,34 +1495,43 @@ int recall_json_handle_from_mq(const std::string& body)
} }
if (!lm) { std::cout << "monitorId未在terminal内找到: " << monId << " @ " << terminalId << std::endl; continue; } if (!lm) { std::cout << "monitorId未在terminal内找到: " << monId << " @ " << terminalId << std::endl; continue; }
// 稳态文件一个 guid + monitor 只生成一条 RecallFile
RecallFile rm_all;
bool has_steady_range = false;
if (stat == 1) {
rm_all.recall_guid = guid;
rm_all.recall_status = 0;
rm_all.STEADY = std::to_string(stat);
rm_all.VOLTAGE = std::to_string(voltage);
rm_all.file_type = RecallFileType::STEADY_FILE;
}
for (const auto& ti : rec["timeInterval"]) { for (const auto& ti : rec["timeInterval"]) {
if (!ti.is_string()) continue; if (!ti.is_string()) continue;
std::string s = ti.get<std::string>(); std::string s = ti.get<std::string>();
std::string::size_type pos = s.find('~'); std::string::size_type pos = s.find('~');
if (pos == std::string::npos) { std::cout << "timeInterval格式错误: " << s << std::endl; continue; } if (pos == std::string::npos) { std::cout << "timeInterval格式错误: " << s << std::endl; continue; }
std::string start = s.substr(0, pos); std::string start = s.substr(0, pos);
std::string end = s.substr(pos + 1); std::string end = s.substr(pos + 1);
RecallFile rm_all;
rm_all.recall_guid = guid;
rm_all.recall_status = 0;
rm_all.StartTime = start;
rm_all.EndTime = end;
rm_all.STEADY = std::to_string(stat);
rm_all.VOLTAGE = std::to_string(voltage);
//lnk 20251027xuyang request:生成文件记录单个测点单个时间段的补招记录文件,补招结束后使用这个文件信息来响应 //lnk 20251027xuyang request:生成文件记录单个测点单个时间段的补招记录文件,补招结束后使用这个文件信息来响应
init_recall_record_file(guid, terminalId, monId, start, end); init_recall_record_file(guid, terminalId, monId, start, end);
if (voltage == 1) { if (voltage == 1) {
std::vector<RecallInfo> recallinfo_list_hour; std::vector<RecallInfo> recallinfo_list_hour;
Get_Recall_Time_Char(start, end, recallinfo_list_hour); Get_Recall_Time_Char(start, end, recallinfo_list_hour); //暂态将整个时间段划分为多个一小时的时间段
for (size_t i = 0; i < recallinfo_list_hour.size(); ++i) { for (size_t i = 0; i < recallinfo_list_hour.size(); ++i) {
const RecallInfo& info = recallinfo_list_hour[i]; const RecallInfo& info = recallinfo_list_hour[i];
RecallMonitor rm; RecallMonitor rm;
rm.recall_guid = guid; rm.recall_guid = guid;
rm.recall_status = 0; rm.recall_status = 0;
rm.StartTime = epoch_to_datetime_str(info.starttime); rm.StartTime = epoch_to_datetime_str(info.starttime); //每个记录是一个小时的时间段,张文暂态事件接口是一小时一条
rm.EndTime = epoch_to_datetime_str(info.endtime); rm.EndTime = epoch_to_datetime_str(info.endtime);
rm.STEADY = std::to_string(stat); rm.STEADY = std::to_string(stat);
rm.VOLTAGE = std::to_string(voltage); rm.VOLTAGE = std::to_string(voltage);
@@ -1525,10 +1539,62 @@ int recall_json_handle_from_mq(const std::string& body)
} }
} }
if (stat == 1) { if (stat == 1) {
lm->recall_list_static.push_back(rm_all); long long beg = parse_time_to_epoch(start); //转为整数时间
long long ed = parse_time_to_epoch(end);
if (beg < 0 || ed < 0 || beg > ed) {
std::cout << "[recall_file] invalid steady time range guid="
<< guid
<< " terminal=" << terminalId
<< " monitor=" << monId
<< " range=" << start << "~" << end
<< std::endl;
continue;
} }
rm_all.recall_ranges.push_back(std::make_pair(beg, ed));//每个时间段都记录到补招稳态时间列表里
if (!has_steady_range) {
rm_all.StartTime = start; //记录多个时间段的第一条开始和结束
rm_all.EndTime = end;
has_steady_range = true;
} else {
long long old_beg = parse_time_to_epoch(rm_all.StartTime);
long long old_end = parse_time_to_epoch(rm_all.EndTime);
if (old_beg < 0 || beg < old_beg) {
rm_all.StartTime = start; //对比多个时间段更新最早和最晚
}
if (old_end < 0 || ed > old_end) {
rm_all.EndTime = end;
}
}
std::cout << "[recall_file] add steady range guid="
<< guid
<< " terminal=" << terminalId
<< " monitor=" << monId
<< " range=" << start << "~" << end
<< " total_ranges=" << rm_all.recall_ranges.size()
<< std::endl;
}
if (stat == 0 && voltage == 0) return 10003; if (stat == 0 && voltage == 0) return 10003;
} }
// 所有 timeInterval 遍历完后,稳态 RecallFile 只入队一次
if (stat == 1 && has_steady_range) { //一次补招指令的处理结束
lm->recall_list_static.push_back(rm_all);//记录稳态补招
std::cout << "[recall_file] push steady recall once guid="
<< guid
<< " terminal=" << terminalId
<< " monitor=" << monId
<< " ranges=" << rm_all.recall_ranges.size()
<< " StartTime=" << rm_all.StartTime
<< " EndTime=" << rm_all.EndTime
<< std::endl;
}
} }
} else { } else {
// 未知 dataType忽略 // 未知 dataType忽略
@@ -4592,71 +4658,153 @@ void check_recall_event() {
} }
//////////////////////////////////////////////////////////////////////////////////////////////////////////////处理补招逻辑统计数据 //////////////////////////////////////////////////////////////////////////////////////////////////////////////处理补招逻辑统计数据
// ====== 从文件名中提取“第二段下划线分隔字段”并转换为 epoch 秒 ====== static std::string replace_all_copy(std::string s,
static bool extract_epoch_from_filename(const std::string& name, const std::string& from,
long long& out_epoch, const std::string& to)
int logical_device_seq)
{ {
// 拆分 size_t pos = 0;
std::vector<std::string> parts; while ((pos = s.find(from, pos)) != std::string::npos) {
parts.reserve(8); s.replace(pos, from.length(), to);
size_t start = 0, pos; pos += to.length();
while ((pos = name.find('_', start)) != std::string::npos) {
parts.emplace_back(name.substr(start, pos - start));
start = pos + 1;
} }
parts.emplace_back(name.substr(start)); // 最后一段(含扩展名) return s;
if (parts.size() < 4) return false;
// 第二段序号是倒数第4段
const std::string& seq_str = parts[parts.size() - 4];
// 允许前导 0把字符串转 int 后比较
for (char c : seq_str) if (!std::isdigit(static_cast<unsigned char>(c))) return false;
int seq_val = 0;
try {
seq_val = std::stoi(seq_str);
} catch (...) {
return false;
}
if (seq_val != logical_device_seq) return false;
// 其余与上面相同
const std::string& date_str = parts[parts.size() - 3];
const std::string& time_str = parts[parts.size() - 2];
std::string ms_str = parts.back();
size_t dot = ms_str.find('.');
if (dot != std::string::npos) {
ms_str.erase(dot);
}
if (date_str.size() != 8 || time_str.size() != 6) return false;
for (char c : date_str) if (!std::isdigit(static_cast<unsigned char>(c))) return false;
for (char c : time_str) if (!std::isdigit(static_cast<unsigned char>(c))) return false;
for (char c : ms_str) if (!std::isdigit(static_cast<unsigned char>(c))) return false;
int year = std::stoi(date_str.substr(0, 4));
int month = std::stoi(date_str.substr(4, 2));
int day = std::stoi(date_str.substr(6, 2));
int hour = std::stoi(time_str.substr(0, 2));
int min = std::stoi(time_str.substr(2, 2));
int sec = std::stoi(time_str.substr(4, 2));
// int msec = std::stoi(ms_str);
std::tm tm{}; tm.tm_isdst = -1;
tm.tm_year = year - 1900;
tm.tm_mon = month - 1;
tm.tm_mday = day;
tm.tm_hour = hour;
tm.tm_min = min;
tm.tm_sec = sec;
time_t t = timegm(&tm);
if (t < 0) return false;
out_epoch = static_cast<long long>(t); // 秒级
return true;
} }
static std::string make_day_from_datetime(const std::string& t)
{
// yyyy-MM-dd HH:mm:ss -> yyyyMMdd
if (t.size() < 10) return "";
std::string day = t.substr(0, 10);
day.erase(std::remove(day.begin(), day.end(), '-'), day.end());
return day;
}
static std::string resolve_recall_dir(const std::string& raw_dir,
const RecallFile& rf,
const ledger_monitor& lm)
{
std::string dir = raw_dir;
std::string day = make_day_from_datetime(rf.StartTime);
std::string seq = "1";
try {
seq = std::to_string(std::stoi(lm.logical_device_seq));
} catch (...) {
seq = "1";
}
// DESC 建议优先用你台账里的监测点描述/名称
// 如果 lm.monitor_name 是“监测点1”广东目录就是 /historyFile/监测点1
std::string desc = lm.monitor_name;
if (desc.empty()) {
desc = std::string("Line") + seq;
}
dir = replace_all_copy(dir, "%DAY%", day);
dir = replace_all_copy(dir, "%SEQ%", seq);
dir = replace_all_copy(dir, "%DESC%", desc);
return dir;
}
/////////////////////////////目录解析
// ====== 从文件名中提取“第二段下划线分隔字段”并转换为 epoch 秒 ======
static bool extract_pqdif_range_from_filename(const std::string& fname,
long long& fs,
long long& fe,
int& file_seq,
std::string& file_monitor_id)
{
fs = -1;
fe = -1;
file_seq = -1;
file_monitor_id.clear();
auto to_epoch = [](int y, int mon, int d, int h, int m, int s)->long long {
struct tm t {};
t.tm_year = y - 1900;
t.tm_mon = mon - 1;
t.tm_mday = d;
t.tm_hour = h;
t.tm_min = m;
t.tm_sec = s;
t.tm_isdst = -1;
return static_cast<long long>(mktime(&t));
};
// 广东/上海20230915000000_20230915010000.pqd 开始时间_结束时间
{
std::regex re(R"((\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})_(\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})\.pqd$)",
std::regex::icase);
std::smatch m;
if (std::regex_search(fname, m, re)) {
fs = to_epoch(std::stoi(m[1]), std::stoi(m[2]), std::stoi(m[3]),
std::stoi(m[4]), std::stoi(m[5]), std::stoi(m[6]));
fe = to_epoch(std::stoi(m[7]), std::stoi(m[8]), std::stoi(m[9]),
std::stoi(m[10]), std::stoi(m[11]), std::stoi(m[12]));
return fs >= 0 && fe >= fs;
}
}
// 云南20230915T010000.pqd文件时间按 1 小时窗口估算 时间点
{
std::regex re(R"((\d{4})(\d{2})(\d{2})T(\d{2})(\d{2})(\d{2})\.pqd$)",
std::regex::icase);
std::smatch m;
if (std::regex_search(fname, m, re)) {
fs = to_epoch(std::stoi(m[1]), std::stoi(m[2]), std::stoi(m[3]),
std::stoi(m[4]), std::stoi(m[5]), std::stoi(m[6]));
fe = fs + 3600;
return fs >= 0;
}
}
// 新疆B8B7FEE6D4792D30-20230915-003000-p.pqd
{
std::regex re(
R"(([^-]+)-(\d{4})(\d{2})(\d{2})-(\d{2})(\d{2})(\d{2})-p\.pqd$)",
std::regex::icase);
std::smatch m;
if (std::regex_search(fname, m, re)) {
file_monitor_id = m[1]; // B8B7FEE6D4792D30
fs = to_epoch(std::stoi(m[2]), std::stoi(m[3]), std::stoi(m[4]),
std::stoi(m[5]), std::stoi(m[6]), std::stoi(m[7]));
fe = fs + 3600;
return fs >= 0;
}
}
// 默认ied_ld_20230915_0030_60.pqd
// 例如PQMonitor_PQM1_20260601_0000_24.pqd
// 第二段 PQM1 最后的数字作为 seq
{
std::regex re(R"([^_]+_([^_]+)_(\d{4})(\d{2})(\d{2})_(\d{2})(\d{2})_(\d+)\.pqd$)",
std::regex::icase);
std::smatch m;
if (std::regex_search(fname, m, re)) {
std::string ld = m[1]; // 例如 PQM1、PQM2
std::regex seq_re(R"((\d+)$)");
std::smatch seq_m;
if (std::regex_search(ld, seq_m, seq_re)) {
file_seq = std::stoi(seq_m[1]);
}
fs = to_epoch(std::stoi(m[2]), std::stoi(m[3]), std::stoi(m[4]),
std::stoi(m[5]), std::stoi(m[6]), 0);
int intv_min = std::stoi(m[7]);
fe = fs + intv_min * 60;
return fs >= 0 && intv_min > 0;
}
}
return false;
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////////////////////////
// 从文件名中解析出 "监测点号_YYYYMMDD_HHMMSS_mmm";成功返回 true // 从文件名中解析出 "监测点号_YYYYMMDD_HHMMSS_mmm";成功返回 true
static bool make_target_key_from_filename(const std::string& fname, std::string& out_key) { static bool make_target_key_from_filename(const std::string& fname, std::string& out_key) {
@@ -4743,14 +4891,41 @@ static void dircache_clear_device(const std::string& dev_id)
g_device_dir_cache.erase(dev_id); g_device_dir_cache.erase(dev_id);
} }
//////////////////////////////////////////////////////////////////////////////////////////////////////////////补招文件逻辑 //复制文件
bool move_file_crossfs(const std::string& src, const std::string& dst)
{
if (std::rename(src.c_str(), dst.c_str()) == 0) {
return true;
}
if (errno != EXDEV) {
return false;
}
std::ifstream in(src.c_str(), std::ios::binary);
std::ofstream out(dst.c_str(), std::ios::binary | std::ios::trunc);
if (!in || !out) {
return false;
}
out << in.rdbuf();
out.close();
in.close();
if (std::remove(src.c_str()) != 0) {
return false;
}
return true;
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////补招文件逻辑
// ====== ★修改check_recall_stat —— 加入“两步法”状态机 ====== // ====== ★修改check_recall_stat —— 加入“两步法”状态机 ======
void check_recall_file() { void check_recall_file() {
std::vector<RecallTask> tasks; // 本轮要发送的“每终端一条”(目录请求 或 文件下载 请求) std::vector<RecallTask> tasks; // 本轮要发送的“每终端一条”(目录请求 或 文件下载 请求)
// ★修改开始:新增“待上传动作”容器与两个小工具(局部作用域,函数私有) // “待上传动作”容器与两个小工具(局部作用域,函数私有)
struct PendingUpload { struct PendingUpload {
std::string terminal_id; std::string terminal_id;
unsigned short logical_seq = 0; unsigned short logical_seq = 0;
@@ -4783,6 +4958,12 @@ void check_recall_file() {
}; };
// ★修改结束 // ★修改结束
auto RecallTypeName = [](const RecallFile& rf)->std::string {
if (rf.is_steady_file()) return "稳态文件";
if (rf.is_voltage_file()) return "波形文件";
return "文件";
};
std::vector<PendingUpload> pending_uploads; // ★修改:锁外执行上传与清理 std::vector<PendingUpload> pending_uploads; // ★修改:锁外执行上传与清理
@@ -4809,7 +4990,7 @@ void check_recall_file() {
// ★优先级结束 // ★优先级结束
for (auto& dev : terminal_devlist) { for (auto& dev : terminal_devlist) {
// 仅处理“正在补招/空闲”终端,与你原逻辑一致 // 仅处理“正在补招/空闲”终端
if (dev.busytype != static_cast<int>(DeviceState::READING_STATSFILE) && if (dev.busytype != static_cast<int>(DeviceState::READING_STATSFILE) &&
dev.busytype != static_cast<int>(DeviceState::IDLE)) { dev.busytype != static_cast<int>(DeviceState::IDLE)) {
continue; continue;
@@ -4834,7 +5015,6 @@ void check_recall_file() {
<< " size=" << lm.recall_list_static.size() << " size=" << lm.recall_list_static.size()
<< " first{guid=" << front.recall_guid << " first{guid=" << front.recall_guid
<< ", status=" << front.recall_status << ", status=" << front.recall_status
<< ", direct=" << (front.direct_mode ? 1 : 0)
<< ", phase=" << static_cast<int>(front.phase) << ", phase=" << static_cast<int>(front.phase)
<< ", cur_dir=" << front.cur_dir << ", cur_dir=" << front.cur_dir
<< ", cur_file=" << front.downloading_file << ", cur_file=" << front.downloading_file
@@ -4868,7 +5048,6 @@ void check_recall_file() {
<< " size=" << lm.recall_list_static.size() << " size=" << lm.recall_list_static.size()
<< " first{guid=" << nf.recall_guid << " first{guid=" << nf.recall_guid
<< ", status=" << nf.recall_status << ", status=" << nf.recall_status
<< ", direct=" << (nf.direct_mode ? 1 : 0)
<< ", phase=" << static_cast<int>(nf.phase) << ", phase=" << static_cast<int>(nf.phase)
<< ", cur_dir=" << nf.cur_dir << ", cur_dir=" << nf.cur_dir
<< ", cur_file=" << nf.downloading_file << ", cur_file=" << nf.downloading_file
@@ -4900,13 +5079,13 @@ void check_recall_file() {
//20251218添加记录 //20251218添加记录
std::string msg_fail; std::string msg_fail;
if (front.direct_mode) { if (front.is_voltage_file()) {
msg_fail = std::string("监测点:") + lm.monitor_name msg_fail = std::string("监测点:") + lm.monitor_name
+ " 补招波形文件失败,目标时标:" + " 补招" + RecallTypeName(front) + "失败,目标时标:"
+ front.target_filetimes; + front.target_filetimes;
} else { } else {
msg_fail = std::string("监测点:") + lm.monitor_name msg_fail = std::string("监测点:") + lm.monitor_name
+ " 补招波形文件失败,时间范围:" + " 补招" + RecallTypeName(front) + "失败,时间范围:"
+ front.StartTime + " ~ " + front.EndTime; + front.StartTime + " ~ " + front.EndTime;
} }
append_recall_record_line(dev.guid, lm.monitor_id, msg_fail); append_recall_record_line(dev.guid, lm.monitor_id, msg_fail);
@@ -4924,7 +5103,6 @@ void check_recall_file() {
<< " size=" << lm.recall_list_static.size() << " size=" << lm.recall_list_static.size()
<< " first{guid=" << nf.recall_guid << " first{guid=" << nf.recall_guid
<< ", status=" << nf.recall_status << ", status=" << nf.recall_status
<< ", direct=" << (nf.direct_mode ? 1 : 0)
<< ", phase=" << static_cast<int>(nf.phase) << ", phase=" << static_cast<int>(nf.phase)
<< ", cur_dir=" << nf.cur_dir << ", cur_dir=" << nf.cur_dir
<< ", cur_file=" << nf.downloading_file << ", cur_file=" << nf.downloading_file
@@ -5034,8 +5212,12 @@ void check_recall_file() {
front.file_success.clear(); front.file_success.clear();
// 立即发起第一个目录请求 // 立即发起第一个目录请求
if (front.cur_dir_index < static_cast<int>(front.dir_candidates.size())) { if (front.cur_dir_index < static_cast<int>(front.active_dirs().size())) {
front.cur_dir = front.dir_candidates[front.cur_dir_index];//从第一个目录开始 front.cur_dir = resolve_recall_dir(
front.active_dirs()[front.cur_dir_index],
front,
lm
);//从第一个目录开始
// ★新增:先查设备级目录缓存 // ★新增:先查设备级目录缓存
std::vector<tag_dir_info> cached; std::vector<tag_dir_info> cached;
@@ -5061,13 +5243,13 @@ void check_recall_file() {
front.recall_status = static_cast<int>(RecallStatus::FAILED); front.recall_status = static_cast<int>(RecallStatus::FAILED);
std::string msg_fail; std::string msg_fail;
if (front.direct_mode) { if (front.is_voltage_file()) {
msg_fail = std::string("监测点:") + lm.monitor_name msg_fail = std::string("监测点:") + lm.monitor_name
+ " 补招波形文件未找到,目标时标:" + " 补招" + RecallTypeName(front) + "未找到,目标时标:"
+ front.target_filetimes; + front.target_filetimes;
} else { } else {
msg_fail = std::string("监测点:") + lm.monitor_name msg_fail = std::string("监测点:") + lm.monitor_name
+ " 补招波形文件未找到,时间范围:" + " 补招" + RecallTypeName(front) + "未找到,时间范围:"
+ front.StartTime + " ~ " + front.EndTime; + front.StartTime + " ~ " + front.EndTime;
} }
append_recall_record_line(dev.guid, lm.monitor_id, msg_fail); append_recall_record_line(dev.guid, lm.monitor_id, msg_fail);
@@ -5096,8 +5278,12 @@ void check_recall_file() {
front.required_files.clear(); front.required_files.clear();
front.file_success.clear(); front.file_success.clear();
if (front.cur_dir_index < static_cast<int>(front.dir_candidates.size())) { if (front.cur_dir_index < static_cast<int>(front.active_dirs().size())) {
front.cur_dir = front.dir_candidates[front.cur_dir_index]; front.cur_dir = resolve_recall_dir(
front.active_dirs()[front.cur_dir_index],
front,
lm
);
// ★新增:先查缓存 // ★新增:先查缓存
std::vector<tag_dir_info> cached; std::vector<tag_dir_info> cached;
@@ -5121,13 +5307,13 @@ void check_recall_file() {
front.recall_status = static_cast<int>(RecallStatus::FAILED); front.recall_status = static_cast<int>(RecallStatus::FAILED);
std::string msg_fail; std::string msg_fail;
if (front.direct_mode) { if (front.is_voltage_file()) {
msg_fail = std::string("监测点:") + lm.monitor_name msg_fail = std::string("监测点:") + lm.monitor_name
+ " 补招波形文件未找到,目标时标:" + " 补招" + RecallTypeName(front) + "未找到,目标时标:"
+ front.target_filetimes; + front.target_filetimes;;
} else { } else {
msg_fail = std::string("监测点:") + lm.monitor_name msg_fail = std::string("监测点:") + lm.monitor_name
+ " 补招波形文件未找到,时间范围:" + " 补招" + RecallTypeName(front) + "未找到,时间范围:"
+ front.StartTime + " ~ " + front.EndTime; + front.StartTime + " ~ " + front.EndTime;
} }
append_recall_record_line(dev.guid, lm.monitor_id, msg_fail); append_recall_record_line(dev.guid, lm.monitor_id, msg_fail);
@@ -5140,88 +5326,142 @@ void check_recall_file() {
// OK根据起止时间筛选文件 // OK根据起止时间筛选文件
{ {
/*//这部分用于稳态数据文件 long long beg = -1;
long long beg = parse_time_to_epoch(front.StartTime); long long end = -1;
long long end = parse_time_to_epoch(front.EndTime);
if (front.is_steady_file()) {
beg = parse_time_to_epoch(front.StartTime);
end = parse_time_to_epoch(front.EndTime);
//错误判断:如果是直下文件的方式,会给默认的正确的时间范围
if (beg < 0 || end < 0 || beg > end) { if (beg < 0 || end < 0 || beg > end) {
front.recall_status = static_cast<int>(RecallStatus::FAILED); front.recall_status = static_cast<int>(RecallStatus::FAILED);
std::cout << "[check_recall_stat] time parse ERR, FAIL dev=" << dev.terminal_id std::cout << "[check_recall_file] steady time parse ERR, FAIL dev="
<< dev.terminal_id
<< " monitor=" << lm.monitor_id << " monitor=" << lm.monitor_id
<< " start=" << front.StartTime << " start=" << front.StartTime
<< " end=" << front.EndTime << std::endl; << " end=" << front.EndTime << std::endl;
break;//跳出循环,一个装置一次只能处理一个测点的一个补招记录;如果失败,下个循环会弹出
}*/
//装置消息返回后通知成功的处理: front.phase = RecallPhase::IDLE;
auto it = front.dir_files.find(front.cur_dir);//在map中查找当前目录名对应的目录下的文件名列表 front.downloading_file.clear();
front.download_result = ActionResult::PENDING;
break;
}
}
auto it = front.dir_files.find(front.cur_dir);
if (it != front.dir_files.end()) { if (it != front.dir_files.end()) {
if (front.direct_mode) { if (front.is_voltage_file()) { //暂态波形下载
// 目标时间戳(不含毫秒、形如 yyyyMMdd_HHmmss
const std::string& want_ts = front.target_filetimes; const std::string& want_ts = front.target_filetimes;
for (const auto& ent : it->second) { for (const auto& ent : it->second) {
// 打印目录下的所有条目 std::cout << "[check_recall_file] voltage dir file dev=" << dev.terminal_id
std::cout << "[check_recall_file] dir file dev=" << dev.terminal_id
<< " monitor=" << lm.monitor_id << " monitor=" << lm.monitor_id
<< " dir=" << front.cur_dir << " dir=" << front.cur_dir
<< " file=" << ent.name << " file=" << ent.name
<< " flag=" << ent.flag << " flag=" << ent.flag
<< std::endl; << std::endl;
if (ent.flag == 0) continue; // 只要文件,跳过目录 if (ent.flag == 0) continue;
size_t n = ::strnlen(ent.name, sizeof(ent.name)); size_t n = ::strnlen(ent.name, sizeof(ent.name));
std::string fname(ent.name, n); std::string fname(ent.name, n);
// 解析出 key = 监测点号_YYYYMMDD_HHMMSS注意确保你的 make_target_key_from_filename
// 已经改成“去掉毫秒”的版本;若还带毫秒,请先调整该函数)
std::string key; std::string key;
if (!make_target_key_from_filename(fname, key)) { if (!make_target_key_from_filename(fname, key)) {
std::cout << "[check_recall_file] dir file dev=" << dev.terminal_id continue;
}
if (has_suffix(key, want_ts)) {
front.download_queue.push_back(front.cur_dir + "/" + fname);
}
}
}
else if (front.is_steady_file()) { //稳态文件下载
for (const auto& ent : it->second) {
if (ent.flag != 1) continue;
size_t n = ::strnlen(ent.name, sizeof(ent.name));
std::string fname(ent.name, n);
long long fs = -1;
long long fe = -1;
int file_seq = -1;
std::string file_monitor_id;
if (!extract_pqdif_range_from_filename(fname, fs, fe, file_seq, file_monitor_id)) {
std::cout << "[check_recall_file] steady skip invalid pqdif filename dev="
<< dev.terminal_id
<< " monitor=" << lm.monitor_id << " monitor=" << lm.monitor_id
<< " dir=" << front.cur_dir << " dir=" << front.cur_dir
<< " file=" << fname << std::endl;
continue;
}
int cur_seq = -1;
try {
cur_seq = std::stoi(lm.logical_device_seq);
} catch (...) {
cur_seq = -1;
}
// 新疆:用文件名里的 monitor_id 找 seq
if (!file_monitor_id.empty()) {
if (file_monitor_id != lm.monitor_id) {//跳过id不匹配的新疆文件
std::cout << "[check_recall_file] steady skip other xj monitor file dev="
<< dev.terminal_id
<< " cur_monitor=" << lm.monitor_id
<< " file_monitor_id=" << file_monitor_id
<< " file=" << fname << " file=" << fname
<< " key=" << key
<< " ERR: invalid filename format, skip"
<< std::endl; << std::endl;
continue; continue;
} }
// key 形如 MON_YYYYMMDD_HHMMSS目标是只按时间戳匹配 // 到这里说明新疆文件的测点ID就是当前测点
if (has_suffix(key, want_ts)) { file_seq = cur_seq;
//打印放入的文件名 }
std::cout << "[check_recall_file] dir file dev=" << dev.terminal_id // 默认:文件名能解析出 PQM 序号
<< " monitor=" << lm.monitor_id else if (file_seq > 0) {
<< " dir=" << front.cur_dir if (cur_seq > 0 && file_seq != cur_seq) {//跳过不同测点的文件
std::cout << "[check_recall_file] steady skip other seq file dev="
<< dev.terminal_id
<< " cur_monitor=" << lm.monitor_id
<< " cur_seq=" << cur_seq
<< " file_seq=" << file_seq
<< " file=" << fname << " file=" << fname
<< " key=" << key
<< " MATCH, add to download queue"
<< std::endl; << std::endl;
continue;
}
}
// 广东/上海/云南file_monitor_id 为空file_seq 也为 -1不过滤
front.download_queue.push_back(front.cur_dir + "/" + fname); // 文件时间段和补招时间段有交集就下载
} bool hit = false;
}
} else { //稳态文件
// ☆原有:按时间窗筛选
long long beg = parse_time_to_epoch(front.StartTime);
long long end = parse_time_to_epoch(front.EndTime);
for (const auto& ent : it->second) { for (const auto& r : front.recall_ranges) {//每个文件的时间段和补招的多个时间段比对,只要有一个交集就算命中
if (ent.flag != 1) continue; // 只要文件 long long rb = r.first;
// 文件名 long long re = r.second;
size_t n = ::strnlen(ent.name, sizeof(ent.name));
std::string fname(ent.name, n);
long long fe = -1; if (fs <= re && fe >= rb) {
int seq = 0; hit = true;
try { seq = std::stoi(lm.logical_device_seq); } catch (...) { seq = 0; } break;
if (!extract_epoch_from_filename(fname, fe, seq)) continue;
if (fe >= beg && fe <= end) {
front.download_queue.push_back(front.cur_dir + "/" + fname);
} }
} }
if (hit) {
std::string remote_file = front.cur_dir + "/" + fname;
if (front.required_files.insert(remote_file).second) {
front.download_queue.push_back(remote_file);
}
}
}
}
else {
front.recall_status = static_cast<int>(RecallStatus::FAILED);
append_recall_record_line(dev.guid, lm.monitor_id,
std::string("监测点:") + lm.monitor_name + " 文件补招类型为空,执行失败");
break;
} }
} }
} }
@@ -5230,8 +5470,12 @@ void check_recall_file() {
// 当前目录无匹配文件 -> 试下一个目录 // 当前目录无匹配文件 -> 试下一个目录
front.cur_dir_index++; front.cur_dir_index++;
front.list_result = ActionResult::PENDING; front.list_result = ActionResult::PENDING;
if (front.cur_dir_index < static_cast<int>(front.dir_candidates.size())) { if (front.cur_dir_index < static_cast<int>(front.active_dirs().size())) {
front.cur_dir = front.dir_candidates[front.cur_dir_index]; front.cur_dir = resolve_recall_dir(
front.active_dirs()[front.cur_dir_index],
front,
lm
);
// ★新增:先查缓存 // ★新增:先查缓存
std::vector<tag_dir_info> cached; std::vector<tag_dir_info> cached;
@@ -5255,13 +5499,13 @@ void check_recall_file() {
front.recall_status = static_cast<int>(RecallStatus::FAILED); front.recall_status = static_cast<int>(RecallStatus::FAILED);
std::string msg_fail; std::string msg_fail;
if (front.direct_mode) { if (front.is_voltage_file()) {
msg_fail = std::string("监测点:") + lm.monitor_name msg_fail = std::string("监测点:") + lm.monitor_name
+ " 补招波形文件未找到,目标时标:" + " 补招" + RecallTypeName(front) + "未找到,目标时标:"
+ front.target_filetimes; + front.target_filetimes;
} else { } else {
msg_fail = std::string("监测点:") + lm.monitor_name msg_fail = std::string("监测点:") + lm.monitor_name
+ " 补招波形文件未找到,时间范围:" + " 补招" + RecallTypeName(front) + "未找到,时间范围:"
+ front.StartTime + " ~ " + front.EndTime; + front.StartTime + " ~ " + front.EndTime;
} }
append_recall_record_line(dev.guid, lm.monitor_id, msg_fail); append_recall_record_line(dev.guid, lm.monitor_id, msg_fail);
@@ -5275,20 +5519,12 @@ void check_recall_file() {
} }
else { else {
// 进入下载阶段 // 进入下载阶段
// required_files 记录远端文件全集,用于和 file_success 对比
front.required_files.clear(); front.required_files.clear();
//for (const auto& p : front.download_queue) front.required_files.insert(p);
//转成本地保存路径 download/<dev.addr_str>/<fname>
front.required_files.clear();
{
const std::string base_dir = std::string("download/") + sanitize(dev.addr_str);
for (const auto& p : front.download_queue) { for (const auto& p : front.download_queue) {
// p 形如 "<cur_dir>/<fname>" —— 提取纯文件名 if (!p.empty()) {
std::string fname = sanitize(extract_filename1(p)); front.required_files.insert(p); // 保留远端路径
if (!fname.empty()) {
front.required_files.insert(base_dir + "/" + fname);
}
} }
} }
@@ -5297,9 +5533,12 @@ void check_recall_file() {
front.phase = RecallPhase::DOWNLOADING; front.phase = RecallPhase::DOWNLOADING;
front.download_result = ActionResult::PENDING; front.download_result = ActionResult::PENDING;
front.downloading_file.clear(); front.downloading_file.clear();
std::cout << "[check_recall_stat] enter DOWNLOADING dev=" << dev.terminal_id std::cout << "[check_recall_stat] enter DOWNLOADING dev=" << dev.terminal_id
<< " monitor=" << lm.monitor_id << " monitor=" << lm.monitor_id
<< " count=" << front.download_queue.size() << std::endl; << " count=" << front.download_queue.size()
<< " required_remote_count=" << front.required_files.size()
<< std::endl;
} }
} }
@@ -5320,6 +5559,23 @@ void check_recall_file() {
front.recall_status = static_cast<int>(RecallStatus::DONE);//两个文件都下好了标记为成功 front.recall_status = static_cast<int>(RecallStatus::DONE);//两个文件都下好了标记为成功
//稳态文件就不会走下面的逻辑
if (front.is_steady_file()) {
std::string msg_ok = std::string("监测点:") + lm.monitor_name
+ " 补招" + RecallTypeName(front) + "完成,文件数:"
+ std::to_string(front.file_success.size());
append_recall_record_line(dev.guid, lm.monitor_id, msg_ok);
std::cout << "[check_recall_file] STEADY DONE dev=" << dev.terminal_id
<< " monitor=" << lm.monitor_id
<< " ok=" << front.file_success.size()
<< " total=" << front.required_files.size()
<< std::endl;
break;
}
//更新事件 //更新事件
// ★修改开始替换“assign_qvvr_file_list + update_qvvr_file_download(有锁)” // ★修改开始替换“assign_qvvr_file_list + update_qvvr_file_download(有锁)”
// 组装完整路径列表 // 组装完整路径列表
@@ -5419,13 +5675,13 @@ void check_recall_file() {
//20251218添加记录 //20251218添加记录
std::string msg_fail; std::string msg_fail;
if (front.direct_mode) { if (front.is_voltage_file()) {
msg_fail = std::string("监测点:") + lm.monitor_name msg_fail = std::string("监测点:") + lm.monitor_name
+ " 补招波形文件下载失败,目标时标:" + " 补招" + RecallTypeName(front) + "下载失败,目标时标:"
+ front.target_filetimes; + front.target_filetimes;
} else { } else {
msg_fail = std::string("监测点:") + lm.monitor_name msg_fail = std::string("监测点:") + lm.monitor_name
+ " 补招波形文件下载失败,时间范围:" + " 补招" + RecallTypeName(front) + "下载失败,时间范围:"
+ front.StartTime + " ~ " + front.EndTime; + front.StartTime + " ~ " + front.EndTime;
} }
append_recall_record_line(dev.guid, lm.monitor_id, msg_fail); append_recall_record_line(dev.guid, lm.monitor_id, msg_fail);
@@ -5456,10 +5712,114 @@ void check_recall_file() {
} }
if (front.download_result == ActionResult::OK) { if (front.download_result == ActionResult::OK) {
if (front.is_steady_file()) {
const std::string base_dir = std::string("download/") + sanitize(dev.addr_str);
std::string seq = "1";
try {
int nseq = std::stoi(lm.logical_device_seq);
if (nseq > 0) {
seq = std::to_string(nseq);
} else {
std::cout << "[check_recall_file][WARN] invalid logical_device_seq, use default M1_"
<< " dev=" << dev.terminal_id
<< " monitor=" << lm.monitor_id
<< " logical_device_seq=" << lm.logical_device_seq
<< std::endl;
DIY_ERRORLOG_CODE(lm.monitor_id, 2, static_cast<int>(LogCode::LOG_CODE_RECALL), "无法获取监测点序号,使用默认值 M1_");
}
} catch (...) {
std::cout << "[check_recall_file][WARN] parse logical_device_seq failed, use default M1_"
<< " dev=" << dev.terminal_id
<< " monitor=" << lm.monitor_id
<< " logical_device_seq=" << lm.logical_device_seq
<< std::endl;
DIY_ERRORLOG_CODE(lm.monitor_id, 2, static_cast<int>(LogCode::LOG_CODE_RECALL), "无法获取监测点序号,使用默认值 M1_");
}
std::string fname = sanitize(extract_filename1(front.downloading_file));
if (!fname.empty()) {
std::string old_local = base_dir + "/" + fname;
std::string new_fname = "M" + seq + "_" + fname;
std::string new_local = base_dir + "/" + new_fname;
// 先把下载目录下文件名改成 M1_xxx.pqd
if (old_local != new_local) {
if (std::rename(old_local.c_str(), new_local.c_str()) != 0) {
std::cout << "[check_recall_file][WARN] rename steady local file failed"
<< " old=" << old_local
<< " new=" << new_local
<< " errno=" << errno
<< std::endl;
DIY_ERRORLOG_CODE(lm.monitor_id, 2, static_cast<int>(LogCode::LOG_CODE_RECALL), "重命名本地文件失败");
new_local = old_local; // 重命名失败,后续仍尝试用原文件
new_fname = fname;
} else {
std::cout << "[check_recall_file] rename steady local file ok"
<< " old=" << old_local
<< " new=" << new_local
<< std::endl;
DIY_DEBUGLOG_CODE(lm.monitor_id, 2, static_cast<int>(LogCode::LOG_CODE_RECALL), "重命名本地文件成功");
}
}
// 只处理 .pqd 文件
if (new_fname.size() >= 4 &&
new_fname.substr(new_fname.size() - 4) == ".pqd") {
std::string pqdif_dir = std::string("download_pqdif/") + sanitize(dev.addr_str);
// 如果你已有递归建目录函数,用你的函数即可
create_directory_recursive(pqdif_dir.c_str());
std::string tmp_fname = new_fname.substr(0, new_fname.size() - 4) + ".tmp";
std::string local_tmp = base_dir + "/" + tmp_fname;
std::string dst_tmp = pqdif_dir + "/" + tmp_fname;
std::string dst_pqd = pqdif_dir + "/" + new_fname;
// 1. .pqd -> .tmp
if (std::rename(new_local.c_str(), local_tmp.c_str()) != 0) {
std::cout << "[check_recall_file][WARN] rename .pqd to .tmp failed"
<< " old=" << new_local
<< " new=" << local_tmp
<< " errno=" << errno
<< std::endl;
DIY_ERRORLOG_CODE(lm.monitor_id, 2, static_cast<int>(LogCode::LOG_CODE_RECALL), ".pqd改.tmp失败");
}
// 2. 移动 .tmp 到 download_pqdif
else if (!move_file_crossfs(local_tmp, dst_tmp)) {
std::cout << "[check_recall_file][WARN] move tmp file failed"
<< " old=" << local_tmp
<< " new=" << dst_tmp
<< " errno=" << errno
<< std::endl;
DIY_ERRORLOG_CODE(lm.monitor_id, 2, static_cast<int>(LogCode::LOG_CODE_RECALL), "移动tmp文件失败");
}
// 3. download_pqdif 下 .tmp -> .pqd
else if (std::rename(dst_tmp.c_str(), dst_pqd.c_str()) != 0) {//原子操作,不会影响解析线程扫描到不完整文件
std::cout << "[check_recall_file][WARN] rename .tmp to .pqd failed"
<< " old=" << dst_tmp
<< " new=" << dst_pqd
<< " errno=" << errno
<< std::endl;
DIY_ERRORLOG_CODE(lm.monitor_id, 2, static_cast<int>(LogCode::LOG_CODE_RECALL), ".tmp改.pqd失败");
} else {
std::cout << "[check_recall_file] move pqd file to pqdif dir ok"
<< " src=" << new_local
<< " dst=" << dst_pqd
<< std::endl;
DIY_DEBUGLOG_CODE(lm.monitor_id, 2, static_cast<int>(LogCode::LOG_CODE_RECALL), "pqd文件移动到解析目录成功");
}
}
}
}
front.file_success.insert(front.downloading_file); front.file_success.insert(front.downloading_file);
std::string msg_ok = std::string("监测点:") + lm.monitor_name std::string msg_ok = std::string("监测点:") + lm.monitor_name
+ " 补招波形文件:" + front.downloading_file + " 补招" + RecallTypeName(front) + ""
+ front.downloading_file
+ " 执行完成"; + " 执行完成";
append_recall_record_line(dev.guid, lm.monitor_id, msg_ok); append_recall_record_line(dev.guid, lm.monitor_id, msg_ok);
@@ -5477,7 +5837,8 @@ void check_recall_file() {
} else { } else {
std::string msg_fail = std::string("监测点:") + lm.monitor_name std::string msg_fail = std::string("监测点:") + lm.monitor_name
+ " 补招波形文件:" + front.downloading_file + " 补招" + RecallTypeName(front) + ""
+ front.downloading_file
+ " 执行失败"; + " 执行失败";
append_recall_record_line(dev.guid, lm.monitor_id, msg_fail); append_recall_record_line(dev.guid, lm.monitor_id, msg_fail);
@@ -5509,6 +5870,7 @@ void check_recall_file() {
RecallFile& front = lm.recall_list_static.front(); //取测点第一条记录 RecallFile& front = lm.recall_list_static.front(); //取测点第一条记录
if (front.recall_status == static_cast<int>(RecallStatus::NOT_STARTED)) { //补招未开始 if (front.recall_status == static_cast<int>(RecallStatus::NOT_STARTED)) { //补招未开始
// 标记为 RUNNING并设置终端忙状态 // 标记为 RUNNING并设置终端忙状态
front.recall_status = static_cast<int>(RecallStatus::RUNNING); //该补招记录刷新为补招中 front.recall_status = static_cast<int>(RecallStatus::RUNNING); //该补招记录刷新为补招中
dev.isbusy = 1; //装置由idle标记为忙 dev.isbusy = 1; //装置由idle标记为忙
@@ -5519,9 +5881,13 @@ void check_recall_file() {
// 初始化状态机并发出第一个目录请求 // 初始化状态机并发出第一个目录请求
front.reset_runtime(true);//保留直下文件信息 front.reset_runtime(true);//保留直下文件信息
front.phase = RecallPhase::LISTING; //正在请求并等待“目录文件名列表” front.phase = RecallPhase::LISTING; //正在请求并等待“目录文件名列表”
if (!front.dir_candidates.empty()) {//目录列表非空 if (!front.active_dirs().empty()) {//目录列表非空
front.cur_dir_index = 0; //正在尝试的目录下标 front.cur_dir_index = 0; //正在尝试的目录下标
front.cur_dir = front.dir_candidates[0]; //第一个目录 front.cur_dir = resolve_recall_dir(
front.active_dirs()[0],
front,
lm
); //第一个目录
front.list_result = ActionResult::PENDING; //目录状态:等待返回 front.list_result = ActionResult::PENDING; //目录状态:等待返回
// ★新增:先查缓存 // ★新增:先查缓存
@@ -5550,18 +5916,18 @@ void check_recall_file() {
front.recall_status = static_cast<int>(RecallStatus::FAILED); //目录列表空,失败 front.recall_status = static_cast<int>(RecallStatus::FAILED); //目录列表空,失败
std::string msg_fail; std::string msg_fail;
if (front.direct_mode) { if (front.is_voltage_file()) {
msg_fail = std::string("监测点:") + lm.monitor_name msg_fail = std::string("监测点:") + lm.monitor_name
+ " 补招波形文件未找到,目标时标:" + " 补招" + RecallTypeName(front) + "未找到,目标时标:"
+ front.target_filetimes; + front.target_filetimes;
} else { } else {
msg_fail = std::string("监测点:") + lm.monitor_name msg_fail = std::string("监测点:") + lm.monitor_name
+ " 补招波形文件未找到,时间范围:" + " 补招" + RecallTypeName(front) + "未找到,时间范围:"
+ front.StartTime + " ~ " + front.EndTime; + front.StartTime + " ~ " + front.EndTime;
} }
append_recall_record_line(dev.guid, lm.monitor_id, msg_fail); append_recall_record_line(dev.guid, lm.monitor_id, msg_fail);
std::cout << "[check_recall_stat] empty dir_candidates, FAIL dev=" << dev.terminal_id std::cout << "[check_recall_stat] empty active_dirs, FAIL dev=" << dev.terminal_id
<< " monitor=" << lm.monitor_id << std::endl; << " monitor=" << lm.monitor_id << std::endl;
} }
@@ -5700,10 +6066,10 @@ bool enqueue_direct_download(const std::string& dev_id,
rf.recall_status = static_cast<int>(RecallStatus::NOT_STARTED); rf.recall_status = static_cast<int>(RecallStatus::NOT_STARTED);
rf.StartTime = "1970-01-01 00:00:00"; // 仅占位,直下文件不会用到时间窗 rf.StartTime = "1970-01-01 00:00:00"; // 仅占位,直下文件不会用到时间窗
rf.EndTime = "1970-01-01 00:00:01"; rf.EndTime = "1970-01-01 00:00:01";
//rf.dir_candidates = dir_candidates; // 要检索的目录列表和默认的一致
rf.direct_mode = true; // ★关键:直下文件
rf.target_filetimes=filetime; // ▲单个文件时间入“列表” rf.target_filetimes=filetime; // ▲单个文件时间入“列表”
rf.file_type = RecallFileType::VOLTAGE_FILE; // 直下波形文件
lm_it->recall_list_static.push_back(std::move(rf)); lm_it->recall_list_static.push_back(std::move(rf));
// 若设备空闲,可直接置忙(可选,视你的流程而定) // 若设备空闲,可直接置忙(可选,视你的流程而定)

View File

@@ -68,6 +68,12 @@ enum class ActionResult {
}; };
// ====== ★修改:扩展 RecallFile支持“多目录 + 文件筛选 + 串行下载”的状态机 ====== // ====== ★修改:扩展 RecallFile支持“多目录 + 文件筛选 + 串行下载”的状态机 ======
enum class RecallFileType {
NONE = 0,
STEADY_FILE, // 稳态文件
VOLTAGE_FILE // 暂态直下文件
};
class RecallFile class RecallFile
{ {
public: public:
@@ -75,18 +81,35 @@ public:
int recall_status; // 补招状态 0-未补招 1-补招中 2-补招完成 3-补招失败 int recall_status; // 补招状态 0-未补招 1-补招中 2-补招完成 3-补招失败
std::string StartTime; // 数据补招起始时间yyyy-MM-dd HH:mm:ss std::string StartTime; // 数据补招起始时间yyyy-MM-dd HH:mm:ss
std::string EndTime; // 数据补招结束时间yyyy-MM-dd HH:mm:ss std::string EndTime; // 数据补招结束时间yyyy-MM-dd HH:mm:ss
// ===== 业务类型 =====
// STEADY稳态文件补招
// VOLTAGE暂态事件补招
std::string STEADY; // 补招历史统计数据标识 0-不补招1-补招 std::string STEADY; // 补招历史统计数据标识 0-不补招1-补招
std::string VOLTAGE; // 补招暂态事件标识 0-不补招1-补招 std::string VOLTAGE; // 补招暂态事件标识 0-不补招1-补招
// ===== 文件下载类型 =====
RecallFileType file_type = RecallFileType::NONE;
//暂态文件用 //暂态文件用
bool direct_mode = false; // 直下文件开关true 表示不按时间窗,仅按目标文件名 std::string target_filetimes; // 直下文件匹配时间点yyyyMMdd_HHmmss
std::string target_filetimes; // 直下文件匹配时间点yyyyMMdd_HHmmss仅 direct_mode=true 时有效
// ★新增:按“目录名 -> 文件名列表”的映射;由“其他线程”在目录请求成功后回填 // ★新增:按“目录名 -> 文件名列表”的映射;由“其他线程”在目录请求成功后回填
std::map<std::string, std::vector<tag_dir_info>> dir_files; std::map<std::string, std::vector<tag_dir_info>> dir_files;
// ★新增:候选目录(可扩展) std::vector<std::string> steady_dir_candidates{
std::vector<std::string> dir_candidates{ "/cf/pqdif", //580绝对真实路径
"/bd0/pqdif", //chemengyu提供包含bd0
"/bd0/pqdif/%DESC%/%DAY%",
"/bd0/pqdif/Line%SEQ%",
"/bd0/historyFile/%DESC%",
"/pqdif", // 默认版本 / 新疆 可能取到其他测点文件
"/pqdif/%DESC%/%DAY%", // 上海pqdif_dir_cfg 或描述目录 + 日期 不会取到其他测点文件
"/pqdif/Line%SEQ%", // 云南 不会取到其他测点文件
"/historyFile/%DESC%" // 广东 不会取到其他测点文件
};
std::vector<std::string> voltage_dir_candidates{
"/cf/COMTRADE", "/cf/COMTRADE",
"/bd0/COMTRADE", "/bd0/COMTRADE",
"/sd0/COMTRADE", "/sd0/COMTRADE",
@@ -102,6 +125,9 @@ public:
ActionResult list_result = ActionResult::PENDING; // 当前目录的列举结果 ActionResult list_result = ActionResult::PENDING; // 当前目录的列举结果
ActionResult download_result = ActionResult::PENDING; // 当前文件的下载结果 ActionResult download_result = ActionResult::PENDING; // 当前文件的下载结果
// 稳态文件用:一个 guid 下的多个补招时间段
std::vector<std::pair<long long, long long>> recall_ranges;
// ★新增:下载队列(已筛选出在时间窗内的文件,含完整路径) // ★新增:下载队列(已筛选出在时间窗内的文件,含完整路径)
std::list<std::string> download_queue; //一个时间可能对应多个文件 std::list<std::string> download_queue; //一个时间可能对应多个文件
std::string downloading_file; // 当前正在下载的文件(完整路径) std::string downloading_file; // 当前正在下载的文件(完整路径)
@@ -109,8 +135,22 @@ public:
std::unordered_set<std::string> required_files; // 本次应当下载成功的文件全集 std::unordered_set<std::string> required_files; // 本次应当下载成功的文件全集
std::unordered_set<std::string> file_success; // 已下载成功的文件集合 std::unordered_set<std::string> file_success; // 已下载成功的文件集合
// 是否稳态文件任务
bool is_steady_file() const {
return file_type == RecallFileType::STEADY_FILE;
}
// 是否暂态直下文件任务
bool is_voltage_file() const {
return file_type == RecallFileType::VOLTAGE_FILE;
}
const std::vector<std::string>& active_dirs() const {
return is_steady_file() ? steady_dir_candidates : voltage_dir_candidates;
}
// ★新增:一个便捷复位 // ★新增:一个便捷复位
void reset_runtime(bool keep_direct = false) void reset_runtime(bool keep_target_filetimes = false)
{ {
phase = RecallPhase::IDLE; phase = RecallPhase::IDLE;
cur_dir_index = 0; cur_dir_index = 0;
@@ -124,9 +164,10 @@ public:
required_files.clear(); required_files.clear();
file_success.clear(); file_success.clear();
// 注意file_type 不属于运行态,不能清除,因为它决定了本次补招的业务类型(稳态/暂态),而这个业务类型在整个补招过程中是固定的,不应当被运行态重置影响
// ★新增:按需保留直下文件开关和目标名 // ★新增:按需保留直下文件开关和目标名
if (!keep_direct) { if (!keep_target_filetimes) {
direct_mode = false;
target_filetimes.clear(); // ▲列表清空 target_filetimes.clear(); // ▲列表清空
} }
} }

View File

@@ -98,6 +98,8 @@ extern int TEST_PORT; //测试端口号
extern std::string FRONT_INST; extern std::string FRONT_INST;
extern bool PQD_FLAG;
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////// 功能函数 /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// 功能函数
template<typename T, typename... Args> template<typename T, typename... Args>
@@ -130,6 +132,11 @@ bool parse_param(int argc, char* argv[]) {
try { try {
g_front_seg_index = std::stoi(val.substr(0, pos)); g_front_seg_index = std::stoi(val.substr(0, pos));
g_front_seg_num = std::stoi(val.substr(pos + 1)); g_front_seg_num = std::stoi(val.substr(pos + 1));
if (g_front_seg_index == 0) {
PQD_FLAG = true;
}
} catch (...) { } catch (...) {
std::cerr << "Invalid -s format." << std::endl; std::cerr << "Invalid -s format." << std::endl;
} }
@@ -144,6 +151,11 @@ bool parse_param(int argc, char* argv[]) {
try { try {
g_front_seg_index = std::stoi(val.substr(0, pos)); g_front_seg_index = std::stoi(val.substr(0, pos));
g_front_seg_num = std::stoi(val.substr(pos + 1)); g_front_seg_num = std::stoi(val.substr(pos + 1));
if (g_front_seg_index == 0) {
PQD_FLAG = true;
}
} catch (...) { } catch (...) {
std::cerr << "Invalid -s format." << std::endl; std::cerr << "Invalid -s format." << std::endl;
} }
@@ -223,13 +235,13 @@ std::string get_parent_directory() {
//解析模板文件 //解析模板文件
//Set_xml_nodeinfo(); //Set_xml_nodeinfo();
StartFrontThread(); //开启主线程
StartMQConsumerThread(); //开启消费者线程
StartMQProducerThread(); //开启生产者线程 StartMQProducerThread(); //开启生产者线程
if(!PQD_FLAG){
StartFrontThread(); //开启主线程
StartMQConsumerThread(); //开启消费者线程
StartTimerThread(); //开启定时线程 StartTimerThread(); //开启定时线程
}
//启动worker 根据启动标志启动 //启动worker 根据启动标志启动
if(G_TEST_FLAG){ if(G_TEST_FLAG){

View File

@@ -758,12 +758,20 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) {
<< ", VOLTAGE=" << rf.VOLTAGE << ", VOLTAGE=" << rf.VOLTAGE
<< "\n"; << "\n";
// ★新增:直下模式与目标时间列表 // ★新增:文件补招类型与目标信息
os << "\r\x1B[K |-- direct_mode=" << (rf.direct_mode ? "true" : "false") os << "\r\x1B[K |-- file_type="
<< ", target_filetimes(" << rf.target_filetimes << ")\n"; << (rf.is_steady_file() ? "STEADY_FILE" :
{ rf.is_voltage_file() ? "VOLTAGE_FILE" : "NONE");
os << "\r\x1B[K |.. " << rf.target_filetimes << "\n";
if (rf.is_voltage_file()) {
os << ", target_filetimes=" << rf.target_filetimes;
} }
else if (rf.is_steady_file()) {
os << ", time_range=" << rf.StartTime
<< " ~ " << rf.EndTime;
}
os << "\n";
// ★新增:状态机运行态 // ★新增:状态机运行态
os << "\r\x1B[K |-- phase=" << phaseStr(rf.phase) os << "\r\x1B[K |-- phase=" << phaseStr(rf.phase)
@@ -773,15 +781,15 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) {
<< ", download_result=" << resultStr(rf.download_result) << "\n"; << ", download_result=" << resultStr(rf.download_result) << "\n";
// ★新增:候选目录 // ★新增:候选目录
os << "\r\x1B[K |-- dir_candidates(" << rf.dir_candidates.size() << ")\n"; os << "\r\x1B[K |-- active_dirs(" << rf.active_dirs().size() << ")\n";
{ {
size_t c = 0; size_t c = 0;
for (const auto& d : rf.dir_candidates) { for (const auto& d : rf.active_dirs()) {
if (c++ >= MAX_ITEMS) break; if (c++ >= MAX_ITEMS) break;
os << "\r\x1B[K |-- " << d << "\n"; os << "\r\x1B[K |-- " << d << "\n";
} }
if (rf.dir_candidates.size() > MAX_ITEMS) { if (rf.active_dirs().size() > MAX_ITEMS) {
os << "\r\x1B[K |.. (+" << (rf.dir_candidates.size() - MAX_ITEMS) << " more)\n"; os << "\r\x1B[K |.. (+" << (rf.active_dirs().size() - MAX_ITEMS) << " more)\n";
} }
} }

View File

@@ -350,11 +350,13 @@ void process_received_message(string mac, string id,const char* data, size_t len
<< ", 时间戳: " << record.triggerTimeMs << "ms" << std::endl; << ", 时间戳: " << record.triggerTimeMs << "ms" << std::endl;
//lnk20250805 事件上送先记录,录波文件上传结束后再更新文件 //lnk20250805 事件上送先记录,录波文件上传结束后再更新文件
if(record.nType != 0){
append_qvvr_event(id,event.head.name, append_qvvr_event(id,event.head.name,
record.nType,record.fPersisstime,record.fMagntitude,record.triggerTimeMs,record.phase); record.nType,record.fPersisstime,record.fMagntitude,record.triggerTimeMs,record.phase);
transfer_json_qvvr_data(id,event.head.name, transfer_json_qvvr_data(id,event.head.name,
record.fMagntitude,record.fPersisstime,record.triggerTimeMs,record.nType,record.phase, record.fMagntitude,record.fPersisstime,record.triggerTimeMs,record.nType,record.phase,
""); "");
}
//事件主动上送处理完成,不需要通知状态机 //事件主动上送处理完成,不需要通知状态机
} }
@@ -2449,12 +2451,14 @@ void process_received_message(string mac, string id,const char* data, size_t len
<< ", 时间戳: " << record.triggerTimeMs << "ms" << std::endl; << ", 时间戳: " << record.triggerTimeMs << "ms" << std::endl;
//记录补招上来的暂态事件 //记录补招上来的暂态事件
if(record.nType != 0){
append_qvvr_event(id,event.head.name, append_qvvr_event(id,event.head.name,
record.nType,record.fPersisstime,record.fMagntitude,record.triggerTimeMs,record.phase); record.nType,record.fPersisstime,record.fMagntitude,record.triggerTimeMs,record.phase);
//直接发走暂态事件 //直接发走暂态事件
transfer_json_qvvr_data(id,event.head.name, transfer_json_qvvr_data(id,event.head.name,
record.fMagntitude,record.fPersisstime,record.triggerTimeMs,record.nType,record.phase,""); record.fMagntitude,record.fPersisstime,record.triggerTimeMs,record.nType,record.phase,"");
}
//通知状态机补招暂态事件成功 //通知状态机补招暂态事件成功
on_device_response_minimal(static_cast<int>(ResponseCode::OK), id, 0, static_cast<int>(DeviceState::READING_EVENTLOG)); on_device_response_minimal(static_cast<int>(ResponseCode::OK), id, 0, static_cast<int>(DeviceState::READING_EVENTLOG));

View File

@@ -20,6 +20,7 @@
#include <cstdlib> #include <cstdlib>
#include <limits> #include <limits>
#include <utility> #include <utility>
#include <new>
// PQDIF 解析库 // PQDIF 解析库
#include "pqdif/PQDIF.h" #include "pqdif/PQDIF.h"
@@ -28,6 +29,17 @@
#include "pqdif/include/pqdif_lg.h" #include "pqdif/include/pqdif_lg.h"
#include "pqdif_semantic_ids.h" #include "pqdif_semantic_ids.h"
#include "cloudfront/code/log4.h" //lnk20260526
extern void enqueue_stat_pq(const std::string& max_base64Str,
const std::string& min_base64Str,
const std::string& avg_base64Str,
const std::string& cp95_base64Str,
time_t data_time,
const std::string& mac,
short cid);
extern std::string extract_filename1(const std::string& path);
namespace fs = std::experimental::filesystem; namespace fs = std::experimental::filesystem;
namespace { namespace {
@@ -37,7 +49,11 @@ namespace {
constexpr int kMaxPqdifFilesPerScan = 1; constexpr int kMaxPqdifFilesPerScan = 1;
constexpr size_t kParsedCacheLimit = 128; constexpr size_t kParsedCacheLimit = 128;
const char* kPqdRootDir = "download"; // 大文件流式阈值:估算展开点数超过该值时,不再构造 expanded_stat_points
// 而是按通道逐步聚合成时间桶并直接组装 Base64避免单文件中间对象占用过大内存。
constexpr size_t kPqdifLargeFileStreamingPointThreshold = 800000;
const char* kPqdRootDir = "download_pqdif";
const char* kDoneRootDir = "download_done"; const char* kDoneRootDir = "download_done";
const char* kFailRootDir = "download_fail"; const char* kFailRootDir = "download_fail";
@@ -5072,6 +5088,250 @@ namespace {
return stat_select_best_metric_sources(out); return stat_select_best_metric_sources(out);
} }
size_t stat_estimate_candidate_point_count_for_streaming(const PqdifLogicalFile& lf)
{
size_t total = 0;
for (const auto& obs : lf.observations)
{
for (const auto& ch : obs.channel_instances)
{
if (!pqdif_sem::IsQuantityTypeValueLog(ch.quantity_type_id.value))
continue;
for (const auto& si : ch.series_instances)
{
if (pqdif_sem::IsValueTypeTime(si.value_type_id.value))
continue;
const PqdifSeriesInstance* resolved = stat_resolve_shared_series(obs, si);
if (resolved == nullptr || resolved->values.count <= 0)
continue;
total += static_cast<size_t>(resolved->values.count);
if (total > kPqdifLargeFileStreamingPointThreshold)
return total;
}
}
}
return total;
}
struct StatStreamingSourceBucket
{
StatMetricId metric_id = StatMetricId::Unknown;
StatMetricSourceKey key;
StatMetricSourceStats stats;
std::map<time_t, AggregatedStatValues> values_by_time;
std::map<time_t, std::string> time_text_by_time;
};
void stat_streaming_set_value(AggregatedStatValues& agg, const ExpandedStatPoint& p)
{
if (agg.source_observation_index < 0)
{
agg.source_observation_index = p.observation_index;
agg.source_channel_instance_index = p.channel_instance_index;
agg.source_series_instance_index = p.series_instance_index;
agg.source_channel_name = p.channel_name;
agg.quality = StatMetricQuality::Normal;
agg.quality_reason = "ok";
}
// 同一来源同一时间同一种统计值如果重复,保留首次写入,避免一次性覆盖造成结果不稳定。
if (stat_has_value_kind(agg, p.stat_kind))
return;
agg.source_series_instance_index = p.series_instance_index;
switch (p.stat_kind)
{
case StatValueKind::Min:
agg.has_min = true;
agg.min_value = p.value;
break;
case StatValueKind::Max:
agg.has_max = true;
agg.max_value = p.value;
break;
case StatValueKind::Avg:
agg.has_avg = true;
agg.avg_value = p.value;
break;
case StatValueKind::P95:
agg.has_p95 = true;
agg.p95_value = p.value;
break;
default:
break;
}
}
void stat_streaming_add_point(
std::map<StatMetricId, std::map<StatMetricSourceKey, StatStreamingSourceBucket>>& by_metric,
const ExpandedStatPoint& p)
{
if (p.metric_id == StatMetricId::Unknown || p.stat_kind == StatValueKind::Unknown)
return;
const StatMetricSourceKey key = stat_make_source_key(p);
StatStreamingSourceBucket& src = by_metric[p.metric_id][key];
if (src.metric_id == StatMetricId::Unknown)
{
src.metric_id = p.metric_id;
src.key = key;
}
src.stats.add(p);
AggregatedStatValues& agg = src.values_by_time[p.timestamp];
stat_streaming_set_value(agg, p);
if (src.time_text_by_time.find(p.timestamp) == src.time_text_by_time.end())
src.time_text_by_time[p.timestamp] = p.timestamp_text;
}
std::vector<TimeAggregatedStatBucket> stat_build_aggregated_buckets_streaming(
const PqdifLogicalFile& lf,
ParsedConnectionKind connection_kind,
int& selected_observation_index,
std::string& selected_observation_name,
size_t& out_raw_point_count,
size_t& out_selected_metric_count,
size_t& out_selected_dynamic_metric_count)
{
selected_observation_index = -1;
selected_observation_name.clear();
out_raw_point_count = 0;
out_selected_metric_count = 0;
out_selected_dynamic_metric_count = 0;
const PqdifObservationRecord* primary = stat_select_primary_statistical_observation(lf);
if (primary != nullptr)
{
selected_observation_index = primary->observation_index;
selected_observation_name = primary->observation_name;
}
std::map<StatMetricId, std::map<StatMetricSourceKey, StatStreamingSourceBucket>> by_metric;
// 大文件模式:遍历全部 observations但每次只临时展开一个通道随后立刻压缩到
// metric/source/timestamp/kind 聚合结构中,不保留全量 ExpandedStatPoint。
for (const auto& obs : lf.observations)
{
for (const auto& ch : obs.channel_instances)
{
std::vector<ExpandedStatPoint> points = stat_expand_channel_points(lf, connection_kind, obs, ch);
out_raw_point_count += points.size();
for (const auto& p : points)
stat_streaming_add_point(by_metric, p);
}
}
std::map<time_t, TimeAggregatedStatBucket> global_buckets;
size_t duplicate_metric_count = 0;
for (auto& metric_pair : by_metric)
{
const StatMetricId metric_id = metric_pair.first;
auto& sources = metric_pair.second;
if (sources.empty())
continue;
bool has_selected = false;
StatMetricSourceKey best_key;
int best_score = std::numeric_limits<int>::min();
for (auto& src_pair : sources)
{
int score = stat_metric_source_score(src_pair.second.stats);
// 大文件流式模式会遍历全部 observations。为尽量保持原逻辑普通核心指标在分数接近时
// 优先使用主统计 observation谐波等只存在于其他 observation 时仍能正常选中。
if (selected_observation_index >= 0 && src_pair.first.observation_index == selected_observation_index)
score += 25;
if (!has_selected || score > best_score)
{
has_selected = true;
best_score = score;
best_key = src_pair.first;
}
}
if (!has_selected)
continue;
++out_selected_metric_count;
if (stat_is_dynamic_metric(metric_id))
++out_selected_dynamic_metric_count;
if (sources.size() > 1)
++duplicate_metric_count;
StatStreamingSourceBucket& selected_src = sources[best_key];
for (auto& time_pair : selected_src.values_by_time)
{
TimeAggregatedStatBucket& bucket = global_buckets[time_pair.first];
if (bucket.timestamp == 0)
{
bucket.timestamp = time_pair.first;
const auto tit = selected_src.time_text_by_time.find(time_pair.first);
bucket.timestamp_text = (tit == selected_src.time_text_by_time.end()) ? format_time_text(time_pair.first) : tit->second;
}
bucket.metrics[metric_id] = time_pair.second;
}
}
std::vector<TimeAggregatedStatBucket> out;
out.reserve(global_buckets.size());
for (auto& kv : global_buckets)
out.push_back(std::move(kv.second));
if (pqdif_log_enabled(PqdifLogLevel::Core))
{
std::cout << "========== PQDIF LARGE FILE STREAMING SUMMARY ==========" << std::endl;
std::cout << "raw_points=" << out_raw_point_count
<< ", candidate_metric_count=" << by_metric.size()
<< ", selected_metric_count=" << out_selected_metric_count
<< ", selected_dynamic_metrics=" << out_selected_dynamic_metric_count << "/" << stat_all_dynamic_metric_order().size()
<< ", duplicate_metric_count=" << duplicate_metric_count
<< ", buckets=" << out.size()
<< ", selected_observation_index=" << selected_observation_index
<< ", selected_observation_name=" << selected_observation_name
<< std::endl;
std::cout << "mode=streaming_no_expanded_points; detail=per-channel expand then immediate aggregate" << std::endl;
std::cout << "========================================================" << std::endl;
}
return out;
}
void stat_print_aggregated_metric_line(StatMetricId metric_id, const AggregatedStatValues* agg);
void dump_grouped_bucket_streaming_preview(const ParsedPqdifFile& parsed_file)
{
std::cout << "========== GROUPED STAT STREAMING CORE SUMMARY ==========" << std::endl;
std::cout << "connection_kind=" << stat_connection_kind_name(parsed_file.connection_kind)
<< ", selected_observation_index=" << parsed_file.selected_observation_index
<< ", selected_observation_name=" << parsed_file.selected_observation_name
<< ", expanded_points=SKIPPED_BY_STREAMING"
<< ", buckets=" << parsed_file.aggregated_stat_buckets.size()
<< ", core_metric_slots=" << stat_core_metric_print_order().size()
<< ", dynamic_spectrum_slots=" << stat_all_dynamic_metric_order().size()
<< std::endl;
const size_t bucket_limit = std::min<size_t>(parsed_file.aggregated_stat_buckets.size(), 3);
for (size_t i = 0; i < bucket_limit; ++i)
{
const auto& b = parsed_file.aggregated_stat_buckets[i];
std::cout << " [BUCKET " << i << "] time=" << b.timestamp_text
<< ", metric_count_present=" << b.metrics.size()
<< std::endl;
for (const auto metric_id : stat_core_metric_print_order())
{
const auto it = b.metrics.find(metric_id);
if (it != b.metrics.end())
stat_print_aggregated_metric_line(metric_id, &it->second);
}
}
std::cout << "=========================================================" << std::endl;
}
bool pqdif_probe_text_looks_like_flicker(const std::string& text) bool pqdif_probe_text_looks_like_flicker(const std::string& text)
{ {
const std::string key = normalize_key(text); const std::string key = normalize_key(text);
@@ -7496,9 +7756,41 @@ namespace {
<< ", total_base64_chars=" << file_batch.total_base64_chars << ", total_base64_chars=" << file_batch.total_base64_chars
<< std::endl; << std::endl;
// 按你的要求,入队前完整打印保存对象内部结构:文件批次 -> 时间点 -> Max/Min/Avg/P95 子记录 -> Base64 内容 // 完整 Base64 内容日志非常长,尤其大文件会明显放大内存和 I/O 压力
// 日志会很长;确认完成后可以临时注释掉这一行,或改成 if (pqdif_log_enabled(PqdifLogLevel::Info)) 包裹 // 现在仅在 Debug/Trace 级别打印完整对象Core/Info 只打印摘要和前几条子记录
//pqdif_dump_stat_base64_file_batch_full(file_batch); if (pqdif_log_enabled(PqdifLogLevel::Debug))
{
pqdif_dump_stat_base64_file_batch_full(file_batch);
}
else
{
std::cout << " [BASE64 FILE BATCH COMPACT] file=" << file_batch.pqdif_file_path
<< ", time_points=" << file_batch.time_point_count
<< ", records=" << file_batch.total_record_count
<< ", total_float_count=" << file_batch.total_float_count
<< ", total_placeholder_count=" << file_batch.total_placeholder_count
<< ", total_base64_chars=" << file_batch.total_base64_chars
<< std::endl;
size_t shown = 0;
for (const auto& tp : file_batch.time_points)
{
for (const auto& rec : tp.records)
{
if (shown >= 4)
break;
std::cout << " [BASE64 SAVED SAMPLE] time=" << rec.timestamp_text
<< ", kind=" << rec.value_kind_name
<< ", floats=" << rec.float_count
<< ", placeholders=" << rec.placeholder_count
<< ", base64_len=" << rec.base64_payload.size()
<< std::endl;
++shown;
}
if (shown >= 4)
break;
}
}
push_pqdif_stat_base64_file_batch(std::move(file_batch)); push_pqdif_stat_base64_file_batch(std::move(file_batch));
@@ -7548,6 +7840,54 @@ namespace {
// 后续指标判断需要先区分星型 / 角型两套规则。 // 后续指标判断需要先区分星型 / 角型两套规则。
parsed_file.connection_kind = stat_classify_connection_kind(parsed_file.logical_file); parsed_file.connection_kind = stat_classify_connection_kind(parsed_file.logical_file);
// 2) 根据估算展开点数决定是否启用“大文件流式模式”。
// 普通文件仍走原 expanded_stat_points -> grouped buckets 流程;
// 大文件则直接按通道聚合为 buckets不再保存全量 expanded_stat_points。
const size_t estimated_candidate_points =
stat_estimate_candidate_point_count_for_streaming(parsed_file.logical_file);
const bool use_large_file_streaming =
estimated_candidate_points > kPqdifLargeFileStreamingPointThreshold;
if (use_large_file_streaming)
{
std::cout << "[PQDIF][MEMORY][STREAMING] large file detected, use streaming bucket/base64 build. "
<< "estimated_candidate_points=" << estimated_candidate_points
<< ", threshold=" << kPqdifLargeFileStreamingPointThreshold
<< std::endl;
size_t raw_point_count = 0;
size_t selected_metric_count = 0;
size_t selected_dynamic_metric_count = 0;
parsed_file.aggregated_stat_buckets = stat_build_aggregated_buckets_streaming(
parsed_file.logical_file,
parsed_file.connection_kind,
parsed_file.selected_observation_index,
parsed_file.selected_observation_name,
raw_point_count,
selected_metric_count,
selected_dynamic_metric_count);
if (pqdif_is_trace_log_enabled())
dump_semantic_probe(parsed_file);
dump_grouped_bucket_streaming_preview(parsed_file);
// 3) 大文件仍完整生成 Base64 文件批次,只是跳过 expanded_stat_points 中间缓存。
pqdif_build_and_queue_base64_records(parsed_file);
// 大文件模式不再把完整 ParsedPqdifFile 放入解析缓存,避免 logical_file + buckets 在内存中长期驻留。
// 后续业务请从 PqdifStatBase64FileBatch 队列读取最终 Base64 结果。
std::cout << "[PQDIF][MEMORY][STREAMING] skip parsed_file cache for large file, "
<< "base64 batch has been queued. file=" << file_path.string()
<< std::endl;
std::cout << "[PQDIF] processed ok(streaming): " << file_path.string()
<< ", cache_size=" << GetParsedPqdifCacheSize()
<< std::endl;
return true;
}
// 2) 只筛选“统计类 observation”并对其展开目标统计指标。 // 2) 只筛选“统计类 observation”并对其展开目标统计指标。
// 当前阶段不处理全部 observation避免不同 observation 混入同一时间聚合结果。 // 当前阶段不处理全部 observation避免不同 observation 混入同一时间聚合结果。
parsed_file.expanded_stat_points = stat_expand_selected_statistical_observation( parsed_file.expanded_stat_points = stat_expand_selected_statistical_observation(
@@ -7655,14 +7995,50 @@ namespace {
<< ", max_per_scan=" << kMaxPqdifFilesPerScan << ", max_per_scan=" << kMaxPqdifFilesPerScan
<< std::endl; << std::endl;
const bool ok = process_single_pqdif_file(item.path, item.mac); bool ok = false;
bool caught_exception = false;
std::string exception_text;
try
{
ok = process_single_pqdif_file(item.path, item.mac);
}
catch (const std::bad_alloc& ex)
{
ok = false;
caught_exception = true;
exception_text = ex.what();
std::cout << "[PQDIF][OOM] std::bad_alloc while processing file="
<< item.path.string()
<< ", what=" << exception_text
<< std::endl;
}
catch (const std::exception& ex)
{
ok = false;
caught_exception = true;
exception_text = ex.what();
std::cout << "[PQDIF][ERROR] exception while processing file="
<< item.path.string()
<< ", what=" << exception_text
<< std::endl;
}
catch (...)
{
ok = false;
caught_exception = true;
exception_text = "unknown";
std::cout << "[PQDIF][ERROR] unknown exception while processing file="
<< item.path.string()
<< std::endl;
}
const fs::path target_root = ok ? fs::path(kDoneRootDir) : fs::path(kFailRootDir); const fs::path target_root = ok ? fs::path(kDoneRootDir) : fs::path(kFailRootDir);
const fs::path dst = target_root / item.mac / item.path.filename(); const fs::path dst = target_root / item.mac / item.path.filename();
// 处理完成后移动文件,避免下一轮 scan_once() 重复解析同一个 PQDIF。 // 处理完成后移动文件,避免下一轮 scan_once() 重复解析同一个 PQDIF。
// 解析成功download/<mac>/<file>.pqd -> download_done/<mac>/<file>.pqd // 解析成功download/<mac>/<file>.pqd -> download_done/<mac>/<file>.pqd
// 解析失败download/<mac>/<file>.pqd -> download_fail/<mac>/<file>.pqd // 解析失败或解析过程中出现异常download/<mac>/<file>.pqd -> download_fail/<mac>/<file>.pqd
// //
// 调试时如果想让文件保留在 download 目录中、方便反复解析同一个文件, // 调试时如果想让文件保留在 download 目录中、方便反复解析同一个文件,
// 可以临时注释掉下面这个 if 块;调试结束后建议恢复,否则每一轮都会重复解析。 // 可以临时注释掉下面这个 if 块;调试结束后建议恢复,否则每一轮都会重复解析。
@@ -7672,6 +8048,13 @@ namespace {
<< item.path.string() << " -> " << dst.string() << item.path.string() << " -> " << dst.string()
<< std::endl; << std::endl;
} }
else if (caught_exception)
{
std::cout << "[PQDIF] exception file moved to fail dir: "
<< dst.string()
<< ", reason=" << exception_text
<< std::endl;
}
cleanup_backup_dir(fs::path(kDoneRootDir) / item.mac, kBackupLimit); cleanup_backup_dir(fs::path(kDoneRootDir) / item.mac, kBackupLimit);
cleanup_backup_dir(fs::path(kFailRootDir) / item.mac, kBackupLimit); cleanup_backup_dir(fs::path(kFailRootDir) / item.mac, kBackupLimit);
@@ -7877,6 +8260,96 @@ void ClearReadyPqdifStatBase64Queue()
g_pqdif_stat_base64_ready_queue.clear(); g_pqdif_stat_base64_ready_queue.clear();
} }
static bool GetBase64ByKind(const PqdifStatBase64TimePointPacket& tp, //从序列中获取指定 kind 的 Base64 内容
StatValueKind kind,
std::string& out)
{
for (const auto& r : tp.records) {
if (r.value_kind == kind) {
out = r.base64_payload;
return !out.empty();
}
}
return false;
}
static bool extract_monitor_seq_from_local_pqdif_path(const std::string& path,
short& point_name)
{
point_name = 0;
std::cout << "[extract_monitor_seq] begin path="
<< path << std::endl;
// 取纯文件名,例如:
// download/192.168.1.10/M1_xxx.pqd
// -> M1_xxx.pqd
std::string fname = extract_filename1(path);
std::cout << "[extract_monitor_seq] filename="
<< fname << std::endl;
if (fname.size() < 3) {
std::cout << "[extract_monitor_seq] filename too short"
<< std::endl;
return false;
}
if (fname[0] != 'M' && fname[0] != 'm') {
std::cout << "[extract_monitor_seq] filename not start with M/m"
<< std::endl;
return false;
}
size_t pos = fname.find('_');
std::cout << "[extract_monitor_seq] underscore pos="
<< pos << std::endl;
if (pos == std::string::npos || pos <= 1) {
std::cout << "[extract_monitor_seq] invalid underscore position"
<< std::endl;
return false;
}
// M1_xxx -> 1
std::string seq_str = fname.substr(1, pos - 1);
std::cout << "[extract_monitor_seq] seq_str="
<< seq_str << std::endl;
for (char c : seq_str) {
if (!std::isdigit(static_cast<unsigned char>(c))) {
std::cout << "[extract_monitor_seq] non-digit char="
<< c << std::endl;
return false;
}
}
try {
point_name = static_cast<short>(std::stoi(seq_str));
std::cout << "[extract_monitor_seq] success point_name="
<< point_name << std::endl;
return point_name > 0;
}
catch (const std::exception& e) {
std::cout << "[extract_monitor_seq] exception="
<< e.what() << std::endl;
point_name = 0;
return false;
}
catch (...) {
std::cout << "[extract_monitor_seq] unknown exception"
<< std::endl;
point_name = 0;
return false;
}
}
void RunPqdifScanLoop() void RunPqdifScanLoop()
{ {
std::cout << "[PQDIF] scan loop started, root=" << kPqdRootDir std::cout << "[PQDIF] scan loop started, root=" << kPqdRootDir
@@ -7910,6 +8383,54 @@ void RunPqdifScanLoop()
if (PopReadyPqdifStatBase64FileBatch(batch)) { if (PopReadyPqdifStatBase64FileBatch(batch)) {
// batch 就是一个 PQDIF 文件完整的 Base64 组装结果 // batch 就是一个 PQDIF 文件完整的 Base64 组装结果
// 在此处处理上送逻辑 // 在此处处理上送逻辑
const std::string& mac = batch.mac;
short point_name = 0;
if (!extract_monitor_seq_from_local_pqdif_path(batch.pqdif_file_path, point_name)) {
std::cout << "[PQDIF_UPLOAD] failed to extract monitor seq from file="
<< batch.pqdif_file_path << std::endl;
continue;
}
for (const auto& tp : batch.time_points) {
std::string max_base64;
std::string min_base64;
std::string avg_base64;
std::string p95_base64;
bool has_max = GetBase64ByKind(tp, StatValueKind::Max, max_base64);
bool has_min = GetBase64ByKind(tp, StatValueKind::Min, min_base64);
bool has_avg = GetBase64ByKind(tp, StatValueKind::Avg, avg_base64);
bool has_p95 = GetBase64ByKind(tp, StatValueKind::P95, p95_base64);
if (!has_max || !has_min || !has_avg || !has_p95) {
std::cout << "[PQDIF_UPLOAD] skip incomplete time point, file="
<< batch.pqdif_file_path
<< " time=" << tp.timestamp_text
<< " has_max=" << has_max
<< " has_min=" << has_min
<< " has_avg=" << has_avg
<< " has_p95=" << has_p95
<< std::endl;
continue;
}
enqueue_stat_pq(max_base64,
min_base64,
avg_base64,
p95_base64,
tp.timestamp,
mac,
point_name);
std::cout << "[PQDIF_UPLOAD] enqueue_stat_pq ok, file="
<< batch.pqdif_file_path
<< " time=" << tp.timestamp_text
<< " mac=" << mac
<< " point=" << point_name
<< std::endl;
}
} }
} }
catch (const std::exception& ex) catch (const std::exception& ex)