recall stat

This commit is contained in:
lnk
2026-05-28 14:42:32 +08:00
parent 2161629fe0
commit 671fc6702e
5 changed files with 636 additions and 205 deletions

View File

@@ -22,6 +22,8 @@
#include <fnmatch.h> #include <fnmatch.h>
#include <memory> #include <memory>
#include <unordered_set> #include <unordered_set>
#include <regex>
#include <algorithm>
///////////////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////////
@@ -657,6 +659,9 @@ void init_config() {
MULTIPLE_NODE_FLAG = 0; MULTIPLE_NODE_FLAG = 0;
std::cout << "this is single process" << std::endl; std::cout << "this is single process" << std::endl;
} }
else if(g_front_seg_num > 0 && g_front_seg_index == 0){
std::cout << "this is pqdif process" << std::endl;
}
else{ else{
DIY_ERRORLOG_CODE("process",0,LOG_CODE_CONFIG,"进程号参数异常,当前进程退出"); DIY_ERRORLOG_CODE("process",0,LOG_CODE_CONFIG,"进程号参数异常,当前进程退出");
exit(-1039); exit(-1039);
@@ -1406,7 +1411,7 @@ int recall_json_handle_from_mq(const std::string& body)
// 根据 monitorId 和提取的数字初始化补招记录 // 根据 monitorId 和提取的数字初始化补招记录
init_recall_record_file(guid, terminalId, monId, "", ""); init_recall_record_file(guid, terminalId, monId, "", "");
//根据时间戳单独补招事件 //根据时间戳单独补招事件,暂不使用
if(0)// ★新增dt==2 同步生成“按小时”的事件补招到 recall_list与 dt==1 逻辑一致)——开始 if(0)// ★新增dt==2 同步生成“按小时”的事件补招到 recall_list与 dt==1 逻辑一致)——开始
{ {
std::lock_guard<std::mutex> lock2(ledgermtx); // 复用与 dt==1 相同的加锁粒度 std::lock_guard<std::mutex> lock2(ledgermtx); // 复用与 dt==1 相同的加锁粒度
@@ -1490,34 +1495,43 @@ int recall_json_handle_from_mq(const std::string& body)
} }
if (!lm) { std::cout << "monitorId未在terminal内找到: " << monId << " @ " << terminalId << std::endl; continue; } if (!lm) { std::cout << "monitorId未在terminal内找到: " << monId << " @ " << terminalId << std::endl; continue; }
// 稳态文件一个 guid + monitor 只生成一条 RecallFile
RecallFile rm_all;
bool has_steady_range = false;
if (stat == 1) {
rm_all.recall_guid = guid;
rm_all.recall_status = 0;
rm_all.STEADY = std::to_string(stat);
rm_all.VOLTAGE = std::to_string(voltage);
rm_all.file_type = RecallFileType::STEADY_FILE;
}
for (const auto& ti : rec["timeInterval"]) { for (const auto& ti : rec["timeInterval"]) {
if (!ti.is_string()) continue; if (!ti.is_string()) continue;
std::string s = ti.get<std::string>(); std::string s = ti.get<std::string>();
std::string::size_type pos = s.find('~'); std::string::size_type pos = s.find('~');
if (pos == std::string::npos) { std::cout << "timeInterval格式错误: " << s << std::endl; continue; } if (pos == std::string::npos) { std::cout << "timeInterval格式错误: " << s << std::endl; continue; }
std::string start = s.substr(0, pos); std::string start = s.substr(0, pos);
std::string end = s.substr(pos + 1); std::string end = s.substr(pos + 1);
RecallFile rm_all;
rm_all.recall_guid = guid;
rm_all.recall_status = 0;
rm_all.StartTime = start;
rm_all.EndTime = end;
rm_all.STEADY = std::to_string(stat);
rm_all.VOLTAGE = std::to_string(voltage);
//lnk 20251027xuyang request:生成文件记录单个测点单个时间段的补招记录文件,补招结束后使用这个文件信息来响应 //lnk 20251027xuyang request:生成文件记录单个测点单个时间段的补招记录文件,补招结束后使用这个文件信息来响应
init_recall_record_file(guid, terminalId, monId, start, end); init_recall_record_file(guid, terminalId, monId, start, end);
if (voltage == 1) { if (voltage == 1) {
std::vector<RecallInfo> recallinfo_list_hour; std::vector<RecallInfo> recallinfo_list_hour;
Get_Recall_Time_Char(start, end, recallinfo_list_hour); Get_Recall_Time_Char(start, end, recallinfo_list_hour); //暂态将整个时间段划分为多个一小时的时间段
for (size_t i = 0; i < recallinfo_list_hour.size(); ++i) { for (size_t i = 0; i < recallinfo_list_hour.size(); ++i) {
const RecallInfo& info = recallinfo_list_hour[i]; const RecallInfo& info = recallinfo_list_hour[i];
RecallMonitor rm; RecallMonitor rm;
rm.recall_guid = guid; rm.recall_guid = guid;
rm.recall_status = 0; rm.recall_status = 0;
rm.StartTime = epoch_to_datetime_str(info.starttime); rm.StartTime = epoch_to_datetime_str(info.starttime); //每个记录是一个小时的时间段,张文暂态事件接口是一小时一条
rm.EndTime = epoch_to_datetime_str(info.endtime); rm.EndTime = epoch_to_datetime_str(info.endtime);
rm.STEADY = std::to_string(stat); rm.STEADY = std::to_string(stat);
rm.VOLTAGE = std::to_string(voltage); rm.VOLTAGE = std::to_string(voltage);
@@ -1525,10 +1539,62 @@ int recall_json_handle_from_mq(const std::string& body)
} }
} }
if (stat == 1) { if (stat == 1) {
lm->recall_list_static.push_back(rm_all); long long beg = parse_time_to_epoch(start); //转为整数时间
long long ed = parse_time_to_epoch(end);
if (beg < 0 || ed < 0 || beg > ed) {
std::cout << "[recall_file] invalid steady time range guid="
<< guid
<< " terminal=" << terminalId
<< " monitor=" << monId
<< " range=" << start << "~" << end
<< std::endl;
continue;
}
rm_all.recall_ranges.push_back(std::make_pair(beg, ed));//每个时间段都记录到补招稳态时间列表里
if (!has_steady_range) {
rm_all.StartTime = start; //记录多个时间段的第一条开始和结束
rm_all.EndTime = end;
has_steady_range = true;
} else {
long long old_beg = parse_time_to_epoch(rm_all.StartTime);
long long old_end = parse_time_to_epoch(rm_all.EndTime);
if (old_beg < 0 || beg < old_beg) {
rm_all.StartTime = start; //对比多个时间段更新最早和最晚
}
if (old_end < 0 || ed > old_end) {
rm_all.EndTime = end;
}
}
std::cout << "[recall_file] add steady range guid="
<< guid
<< " terminal=" << terminalId
<< " monitor=" << monId
<< " range=" << start << "~" << end
<< " total_ranges=" << rm_all.recall_ranges.size()
<< std::endl;
} }
if (stat == 0 && voltage == 0) return 10003; if (stat == 0 && voltage == 0) return 10003;
} }
// 所有 timeInterval 遍历完后,稳态 RecallFile 只入队一次
if (stat == 1 && has_steady_range) { //一次补招指令的处理结束
lm->recall_list_static.push_back(rm_all);//记录稳态补招
std::cout << "[recall_file] push steady recall once guid="
<< guid
<< " terminal=" << terminalId
<< " monitor=" << monId
<< " ranges=" << rm_all.recall_ranges.size()
<< " StartTime=" << rm_all.StartTime
<< " EndTime=" << rm_all.EndTime
<< std::endl;
}
} }
} else { } else {
// 未知 dataType忽略 // 未知 dataType忽略
@@ -4592,71 +4658,133 @@ void check_recall_event() {
} }
//////////////////////////////////////////////////////////////////////////////////////////////////////////////处理补招逻辑统计数据 //////////////////////////////////////////////////////////////////////////////////////////////////////////////处理补招逻辑统计数据
// ====== 从文件名中提取“第二段下划线分隔字段”并转换为 epoch 秒 ====== static std::string replace_all_copy(std::string s,
static bool extract_epoch_from_filename(const std::string& name, const std::string& from,
long long& out_epoch, const std::string& to)
int logical_device_seq)
{ {
// 拆分 size_t pos = 0;
std::vector<std::string> parts; while ((pos = s.find(from, pos)) != std::string::npos) {
parts.reserve(8); s.replace(pos, from.length(), to);
size_t start = 0, pos; pos += to.length();
while ((pos = name.find('_', start)) != std::string::npos) {
parts.emplace_back(name.substr(start, pos - start));
start = pos + 1;
} }
parts.emplace_back(name.substr(start)); // 最后一段(含扩展名) return s;
if (parts.size() < 4) return false;
// 第二段序号是倒数第4段
const std::string& seq_str = parts[parts.size() - 4];
// 允许前导 0把字符串转 int 后比较
for (char c : seq_str) if (!std::isdigit(static_cast<unsigned char>(c))) return false;
int seq_val = 0;
try {
seq_val = std::stoi(seq_str);
} catch (...) {
return false;
}
if (seq_val != logical_device_seq) return false;
// 其余与上面相同
const std::string& date_str = parts[parts.size() - 3];
const std::string& time_str = parts[parts.size() - 2];
std::string ms_str = parts.back();
size_t dot = ms_str.find('.');
if (dot != std::string::npos) {
ms_str.erase(dot);
}
if (date_str.size() != 8 || time_str.size() != 6) return false;
for (char c : date_str) if (!std::isdigit(static_cast<unsigned char>(c))) return false;
for (char c : time_str) if (!std::isdigit(static_cast<unsigned char>(c))) return false;
for (char c : ms_str) if (!std::isdigit(static_cast<unsigned char>(c))) return false;
int year = std::stoi(date_str.substr(0, 4));
int month = std::stoi(date_str.substr(4, 2));
int day = std::stoi(date_str.substr(6, 2));
int hour = std::stoi(time_str.substr(0, 2));
int min = std::stoi(time_str.substr(2, 2));
int sec = std::stoi(time_str.substr(4, 2));
// int msec = std::stoi(ms_str);
std::tm tm{}; tm.tm_isdst = -1;
tm.tm_year = year - 1900;
tm.tm_mon = month - 1;
tm.tm_mday = day;
tm.tm_hour = hour;
tm.tm_min = min;
tm.tm_sec = sec;
time_t t = timegm(&tm);
if (t < 0) return false;
out_epoch = static_cast<long long>(t); // 秒级
return true;
} }
static std::string make_day_from_datetime(const std::string& t)
{
// yyyy-MM-dd HH:mm:ss -> yyyyMMdd
if (t.size() < 10) return "";
std::string day = t.substr(0, 10);
day.erase(std::remove(day.begin(), day.end(), '-'), day.end());
return day;
}
static std::string resolve_recall_dir(const std::string& raw_dir,
const RecallFile& rf,
const ledger_monitor& lm)
{
std::string dir = raw_dir;
std::string day = make_day_from_datetime(rf.StartTime);
std::string seq = "1";
try {
seq = std::to_string(std::stoi(lm.logical_device_seq));
} catch (...) {
seq = "1";
}
// DESC 建议优先用你台账里的监测点描述/名称
// 如果 lm.monitor_name 是“监测点1”广东目录就是 /historyFile/监测点1
std::string desc = lm.monitor_name;
if (desc.empty()) {
desc = std::string("Line") + seq;
}
dir = replace_all_copy(dir, "%DAY%", day);
dir = replace_all_copy(dir, "%SEQ%", seq);
dir = replace_all_copy(dir, "%DESC%", desc);
return dir;
}
/////////////////////////////目录解析
// ====== 从文件名中提取“第二段下划线分隔字段”并转换为 epoch 秒 ======
static bool extract_pqdif_range_from_filename(const std::string& fname,
long long& fs,
long long& fe)
{
fs = -1;
fe = -1;
auto to_epoch = [](int y, int mon, int d, int h, int m, int s)->long long {
struct tm t {};
t.tm_year = y - 1900;
t.tm_mon = mon - 1;
t.tm_mday = d;
t.tm_hour = h;
t.tm_min = m;
t.tm_sec = s;
t.tm_isdst = -1;
return static_cast<long long>(mktime(&t));
};
// 广东/上海20230915000000_20230915010000.pqd 开始时间_结束时间
{
std::regex re(R"((\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})_(\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})\.pqd$)",
std::regex::icase);
std::smatch m;
if (std::regex_search(fname, m, re)) {
fs = to_epoch(std::stoi(m[1]), std::stoi(m[2]), std::stoi(m[3]),
std::stoi(m[4]), std::stoi(m[5]), std::stoi(m[6]));
fe = to_epoch(std::stoi(m[7]), std::stoi(m[8]), std::stoi(m[9]),
std::stoi(m[10]), std::stoi(m[11]), std::stoi(m[12]));
return fs >= 0 && fe >= fs;
}
}
// 云南20230915T010000.pqd文件时间按 1 小时窗口估算 时间点
{
std::regex re(R"((\d{4})(\d{2})(\d{2})T(\d{2})(\d{2})(\d{2})\.pqd$)",
std::regex::icase);
std::smatch m;
if (std::regex_search(fname, m, re)) {
fs = to_epoch(std::stoi(m[1]), std::stoi(m[2]), std::stoi(m[3]),
std::stoi(m[4]), std::stoi(m[5]), std::stoi(m[6]));
fe = fs + 3600;
return fs >= 0;
}
}
// 新疆B8B7FEE6D4792D30-20230915-003000-p.pqd按 1 小时窗口估算 设备ID-时间点-p.pqd
{
std::regex re(R"([^-]+-(\d{4})(\d{2})(\d{2})-(\d{2})(\d{2})(\d{2})-p\.pqd$)",
std::regex::icase);
std::smatch m;
if (std::regex_search(fname, m, re)) {
fs = to_epoch(std::stoi(m[1]), std::stoi(m[2]), std::stoi(m[3]),
std::stoi(m[4]), std::stoi(m[5]), std::stoi(m[6]));
fe = fs + 3600;
return fs >= 0;
}
}
// 默认ied_ld_20230915_0030_60.pqd最后的 60 是间隔分钟 设备名_逻辑设备_开始日期_开始时间_间隔分钟
{
std::regex re(R"(.*_(\d{4})(\d{2})(\d{2})_(\d{2})(\d{2})_(\d+)\.pqd$)",
std::regex::icase);
std::smatch m;
if (std::regex_search(fname, m, re)) {
fs = to_epoch(std::stoi(m[1]), std::stoi(m[2]), std::stoi(m[3]),
std::stoi(m[4]), std::stoi(m[5]), 0);
int intv_min = std::stoi(m[6]);
fe = fs + intv_min * 60;
return fs >= 0 && intv_min > 0;
}
}
return false;
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////////////////////////
// 从文件名中解析出 "监测点号_YYYYMMDD_HHMMSS_mmm";成功返回 true // 从文件名中解析出 "监测点号_YYYYMMDD_HHMMSS_mmm";成功返回 true
static bool make_target_key_from_filename(const std::string& fname, std::string& out_key) { static bool make_target_key_from_filename(const std::string& fname, std::string& out_key) {
@@ -4744,7 +4872,6 @@ static void dircache_clear_device(const std::string& dev_id)
} }
//////////////////////////////////////////////////////////////////////////////////////////////////////////////补招文件逻辑 //////////////////////////////////////////////////////////////////////////////////////////////////////////////补招文件逻辑
// ====== ★修改check_recall_stat —— 加入“两步法”状态机 ====== // ====== ★修改check_recall_stat —— 加入“两步法”状态机 ======
void check_recall_file() { void check_recall_file() {
@@ -4783,6 +4910,12 @@ void check_recall_file() {
}; };
// ★修改结束 // ★修改结束
auto RecallTypeName = [](const RecallFile& rf)->std::string {
if (rf.is_steady_file()) return "稳态文件";
if (rf.is_voltage_file()) return "波形文件";
return "文件";
};
std::vector<PendingUpload> pending_uploads; // ★修改:锁外执行上传与清理 std::vector<PendingUpload> pending_uploads; // ★修改:锁外执行上传与清理
@@ -4809,7 +4942,7 @@ void check_recall_file() {
// ★优先级结束 // ★优先级结束
for (auto& dev : terminal_devlist) { for (auto& dev : terminal_devlist) {
// 仅处理“正在补招/空闲”终端,与你原逻辑一致 // 仅处理“正在补招/空闲”终端
if (dev.busytype != static_cast<int>(DeviceState::READING_STATSFILE) && if (dev.busytype != static_cast<int>(DeviceState::READING_STATSFILE) &&
dev.busytype != static_cast<int>(DeviceState::IDLE)) { dev.busytype != static_cast<int>(DeviceState::IDLE)) {
continue; continue;
@@ -4834,7 +4967,6 @@ void check_recall_file() {
<< " size=" << lm.recall_list_static.size() << " size=" << lm.recall_list_static.size()
<< " first{guid=" << front.recall_guid << " first{guid=" << front.recall_guid
<< ", status=" << front.recall_status << ", status=" << front.recall_status
<< ", direct=" << (front.direct_mode ? 1 : 0)
<< ", phase=" << static_cast<int>(front.phase) << ", phase=" << static_cast<int>(front.phase)
<< ", cur_dir=" << front.cur_dir << ", cur_dir=" << front.cur_dir
<< ", cur_file=" << front.downloading_file << ", cur_file=" << front.downloading_file
@@ -4868,7 +5000,6 @@ void check_recall_file() {
<< " size=" << lm.recall_list_static.size() << " size=" << lm.recall_list_static.size()
<< " first{guid=" << nf.recall_guid << " first{guid=" << nf.recall_guid
<< ", status=" << nf.recall_status << ", status=" << nf.recall_status
<< ", direct=" << (nf.direct_mode ? 1 : 0)
<< ", phase=" << static_cast<int>(nf.phase) << ", phase=" << static_cast<int>(nf.phase)
<< ", cur_dir=" << nf.cur_dir << ", cur_dir=" << nf.cur_dir
<< ", cur_file=" << nf.downloading_file << ", cur_file=" << nf.downloading_file
@@ -4900,13 +5031,13 @@ void check_recall_file() {
//20251218添加记录 //20251218添加记录
std::string msg_fail; std::string msg_fail;
if (front.direct_mode) { if (front.is_voltage_file()) {
msg_fail = std::string("监测点:") + lm.monitor_name msg_fail = std::string("监测点:") + lm.monitor_name
+ " 补招波形文件失败,目标时标:" + " 补招" + RecallTypeName(front) + "失败,目标时标:"
+ front.target_filetimes; + front.target_filetimes;
} else { } else {
msg_fail = std::string("监测点:") + lm.monitor_name msg_fail = std::string("监测点:") + lm.monitor_name
+ " 补招波形文件失败,时间范围:" + " 补招" + RecallTypeName(front) + "失败,时间范围:"
+ front.StartTime + " ~ " + front.EndTime; + front.StartTime + " ~ " + front.EndTime;
} }
append_recall_record_line(dev.guid, lm.monitor_id, msg_fail); append_recall_record_line(dev.guid, lm.monitor_id, msg_fail);
@@ -4924,7 +5055,6 @@ void check_recall_file() {
<< " size=" << lm.recall_list_static.size() << " size=" << lm.recall_list_static.size()
<< " first{guid=" << nf.recall_guid << " first{guid=" << nf.recall_guid
<< ", status=" << nf.recall_status << ", status=" << nf.recall_status
<< ", direct=" << (nf.direct_mode ? 1 : 0)
<< ", phase=" << static_cast<int>(nf.phase) << ", phase=" << static_cast<int>(nf.phase)
<< ", cur_dir=" << nf.cur_dir << ", cur_dir=" << nf.cur_dir
<< ", cur_file=" << nf.downloading_file << ", cur_file=" << nf.downloading_file
@@ -5034,8 +5164,12 @@ void check_recall_file() {
front.file_success.clear(); front.file_success.clear();
// 立即发起第一个目录请求 // 立即发起第一个目录请求
if (front.cur_dir_index < static_cast<int>(front.dir_candidates.size())) { if (front.cur_dir_index < static_cast<int>(front.active_dirs().size())) {
front.cur_dir = front.dir_candidates[front.cur_dir_index];//从第一个目录开始 front.cur_dir = resolve_recall_dir(
front.active_dirs()[front.cur_dir_index],
front,
lm
);//从第一个目录开始
// ★新增:先查设备级目录缓存 // ★新增:先查设备级目录缓存
std::vector<tag_dir_info> cached; std::vector<tag_dir_info> cached;
@@ -5061,16 +5195,16 @@ void check_recall_file() {
front.recall_status = static_cast<int>(RecallStatus::FAILED); front.recall_status = static_cast<int>(RecallStatus::FAILED);
std::string msg_fail; std::string msg_fail;
if (front.direct_mode) { if (front.is_voltage_file()) {
msg_fail = std::string("监测点:") + lm.monitor_name msg_fail = std::string("监测点:") + lm.monitor_name
+ " 补招波形文件未找到,目标时标:" + " 补招" + RecallTypeName(front) + "未找到,目标时标:"
+ front.target_filetimes; + front.target_filetimes;
} else { } else {
msg_fail = std::string("监测点:") + lm.monitor_name msg_fail = std::string("监测点:") + lm.monitor_name
+ " 补招波形文件未找到,时间范围:" + " 补招" + RecallTypeName(front) + "未找到,时间范围:"
+ front.StartTime + " ~ " + front.EndTime; + front.StartTime + " ~ " + front.EndTime;
} }
append_recall_record_line(dev.guid, lm.monitor_id, msg_fail); append_recall_record_line(dev.guid, lm.monitor_id, msg_fail);
std::cout << "[check_recall_stat] no dir candidates, FAIL dev=" << dev.terminal_id std::cout << "[check_recall_stat] no dir candidates, FAIL dev=" << dev.terminal_id
<< " monitor=" << lm.monitor_id << std::endl; << " monitor=" << lm.monitor_id << std::endl;
@@ -5096,8 +5230,12 @@ void check_recall_file() {
front.required_files.clear(); front.required_files.clear();
front.file_success.clear(); front.file_success.clear();
if (front.cur_dir_index < static_cast<int>(front.dir_candidates.size())) { if (front.cur_dir_index < static_cast<int>(front.active_dirs().size())) {
front.cur_dir = front.dir_candidates[front.cur_dir_index]; front.cur_dir = resolve_recall_dir(
front.active_dirs()[front.cur_dir_index],
front,
lm
);
// ★新增:先查缓存 // ★新增:先查缓存
std::vector<tag_dir_info> cached; std::vector<tag_dir_info> cached;
@@ -5121,14 +5259,14 @@ void check_recall_file() {
front.recall_status = static_cast<int>(RecallStatus::FAILED); front.recall_status = static_cast<int>(RecallStatus::FAILED);
std::string msg_fail; std::string msg_fail;
if (front.direct_mode) { if (front.is_voltage_file()) {
msg_fail = std::string("监测点:") + lm.monitor_name msg_fail = std::string("监测点:") + lm.monitor_name
+ " 补招波形文件未找到,目标时标:" + " 补招" + RecallTypeName(front) + "未找到,目标时标:"
+ front.target_filetimes; + front.target_filetimes;;
} else { } else {
msg_fail = std::string("监测点:") + lm.monitor_name msg_fail = std::string("监测点:") + lm.monitor_name
+ " 补招波形文件未找到,时间范围:" + " 补招" + RecallTypeName(front) + "未找到,时间范围:"
+ front.StartTime + " ~ " + front.EndTime; + front.StartTime + " ~ " + front.EndTime;
} }
append_recall_record_line(dev.guid, lm.monitor_id, msg_fail); append_recall_record_line(dev.guid, lm.monitor_id, msg_fail);
@@ -5140,89 +5278,104 @@ void check_recall_file() {
// OK根据起止时间筛选文件 // OK根据起止时间筛选文件
{ {
/*//这部分用于稳态数据文件 long long beg = -1;
long long beg = parse_time_to_epoch(front.StartTime); long long end = -1;
long long end = parse_time_to_epoch(front.EndTime);
//错误判断:如果是直下文件的方式,会给默认的正确的时间范围 if (front.is_steady_file()) {
if (beg < 0 || end < 0 || beg > end) { beg = parse_time_to_epoch(front.StartTime);
front.recall_status = static_cast<int>(RecallStatus::FAILED); end = parse_time_to_epoch(front.EndTime);
std::cout << "[check_recall_stat] time parse ERR, FAIL dev=" << dev.terminal_id
<< " monitor=" << lm.monitor_id
<< " start=" << front.StartTime
<< " end=" << front.EndTime << std::endl;
break;//跳出循环,一个装置一次只能处理一个测点的一个补招记录;如果失败,下个循环会弹出
}*/
//装置消息返回后通知成功的处理: if (beg < 0 || end < 0 || beg > end) {
auto it = front.dir_files.find(front.cur_dir);//在map中查找当前目录名对应的目录下的文件名列表 front.recall_status = static_cast<int>(RecallStatus::FAILED);
std::cout << "[check_recall_file] steady time parse ERR, FAIL dev="
<< dev.terminal_id
<< " monitor=" << lm.monitor_id
<< " start=" << front.StartTime
<< " end=" << front.EndTime << std::endl;
front.phase = RecallPhase::IDLE;
front.downloading_file.clear();
front.download_result = ActionResult::PENDING;
break;
}
}
auto it = front.dir_files.find(front.cur_dir);
if (it != front.dir_files.end()) { if (it != front.dir_files.end()) {
if (front.direct_mode) { if (front.is_voltage_file()) { //暂态波形下载
// 目标时间戳(不含毫秒、形如 yyyyMMdd_HHmmss
const std::string& want_ts = front.target_filetimes; const std::string& want_ts = front.target_filetimes;
for (const auto& ent : it->second) { for (const auto& ent : it->second) {
// 打印目录下的所有条目 std::cout << "[check_recall_file] voltage dir file dev=" << dev.terminal_id
std::cout << "[check_recall_file] dir file dev=" << dev.terminal_id
<< " monitor=" << lm.monitor_id << " monitor=" << lm.monitor_id
<< " dir=" << front.cur_dir << " dir=" << front.cur_dir
<< " file=" << ent.name << " file=" << ent.name
<< " flag=" << ent.flag << " flag=" << ent.flag
<< std::endl; << std::endl;
if (ent.flag == 0) continue; // 只要文件,跳过目录 if (ent.flag == 0) continue;
size_t n = ::strnlen(ent.name, sizeof(ent.name)); size_t n = ::strnlen(ent.name, sizeof(ent.name));
std::string fname(ent.name, n); std::string fname(ent.name, n);
// 解析出 key = 监测点号_YYYYMMDD_HHMMSS注意确保你的 make_target_key_from_filename
// 已经改成“去掉毫秒”的版本;若还带毫秒,请先调整该函数)
std::string key; std::string key;
if (!make_target_key_from_filename(fname, key)) { if (!make_target_key_from_filename(fname, key)) {
std::cout << "[check_recall_file] dir file dev=" << dev.terminal_id
<< " monitor=" << lm.monitor_id
<< " dir=" << front.cur_dir
<< " file=" << fname
<< " key=" << key
<< " ERR: invalid filename format, skip"
<< std::endl;
continue; continue;
} }
// key 形如 MON_YYYYMMDD_HHMMSS目标是只按时间戳匹配 if (has_suffix(key, want_ts)) {
if (has_suffix(key, want_ts)) {
//打印放入的文件名
std::cout << "[check_recall_file] dir file dev=" << dev.terminal_id
<< " monitor=" << lm.monitor_id
<< " dir=" << front.cur_dir
<< " file=" << fname
<< " key=" << key
<< " MATCH, add to download queue"
<< std::endl;
front.download_queue.push_back(front.cur_dir + "/" + fname); front.download_queue.push_back(front.cur_dir + "/" + fname);
} }
} }
} else { //稳态文件 }
// ☆原有:按时间窗筛选 else if (front.is_steady_file()) { //稳态文件下载
long long beg = parse_time_to_epoch(front.StartTime);
long long end = parse_time_to_epoch(front.EndTime);
for (const auto& ent : it->second) { for (const auto& ent : it->second) {
if (ent.flag != 1) continue; // 只要文件 if (ent.flag != 1) continue;
// 文件名
size_t n = ::strnlen(ent.name, sizeof(ent.name)); size_t n = ::strnlen(ent.name, sizeof(ent.name));
std::string fname(ent.name, n); std::string fname(ent.name, n);
long long fs = -1;
long long fe = -1; long long fe = -1;
int seq = 0;
try { seq = std::stoi(lm.logical_device_seq); } catch (...) { seq = 0; } if (!extract_pqdif_range_from_filename(fname, fs, fe)) {
if (!extract_epoch_from_filename(fname, fe, seq)) continue; std::cout << "[check_recall_file] steady skip invalid pqdif filename dev="
if (fe >= beg && fe <= end) { << dev.terminal_id
front.download_queue.push_back(front.cur_dir + "/" + fname); << " monitor=" << lm.monitor_id
<< " dir=" << front.cur_dir
<< " file=" << fname << std::endl;
continue;
}
// 文件时间段和补招时间段有交集就下载
bool hit = false;
for (const auto& r : front.recall_ranges) {//每个文件的时间段和补招的多个时间段比对,只要有一个交集就算命中
long long rb = r.first;
long long re = r.second;
if (fs <= re && fe >= rb) {
hit = true;
break;
}
}
if (hit) {
std::string remote_file = front.cur_dir + "/" + fname;
if (front.required_files.insert(remote_file).second) {
front.download_queue.push_back(remote_file);
}
} }
} }
} }
else {
front.recall_status = static_cast<int>(RecallStatus::FAILED);
append_recall_record_line(dev.guid, lm.monitor_id,
std::string("监测点:") + lm.monitor_name + " 文件补招类型为空,执行失败");
break;
}
} }
} }
@@ -5230,8 +5383,12 @@ void check_recall_file() {
// 当前目录无匹配文件 -> 试下一个目录 // 当前目录无匹配文件 -> 试下一个目录
front.cur_dir_index++; front.cur_dir_index++;
front.list_result = ActionResult::PENDING; front.list_result = ActionResult::PENDING;
if (front.cur_dir_index < static_cast<int>(front.dir_candidates.size())) { if (front.cur_dir_index < static_cast<int>(front.active_dirs().size())) {
front.cur_dir = front.dir_candidates[front.cur_dir_index]; front.cur_dir = resolve_recall_dir(
front.active_dirs()[front.cur_dir_index],
front,
lm
);
// ★新增:先查缓存 // ★新增:先查缓存
std::vector<tag_dir_info> cached; std::vector<tag_dir_info> cached;
@@ -5255,13 +5412,13 @@ void check_recall_file() {
front.recall_status = static_cast<int>(RecallStatus::FAILED); front.recall_status = static_cast<int>(RecallStatus::FAILED);
std::string msg_fail; std::string msg_fail;
if (front.direct_mode) { if (front.is_voltage_file()) {
msg_fail = std::string("监测点:") + lm.monitor_name msg_fail = std::string("监测点:") + lm.monitor_name
+ " 补招波形文件未找到,目标时标:" + " 补招" + RecallTypeName(front) + "未找到,目标时标:"
+ front.target_filetimes; + front.target_filetimes;
} else { } else {
msg_fail = std::string("监测点:") + lm.monitor_name msg_fail = std::string("监测点:") + lm.monitor_name
+ " 补招波形文件未找到,时间范围:" + " 补招" + RecallTypeName(front) + "未找到,时间范围:"
+ front.StartTime + " ~ " + front.EndTime; + front.StartTime + " ~ " + front.EndTime;
} }
append_recall_record_line(dev.guid, lm.monitor_id, msg_fail); append_recall_record_line(dev.guid, lm.monitor_id, msg_fail);
@@ -5275,20 +5432,12 @@ void check_recall_file() {
} }
else { else {
// 进入下载阶段 // 进入下载阶段
// required_files 记录远端文件全集,用于和 file_success 对比
front.required_files.clear(); front.required_files.clear();
//for (const auto& p : front.download_queue) front.required_files.insert(p);
//转成本地保存路径 download/<dev.addr_str>/<fname> for (const auto& p : front.download_queue) {
front.required_files.clear(); if (!p.empty()) {
{ front.required_files.insert(p); // 保留远端路径
const std::string base_dir = std::string("download/") + sanitize(dev.addr_str);
for (const auto& p : front.download_queue) {
// p 形如 "<cur_dir>/<fname>" —— 提取纯文件名
std::string fname = sanitize(extract_filename1(p));
if (!fname.empty()) {
front.required_files.insert(base_dir + "/" + fname);
}
} }
} }
@@ -5297,9 +5446,12 @@ void check_recall_file() {
front.phase = RecallPhase::DOWNLOADING; front.phase = RecallPhase::DOWNLOADING;
front.download_result = ActionResult::PENDING; front.download_result = ActionResult::PENDING;
front.downloading_file.clear(); front.downloading_file.clear();
std::cout << "[check_recall_stat] enter DOWNLOADING dev=" << dev.terminal_id std::cout << "[check_recall_stat] enter DOWNLOADING dev=" << dev.terminal_id
<< " monitor=" << lm.monitor_id << " monitor=" << lm.monitor_id
<< " count=" << front.download_queue.size() << std::endl; << " count=" << front.download_queue.size()
<< " required_remote_count=" << front.required_files.size()
<< std::endl;
} }
} }
@@ -5320,6 +5472,23 @@ void check_recall_file() {
front.recall_status = static_cast<int>(RecallStatus::DONE);//两个文件都下好了标记为成功 front.recall_status = static_cast<int>(RecallStatus::DONE);//两个文件都下好了标记为成功
//稳态文件就不会走下面的逻辑
if (front.is_steady_file()) {
std::string msg_ok = std::string("监测点:") + lm.monitor_name
+ " 补招" + RecallTypeName(front) + "完成,文件数:"
+ std::to_string(front.file_success.size());
append_recall_record_line(dev.guid, lm.monitor_id, msg_ok);
std::cout << "[check_recall_file] STEADY DONE dev=" << dev.terminal_id
<< " monitor=" << lm.monitor_id
<< " ok=" << front.file_success.size()
<< " total=" << front.required_files.size()
<< std::endl;
break;
}
//更新事件 //更新事件
// ★修改开始替换“assign_qvvr_file_list + update_qvvr_file_download(有锁)” // ★修改开始替换“assign_qvvr_file_list + update_qvvr_file_download(有锁)”
// 组装完整路径列表 // 组装完整路径列表
@@ -5419,13 +5588,13 @@ void check_recall_file() {
//20251218添加记录 //20251218添加记录
std::string msg_fail; std::string msg_fail;
if (front.direct_mode) { if (front.is_voltage_file()) {
msg_fail = std::string("监测点:") + lm.monitor_name msg_fail = std::string("监测点:") + lm.monitor_name
+ " 补招波形文件下载失败,目标时标:" + " 补招" + RecallTypeName(front) + "下载失败,目标时标:"
+ front.target_filetimes; + front.target_filetimes;
} else { } else {
msg_fail = std::string("监测点:") + lm.monitor_name msg_fail = std::string("监测点:") + lm.monitor_name
+ " 补招波形文件下载失败,时间范围:" + " 补招" + RecallTypeName(front) + "下载失败,时间范围:"
+ front.StartTime + " ~ " + front.EndTime; + front.StartTime + " ~ " + front.EndTime;
} }
append_recall_record_line(dev.guid, lm.monitor_id, msg_fail); append_recall_record_line(dev.guid, lm.monitor_id, msg_fail);
@@ -5456,10 +5625,60 @@ void check_recall_file() {
} }
if (front.download_result == ActionResult::OK) { if (front.download_result == ActionResult::OK) {
if (front.is_steady_file()) {
const std::string base_dir = std::string("download/") + sanitize(dev.addr_str);
std::string seq = "1";
try {
int nseq = std::stoi(lm.logical_device_seq);
if (nseq > 0) {
seq = std::to_string(nseq);
} else {
std::cout << "[check_recall_file][WARN] invalid logical_device_seq, use default M1_"
<< " dev=" << dev.terminal_id
<< " monitor=" << lm.monitor_id
<< " logical_device_seq=" << lm.logical_device_seq
<< std::endl;
DIY_ERRORLOG_CODE(lm.monitor_id, 2, static_cast<int>(LogCode::LOG_CODE_RECALL), "无法获取监测点序号,使用默认值 M1_");
}
} catch (...) {
std::cout << "[check_recall_file][WARN] parse logical_device_seq failed, use default M1_"
<< " dev=" << dev.terminal_id
<< " monitor=" << lm.monitor_id
<< " logical_device_seq=" << lm.logical_device_seq
<< std::endl;
DIY_ERRORLOG_CODE(lm.monitor_id, 2, static_cast<int>(LogCode::LOG_CODE_RECALL), "无法获取监测点序号,使用默认值 M1_");
}
std::string fname = sanitize(extract_filename1(front.downloading_file));
if (!fname.empty()) {
std::string old_local = base_dir + "/" + fname;
std::string new_local = base_dir + "/M" + seq + "_" + fname;
if (old_local != new_local) {
if (std::rename(old_local.c_str(), new_local.c_str()) != 0) { //修改下载好的文件名为 M1_开头的格式方便后续处理如果失败不影响结果只是文件名不规范
std::cout << "[check_recall_file][WARN] rename steady local file failed"
<< " old=" << old_local
<< " new=" << new_local
<< " errno=" << errno
<< std::endl;
DIY_ERRORLOG_CODE(lm.monitor_id, 2, static_cast<int>(LogCode::LOG_CODE_RECALL), "重命名本地文件失败");
} else {
std::cout << "[check_recall_file] rename steady local file ok"
<< " old=" << old_local
<< " new=" << new_local
<< std::endl;
DIY_DEBUGLOG_CODE(lm.monitor_id, 2, static_cast<int>(LogCode::LOG_CODE_RECALL), "重命名本地文件成功");
}
}
}
}
front.file_success.insert(front.downloading_file); front.file_success.insert(front.downloading_file);
std::string msg_ok = std::string("监测点:") + lm.monitor_name std::string msg_ok = std::string("监测点:") + lm.monitor_name
+ " 补招波形文件:" + front.downloading_file + " 补招" + RecallTypeName(front) + ""
+ front.downloading_file
+ " 执行完成"; + " 执行完成";
append_recall_record_line(dev.guid, lm.monitor_id, msg_ok); append_recall_record_line(dev.guid, lm.monitor_id, msg_ok);
@@ -5477,7 +5696,8 @@ void check_recall_file() {
} else { } else {
std::string msg_fail = std::string("监测点:") + lm.monitor_name std::string msg_fail = std::string("监测点:") + lm.monitor_name
+ " 补招波形文件:" + front.downloading_file + " 补招" + RecallTypeName(front) + ""
+ front.downloading_file
+ " 执行失败"; + " 执行失败";
append_recall_record_line(dev.guid, lm.monitor_id, msg_fail); append_recall_record_line(dev.guid, lm.monitor_id, msg_fail);
@@ -5509,6 +5729,7 @@ void check_recall_file() {
RecallFile& front = lm.recall_list_static.front(); //取测点第一条记录 RecallFile& front = lm.recall_list_static.front(); //取测点第一条记录
if (front.recall_status == static_cast<int>(RecallStatus::NOT_STARTED)) { //补招未开始 if (front.recall_status == static_cast<int>(RecallStatus::NOT_STARTED)) { //补招未开始
// 标记为 RUNNING并设置终端忙状态 // 标记为 RUNNING并设置终端忙状态
front.recall_status = static_cast<int>(RecallStatus::RUNNING); //该补招记录刷新为补招中 front.recall_status = static_cast<int>(RecallStatus::RUNNING); //该补招记录刷新为补招中
dev.isbusy = 1; //装置由idle标记为忙 dev.isbusy = 1; //装置由idle标记为忙
@@ -5519,9 +5740,13 @@ void check_recall_file() {
// 初始化状态机并发出第一个目录请求 // 初始化状态机并发出第一个目录请求
front.reset_runtime(true);//保留直下文件信息 front.reset_runtime(true);//保留直下文件信息
front.phase = RecallPhase::LISTING; //正在请求并等待“目录文件名列表” front.phase = RecallPhase::LISTING; //正在请求并等待“目录文件名列表”
if (!front.dir_candidates.empty()) {//目录列表非空 if (!front.active_dirs().empty()) {//目录列表非空
front.cur_dir_index = 0; //正在尝试的目录下标 front.cur_dir_index = 0; //正在尝试的目录下标
front.cur_dir = front.dir_candidates[0]; //第一个目录 front.cur_dir = resolve_recall_dir(
front.active_dirs()[0],
front,
lm
); //第一个目录
front.list_result = ActionResult::PENDING; //目录状态:等待返回 front.list_result = ActionResult::PENDING; //目录状态:等待返回
// ★新增:先查缓存 // ★新增:先查缓存
@@ -5550,18 +5775,18 @@ void check_recall_file() {
front.recall_status = static_cast<int>(RecallStatus::FAILED); //目录列表空,失败 front.recall_status = static_cast<int>(RecallStatus::FAILED); //目录列表空,失败
std::string msg_fail; std::string msg_fail;
if (front.direct_mode) { if (front.is_voltage_file()) {
msg_fail = std::string("监测点:") + lm.monitor_name msg_fail = std::string("监测点:") + lm.monitor_name
+ " 补招波形文件未找到,目标时标:" + " 补招" + RecallTypeName(front) + "未找到,目标时标:"
+ front.target_filetimes; + front.target_filetimes;
} else { } else {
msg_fail = std::string("监测点:") + lm.monitor_name msg_fail = std::string("监测点:") + lm.monitor_name
+ " 补招波形文件未找到,时间范围:" + " 补招" + RecallTypeName(front) + "未找到,时间范围:"
+ front.StartTime + " ~ " + front.EndTime; + front.StartTime + " ~ " + front.EndTime;
} }
append_recall_record_line(dev.guid, lm.monitor_id, msg_fail); append_recall_record_line(dev.guid, lm.monitor_id, msg_fail);
std::cout << "[check_recall_stat] empty dir_candidates, FAIL dev=" << dev.terminal_id std::cout << "[check_recall_stat] empty active_dirs, FAIL dev=" << dev.terminal_id
<< " monitor=" << lm.monitor_id << std::endl; << " monitor=" << lm.monitor_id << std::endl;
} }
@@ -5700,10 +5925,10 @@ bool enqueue_direct_download(const std::string& dev_id,
rf.recall_status = static_cast<int>(RecallStatus::NOT_STARTED); rf.recall_status = static_cast<int>(RecallStatus::NOT_STARTED);
rf.StartTime = "1970-01-01 00:00:00"; // 仅占位,直下文件不会用到时间窗 rf.StartTime = "1970-01-01 00:00:00"; // 仅占位,直下文件不会用到时间窗
rf.EndTime = "1970-01-01 00:00:01"; rf.EndTime = "1970-01-01 00:00:01";
//rf.dir_candidates = dir_candidates; // 要检索的目录列表和默认的一致
rf.direct_mode = true; // ★关键:直下文件
rf.target_filetimes=filetime; // ▲单个文件时间入“列表” rf.target_filetimes=filetime; // ▲单个文件时间入“列表”
rf.file_type = RecallFileType::VOLTAGE_FILE; // 直下波形文件
lm_it->recall_list_static.push_back(std::move(rf)); lm_it->recall_list_static.push_back(std::move(rf));
// 若设备空闲,可直接置忙(可选,视你的流程而定) // 若设备空闲,可直接置忙(可选,视你的流程而定)

View File

@@ -68,6 +68,12 @@ enum class ActionResult {
}; };
// ====== ★修改:扩展 RecallFile支持“多目录 + 文件筛选 + 串行下载”的状态机 ====== // ====== ★修改:扩展 RecallFile支持“多目录 + 文件筛选 + 串行下载”的状态机 ======
enum class RecallFileType {
NONE = 0,
STEADY_FILE, // 稳态文件
VOLTAGE_FILE // 暂态直下文件
};
class RecallFile class RecallFile
{ {
public: public:
@@ -75,18 +81,31 @@ public:
int recall_status; // 补招状态 0-未补招 1-补招中 2-补招完成 3-补招失败 int recall_status; // 补招状态 0-未补招 1-补招中 2-补招完成 3-补招失败
std::string StartTime; // 数据补招起始时间yyyy-MM-dd HH:mm:ss std::string StartTime; // 数据补招起始时间yyyy-MM-dd HH:mm:ss
std::string EndTime; // 数据补招结束时间yyyy-MM-dd HH:mm:ss std::string EndTime; // 数据补招结束时间yyyy-MM-dd HH:mm:ss
// ===== 业务类型 =====
// STEADY稳态文件补招
// VOLTAGE暂态事件补招
std::string STEADY; // 补招历史统计数据标识 0-不补招1-补招 std::string STEADY; // 补招历史统计数据标识 0-不补招1-补招
std::string VOLTAGE; // 补招暂态事件标识 0-不补招1-补招 std::string VOLTAGE; // 补招暂态事件标识 0-不补招1-补招
// ===== 文件下载类型 =====
RecallFileType file_type = RecallFileType::NONE;
//暂态文件用 //暂态文件用
bool direct_mode = false; // 直下文件开关true 表示不按时间窗,仅按目标文件名 std::string target_filetimes; // 直下文件匹配时间点yyyyMMdd_HHmmss
std::string target_filetimes; // 直下文件匹配时间点yyyyMMdd_HHmmss仅 direct_mode=true 时有效
// ★新增:按“目录名 -> 文件名列表”的映射;由“其他线程”在目录请求成功后回填 // ★新增:按“目录名 -> 文件名列表”的映射;由“其他线程”在目录请求成功后回填
std::map<std::string, std::vector<tag_dir_info>> dir_files; std::map<std::string, std::vector<tag_dir_info>> dir_files;
// ★新增:候选目录(可扩展) std::vector<std::string> steady_dir_candidates{
std::vector<std::string> dir_candidates{ "/pqdif", // 默认版本 / 新疆
"/pqdif/%DAY%", // 上海:日期子目录
"/pqdif/%DESC%/%DAY%", // 上海pqdif_dir_cfg 或描述目录 + 日期
"/pqdif/Line%SEQ%", // 云南
"/historyFile/%DESC%" // 广东
};
std::vector<std::string> voltage_dir_candidates{
"/cf/COMTRADE", "/cf/COMTRADE",
"/bd0/COMTRADE", "/bd0/COMTRADE",
"/sd0/COMTRADE", "/sd0/COMTRADE",
@@ -102,6 +121,9 @@ public:
ActionResult list_result = ActionResult::PENDING; // 当前目录的列举结果 ActionResult list_result = ActionResult::PENDING; // 当前目录的列举结果
ActionResult download_result = ActionResult::PENDING; // 当前文件的下载结果 ActionResult download_result = ActionResult::PENDING; // 当前文件的下载结果
// 稳态文件用:一个 guid 下的多个补招时间段
std::vector<std::pair<long long, long long>> recall_ranges;
// ★新增:下载队列(已筛选出在时间窗内的文件,含完整路径) // ★新增:下载队列(已筛选出在时间窗内的文件,含完整路径)
std::list<std::string> download_queue; //一个时间可能对应多个文件 std::list<std::string> download_queue; //一个时间可能对应多个文件
std::string downloading_file; // 当前正在下载的文件(完整路径) std::string downloading_file; // 当前正在下载的文件(完整路径)
@@ -109,8 +131,22 @@ public:
std::unordered_set<std::string> required_files; // 本次应当下载成功的文件全集 std::unordered_set<std::string> required_files; // 本次应当下载成功的文件全集
std::unordered_set<std::string> file_success; // 已下载成功的文件集合 std::unordered_set<std::string> file_success; // 已下载成功的文件集合
// 是否稳态文件任务
bool is_steady_file() const {
return file_type == RecallFileType::STEADY_FILE;
}
// 是否暂态直下文件任务
bool is_voltage_file() const {
return file_type == RecallFileType::VOLTAGE_FILE;
}
const std::vector<std::string>& active_dirs() const {
return is_steady_file() ? steady_dir_candidates : voltage_dir_candidates;
}
// ★新增:一个便捷复位 // ★新增:一个便捷复位
void reset_runtime(bool keep_direct = false) void reset_runtime(bool keep_target_filetimes = false)
{ {
phase = RecallPhase::IDLE; phase = RecallPhase::IDLE;
cur_dir_index = 0; cur_dir_index = 0;
@@ -124,9 +160,10 @@ public:
required_files.clear(); required_files.clear();
file_success.clear(); file_success.clear();
// 注意file_type 不属于运行态,不能清除,因为它决定了本次补招的业务类型(稳态/暂态),而这个业务类型在整个补招过程中是固定的,不应当被运行态重置影响
// ★新增:按需保留直下文件开关和目标名 // ★新增:按需保留直下文件开关和目标名
if (!keep_direct) { if (!keep_target_filetimes) {
direct_mode = false;
target_filetimes.clear(); // ▲列表清空 target_filetimes.clear(); // ▲列表清空
} }
} }

View File

@@ -98,6 +98,8 @@ extern int TEST_PORT; //测试端口号
extern std::string FRONT_INST; extern std::string FRONT_INST;
extern bool PQD_FLAG;
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////// 功能函数 /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// 功能函数
template<typename T, typename... Args> template<typename T, typename... Args>
@@ -130,6 +132,11 @@ bool parse_param(int argc, char* argv[]) {
try { try {
g_front_seg_index = std::stoi(val.substr(0, pos)); g_front_seg_index = std::stoi(val.substr(0, pos));
g_front_seg_num = std::stoi(val.substr(pos + 1)); g_front_seg_num = std::stoi(val.substr(pos + 1));
if (g_front_seg_index == 0) {
PQD_FLAG = true;
}
} catch (...) { } catch (...) {
std::cerr << "Invalid -s format." << std::endl; std::cerr << "Invalid -s format." << std::endl;
} }
@@ -144,6 +151,11 @@ bool parse_param(int argc, char* argv[]) {
try { try {
g_front_seg_index = std::stoi(val.substr(0, pos)); g_front_seg_index = std::stoi(val.substr(0, pos));
g_front_seg_num = std::stoi(val.substr(pos + 1)); g_front_seg_num = std::stoi(val.substr(pos + 1));
if (g_front_seg_index == 0) {
PQD_FLAG = true;
}
} catch (...) { } catch (...) {
std::cerr << "Invalid -s format." << std::endl; std::cerr << "Invalid -s format." << std::endl;
} }
@@ -223,13 +235,13 @@ std::string get_parent_directory() {
//解析模板文件 //解析模板文件
//Set_xml_nodeinfo(); //Set_xml_nodeinfo();
StartFrontThread(); //开启主线程
StartMQConsumerThread(); //开启消费者线程
StartMQProducerThread(); //开启生产者线程 StartMQProducerThread(); //开启生产者线程
StartTimerThread(); //开启定时线程 if(!PQD_FLAG){
StartFrontThread(); //开启主线程
StartMQConsumerThread(); //开启消费者线程
StartTimerThread(); //开启定时线程
}
//启动worker 根据启动标志启动 //启动worker 根据启动标志启动
if(G_TEST_FLAG){ if(G_TEST_FLAG){

View File

@@ -758,12 +758,20 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) {
<< ", VOLTAGE=" << rf.VOLTAGE << ", VOLTAGE=" << rf.VOLTAGE
<< "\n"; << "\n";
// ★新增:直下模式与目标时间列表 // ★新增:文件补招类型与目标信息
os << "\r\x1B[K |-- direct_mode=" << (rf.direct_mode ? "true" : "false") os << "\r\x1B[K |-- file_type="
<< ", target_filetimes(" << rf.target_filetimes << ")\n"; << (rf.is_steady_file() ? "STEADY_FILE" :
{ rf.is_voltage_file() ? "VOLTAGE_FILE" : "NONE");
os << "\r\x1B[K |.. " << rf.target_filetimes << "\n";
if (rf.is_voltage_file()) {
os << ", target_filetimes=" << rf.target_filetimes;
} }
else if (rf.is_steady_file()) {
os << ", time_range=" << rf.StartTime
<< " ~ " << rf.EndTime;
}
os << "\n";
// ★新增:状态机运行态 // ★新增:状态机运行态
os << "\r\x1B[K |-- phase=" << phaseStr(rf.phase) os << "\r\x1B[K |-- phase=" << phaseStr(rf.phase)
@@ -773,15 +781,15 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) {
<< ", download_result=" << resultStr(rf.download_result) << "\n"; << ", download_result=" << resultStr(rf.download_result) << "\n";
// ★新增:候选目录 // ★新增:候选目录
os << "\r\x1B[K |-- dir_candidates(" << rf.dir_candidates.size() << ")\n"; os << "\r\x1B[K |-- active_dirs(" << rf.active_dirs().size() << ")\n";
{ {
size_t c = 0; size_t c = 0;
for (const auto& d : rf.dir_candidates) { for (const auto& d : rf.active_dirs()) {
if (c++ >= MAX_ITEMS) break; if (c++ >= MAX_ITEMS) break;
os << "\r\x1B[K |-- " << d << "\n"; os << "\r\x1B[K |-- " << d << "\n";
} }
if (rf.dir_candidates.size() > MAX_ITEMS) { if (rf.active_dirs().size() > MAX_ITEMS) {
os << "\r\x1B[K |.. (+" << (rf.dir_candidates.size() - MAX_ITEMS) << " more)\n"; os << "\r\x1B[K |.. (+" << (rf.active_dirs().size() - MAX_ITEMS) << " more)\n";
} }
} }

View File

@@ -28,6 +28,17 @@
#include "pqdif/include/pqdif_lg.h" #include "pqdif/include/pqdif_lg.h"
#include "pqdif_semantic_ids.h" #include "pqdif_semantic_ids.h"
#include "cloudfront/code/log4.h" //lnk20260526
extern void enqueue_stat_pq(const std::string& max_base64Str,
const std::string& min_base64Str,
const std::string& avg_base64Str,
const std::string& cp95_base64Str,
time_t data_time,
const std::string& mac,
short cid);
extern std::string extract_filename1(const std::string& path);
namespace fs = std::experimental::filesystem; namespace fs = std::experimental::filesystem;
namespace { namespace {
@@ -7877,6 +7888,96 @@ void ClearReadyPqdifStatBase64Queue()
g_pqdif_stat_base64_ready_queue.clear(); g_pqdif_stat_base64_ready_queue.clear();
} }
static bool GetBase64ByKind(const PqdifStatBase64TimePointPacket& tp, //从序列中获取指定 kind 的 Base64 内容
StatValueKind kind,
std::string& out)
{
for (const auto& r : tp.records) {
if (r.value_kind == kind) {
out = r.base64_payload;
return !out.empty();
}
}
return false;
}
static bool extract_monitor_seq_from_local_pqdif_path(const std::string& path,
short& point_name)
{
point_name = 0;
std::cout << "[extract_monitor_seq] begin path="
<< path << std::endl;
// 取纯文件名,例如:
// download/192.168.1.10/M1_xxx.pqd
// -> M1_xxx.pqd
std::string fname = extract_filename1(path);
std::cout << "[extract_monitor_seq] filename="
<< fname << std::endl;
if (fname.size() < 3) {
std::cout << "[extract_monitor_seq] filename too short"
<< std::endl;
return false;
}
if (fname[0] != 'M' && fname[0] != 'm') {
std::cout << "[extract_monitor_seq] filename not start with M/m"
<< std::endl;
return false;
}
size_t pos = fname.find('_');
std::cout << "[extract_monitor_seq] underscore pos="
<< pos << std::endl;
if (pos == std::string::npos || pos <= 1) {
std::cout << "[extract_monitor_seq] invalid underscore position"
<< std::endl;
return false;
}
// M1_xxx -> 1
std::string seq_str = fname.substr(1, pos - 1);
std::cout << "[extract_monitor_seq] seq_str="
<< seq_str << std::endl;
for (char c : seq_str) {
if (!std::isdigit(static_cast<unsigned char>(c))) {
std::cout << "[extract_monitor_seq] non-digit char="
<< c << std::endl;
return false;
}
}
try {
point_name = static_cast<short>(std::stoi(seq_str));
std::cout << "[extract_monitor_seq] success point_name="
<< point_name << std::endl;
return point_name > 0;
}
catch (const std::exception& e) {
std::cout << "[extract_monitor_seq] exception="
<< e.what() << std::endl;
point_name = 0;
return false;
}
catch (...) {
std::cout << "[extract_monitor_seq] unknown exception"
<< std::endl;
point_name = 0;
return false;
}
}
void RunPqdifScanLoop() void RunPqdifScanLoop()
{ {
std::cout << "[PQDIF] scan loop started, root=" << kPqdRootDir std::cout << "[PQDIF] scan loop started, root=" << kPqdRootDir
@@ -7910,6 +8011,54 @@ void RunPqdifScanLoop()
if (PopReadyPqdifStatBase64FileBatch(batch)) { if (PopReadyPqdifStatBase64FileBatch(batch)) {
// batch 就是一个 PQDIF 文件完整的 Base64 组装结果 // batch 就是一个 PQDIF 文件完整的 Base64 组装结果
// 在此处处理上送逻辑 // 在此处处理上送逻辑
const std::string& mac = batch.mac;
short point_name = 0;
if (!extract_monitor_seq_from_local_pqdif_path(batch.pqdif_file_path, point_name)) {
std::cout << "[PQDIF_UPLOAD] failed to extract monitor seq from file="
<< batch.pqdif_file_path << std::endl;
continue;
}
for (const auto& tp : batch.time_points) {
std::string max_base64;
std::string min_base64;
std::string avg_base64;
std::string p95_base64;
bool has_max = GetBase64ByKind(tp, StatValueKind::Max, max_base64);
bool has_min = GetBase64ByKind(tp, StatValueKind::Min, min_base64);
bool has_avg = GetBase64ByKind(tp, StatValueKind::Avg, avg_base64);
bool has_p95 = GetBase64ByKind(tp, StatValueKind::P95, p95_base64);
if (!has_max || !has_min || !has_avg || !has_p95) {
std::cout << "[PQDIF_UPLOAD] skip incomplete time point, file="
<< batch.pqdif_file_path
<< " time=" << tp.timestamp_text
<< " has_max=" << has_max
<< " has_min=" << has_min
<< " has_avg=" << has_avg
<< " has_p95=" << has_p95
<< std::endl;
continue;
}
enqueue_stat_pq(max_base64,
min_base64,
avg_base64,
p95_base64,
tp.timestamp,
mac,
point_name);
std::cout << "[PQDIF_UPLOAD] enqueue_stat_pq ok, file="
<< batch.pqdif_file_path
<< " time=" << tp.timestamp_text
<< " mac=" << mac
<< " point=" << point_name
<< std::endl;
}
} }
} }
catch (const std::exception& ex) catch (const std::exception& ex)