Compare commits
6 Commits
2161629fe0
...
测试2
| Author | SHA1 | Date | |
|---|---|---|---|
| 3191422869 | |||
| a91672a994 | |||
| e64d2e2318 | |||
| 671fc6702e | |||
| 626aac1fce | |||
| f925356d63 |
BIN
LFtid1056.rar
BIN
LFtid1056.rar
Binary file not shown.
@@ -22,6 +22,8 @@
|
|||||||
#include <fnmatch.h>
|
#include <fnmatch.h>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <unordered_set>
|
#include <unordered_set>
|
||||||
|
#include <regex>
|
||||||
|
#include <algorithm>
|
||||||
|
|
||||||
/////////////////////////////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
@@ -657,6 +659,9 @@ void init_config() {
|
|||||||
MULTIPLE_NODE_FLAG = 0;
|
MULTIPLE_NODE_FLAG = 0;
|
||||||
std::cout << "this is single process" << std::endl;
|
std::cout << "this is single process" << std::endl;
|
||||||
}
|
}
|
||||||
|
else if(g_front_seg_num > 0 && g_front_seg_index == 0){
|
||||||
|
std::cout << "this is pqdif process" << std::endl;
|
||||||
|
}
|
||||||
else{
|
else{
|
||||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_CONFIG,"进程号参数异常,当前进程退出");
|
DIY_ERRORLOG_CODE("process",0,LOG_CODE_CONFIG,"进程号参数异常,当前进程退出");
|
||||||
exit(-1039);
|
exit(-1039);
|
||||||
@@ -1406,7 +1411,7 @@ int recall_json_handle_from_mq(const std::string& body)
|
|||||||
// 根据 monitorId 和提取的数字初始化补招记录
|
// 根据 monitorId 和提取的数字初始化补招记录
|
||||||
init_recall_record_file(guid, terminalId, monId, "", "");
|
init_recall_record_file(guid, terminalId, monId, "", "");
|
||||||
|
|
||||||
//根据时间戳单独补招事件
|
//根据时间戳单独补招事件,暂不使用
|
||||||
if(0)// ★新增(dt==2 同步生成“按小时”的事件补招到 recall_list,与 dt==1 逻辑一致)——开始
|
if(0)// ★新增(dt==2 同步生成“按小时”的事件补招到 recall_list,与 dt==1 逻辑一致)——开始
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock2(ledgermtx); // 复用与 dt==1 相同的加锁粒度
|
std::lock_guard<std::mutex> lock2(ledgermtx); // 复用与 dt==1 相同的加锁粒度
|
||||||
@@ -1490,34 +1495,43 @@ int recall_json_handle_from_mq(const std::string& body)
|
|||||||
}
|
}
|
||||||
if (!lm) { std::cout << "monitorId未在terminal内找到: " << monId << " @ " << terminalId << std::endl; continue; }
|
if (!lm) { std::cout << "monitorId未在terminal内找到: " << monId << " @ " << terminalId << std::endl; continue; }
|
||||||
|
|
||||||
|
// 稳态文件一个 guid + monitor 只生成一条 RecallFile
|
||||||
|
RecallFile rm_all;
|
||||||
|
bool has_steady_range = false;
|
||||||
|
|
||||||
|
if (stat == 1) {
|
||||||
|
rm_all.recall_guid = guid;
|
||||||
|
rm_all.recall_status = 0;
|
||||||
|
rm_all.STEADY = std::to_string(stat);
|
||||||
|
rm_all.VOLTAGE = std::to_string(voltage);
|
||||||
|
rm_all.file_type = RecallFileType::STEADY_FILE;
|
||||||
|
}
|
||||||
|
|
||||||
for (const auto& ti : rec["timeInterval"]) {
|
for (const auto& ti : rec["timeInterval"]) {
|
||||||
if (!ti.is_string()) continue;
|
if (!ti.is_string()) continue;
|
||||||
|
|
||||||
std::string s = ti.get<std::string>();
|
std::string s = ti.get<std::string>();
|
||||||
|
|
||||||
std::string::size_type pos = s.find('~');
|
std::string::size_type pos = s.find('~');
|
||||||
if (pos == std::string::npos) { std::cout << "timeInterval格式错误: " << s << std::endl; continue; }
|
if (pos == std::string::npos) { std::cout << "timeInterval格式错误: " << s << std::endl; continue; }
|
||||||
|
|
||||||
std::string start = s.substr(0, pos);
|
std::string start = s.substr(0, pos);
|
||||||
std::string end = s.substr(pos + 1);
|
std::string end = s.substr(pos + 1);
|
||||||
|
|
||||||
RecallFile rm_all;
|
|
||||||
rm_all.recall_guid = guid;
|
|
||||||
rm_all.recall_status = 0;
|
|
||||||
rm_all.StartTime = start;
|
|
||||||
rm_all.EndTime = end;
|
|
||||||
rm_all.STEADY = std::to_string(stat);
|
|
||||||
rm_all.VOLTAGE = std::to_string(voltage);
|
|
||||||
|
|
||||||
//lnk 20251027xuyang request:生成文件记录单个测点单个时间段的补招记录文件,补招结束后使用这个文件信息来响应
|
//lnk 20251027xuyang request:生成文件记录单个测点单个时间段的补招记录文件,补招结束后使用这个文件信息来响应
|
||||||
init_recall_record_file(guid, terminalId, monId, start, end);
|
init_recall_record_file(guid, terminalId, monId, start, end);
|
||||||
|
|
||||||
if (voltage == 1) {
|
if (voltage == 1) {
|
||||||
std::vector<RecallInfo> recallinfo_list_hour;
|
std::vector<RecallInfo> recallinfo_list_hour;
|
||||||
Get_Recall_Time_Char(start, end, recallinfo_list_hour);
|
Get_Recall_Time_Char(start, end, recallinfo_list_hour); //暂态将整个时间段划分为多个一小时的时间段
|
||||||
for (size_t i = 0; i < recallinfo_list_hour.size(); ++i) {
|
for (size_t i = 0; i < recallinfo_list_hour.size(); ++i) {
|
||||||
const RecallInfo& info = recallinfo_list_hour[i];
|
const RecallInfo& info = recallinfo_list_hour[i];
|
||||||
RecallMonitor rm;
|
RecallMonitor rm;
|
||||||
rm.recall_guid = guid;
|
rm.recall_guid = guid;
|
||||||
rm.recall_status = 0;
|
rm.recall_status = 0;
|
||||||
rm.StartTime = epoch_to_datetime_str(info.starttime);
|
rm.StartTime = epoch_to_datetime_str(info.starttime); //每个记录是一个小时的时间段,张文暂态事件接口是一小时一条
|
||||||
rm.EndTime = epoch_to_datetime_str(info.endtime);
|
rm.EndTime = epoch_to_datetime_str(info.endtime);
|
||||||
rm.STEADY = std::to_string(stat);
|
rm.STEADY = std::to_string(stat);
|
||||||
rm.VOLTAGE = std::to_string(voltage);
|
rm.VOLTAGE = std::to_string(voltage);
|
||||||
@@ -1525,10 +1539,62 @@ int recall_json_handle_from_mq(const std::string& body)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (stat == 1) {
|
if (stat == 1) {
|
||||||
lm->recall_list_static.push_back(rm_all);
|
long long beg = parse_time_to_epoch(start); //转为整数时间
|
||||||
|
long long ed = parse_time_to_epoch(end);
|
||||||
|
|
||||||
|
if (beg < 0 || ed < 0 || beg > ed) {
|
||||||
|
std::cout << "[recall_file] invalid steady time range guid="
|
||||||
|
<< guid
|
||||||
|
<< " terminal=" << terminalId
|
||||||
|
<< " monitor=" << monId
|
||||||
|
<< " range=" << start << "~" << end
|
||||||
|
<< std::endl;
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rm_all.recall_ranges.push_back(std::make_pair(beg, ed));//每个时间段都记录到补招稳态时间列表里
|
||||||
|
|
||||||
|
if (!has_steady_range) {
|
||||||
|
rm_all.StartTime = start; //记录多个时间段的第一条开始和结束
|
||||||
|
rm_all.EndTime = end;
|
||||||
|
has_steady_range = true;
|
||||||
|
} else {
|
||||||
|
long long old_beg = parse_time_to_epoch(rm_all.StartTime);
|
||||||
|
long long old_end = parse_time_to_epoch(rm_all.EndTime);
|
||||||
|
|
||||||
|
if (old_beg < 0 || beg < old_beg) {
|
||||||
|
rm_all.StartTime = start; //对比多个时间段更新最早和最晚
|
||||||
|
}
|
||||||
|
if (old_end < 0 || ed > old_end) {
|
||||||
|
rm_all.EndTime = end;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::cout << "[recall_file] add steady range guid="
|
||||||
|
<< guid
|
||||||
|
<< " terminal=" << terminalId
|
||||||
|
<< " monitor=" << monId
|
||||||
|
<< " range=" << start << "~" << end
|
||||||
|
<< " total_ranges=" << rm_all.recall_ranges.size()
|
||||||
|
<< std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
if (stat == 0 && voltage == 0) return 10003;
|
if (stat == 0 && voltage == 0) return 10003;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 所有 timeInterval 遍历完后,稳态 RecallFile 只入队一次
|
||||||
|
if (stat == 1 && has_steady_range) { //一次补招指令的处理结束
|
||||||
|
lm->recall_list_static.push_back(rm_all);//记录稳态补招
|
||||||
|
|
||||||
|
std::cout << "[recall_file] push steady recall once guid="
|
||||||
|
<< guid
|
||||||
|
<< " terminal=" << terminalId
|
||||||
|
<< " monitor=" << monId
|
||||||
|
<< " ranges=" << rm_all.recall_ranges.size()
|
||||||
|
<< " StartTime=" << rm_all.StartTime
|
||||||
|
<< " EndTime=" << rm_all.EndTime
|
||||||
|
<< std::endl;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// 未知 dataType,忽略
|
// 未知 dataType,忽略
|
||||||
@@ -4592,71 +4658,153 @@ void check_recall_event() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////////////////////////////////////处理补招逻辑统计数据
|
//////////////////////////////////////////////////////////////////////////////////////////////////////////////处理补招逻辑统计数据
|
||||||
// ====== 从文件名中提取“第二段下划线分隔字段”并转换为 epoch 秒 ======
|
static std::string replace_all_copy(std::string s,
|
||||||
static bool extract_epoch_from_filename(const std::string& name,
|
const std::string& from,
|
||||||
long long& out_epoch,
|
const std::string& to)
|
||||||
int logical_device_seq)
|
|
||||||
{
|
{
|
||||||
// 拆分
|
size_t pos = 0;
|
||||||
std::vector<std::string> parts;
|
while ((pos = s.find(from, pos)) != std::string::npos) {
|
||||||
parts.reserve(8);
|
s.replace(pos, from.length(), to);
|
||||||
size_t start = 0, pos;
|
pos += to.length();
|
||||||
while ((pos = name.find('_', start)) != std::string::npos) {
|
|
||||||
parts.emplace_back(name.substr(start, pos - start));
|
|
||||||
start = pos + 1;
|
|
||||||
}
|
}
|
||||||
parts.emplace_back(name.substr(start)); // 最后一段(含扩展名)
|
return s;
|
||||||
|
|
||||||
if (parts.size() < 4) return false;
|
|
||||||
|
|
||||||
// 第二段序号是倒数第4段
|
|
||||||
const std::string& seq_str = parts[parts.size() - 4];
|
|
||||||
// 允许前导 0:把字符串转 int 后比较
|
|
||||||
for (char c : seq_str) if (!std::isdigit(static_cast<unsigned char>(c))) return false;
|
|
||||||
int seq_val = 0;
|
|
||||||
try {
|
|
||||||
seq_val = std::stoi(seq_str);
|
|
||||||
} catch (...) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (seq_val != logical_device_seq) return false;
|
|
||||||
|
|
||||||
// 其余与上面相同
|
|
||||||
const std::string& date_str = parts[parts.size() - 3];
|
|
||||||
const std::string& time_str = parts[parts.size() - 2];
|
|
||||||
std::string ms_str = parts.back();
|
|
||||||
size_t dot = ms_str.find('.');
|
|
||||||
if (dot != std::string::npos) {
|
|
||||||
ms_str.erase(dot);
|
|
||||||
}
|
|
||||||
if (date_str.size() != 8 || time_str.size() != 6) return false;
|
|
||||||
for (char c : date_str) if (!std::isdigit(static_cast<unsigned char>(c))) return false;
|
|
||||||
for (char c : time_str) if (!std::isdigit(static_cast<unsigned char>(c))) return false;
|
|
||||||
for (char c : ms_str) if (!std::isdigit(static_cast<unsigned char>(c))) return false;
|
|
||||||
|
|
||||||
int year = std::stoi(date_str.substr(0, 4));
|
|
||||||
int month = std::stoi(date_str.substr(4, 2));
|
|
||||||
int day = std::stoi(date_str.substr(6, 2));
|
|
||||||
int hour = std::stoi(time_str.substr(0, 2));
|
|
||||||
int min = std::stoi(time_str.substr(2, 2));
|
|
||||||
int sec = std::stoi(time_str.substr(4, 2));
|
|
||||||
// int msec = std::stoi(ms_str);
|
|
||||||
|
|
||||||
std::tm tm{}; tm.tm_isdst = -1;
|
|
||||||
tm.tm_year = year - 1900;
|
|
||||||
tm.tm_mon = month - 1;
|
|
||||||
tm.tm_mday = day;
|
|
||||||
tm.tm_hour = hour;
|
|
||||||
tm.tm_min = min;
|
|
||||||
tm.tm_sec = sec;
|
|
||||||
|
|
||||||
time_t t = timegm(&tm);
|
|
||||||
if (t < 0) return false;
|
|
||||||
|
|
||||||
out_epoch = static_cast<long long>(t); // 秒级
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static std::string make_day_from_datetime(const std::string& t)
|
||||||
|
{
|
||||||
|
// yyyy-MM-dd HH:mm:ss -> yyyyMMdd
|
||||||
|
if (t.size() < 10) return "";
|
||||||
|
std::string day = t.substr(0, 10);
|
||||||
|
day.erase(std::remove(day.begin(), day.end(), '-'), day.end());
|
||||||
|
return day;
|
||||||
|
}
|
||||||
|
|
||||||
|
static std::string resolve_recall_dir(const std::string& raw_dir,
|
||||||
|
const RecallFile& rf,
|
||||||
|
const ledger_monitor& lm)
|
||||||
|
{
|
||||||
|
std::string dir = raw_dir;
|
||||||
|
|
||||||
|
std::string day = make_day_from_datetime(rf.StartTime);
|
||||||
|
|
||||||
|
std::string seq = "1";
|
||||||
|
try {
|
||||||
|
seq = std::to_string(std::stoi(lm.logical_device_seq));
|
||||||
|
} catch (...) {
|
||||||
|
seq = "1";
|
||||||
|
}
|
||||||
|
|
||||||
|
// DESC 建议优先用你台账里的监测点描述/名称
|
||||||
|
// 如果 lm.monitor_name 是“监测点1”,广东目录就是 /historyFile/监测点1
|
||||||
|
std::string desc = lm.monitor_name;
|
||||||
|
if (desc.empty()) {
|
||||||
|
desc = std::string("Line") + seq;
|
||||||
|
}
|
||||||
|
|
||||||
|
dir = replace_all_copy(dir, "%DAY%", day);
|
||||||
|
dir = replace_all_copy(dir, "%SEQ%", seq);
|
||||||
|
dir = replace_all_copy(dir, "%DESC%", desc);
|
||||||
|
|
||||||
|
return dir;
|
||||||
|
}
|
||||||
|
/////////////////////////////目录解析
|
||||||
|
// ====== 从文件名中提取“第二段下划线分隔字段”并转换为 epoch 秒 ======
|
||||||
|
static bool extract_pqdif_range_from_filename(const std::string& fname,
|
||||||
|
long long& fs,
|
||||||
|
long long& fe,
|
||||||
|
int& file_seq,
|
||||||
|
std::string& file_monitor_id)
|
||||||
|
{
|
||||||
|
fs = -1;
|
||||||
|
fe = -1;
|
||||||
|
file_seq = -1;
|
||||||
|
file_monitor_id.clear();
|
||||||
|
|
||||||
|
auto to_epoch = [](int y, int mon, int d, int h, int m, int s)->long long {
|
||||||
|
struct tm t {};
|
||||||
|
t.tm_year = y - 1900;
|
||||||
|
t.tm_mon = mon - 1;
|
||||||
|
t.tm_mday = d;
|
||||||
|
t.tm_hour = h;
|
||||||
|
t.tm_min = m;
|
||||||
|
t.tm_sec = s;
|
||||||
|
t.tm_isdst = -1;
|
||||||
|
return static_cast<long long>(mktime(&t));
|
||||||
|
};
|
||||||
|
|
||||||
|
// 广东/上海:20230915000000_20230915010000.pqd 开始时间_结束时间
|
||||||
|
{
|
||||||
|
std::regex re(R"((\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})_(\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})\.pqd$)",
|
||||||
|
std::regex::icase);
|
||||||
|
std::smatch m;
|
||||||
|
if (std::regex_search(fname, m, re)) {
|
||||||
|
fs = to_epoch(std::stoi(m[1]), std::stoi(m[2]), std::stoi(m[3]),
|
||||||
|
std::stoi(m[4]), std::stoi(m[5]), std::stoi(m[6]));
|
||||||
|
fe = to_epoch(std::stoi(m[7]), std::stoi(m[8]), std::stoi(m[9]),
|
||||||
|
std::stoi(m[10]), std::stoi(m[11]), std::stoi(m[12]));
|
||||||
|
return fs >= 0 && fe >= fs;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 云南:20230915T010000.pqd,文件时间按 1 小时窗口估算 时间点
|
||||||
|
{
|
||||||
|
std::regex re(R"((\d{4})(\d{2})(\d{2})T(\d{2})(\d{2})(\d{2})\.pqd$)",
|
||||||
|
std::regex::icase);
|
||||||
|
std::smatch m;
|
||||||
|
if (std::regex_search(fname, m, re)) {
|
||||||
|
fs = to_epoch(std::stoi(m[1]), std::stoi(m[2]), std::stoi(m[3]),
|
||||||
|
std::stoi(m[4]), std::stoi(m[5]), std::stoi(m[6]));
|
||||||
|
fe = fs + 3600;
|
||||||
|
return fs >= 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 新疆:B8B7FEE6D4792D30-20230915-003000-p.pqd
|
||||||
|
{
|
||||||
|
std::regex re(
|
||||||
|
R"(([^-]+)-(\d{4})(\d{2})(\d{2})-(\d{2})(\d{2})(\d{2})-p\.pqd$)",
|
||||||
|
std::regex::icase);
|
||||||
|
|
||||||
|
std::smatch m;
|
||||||
|
if (std::regex_search(fname, m, re)) {
|
||||||
|
file_monitor_id = m[1]; // B8B7FEE6D4792D30
|
||||||
|
|
||||||
|
fs = to_epoch(std::stoi(m[2]), std::stoi(m[3]), std::stoi(m[4]),
|
||||||
|
std::stoi(m[5]), std::stoi(m[6]), std::stoi(m[7]));
|
||||||
|
fe = fs + 3600;
|
||||||
|
return fs >= 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 默认:ied_ld_20230915_0030_60.pqd
|
||||||
|
// 例如:PQMonitor_PQM1_20260601_0000_24.pqd
|
||||||
|
// 第二段 PQM1 最后的数字作为 seq
|
||||||
|
{
|
||||||
|
std::regex re(R"([^_]+_([^_]+)_(\d{4})(\d{2})(\d{2})_(\d{2})(\d{2})_(\d+)\.pqd$)",
|
||||||
|
std::regex::icase);
|
||||||
|
std::smatch m;
|
||||||
|
if (std::regex_search(fname, m, re)) {
|
||||||
|
|
||||||
|
std::string ld = m[1]; // 例如 PQM1、PQM2
|
||||||
|
|
||||||
|
std::regex seq_re(R"((\d+)$)");
|
||||||
|
std::smatch seq_m;
|
||||||
|
if (std::regex_search(ld, seq_m, seq_re)) {
|
||||||
|
file_seq = std::stoi(seq_m[1]);
|
||||||
|
}
|
||||||
|
|
||||||
|
fs = to_epoch(std::stoi(m[2]), std::stoi(m[3]), std::stoi(m[4]),
|
||||||
|
std::stoi(m[5]), std::stoi(m[6]), 0);
|
||||||
|
|
||||||
|
int intv_min = std::stoi(m[7]);
|
||||||
|
fe = fs + intv_min * 60;
|
||||||
|
|
||||||
|
return fs >= 0 && intv_min > 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
// 从文件名中解析出 "监测点号_YYYYMMDD_HHMMSS_mmm";成功返回 true
|
// 从文件名中解析出 "监测点号_YYYYMMDD_HHMMSS_mmm";成功返回 true
|
||||||
static bool make_target_key_from_filename(const std::string& fname, std::string& out_key) {
|
static bool make_target_key_from_filename(const std::string& fname, std::string& out_key) {
|
||||||
@@ -4743,14 +4891,41 @@ static void dircache_clear_device(const std::string& dev_id)
|
|||||||
g_device_dir_cache.erase(dev_id);
|
g_device_dir_cache.erase(dev_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////////////////////////////////////补招文件逻辑
|
//复制文件
|
||||||
|
bool move_file_crossfs(const std::string& src, const std::string& dst)
|
||||||
|
{
|
||||||
|
if (std::rename(src.c_str(), dst.c_str()) == 0) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (errno != EXDEV) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::ifstream in(src.c_str(), std::ios::binary);
|
||||||
|
std::ofstream out(dst.c_str(), std::ios::binary | std::ios::trunc);
|
||||||
|
if (!in || !out) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
out << in.rdbuf();
|
||||||
|
out.close();
|
||||||
|
in.close();
|
||||||
|
|
||||||
|
if (std::remove(src.c_str()) != 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
//////////////////////////////////////////////////////////////////////////////////////////////////////////////补招文件逻辑
|
||||||
// ====== ★修改:check_recall_stat —— 加入“两步法”状态机 ======
|
// ====== ★修改:check_recall_stat —— 加入“两步法”状态机 ======
|
||||||
void check_recall_file() {
|
void check_recall_file() {
|
||||||
|
|
||||||
std::vector<RecallTask> tasks; // 本轮要发送的“每终端一条”(目录请求 或 文件下载 请求)
|
std::vector<RecallTask> tasks; // 本轮要发送的“每终端一条”(目录请求 或 文件下载 请求)
|
||||||
|
|
||||||
// ★修改开始:新增“待上传动作”容器与两个小工具(局部作用域,函数私有)
|
// “待上传动作”容器与两个小工具(局部作用域,函数私有)
|
||||||
struct PendingUpload {
|
struct PendingUpload {
|
||||||
std::string terminal_id;
|
std::string terminal_id;
|
||||||
unsigned short logical_seq = 0;
|
unsigned short logical_seq = 0;
|
||||||
@@ -4783,6 +4958,12 @@ void check_recall_file() {
|
|||||||
};
|
};
|
||||||
// ★修改结束
|
// ★修改结束
|
||||||
|
|
||||||
|
auto RecallTypeName = [](const RecallFile& rf)->std::string {
|
||||||
|
if (rf.is_steady_file()) return "稳态文件";
|
||||||
|
if (rf.is_voltage_file()) return "波形文件";
|
||||||
|
return "文件";
|
||||||
|
};
|
||||||
|
|
||||||
std::vector<PendingUpload> pending_uploads; // ★修改:锁外执行上传与清理
|
std::vector<PendingUpload> pending_uploads; // ★修改:锁外执行上传与清理
|
||||||
|
|
||||||
|
|
||||||
@@ -4809,7 +4990,7 @@ void check_recall_file() {
|
|||||||
// ★优先级结束
|
// ★优先级结束
|
||||||
|
|
||||||
for (auto& dev : terminal_devlist) {
|
for (auto& dev : terminal_devlist) {
|
||||||
// 仅处理“正在补招/空闲”终端,与你原逻辑一致
|
// 仅处理“正在补招/空闲”终端
|
||||||
if (dev.busytype != static_cast<int>(DeviceState::READING_STATSFILE) &&
|
if (dev.busytype != static_cast<int>(DeviceState::READING_STATSFILE) &&
|
||||||
dev.busytype != static_cast<int>(DeviceState::IDLE)) {
|
dev.busytype != static_cast<int>(DeviceState::IDLE)) {
|
||||||
continue;
|
continue;
|
||||||
@@ -4834,7 +5015,6 @@ void check_recall_file() {
|
|||||||
<< " size=" << lm.recall_list_static.size()
|
<< " size=" << lm.recall_list_static.size()
|
||||||
<< " first{guid=" << front.recall_guid
|
<< " first{guid=" << front.recall_guid
|
||||||
<< ", status=" << front.recall_status
|
<< ", status=" << front.recall_status
|
||||||
<< ", direct=" << (front.direct_mode ? 1 : 0)
|
|
||||||
<< ", phase=" << static_cast<int>(front.phase)
|
<< ", phase=" << static_cast<int>(front.phase)
|
||||||
<< ", cur_dir=" << front.cur_dir
|
<< ", cur_dir=" << front.cur_dir
|
||||||
<< ", cur_file=" << front.downloading_file
|
<< ", cur_file=" << front.downloading_file
|
||||||
@@ -4868,7 +5048,6 @@ void check_recall_file() {
|
|||||||
<< " size=" << lm.recall_list_static.size()
|
<< " size=" << lm.recall_list_static.size()
|
||||||
<< " first{guid=" << nf.recall_guid
|
<< " first{guid=" << nf.recall_guid
|
||||||
<< ", status=" << nf.recall_status
|
<< ", status=" << nf.recall_status
|
||||||
<< ", direct=" << (nf.direct_mode ? 1 : 0)
|
|
||||||
<< ", phase=" << static_cast<int>(nf.phase)
|
<< ", phase=" << static_cast<int>(nf.phase)
|
||||||
<< ", cur_dir=" << nf.cur_dir
|
<< ", cur_dir=" << nf.cur_dir
|
||||||
<< ", cur_file=" << nf.downloading_file
|
<< ", cur_file=" << nf.downloading_file
|
||||||
@@ -4900,13 +5079,13 @@ void check_recall_file() {
|
|||||||
|
|
||||||
//20251218添加记录
|
//20251218添加记录
|
||||||
std::string msg_fail;
|
std::string msg_fail;
|
||||||
if (front.direct_mode) {
|
if (front.is_voltage_file()) {
|
||||||
msg_fail = std::string("监测点:") + lm.monitor_name
|
msg_fail = std::string("监测点:") + lm.monitor_name
|
||||||
+ " 补招波形文件失败,目标时标:"
|
+ " 补招" + RecallTypeName(front) + "失败,目标时标:"
|
||||||
+ front.target_filetimes;
|
+ front.target_filetimes;
|
||||||
} else {
|
} else {
|
||||||
msg_fail = std::string("监测点:") + lm.monitor_name
|
msg_fail = std::string("监测点:") + lm.monitor_name
|
||||||
+ " 补招波形文件失败,时间范围:"
|
+ " 补招" + RecallTypeName(front) + "失败,时间范围:"
|
||||||
+ front.StartTime + " ~ " + front.EndTime;
|
+ front.StartTime + " ~ " + front.EndTime;
|
||||||
}
|
}
|
||||||
append_recall_record_line(dev.guid, lm.monitor_id, msg_fail);
|
append_recall_record_line(dev.guid, lm.monitor_id, msg_fail);
|
||||||
@@ -4924,7 +5103,6 @@ void check_recall_file() {
|
|||||||
<< " size=" << lm.recall_list_static.size()
|
<< " size=" << lm.recall_list_static.size()
|
||||||
<< " first{guid=" << nf.recall_guid
|
<< " first{guid=" << nf.recall_guid
|
||||||
<< ", status=" << nf.recall_status
|
<< ", status=" << nf.recall_status
|
||||||
<< ", direct=" << (nf.direct_mode ? 1 : 0)
|
|
||||||
<< ", phase=" << static_cast<int>(nf.phase)
|
<< ", phase=" << static_cast<int>(nf.phase)
|
||||||
<< ", cur_dir=" << nf.cur_dir
|
<< ", cur_dir=" << nf.cur_dir
|
||||||
<< ", cur_file=" << nf.downloading_file
|
<< ", cur_file=" << nf.downloading_file
|
||||||
@@ -5034,8 +5212,12 @@ void check_recall_file() {
|
|||||||
front.file_success.clear();
|
front.file_success.clear();
|
||||||
|
|
||||||
// 立即发起第一个目录请求
|
// 立即发起第一个目录请求
|
||||||
if (front.cur_dir_index < static_cast<int>(front.dir_candidates.size())) {
|
if (front.cur_dir_index < static_cast<int>(front.active_dirs().size())) {
|
||||||
front.cur_dir = front.dir_candidates[front.cur_dir_index];//从第一个目录开始
|
front.cur_dir = resolve_recall_dir(
|
||||||
|
front.active_dirs()[front.cur_dir_index],
|
||||||
|
front,
|
||||||
|
lm
|
||||||
|
);//从第一个目录开始
|
||||||
|
|
||||||
// ★新增:先查设备级目录缓存
|
// ★新增:先查设备级目录缓存
|
||||||
std::vector<tag_dir_info> cached;
|
std::vector<tag_dir_info> cached;
|
||||||
@@ -5061,13 +5243,13 @@ void check_recall_file() {
|
|||||||
front.recall_status = static_cast<int>(RecallStatus::FAILED);
|
front.recall_status = static_cast<int>(RecallStatus::FAILED);
|
||||||
|
|
||||||
std::string msg_fail;
|
std::string msg_fail;
|
||||||
if (front.direct_mode) {
|
if (front.is_voltage_file()) {
|
||||||
msg_fail = std::string("监测点:") + lm.monitor_name
|
msg_fail = std::string("监测点:") + lm.monitor_name
|
||||||
+ " 补招波形文件未找到,目标时标:"
|
+ " 补招" + RecallTypeName(front) + "未找到,目标时标:"
|
||||||
+ front.target_filetimes;
|
+ front.target_filetimes;
|
||||||
} else {
|
} else {
|
||||||
msg_fail = std::string("监测点:") + lm.monitor_name
|
msg_fail = std::string("监测点:") + lm.monitor_name
|
||||||
+ " 补招波形文件未找到,时间范围:"
|
+ " 补招" + RecallTypeName(front) + "未找到,时间范围:"
|
||||||
+ front.StartTime + " ~ " + front.EndTime;
|
+ front.StartTime + " ~ " + front.EndTime;
|
||||||
}
|
}
|
||||||
append_recall_record_line(dev.guid, lm.monitor_id, msg_fail);
|
append_recall_record_line(dev.guid, lm.monitor_id, msg_fail);
|
||||||
@@ -5096,8 +5278,12 @@ void check_recall_file() {
|
|||||||
front.required_files.clear();
|
front.required_files.clear();
|
||||||
front.file_success.clear();
|
front.file_success.clear();
|
||||||
|
|
||||||
if (front.cur_dir_index < static_cast<int>(front.dir_candidates.size())) {
|
if (front.cur_dir_index < static_cast<int>(front.active_dirs().size())) {
|
||||||
front.cur_dir = front.dir_candidates[front.cur_dir_index];
|
front.cur_dir = resolve_recall_dir(
|
||||||
|
front.active_dirs()[front.cur_dir_index],
|
||||||
|
front,
|
||||||
|
lm
|
||||||
|
);
|
||||||
|
|
||||||
// ★新增:先查缓存
|
// ★新增:先查缓存
|
||||||
std::vector<tag_dir_info> cached;
|
std::vector<tag_dir_info> cached;
|
||||||
@@ -5121,13 +5307,13 @@ void check_recall_file() {
|
|||||||
front.recall_status = static_cast<int>(RecallStatus::FAILED);
|
front.recall_status = static_cast<int>(RecallStatus::FAILED);
|
||||||
|
|
||||||
std::string msg_fail;
|
std::string msg_fail;
|
||||||
if (front.direct_mode) {
|
if (front.is_voltage_file()) {
|
||||||
msg_fail = std::string("监测点:") + lm.monitor_name
|
msg_fail = std::string("监测点:") + lm.monitor_name
|
||||||
+ " 补招波形文件未找到,目标时标:"
|
+ " 补招" + RecallTypeName(front) + "未找到,目标时标:"
|
||||||
+ front.target_filetimes;
|
+ front.target_filetimes;;
|
||||||
} else {
|
} else {
|
||||||
msg_fail = std::string("监测点:") + lm.monitor_name
|
msg_fail = std::string("监测点:") + lm.monitor_name
|
||||||
+ " 补招波形文件未找到,时间范围:"
|
+ " 补招" + RecallTypeName(front) + "未找到,时间范围:"
|
||||||
+ front.StartTime + " ~ " + front.EndTime;
|
+ front.StartTime + " ~ " + front.EndTime;
|
||||||
}
|
}
|
||||||
append_recall_record_line(dev.guid, lm.monitor_id, msg_fail);
|
append_recall_record_line(dev.guid, lm.monitor_id, msg_fail);
|
||||||
@@ -5140,88 +5326,142 @@ void check_recall_file() {
|
|||||||
|
|
||||||
// OK:根据起止时间筛选文件
|
// OK:根据起止时间筛选文件
|
||||||
{
|
{
|
||||||
/*//这部分用于稳态数据文件
|
long long beg = -1;
|
||||||
long long beg = parse_time_to_epoch(front.StartTime);
|
long long end = -1;
|
||||||
long long end = parse_time_to_epoch(front.EndTime);
|
|
||||||
|
if (front.is_steady_file()) {
|
||||||
|
beg = parse_time_to_epoch(front.StartTime);
|
||||||
|
end = parse_time_to_epoch(front.EndTime);
|
||||||
|
|
||||||
//错误判断:如果是直下文件的方式,会给默认的正确的时间范围
|
|
||||||
if (beg < 0 || end < 0 || beg > end) {
|
if (beg < 0 || end < 0 || beg > end) {
|
||||||
front.recall_status = static_cast<int>(RecallStatus::FAILED);
|
front.recall_status = static_cast<int>(RecallStatus::FAILED);
|
||||||
std::cout << "[check_recall_stat] time parse ERR, FAIL dev=" << dev.terminal_id
|
std::cout << "[check_recall_file] steady time parse ERR, FAIL dev="
|
||||||
|
<< dev.terminal_id
|
||||||
<< " monitor=" << lm.monitor_id
|
<< " monitor=" << lm.monitor_id
|
||||||
<< " start=" << front.StartTime
|
<< " start=" << front.StartTime
|
||||||
<< " end=" << front.EndTime << std::endl;
|
<< " end=" << front.EndTime << std::endl;
|
||||||
break;//跳出循环,一个装置一次只能处理一个测点的一个补招记录;如果失败,下个循环会弹出
|
|
||||||
}*/
|
|
||||||
|
|
||||||
//装置消息返回后通知成功的处理:
|
front.phase = RecallPhase::IDLE;
|
||||||
auto it = front.dir_files.find(front.cur_dir);//在map中查找当前目录名对应的目录下的文件名列表
|
front.downloading_file.clear();
|
||||||
|
front.download_result = ActionResult::PENDING;
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
auto it = front.dir_files.find(front.cur_dir);
|
||||||
if (it != front.dir_files.end()) {
|
if (it != front.dir_files.end()) {
|
||||||
if (front.direct_mode) {
|
if (front.is_voltage_file()) { //暂态波形下载
|
||||||
// 目标时间戳(不含毫秒、形如 yyyyMMdd_HHmmss)
|
|
||||||
const std::string& want_ts = front.target_filetimes;
|
const std::string& want_ts = front.target_filetimes;
|
||||||
|
|
||||||
for (const auto& ent : it->second) {
|
for (const auto& ent : it->second) {
|
||||||
// 打印目录下的所有条目
|
std::cout << "[check_recall_file] voltage dir file dev=" << dev.terminal_id
|
||||||
std::cout << "[check_recall_file] dir file dev=" << dev.terminal_id
|
|
||||||
<< " monitor=" << lm.monitor_id
|
<< " monitor=" << lm.monitor_id
|
||||||
<< " dir=" << front.cur_dir
|
<< " dir=" << front.cur_dir
|
||||||
<< " file=" << ent.name
|
<< " file=" << ent.name
|
||||||
<< " flag=" << ent.flag
|
<< " flag=" << ent.flag
|
||||||
<< std::endl;
|
<< std::endl;
|
||||||
|
|
||||||
if (ent.flag == 0) continue; // 只要文件,跳过目录
|
if (ent.flag == 0) continue;
|
||||||
|
|
||||||
size_t n = ::strnlen(ent.name, sizeof(ent.name));
|
size_t n = ::strnlen(ent.name, sizeof(ent.name));
|
||||||
std::string fname(ent.name, n);
|
std::string fname(ent.name, n);
|
||||||
|
|
||||||
// 解析出 key = 监测点号_YYYYMMDD_HHMMSS(注意:确保你的 make_target_key_from_filename
|
|
||||||
// 已经改成“去掉毫秒”的版本;若还带毫秒,请先调整该函数)
|
|
||||||
std::string key;
|
std::string key;
|
||||||
if (!make_target_key_from_filename(fname, key)) {
|
if (!make_target_key_from_filename(fname, key)) {
|
||||||
std::cout << "[check_recall_file] dir file dev=" << dev.terminal_id
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (has_suffix(key, want_ts)) {
|
||||||
|
front.download_queue.push_back(front.cur_dir + "/" + fname);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (front.is_steady_file()) { //稳态文件下载
|
||||||
|
for (const auto& ent : it->second) {
|
||||||
|
if (ent.flag != 1) continue;
|
||||||
|
|
||||||
|
size_t n = ::strnlen(ent.name, sizeof(ent.name));
|
||||||
|
std::string fname(ent.name, n);
|
||||||
|
|
||||||
|
long long fs = -1;
|
||||||
|
long long fe = -1;
|
||||||
|
int file_seq = -1;
|
||||||
|
std::string file_monitor_id;
|
||||||
|
|
||||||
|
if (!extract_pqdif_range_from_filename(fname, fs, fe, file_seq, file_monitor_id)) {
|
||||||
|
std::cout << "[check_recall_file] steady skip invalid pqdif filename dev="
|
||||||
|
<< dev.terminal_id
|
||||||
<< " monitor=" << lm.monitor_id
|
<< " monitor=" << lm.monitor_id
|
||||||
<< " dir=" << front.cur_dir
|
<< " dir=" << front.cur_dir
|
||||||
|
<< " file=" << fname << std::endl;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
int cur_seq = -1;
|
||||||
|
try {
|
||||||
|
cur_seq = std::stoi(lm.logical_device_seq);
|
||||||
|
} catch (...) {
|
||||||
|
cur_seq = -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 新疆:用文件名里的 monitor_id 找 seq
|
||||||
|
if (!file_monitor_id.empty()) {
|
||||||
|
if (file_monitor_id != lm.monitor_id) {//跳过id不匹配的新疆文件
|
||||||
|
std::cout << "[check_recall_file] steady skip other xj monitor file dev="
|
||||||
|
<< dev.terminal_id
|
||||||
|
<< " cur_monitor=" << lm.monitor_id
|
||||||
|
<< " file_monitor_id=" << file_monitor_id
|
||||||
<< " file=" << fname
|
<< " file=" << fname
|
||||||
<< " key=" << key
|
|
||||||
<< " ERR: invalid filename format, skip"
|
|
||||||
<< std::endl;
|
<< std::endl;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// key 形如 MON_YYYYMMDD_HHMMSS,目标是只按时间戳匹配:
|
// 到这里说明新疆文件的测点ID就是当前测点
|
||||||
if (has_suffix(key, want_ts)) {
|
file_seq = cur_seq;
|
||||||
//打印放入的文件名
|
}
|
||||||
std::cout << "[check_recall_file] dir file dev=" << dev.terminal_id
|
// 默认:文件名能解析出 PQM 序号
|
||||||
<< " monitor=" << lm.monitor_id
|
else if (file_seq > 0) {
|
||||||
<< " dir=" << front.cur_dir
|
if (cur_seq > 0 && file_seq != cur_seq) {//跳过不同测点的文件
|
||||||
|
std::cout << "[check_recall_file] steady skip other seq file dev="
|
||||||
|
<< dev.terminal_id
|
||||||
|
<< " cur_monitor=" << lm.monitor_id
|
||||||
|
<< " cur_seq=" << cur_seq
|
||||||
|
<< " file_seq=" << file_seq
|
||||||
<< " file=" << fname
|
<< " file=" << fname
|
||||||
<< " key=" << key
|
|
||||||
<< " MATCH, add to download queue"
|
|
||||||
<< std::endl;
|
<< std::endl;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// 广东/上海/云南:file_monitor_id 为空,file_seq 也为 -1,不过滤
|
||||||
|
|
||||||
front.download_queue.push_back(front.cur_dir + "/" + fname);
|
// 文件时间段和补招时间段有交集就下载
|
||||||
}
|
bool hit = false;
|
||||||
}
|
|
||||||
} else { //稳态文件
|
|
||||||
// ☆原有:按时间窗筛选
|
|
||||||
long long beg = parse_time_to_epoch(front.StartTime);
|
|
||||||
long long end = parse_time_to_epoch(front.EndTime);
|
|
||||||
|
|
||||||
for (const auto& ent : it->second) {
|
for (const auto& r : front.recall_ranges) {//每个文件的时间段和补招的多个时间段比对,只要有一个交集就算命中
|
||||||
if (ent.flag != 1) continue; // 只要文件
|
long long rb = r.first;
|
||||||
// 文件名
|
long long re = r.second;
|
||||||
size_t n = ::strnlen(ent.name, sizeof(ent.name));
|
|
||||||
std::string fname(ent.name, n);
|
|
||||||
|
|
||||||
long long fe = -1;
|
if (fs <= re && fe >= rb) {
|
||||||
int seq = 0;
|
hit = true;
|
||||||
try { seq = std::stoi(lm.logical_device_seq); } catch (...) { seq = 0; }
|
break;
|
||||||
if (!extract_epoch_from_filename(fname, fe, seq)) continue;
|
|
||||||
if (fe >= beg && fe <= end) {
|
|
||||||
front.download_queue.push_back(front.cur_dir + "/" + fname);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (hit) {
|
||||||
|
std::string remote_file = front.cur_dir + "/" + fname;
|
||||||
|
|
||||||
|
if (front.required_files.insert(remote_file).second) {
|
||||||
|
front.download_queue.push_back(remote_file);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
front.recall_status = static_cast<int>(RecallStatus::FAILED);
|
||||||
|
append_recall_record_line(dev.guid, lm.monitor_id,
|
||||||
|
std::string("监测点:") + lm.monitor_name + " 文件补招类型为空,执行失败");
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -5230,8 +5470,12 @@ void check_recall_file() {
|
|||||||
// 当前目录无匹配文件 -> 试下一个目录
|
// 当前目录无匹配文件 -> 试下一个目录
|
||||||
front.cur_dir_index++;
|
front.cur_dir_index++;
|
||||||
front.list_result = ActionResult::PENDING;
|
front.list_result = ActionResult::PENDING;
|
||||||
if (front.cur_dir_index < static_cast<int>(front.dir_candidates.size())) {
|
if (front.cur_dir_index < static_cast<int>(front.active_dirs().size())) {
|
||||||
front.cur_dir = front.dir_candidates[front.cur_dir_index];
|
front.cur_dir = resolve_recall_dir(
|
||||||
|
front.active_dirs()[front.cur_dir_index],
|
||||||
|
front,
|
||||||
|
lm
|
||||||
|
);
|
||||||
|
|
||||||
// ★新增:先查缓存
|
// ★新增:先查缓存
|
||||||
std::vector<tag_dir_info> cached;
|
std::vector<tag_dir_info> cached;
|
||||||
@@ -5255,13 +5499,13 @@ void check_recall_file() {
|
|||||||
front.recall_status = static_cast<int>(RecallStatus::FAILED);
|
front.recall_status = static_cast<int>(RecallStatus::FAILED);
|
||||||
|
|
||||||
std::string msg_fail;
|
std::string msg_fail;
|
||||||
if (front.direct_mode) {
|
if (front.is_voltage_file()) {
|
||||||
msg_fail = std::string("监测点:") + lm.monitor_name
|
msg_fail = std::string("监测点:") + lm.monitor_name
|
||||||
+ " 补招波形文件未找到,目标时标:"
|
+ " 补招" + RecallTypeName(front) + "未找到,目标时标:"
|
||||||
+ front.target_filetimes;
|
+ front.target_filetimes;
|
||||||
} else {
|
} else {
|
||||||
msg_fail = std::string("监测点:") + lm.monitor_name
|
msg_fail = std::string("监测点:") + lm.monitor_name
|
||||||
+ " 补招波形文件未找到,时间范围:"
|
+ " 补招" + RecallTypeName(front) + "未找到,时间范围:"
|
||||||
+ front.StartTime + " ~ " + front.EndTime;
|
+ front.StartTime + " ~ " + front.EndTime;
|
||||||
}
|
}
|
||||||
append_recall_record_line(dev.guid, lm.monitor_id, msg_fail);
|
append_recall_record_line(dev.guid, lm.monitor_id, msg_fail);
|
||||||
@@ -5275,20 +5519,12 @@ void check_recall_file() {
|
|||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
// 进入下载阶段
|
// 进入下载阶段
|
||||||
|
// required_files 记录远端文件全集,用于和 file_success 对比
|
||||||
front.required_files.clear();
|
front.required_files.clear();
|
||||||
//for (const auto& p : front.download_queue) front.required_files.insert(p);
|
|
||||||
|
|
||||||
//转成本地保存路径 download/<dev.addr_str>/<fname>
|
|
||||||
front.required_files.clear();
|
|
||||||
{
|
|
||||||
const std::string base_dir = std::string("download/") + sanitize(dev.addr_str);
|
|
||||||
for (const auto& p : front.download_queue) {
|
for (const auto& p : front.download_queue) {
|
||||||
// p 形如 "<cur_dir>/<fname>" —— 提取纯文件名
|
if (!p.empty()) {
|
||||||
std::string fname = sanitize(extract_filename1(p));
|
front.required_files.insert(p); // 保留远端路径
|
||||||
if (!fname.empty()) {
|
|
||||||
front.required_files.insert(base_dir + "/" + fname);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -5297,9 +5533,12 @@ void check_recall_file() {
|
|||||||
front.phase = RecallPhase::DOWNLOADING;
|
front.phase = RecallPhase::DOWNLOADING;
|
||||||
front.download_result = ActionResult::PENDING;
|
front.download_result = ActionResult::PENDING;
|
||||||
front.downloading_file.clear();
|
front.downloading_file.clear();
|
||||||
|
|
||||||
std::cout << "[check_recall_stat] enter DOWNLOADING dev=" << dev.terminal_id
|
std::cout << "[check_recall_stat] enter DOWNLOADING dev=" << dev.terminal_id
|
||||||
<< " monitor=" << lm.monitor_id
|
<< " monitor=" << lm.monitor_id
|
||||||
<< " count=" << front.download_queue.size() << std::endl;
|
<< " count=" << front.download_queue.size()
|
||||||
|
<< " required_remote_count=" << front.required_files.size()
|
||||||
|
<< std::endl;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -5320,6 +5559,23 @@ void check_recall_file() {
|
|||||||
|
|
||||||
front.recall_status = static_cast<int>(RecallStatus::DONE);//两个文件都下好了标记为成功
|
front.recall_status = static_cast<int>(RecallStatus::DONE);//两个文件都下好了标记为成功
|
||||||
|
|
||||||
|
//稳态文件就不会走下面的逻辑
|
||||||
|
if (front.is_steady_file()) {
|
||||||
|
std::string msg_ok = std::string("监测点:") + lm.monitor_name
|
||||||
|
+ " 补招" + RecallTypeName(front) + "完成,文件数:"
|
||||||
|
+ std::to_string(front.file_success.size());
|
||||||
|
|
||||||
|
append_recall_record_line(dev.guid, lm.monitor_id, msg_ok);
|
||||||
|
|
||||||
|
std::cout << "[check_recall_file] STEADY DONE dev=" << dev.terminal_id
|
||||||
|
<< " monitor=" << lm.monitor_id
|
||||||
|
<< " ok=" << front.file_success.size()
|
||||||
|
<< " total=" << front.required_files.size()
|
||||||
|
<< std::endl;
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
//更新事件
|
//更新事件
|
||||||
// ★修改开始:替换“assign_qvvr_file_list + update_qvvr_file_download(有锁)”
|
// ★修改开始:替换“assign_qvvr_file_list + update_qvvr_file_download(有锁)”
|
||||||
// 组装完整路径列表
|
// 组装完整路径列表
|
||||||
@@ -5419,13 +5675,13 @@ void check_recall_file() {
|
|||||||
|
|
||||||
//20251218添加记录
|
//20251218添加记录
|
||||||
std::string msg_fail;
|
std::string msg_fail;
|
||||||
if (front.direct_mode) {
|
if (front.is_voltage_file()) {
|
||||||
msg_fail = std::string("监测点:") + lm.monitor_name
|
msg_fail = std::string("监测点:") + lm.monitor_name
|
||||||
+ " 补招波形文件下载失败,目标时标:"
|
+ " 补招" + RecallTypeName(front) + "下载失败,目标时标:"
|
||||||
+ front.target_filetimes;
|
+ front.target_filetimes;
|
||||||
} else {
|
} else {
|
||||||
msg_fail = std::string("监测点:") + lm.monitor_name
|
msg_fail = std::string("监测点:") + lm.monitor_name
|
||||||
+ " 补招波形文件下载失败,时间范围:"
|
+ " 补招" + RecallTypeName(front) + "下载失败,时间范围:"
|
||||||
+ front.StartTime + " ~ " + front.EndTime;
|
+ front.StartTime + " ~ " + front.EndTime;
|
||||||
}
|
}
|
||||||
append_recall_record_line(dev.guid, lm.monitor_id, msg_fail);
|
append_recall_record_line(dev.guid, lm.monitor_id, msg_fail);
|
||||||
@@ -5456,10 +5712,114 @@ void check_recall_file() {
|
|||||||
}
|
}
|
||||||
if (front.download_result == ActionResult::OK) {
|
if (front.download_result == ActionResult::OK) {
|
||||||
|
|
||||||
|
if (front.is_steady_file()) {
|
||||||
|
const std::string base_dir = std::string("download/") + sanitize(dev.addr_str);
|
||||||
|
|
||||||
|
std::string seq = "1";
|
||||||
|
try {
|
||||||
|
int nseq = std::stoi(lm.logical_device_seq);
|
||||||
|
if (nseq > 0) {
|
||||||
|
seq = std::to_string(nseq);
|
||||||
|
} else {
|
||||||
|
std::cout << "[check_recall_file][WARN] invalid logical_device_seq, use default M1_"
|
||||||
|
<< " dev=" << dev.terminal_id
|
||||||
|
<< " monitor=" << lm.monitor_id
|
||||||
|
<< " logical_device_seq=" << lm.logical_device_seq
|
||||||
|
<< std::endl;
|
||||||
|
DIY_ERRORLOG_CODE(lm.monitor_id, 2, static_cast<int>(LogCode::LOG_CODE_RECALL), "无法获取监测点序号,使用默认值 M1_");
|
||||||
|
}
|
||||||
|
} catch (...) {
|
||||||
|
std::cout << "[check_recall_file][WARN] parse logical_device_seq failed, use default M1_"
|
||||||
|
<< " dev=" << dev.terminal_id
|
||||||
|
<< " monitor=" << lm.monitor_id
|
||||||
|
<< " logical_device_seq=" << lm.logical_device_seq
|
||||||
|
<< std::endl;
|
||||||
|
DIY_ERRORLOG_CODE(lm.monitor_id, 2, static_cast<int>(LogCode::LOG_CODE_RECALL), "无法获取监测点序号,使用默认值 M1_");
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string fname = sanitize(extract_filename1(front.downloading_file));
|
||||||
|
if (!fname.empty()) {
|
||||||
|
std::string old_local = base_dir + "/" + fname;
|
||||||
|
std::string new_fname = "M" + seq + "_" + fname;
|
||||||
|
std::string new_local = base_dir + "/" + new_fname;
|
||||||
|
|
||||||
|
// 先把下载目录下文件名改成 M1_xxx.pqd
|
||||||
|
if (old_local != new_local) {
|
||||||
|
if (std::rename(old_local.c_str(), new_local.c_str()) != 0) {
|
||||||
|
std::cout << "[check_recall_file][WARN] rename steady local file failed"
|
||||||
|
<< " old=" << old_local
|
||||||
|
<< " new=" << new_local
|
||||||
|
<< " errno=" << errno
|
||||||
|
<< std::endl;
|
||||||
|
DIY_ERRORLOG_CODE(lm.monitor_id, 2, static_cast<int>(LogCode::LOG_CODE_RECALL), "重命名本地文件失败");
|
||||||
|
new_local = old_local; // 重命名失败,后续仍尝试用原文件
|
||||||
|
new_fname = fname;
|
||||||
|
} else {
|
||||||
|
std::cout << "[check_recall_file] rename steady local file ok"
|
||||||
|
<< " old=" << old_local
|
||||||
|
<< " new=" << new_local
|
||||||
|
<< std::endl;
|
||||||
|
DIY_DEBUGLOG_CODE(lm.monitor_id, 2, static_cast<int>(LogCode::LOG_CODE_RECALL), "重命名本地文件成功");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 只处理 .pqd 文件
|
||||||
|
if (new_fname.size() >= 4 &&
|
||||||
|
new_fname.substr(new_fname.size() - 4) == ".pqd") {
|
||||||
|
|
||||||
|
std::string pqdif_dir = std::string("download_pqdif/") + sanitize(dev.addr_str);
|
||||||
|
|
||||||
|
// 如果你已有递归建目录函数,用你的函数即可
|
||||||
|
create_directory_recursive(pqdif_dir.c_str());
|
||||||
|
|
||||||
|
std::string tmp_fname = new_fname.substr(0, new_fname.size() - 4) + ".tmp";
|
||||||
|
|
||||||
|
std::string local_tmp = base_dir + "/" + tmp_fname;
|
||||||
|
std::string dst_tmp = pqdif_dir + "/" + tmp_fname;
|
||||||
|
std::string dst_pqd = pqdif_dir + "/" + new_fname;
|
||||||
|
|
||||||
|
// 1. .pqd -> .tmp
|
||||||
|
if (std::rename(new_local.c_str(), local_tmp.c_str()) != 0) {
|
||||||
|
std::cout << "[check_recall_file][WARN] rename .pqd to .tmp failed"
|
||||||
|
<< " old=" << new_local
|
||||||
|
<< " new=" << local_tmp
|
||||||
|
<< " errno=" << errno
|
||||||
|
<< std::endl;
|
||||||
|
DIY_ERRORLOG_CODE(lm.monitor_id, 2, static_cast<int>(LogCode::LOG_CODE_RECALL), ".pqd改.tmp失败");
|
||||||
|
}
|
||||||
|
// 2. 移动 .tmp 到 download_pqdif
|
||||||
|
else if (!move_file_crossfs(local_tmp, dst_tmp)) {
|
||||||
|
std::cout << "[check_recall_file][WARN] move tmp file failed"
|
||||||
|
<< " old=" << local_tmp
|
||||||
|
<< " new=" << dst_tmp
|
||||||
|
<< " errno=" << errno
|
||||||
|
<< std::endl;
|
||||||
|
DIY_ERRORLOG_CODE(lm.monitor_id, 2, static_cast<int>(LogCode::LOG_CODE_RECALL), "移动tmp文件失败");
|
||||||
|
}
|
||||||
|
// 3. download_pqdif 下 .tmp -> .pqd
|
||||||
|
else if (std::rename(dst_tmp.c_str(), dst_pqd.c_str()) != 0) {//原子操作,不会影响解析线程扫描到不完整文件
|
||||||
|
std::cout << "[check_recall_file][WARN] rename .tmp to .pqd failed"
|
||||||
|
<< " old=" << dst_tmp
|
||||||
|
<< " new=" << dst_pqd
|
||||||
|
<< " errno=" << errno
|
||||||
|
<< std::endl;
|
||||||
|
DIY_ERRORLOG_CODE(lm.monitor_id, 2, static_cast<int>(LogCode::LOG_CODE_RECALL), ".tmp改.pqd失败");
|
||||||
|
} else {
|
||||||
|
std::cout << "[check_recall_file] move pqd file to pqdif dir ok"
|
||||||
|
<< " src=" << new_local
|
||||||
|
<< " dst=" << dst_pqd
|
||||||
|
<< std::endl;
|
||||||
|
DIY_DEBUGLOG_CODE(lm.monitor_id, 2, static_cast<int>(LogCode::LOG_CODE_RECALL), "pqd文件移动到解析目录成功");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
front.file_success.insert(front.downloading_file);
|
front.file_success.insert(front.downloading_file);
|
||||||
|
|
||||||
std::string msg_ok = std::string("监测点:") + lm.monitor_name
|
std::string msg_ok = std::string("监测点:") + lm.monitor_name
|
||||||
+ " 补招波形文件:" + front.downloading_file
|
+ " 补招" + RecallTypeName(front) + ":"
|
||||||
|
+ front.downloading_file
|
||||||
+ " 执行完成";
|
+ " 执行完成";
|
||||||
append_recall_record_line(dev.guid, lm.monitor_id, msg_ok);
|
append_recall_record_line(dev.guid, lm.monitor_id, msg_ok);
|
||||||
|
|
||||||
@@ -5477,7 +5837,8 @@ void check_recall_file() {
|
|||||||
} else {
|
} else {
|
||||||
|
|
||||||
std::string msg_fail = std::string("监测点:") + lm.monitor_name
|
std::string msg_fail = std::string("监测点:") + lm.monitor_name
|
||||||
+ " 补招波形文件:" + front.downloading_file
|
+ " 补招" + RecallTypeName(front) + ":"
|
||||||
|
+ front.downloading_file
|
||||||
+ " 执行失败";
|
+ " 执行失败";
|
||||||
append_recall_record_line(dev.guid, lm.monitor_id, msg_fail);
|
append_recall_record_line(dev.guid, lm.monitor_id, msg_fail);
|
||||||
|
|
||||||
@@ -5509,6 +5870,7 @@ void check_recall_file() {
|
|||||||
|
|
||||||
RecallFile& front = lm.recall_list_static.front(); //取测点第一条记录
|
RecallFile& front = lm.recall_list_static.front(); //取测点第一条记录
|
||||||
if (front.recall_status == static_cast<int>(RecallStatus::NOT_STARTED)) { //补招未开始
|
if (front.recall_status == static_cast<int>(RecallStatus::NOT_STARTED)) { //补招未开始
|
||||||
|
|
||||||
// 标记为 RUNNING,并设置终端忙状态
|
// 标记为 RUNNING,并设置终端忙状态
|
||||||
front.recall_status = static_cast<int>(RecallStatus::RUNNING); //该补招记录刷新为补招中
|
front.recall_status = static_cast<int>(RecallStatus::RUNNING); //该补招记录刷新为补招中
|
||||||
dev.isbusy = 1; //装置由idle标记为忙
|
dev.isbusy = 1; //装置由idle标记为忙
|
||||||
@@ -5519,9 +5881,13 @@ void check_recall_file() {
|
|||||||
// 初始化状态机并发出第一个目录请求
|
// 初始化状态机并发出第一个目录请求
|
||||||
front.reset_runtime(true);//保留直下文件信息
|
front.reset_runtime(true);//保留直下文件信息
|
||||||
front.phase = RecallPhase::LISTING; //正在请求并等待“目录文件名列表”
|
front.phase = RecallPhase::LISTING; //正在请求并等待“目录文件名列表”
|
||||||
if (!front.dir_candidates.empty()) {//目录列表非空
|
if (!front.active_dirs().empty()) {//目录列表非空
|
||||||
front.cur_dir_index = 0; //正在尝试的目录下标
|
front.cur_dir_index = 0; //正在尝试的目录下标
|
||||||
front.cur_dir = front.dir_candidates[0]; //第一个目录
|
front.cur_dir = resolve_recall_dir(
|
||||||
|
front.active_dirs()[0],
|
||||||
|
front,
|
||||||
|
lm
|
||||||
|
); //第一个目录
|
||||||
front.list_result = ActionResult::PENDING; //目录状态:等待返回
|
front.list_result = ActionResult::PENDING; //目录状态:等待返回
|
||||||
|
|
||||||
// ★新增:先查缓存
|
// ★新增:先查缓存
|
||||||
@@ -5550,18 +5916,18 @@ void check_recall_file() {
|
|||||||
front.recall_status = static_cast<int>(RecallStatus::FAILED); //目录列表空,失败
|
front.recall_status = static_cast<int>(RecallStatus::FAILED); //目录列表空,失败
|
||||||
|
|
||||||
std::string msg_fail;
|
std::string msg_fail;
|
||||||
if (front.direct_mode) {
|
if (front.is_voltage_file()) {
|
||||||
msg_fail = std::string("监测点:") + lm.monitor_name
|
msg_fail = std::string("监测点:") + lm.monitor_name
|
||||||
+ " 补招波形文件未找到,目标时标:"
|
+ " 补招" + RecallTypeName(front) + "未找到,目标时标:"
|
||||||
+ front.target_filetimes;
|
+ front.target_filetimes;
|
||||||
} else {
|
} else {
|
||||||
msg_fail = std::string("监测点:") + lm.monitor_name
|
msg_fail = std::string("监测点:") + lm.monitor_name
|
||||||
+ " 补招波形文件未找到,时间范围:"
|
+ " 补招" + RecallTypeName(front) + "未找到,时间范围:"
|
||||||
+ front.StartTime + " ~ " + front.EndTime;
|
+ front.StartTime + " ~ " + front.EndTime;
|
||||||
}
|
}
|
||||||
append_recall_record_line(dev.guid, lm.monitor_id, msg_fail);
|
append_recall_record_line(dev.guid, lm.monitor_id, msg_fail);
|
||||||
|
|
||||||
std::cout << "[check_recall_stat] empty dir_candidates, FAIL dev=" << dev.terminal_id
|
std::cout << "[check_recall_stat] empty active_dirs, FAIL dev=" << dev.terminal_id
|
||||||
<< " monitor=" << lm.monitor_id << std::endl;
|
<< " monitor=" << lm.monitor_id << std::endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -5700,10 +6066,10 @@ bool enqueue_direct_download(const std::string& dev_id,
|
|||||||
rf.recall_status = static_cast<int>(RecallStatus::NOT_STARTED);
|
rf.recall_status = static_cast<int>(RecallStatus::NOT_STARTED);
|
||||||
rf.StartTime = "1970-01-01 00:00:00"; // 仅占位,直下文件不会用到时间窗
|
rf.StartTime = "1970-01-01 00:00:00"; // 仅占位,直下文件不会用到时间窗
|
||||||
rf.EndTime = "1970-01-01 00:00:01";
|
rf.EndTime = "1970-01-01 00:00:01";
|
||||||
//rf.dir_candidates = dir_candidates; // 要检索的目录列表和默认的一致
|
|
||||||
rf.direct_mode = true; // ★关键:直下文件
|
|
||||||
rf.target_filetimes=filetime; // ▲单个文件时间入“列表”
|
rf.target_filetimes=filetime; // ▲单个文件时间入“列表”
|
||||||
|
|
||||||
|
rf.file_type = RecallFileType::VOLTAGE_FILE; // 直下波形文件
|
||||||
|
|
||||||
lm_it->recall_list_static.push_back(std::move(rf));
|
lm_it->recall_list_static.push_back(std::move(rf));
|
||||||
|
|
||||||
// 若设备空闲,可直接置忙(可选,视你的流程而定)
|
// 若设备空闲,可直接置忙(可选,视你的流程而定)
|
||||||
|
|||||||
@@ -68,6 +68,12 @@ enum class ActionResult {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// ====== ★修改:扩展 RecallFile,支持“多目录 + 文件筛选 + 串行下载”的状态机 ======
|
// ====== ★修改:扩展 RecallFile,支持“多目录 + 文件筛选 + 串行下载”的状态机 ======
|
||||||
|
enum class RecallFileType {
|
||||||
|
NONE = 0,
|
||||||
|
STEADY_FILE, // 稳态文件
|
||||||
|
VOLTAGE_FILE // 暂态直下文件
|
||||||
|
};
|
||||||
|
|
||||||
class RecallFile
|
class RecallFile
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@@ -75,18 +81,35 @@ public:
|
|||||||
int recall_status; // 补招状态 0-未补招 1-补招中 2-补招完成 3-补招失败
|
int recall_status; // 补招状态 0-未补招 1-补招中 2-补招完成 3-补招失败
|
||||||
std::string StartTime; // 数据补招起始时间(yyyy-MM-dd HH:mm:ss)
|
std::string StartTime; // 数据补招起始时间(yyyy-MM-dd HH:mm:ss)
|
||||||
std::string EndTime; // 数据补招结束时间(yyyy-MM-dd HH:mm:ss)
|
std::string EndTime; // 数据补招结束时间(yyyy-MM-dd HH:mm:ss)
|
||||||
|
|
||||||
|
// ===== 业务类型 =====
|
||||||
|
// STEADY:稳态文件补招
|
||||||
|
// VOLTAGE:暂态事件补招
|
||||||
std::string STEADY; // 补招历史统计数据标识 0-不补招;1-补招
|
std::string STEADY; // 补招历史统计数据标识 0-不补招;1-补招
|
||||||
std::string VOLTAGE; // 补招暂态事件标识 0-不补招;1-补招
|
std::string VOLTAGE; // 补招暂态事件标识 0-不补招;1-补招
|
||||||
|
|
||||||
|
// ===== 文件下载类型 =====
|
||||||
|
RecallFileType file_type = RecallFileType::NONE;
|
||||||
|
|
||||||
//暂态文件用
|
//暂态文件用
|
||||||
bool direct_mode = false; // 直下文件开关:true 表示不按时间窗,仅按目标文件名
|
std::string target_filetimes; // 直下文件匹配时间点(yyyyMMdd_HHmmss)
|
||||||
std::string target_filetimes; // 直下文件匹配时间点(yyyyMMdd_HHmmss),仅 direct_mode=true 时有效
|
|
||||||
|
|
||||||
// ★新增:按“目录名 -> 文件名列表”的映射;由“其他线程”在目录请求成功后回填
|
// ★新增:按“目录名 -> 文件名列表”的映射;由“其他线程”在目录请求成功后回填
|
||||||
std::map<std::string, std::vector<tag_dir_info>> dir_files;
|
std::map<std::string, std::vector<tag_dir_info>> dir_files;
|
||||||
|
|
||||||
// ★新增:候选目录(可扩展)
|
std::vector<std::string> steady_dir_candidates{
|
||||||
std::vector<std::string> dir_candidates{
|
"/cf/pqdif", //580绝对真实路径
|
||||||
|
"/bd0/pqdif", //chemengyu提供包含bd0
|
||||||
|
"/bd0/pqdif/%DESC%/%DAY%",
|
||||||
|
"/bd0/pqdif/Line%SEQ%",
|
||||||
|
"/bd0/historyFile/%DESC%",
|
||||||
|
"/pqdif", // 默认版本 / 新疆 可能取到其他测点文件
|
||||||
|
"/pqdif/%DESC%/%DAY%", // 上海:pqdif_dir_cfg 或描述目录 + 日期 不会取到其他测点文件
|
||||||
|
"/pqdif/Line%SEQ%", // 云南 不会取到其他测点文件
|
||||||
|
"/historyFile/%DESC%" // 广东 不会取到其他测点文件
|
||||||
|
};
|
||||||
|
|
||||||
|
std::vector<std::string> voltage_dir_candidates{
|
||||||
"/cf/COMTRADE",
|
"/cf/COMTRADE",
|
||||||
"/bd0/COMTRADE",
|
"/bd0/COMTRADE",
|
||||||
"/sd0/COMTRADE",
|
"/sd0/COMTRADE",
|
||||||
@@ -102,6 +125,9 @@ public:
|
|||||||
ActionResult list_result = ActionResult::PENDING; // 当前目录的列举结果
|
ActionResult list_result = ActionResult::PENDING; // 当前目录的列举结果
|
||||||
ActionResult download_result = ActionResult::PENDING; // 当前文件的下载结果
|
ActionResult download_result = ActionResult::PENDING; // 当前文件的下载结果
|
||||||
|
|
||||||
|
// 稳态文件用:一个 guid 下的多个补招时间段
|
||||||
|
std::vector<std::pair<long long, long long>> recall_ranges;
|
||||||
|
|
||||||
// ★新增:下载队列(已筛选出在时间窗内的文件,含完整路径)
|
// ★新增:下载队列(已筛选出在时间窗内的文件,含完整路径)
|
||||||
std::list<std::string> download_queue; //一个时间可能对应多个文件
|
std::list<std::string> download_queue; //一个时间可能对应多个文件
|
||||||
std::string downloading_file; // 当前正在下载的文件(完整路径)
|
std::string downloading_file; // 当前正在下载的文件(完整路径)
|
||||||
@@ -109,8 +135,22 @@ public:
|
|||||||
std::unordered_set<std::string> required_files; // 本次应当下载成功的文件全集
|
std::unordered_set<std::string> required_files; // 本次应当下载成功的文件全集
|
||||||
std::unordered_set<std::string> file_success; // 已下载成功的文件集合
|
std::unordered_set<std::string> file_success; // 已下载成功的文件集合
|
||||||
|
|
||||||
|
// 是否稳态文件任务
|
||||||
|
bool is_steady_file() const {
|
||||||
|
return file_type == RecallFileType::STEADY_FILE;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 是否暂态直下文件任务
|
||||||
|
bool is_voltage_file() const {
|
||||||
|
return file_type == RecallFileType::VOLTAGE_FILE;
|
||||||
|
}
|
||||||
|
|
||||||
|
const std::vector<std::string>& active_dirs() const {
|
||||||
|
return is_steady_file() ? steady_dir_candidates : voltage_dir_candidates;
|
||||||
|
}
|
||||||
|
|
||||||
// ★新增:一个便捷复位
|
// ★新增:一个便捷复位
|
||||||
void reset_runtime(bool keep_direct = false)
|
void reset_runtime(bool keep_target_filetimes = false)
|
||||||
{
|
{
|
||||||
phase = RecallPhase::IDLE;
|
phase = RecallPhase::IDLE;
|
||||||
cur_dir_index = 0;
|
cur_dir_index = 0;
|
||||||
@@ -124,9 +164,10 @@ public:
|
|||||||
required_files.clear();
|
required_files.clear();
|
||||||
file_success.clear();
|
file_success.clear();
|
||||||
|
|
||||||
|
// 注意:file_type 不属于运行态,不能清除,因为它决定了本次补招的业务类型(稳态/暂态),而这个业务类型在整个补招过程中是固定的,不应当被运行态重置影响
|
||||||
|
|
||||||
// ★新增:按需保留直下文件开关和目标名
|
// ★新增:按需保留直下文件开关和目标名
|
||||||
if (!keep_direct) {
|
if (!keep_target_filetimes) {
|
||||||
direct_mode = false;
|
|
||||||
target_filetimes.clear(); // ▲列表清空
|
target_filetimes.clear(); // ▲列表清空
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -98,6 +98,8 @@ extern int TEST_PORT; //测试端口号
|
|||||||
|
|
||||||
extern std::string FRONT_INST;
|
extern std::string FRONT_INST;
|
||||||
|
|
||||||
|
extern bool PQD_FLAG;
|
||||||
|
|
||||||
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////// 功能函数
|
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////// 功能函数
|
||||||
|
|
||||||
template<typename T, typename... Args>
|
template<typename T, typename... Args>
|
||||||
@@ -130,6 +132,11 @@ bool parse_param(int argc, char* argv[]) {
|
|||||||
try {
|
try {
|
||||||
g_front_seg_index = std::stoi(val.substr(0, pos));
|
g_front_seg_index = std::stoi(val.substr(0, pos));
|
||||||
g_front_seg_num = std::stoi(val.substr(pos + 1));
|
g_front_seg_num = std::stoi(val.substr(pos + 1));
|
||||||
|
|
||||||
|
if (g_front_seg_index == 0) {
|
||||||
|
PQD_FLAG = true;
|
||||||
|
}
|
||||||
|
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
std::cerr << "Invalid -s format." << std::endl;
|
std::cerr << "Invalid -s format." << std::endl;
|
||||||
}
|
}
|
||||||
@@ -144,6 +151,11 @@ bool parse_param(int argc, char* argv[]) {
|
|||||||
try {
|
try {
|
||||||
g_front_seg_index = std::stoi(val.substr(0, pos));
|
g_front_seg_index = std::stoi(val.substr(0, pos));
|
||||||
g_front_seg_num = std::stoi(val.substr(pos + 1));
|
g_front_seg_num = std::stoi(val.substr(pos + 1));
|
||||||
|
|
||||||
|
if (g_front_seg_index == 0) {
|
||||||
|
PQD_FLAG = true;
|
||||||
|
}
|
||||||
|
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
std::cerr << "Invalid -s format." << std::endl;
|
std::cerr << "Invalid -s format." << std::endl;
|
||||||
}
|
}
|
||||||
@@ -223,13 +235,13 @@ std::string get_parent_directory() {
|
|||||||
//解析模板文件
|
//解析模板文件
|
||||||
//Set_xml_nodeinfo();
|
//Set_xml_nodeinfo();
|
||||||
|
|
||||||
StartFrontThread(); //开启主线程
|
|
||||||
|
|
||||||
StartMQConsumerThread(); //开启消费者线程
|
|
||||||
|
|
||||||
StartMQProducerThread(); //开启生产者线程
|
StartMQProducerThread(); //开启生产者线程
|
||||||
|
|
||||||
|
if(!PQD_FLAG){
|
||||||
|
StartFrontThread(); //开启主线程
|
||||||
|
StartMQConsumerThread(); //开启消费者线程
|
||||||
StartTimerThread(); //开启定时线程
|
StartTimerThread(); //开启定时线程
|
||||||
|
}
|
||||||
|
|
||||||
//启动worker 根据启动标志启动
|
//启动worker 根据启动标志启动
|
||||||
if(G_TEST_FLAG){
|
if(G_TEST_FLAG){
|
||||||
|
|||||||
@@ -758,12 +758,20 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) {
|
|||||||
<< ", VOLTAGE=" << rf.VOLTAGE
|
<< ", VOLTAGE=" << rf.VOLTAGE
|
||||||
<< "\n";
|
<< "\n";
|
||||||
|
|
||||||
// ★新增:直下模式与目标时间列表
|
// ★新增:文件补招类型与目标信息
|
||||||
os << "\r\x1B[K |-- direct_mode=" << (rf.direct_mode ? "true" : "false")
|
os << "\r\x1B[K |-- file_type="
|
||||||
<< ", target_filetimes(" << rf.target_filetimes << ")\n";
|
<< (rf.is_steady_file() ? "STEADY_FILE" :
|
||||||
{
|
rf.is_voltage_file() ? "VOLTAGE_FILE" : "NONE");
|
||||||
os << "\r\x1B[K |.. " << rf.target_filetimes << "\n";
|
|
||||||
|
if (rf.is_voltage_file()) {
|
||||||
|
os << ", target_filetimes=" << rf.target_filetimes;
|
||||||
}
|
}
|
||||||
|
else if (rf.is_steady_file()) {
|
||||||
|
os << ", time_range=" << rf.StartTime
|
||||||
|
<< " ~ " << rf.EndTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
os << "\n";
|
||||||
|
|
||||||
// ★新增:状态机运行态
|
// ★新增:状态机运行态
|
||||||
os << "\r\x1B[K |-- phase=" << phaseStr(rf.phase)
|
os << "\r\x1B[K |-- phase=" << phaseStr(rf.phase)
|
||||||
@@ -773,15 +781,15 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) {
|
|||||||
<< ", download_result=" << resultStr(rf.download_result) << "\n";
|
<< ", download_result=" << resultStr(rf.download_result) << "\n";
|
||||||
|
|
||||||
// ★新增:候选目录
|
// ★新增:候选目录
|
||||||
os << "\r\x1B[K |-- dir_candidates(" << rf.dir_candidates.size() << ")\n";
|
os << "\r\x1B[K |-- active_dirs(" << rf.active_dirs().size() << ")\n";
|
||||||
{
|
{
|
||||||
size_t c = 0;
|
size_t c = 0;
|
||||||
for (const auto& d : rf.dir_candidates) {
|
for (const auto& d : rf.active_dirs()) {
|
||||||
if (c++ >= MAX_ITEMS) break;
|
if (c++ >= MAX_ITEMS) break;
|
||||||
os << "\r\x1B[K |-- " << d << "\n";
|
os << "\r\x1B[K |-- " << d << "\n";
|
||||||
}
|
}
|
||||||
if (rf.dir_candidates.size() > MAX_ITEMS) {
|
if (rf.active_dirs().size() > MAX_ITEMS) {
|
||||||
os << "\r\x1B[K |.. (+" << (rf.dir_candidates.size() - MAX_ITEMS) << " more)\n";
|
os << "\r\x1B[K |.. (+" << (rf.active_dirs().size() - MAX_ITEMS) << " more)\n";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -350,11 +350,13 @@ void process_received_message(string mac, string id,const char* data, size_t len
|
|||||||
<< ", 时间戳: " << record.triggerTimeMs << "ms" << std::endl;
|
<< ", 时间戳: " << record.triggerTimeMs << "ms" << std::endl;
|
||||||
|
|
||||||
//lnk20250805 事件上送先记录,录波文件上传结束后再更新文件
|
//lnk20250805 事件上送先记录,录波文件上传结束后再更新文件
|
||||||
|
if(record.nType != 0){
|
||||||
append_qvvr_event(id,event.head.name,
|
append_qvvr_event(id,event.head.name,
|
||||||
record.nType,record.fPersisstime,record.fMagntitude,record.triggerTimeMs,record.phase);
|
record.nType,record.fPersisstime,record.fMagntitude,record.triggerTimeMs,record.phase);
|
||||||
transfer_json_qvvr_data(id,event.head.name,
|
transfer_json_qvvr_data(id,event.head.name,
|
||||||
record.fMagntitude,record.fPersisstime,record.triggerTimeMs,record.nType,record.phase,
|
record.fMagntitude,record.fPersisstime,record.triggerTimeMs,record.nType,record.phase,
|
||||||
"");
|
"");
|
||||||
|
}
|
||||||
|
|
||||||
//事件主动上送处理完成,不需要通知状态机
|
//事件主动上送处理完成,不需要通知状态机
|
||||||
}
|
}
|
||||||
@@ -2449,12 +2451,14 @@ void process_received_message(string mac, string id,const char* data, size_t len
|
|||||||
<< ", 时间戳: " << record.triggerTimeMs << "ms" << std::endl;
|
<< ", 时间戳: " << record.triggerTimeMs << "ms" << std::endl;
|
||||||
|
|
||||||
//记录补招上来的暂态事件
|
//记录补招上来的暂态事件
|
||||||
|
if(record.nType != 0){
|
||||||
append_qvvr_event(id,event.head.name,
|
append_qvvr_event(id,event.head.name,
|
||||||
record.nType,record.fPersisstime,record.fMagntitude,record.triggerTimeMs,record.phase);
|
record.nType,record.fPersisstime,record.fMagntitude,record.triggerTimeMs,record.phase);
|
||||||
|
|
||||||
//直接发走暂态事件
|
//直接发走暂态事件
|
||||||
transfer_json_qvvr_data(id,event.head.name,
|
transfer_json_qvvr_data(id,event.head.name,
|
||||||
record.fMagntitude,record.fPersisstime,record.triggerTimeMs,record.nType,record.phase,"");
|
record.fMagntitude,record.fPersisstime,record.triggerTimeMs,record.nType,record.phase,"");
|
||||||
|
}
|
||||||
//通知状态机补招暂态事件成功
|
//通知状态机补招暂态事件成功
|
||||||
on_device_response_minimal(static_cast<int>(ResponseCode::OK), id, 0, static_cast<int>(DeviceState::READING_EVENTLOG));
|
on_device_response_minimal(static_cast<int>(ResponseCode::OK), id, 0, static_cast<int>(DeviceState::READING_EVENTLOG));
|
||||||
|
|
||||||
|
|||||||
@@ -20,6 +20,7 @@
|
|||||||
#include <cstdlib>
|
#include <cstdlib>
|
||||||
#include <limits>
|
#include <limits>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
|
#include <new>
|
||||||
|
|
||||||
// PQDIF 解析库
|
// PQDIF 解析库
|
||||||
#include "pqdif/PQDIF.h"
|
#include "pqdif/PQDIF.h"
|
||||||
@@ -28,6 +29,17 @@
|
|||||||
#include "pqdif/include/pqdif_lg.h"
|
#include "pqdif/include/pqdif_lg.h"
|
||||||
#include "pqdif_semantic_ids.h"
|
#include "pqdif_semantic_ids.h"
|
||||||
|
|
||||||
|
#include "cloudfront/code/log4.h" //lnk20260526
|
||||||
|
|
||||||
|
extern void enqueue_stat_pq(const std::string& max_base64Str,
|
||||||
|
const std::string& min_base64Str,
|
||||||
|
const std::string& avg_base64Str,
|
||||||
|
const std::string& cp95_base64Str,
|
||||||
|
time_t data_time,
|
||||||
|
const std::string& mac,
|
||||||
|
short cid);
|
||||||
|
extern std::string extract_filename1(const std::string& path);
|
||||||
|
|
||||||
namespace fs = std::experimental::filesystem;
|
namespace fs = std::experimental::filesystem;
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
@@ -37,7 +49,11 @@ namespace {
|
|||||||
constexpr int kMaxPqdifFilesPerScan = 1;
|
constexpr int kMaxPqdifFilesPerScan = 1;
|
||||||
constexpr size_t kParsedCacheLimit = 128;
|
constexpr size_t kParsedCacheLimit = 128;
|
||||||
|
|
||||||
const char* kPqdRootDir = "download";
|
// 大文件流式阈值:估算展开点数超过该值时,不再构造 expanded_stat_points,
|
||||||
|
// 而是按通道逐步聚合成时间桶并直接组装 Base64,避免单文件中间对象占用过大内存。
|
||||||
|
constexpr size_t kPqdifLargeFileStreamingPointThreshold = 800000;
|
||||||
|
|
||||||
|
const char* kPqdRootDir = "download_pqdif";
|
||||||
const char* kDoneRootDir = "download_done";
|
const char* kDoneRootDir = "download_done";
|
||||||
const char* kFailRootDir = "download_fail";
|
const char* kFailRootDir = "download_fail";
|
||||||
|
|
||||||
@@ -5072,6 +5088,250 @@ namespace {
|
|||||||
return stat_select_best_metric_sources(out);
|
return stat_select_best_metric_sources(out);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
size_t stat_estimate_candidate_point_count_for_streaming(const PqdifLogicalFile& lf)
|
||||||
|
{
|
||||||
|
size_t total = 0;
|
||||||
|
for (const auto& obs : lf.observations)
|
||||||
|
{
|
||||||
|
for (const auto& ch : obs.channel_instances)
|
||||||
|
{
|
||||||
|
if (!pqdif_sem::IsQuantityTypeValueLog(ch.quantity_type_id.value))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
for (const auto& si : ch.series_instances)
|
||||||
|
{
|
||||||
|
if (pqdif_sem::IsValueTypeTime(si.value_type_id.value))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
const PqdifSeriesInstance* resolved = stat_resolve_shared_series(obs, si);
|
||||||
|
if (resolved == nullptr || resolved->values.count <= 0)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
total += static_cast<size_t>(resolved->values.count);
|
||||||
|
if (total > kPqdifLargeFileStreamingPointThreshold)
|
||||||
|
return total;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return total;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct StatStreamingSourceBucket
|
||||||
|
{
|
||||||
|
StatMetricId metric_id = StatMetricId::Unknown;
|
||||||
|
StatMetricSourceKey key;
|
||||||
|
StatMetricSourceStats stats;
|
||||||
|
std::map<time_t, AggregatedStatValues> values_by_time;
|
||||||
|
std::map<time_t, std::string> time_text_by_time;
|
||||||
|
};
|
||||||
|
|
||||||
|
void stat_streaming_set_value(AggregatedStatValues& agg, const ExpandedStatPoint& p)
|
||||||
|
{
|
||||||
|
if (agg.source_observation_index < 0)
|
||||||
|
{
|
||||||
|
agg.source_observation_index = p.observation_index;
|
||||||
|
agg.source_channel_instance_index = p.channel_instance_index;
|
||||||
|
agg.source_series_instance_index = p.series_instance_index;
|
||||||
|
agg.source_channel_name = p.channel_name;
|
||||||
|
agg.quality = StatMetricQuality::Normal;
|
||||||
|
agg.quality_reason = "ok";
|
||||||
|
}
|
||||||
|
|
||||||
|
// 同一来源同一时间同一种统计值如果重复,保留首次写入,避免一次性覆盖造成结果不稳定。
|
||||||
|
if (stat_has_value_kind(agg, p.stat_kind))
|
||||||
|
return;
|
||||||
|
|
||||||
|
agg.source_series_instance_index = p.series_instance_index;
|
||||||
|
switch (p.stat_kind)
|
||||||
|
{
|
||||||
|
case StatValueKind::Min:
|
||||||
|
agg.has_min = true;
|
||||||
|
agg.min_value = p.value;
|
||||||
|
break;
|
||||||
|
case StatValueKind::Max:
|
||||||
|
agg.has_max = true;
|
||||||
|
agg.max_value = p.value;
|
||||||
|
break;
|
||||||
|
case StatValueKind::Avg:
|
||||||
|
agg.has_avg = true;
|
||||||
|
agg.avg_value = p.value;
|
||||||
|
break;
|
||||||
|
case StatValueKind::P95:
|
||||||
|
agg.has_p95 = true;
|
||||||
|
agg.p95_value = p.value;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void stat_streaming_add_point(
|
||||||
|
std::map<StatMetricId, std::map<StatMetricSourceKey, StatStreamingSourceBucket>>& by_metric,
|
||||||
|
const ExpandedStatPoint& p)
|
||||||
|
{
|
||||||
|
if (p.metric_id == StatMetricId::Unknown || p.stat_kind == StatValueKind::Unknown)
|
||||||
|
return;
|
||||||
|
|
||||||
|
const StatMetricSourceKey key = stat_make_source_key(p);
|
||||||
|
StatStreamingSourceBucket& src = by_metric[p.metric_id][key];
|
||||||
|
if (src.metric_id == StatMetricId::Unknown)
|
||||||
|
{
|
||||||
|
src.metric_id = p.metric_id;
|
||||||
|
src.key = key;
|
||||||
|
}
|
||||||
|
|
||||||
|
src.stats.add(p);
|
||||||
|
AggregatedStatValues& agg = src.values_by_time[p.timestamp];
|
||||||
|
stat_streaming_set_value(agg, p);
|
||||||
|
if (src.time_text_by_time.find(p.timestamp) == src.time_text_by_time.end())
|
||||||
|
src.time_text_by_time[p.timestamp] = p.timestamp_text;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<TimeAggregatedStatBucket> stat_build_aggregated_buckets_streaming(
|
||||||
|
const PqdifLogicalFile& lf,
|
||||||
|
ParsedConnectionKind connection_kind,
|
||||||
|
int& selected_observation_index,
|
||||||
|
std::string& selected_observation_name,
|
||||||
|
size_t& out_raw_point_count,
|
||||||
|
size_t& out_selected_metric_count,
|
||||||
|
size_t& out_selected_dynamic_metric_count)
|
||||||
|
{
|
||||||
|
selected_observation_index = -1;
|
||||||
|
selected_observation_name.clear();
|
||||||
|
out_raw_point_count = 0;
|
||||||
|
out_selected_metric_count = 0;
|
||||||
|
out_selected_dynamic_metric_count = 0;
|
||||||
|
|
||||||
|
const PqdifObservationRecord* primary = stat_select_primary_statistical_observation(lf);
|
||||||
|
if (primary != nullptr)
|
||||||
|
{
|
||||||
|
selected_observation_index = primary->observation_index;
|
||||||
|
selected_observation_name = primary->observation_name;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::map<StatMetricId, std::map<StatMetricSourceKey, StatStreamingSourceBucket>> by_metric;
|
||||||
|
|
||||||
|
// 大文件模式:遍历全部 observations,但每次只临时展开一个通道,随后立刻压缩到
|
||||||
|
// metric/source/timestamp/kind 聚合结构中,不保留全量 ExpandedStatPoint。
|
||||||
|
for (const auto& obs : lf.observations)
|
||||||
|
{
|
||||||
|
for (const auto& ch : obs.channel_instances)
|
||||||
|
{
|
||||||
|
std::vector<ExpandedStatPoint> points = stat_expand_channel_points(lf, connection_kind, obs, ch);
|
||||||
|
out_raw_point_count += points.size();
|
||||||
|
for (const auto& p : points)
|
||||||
|
stat_streaming_add_point(by_metric, p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::map<time_t, TimeAggregatedStatBucket> global_buckets;
|
||||||
|
size_t duplicate_metric_count = 0;
|
||||||
|
|
||||||
|
for (auto& metric_pair : by_metric)
|
||||||
|
{
|
||||||
|
const StatMetricId metric_id = metric_pair.first;
|
||||||
|
auto& sources = metric_pair.second;
|
||||||
|
if (sources.empty())
|
||||||
|
continue;
|
||||||
|
|
||||||
|
bool has_selected = false;
|
||||||
|
StatMetricSourceKey best_key;
|
||||||
|
int best_score = std::numeric_limits<int>::min();
|
||||||
|
|
||||||
|
for (auto& src_pair : sources)
|
||||||
|
{
|
||||||
|
int score = stat_metric_source_score(src_pair.second.stats);
|
||||||
|
// 大文件流式模式会遍历全部 observations。为尽量保持原逻辑,普通核心指标在分数接近时
|
||||||
|
// 优先使用主统计 observation;谐波等只存在于其他 observation 时仍能正常选中。
|
||||||
|
if (selected_observation_index >= 0 && src_pair.first.observation_index == selected_observation_index)
|
||||||
|
score += 25;
|
||||||
|
|
||||||
|
if (!has_selected || score > best_score)
|
||||||
|
{
|
||||||
|
has_selected = true;
|
||||||
|
best_score = score;
|
||||||
|
best_key = src_pair.first;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!has_selected)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
++out_selected_metric_count;
|
||||||
|
if (stat_is_dynamic_metric(metric_id))
|
||||||
|
++out_selected_dynamic_metric_count;
|
||||||
|
if (sources.size() > 1)
|
||||||
|
++duplicate_metric_count;
|
||||||
|
|
||||||
|
StatStreamingSourceBucket& selected_src = sources[best_key];
|
||||||
|
for (auto& time_pair : selected_src.values_by_time)
|
||||||
|
{
|
||||||
|
TimeAggregatedStatBucket& bucket = global_buckets[time_pair.first];
|
||||||
|
if (bucket.timestamp == 0)
|
||||||
|
{
|
||||||
|
bucket.timestamp = time_pair.first;
|
||||||
|
const auto tit = selected_src.time_text_by_time.find(time_pair.first);
|
||||||
|
bucket.timestamp_text = (tit == selected_src.time_text_by_time.end()) ? format_time_text(time_pair.first) : tit->second;
|
||||||
|
}
|
||||||
|
bucket.metrics[metric_id] = time_pair.second;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<TimeAggregatedStatBucket> out;
|
||||||
|
out.reserve(global_buckets.size());
|
||||||
|
for (auto& kv : global_buckets)
|
||||||
|
out.push_back(std::move(kv.second));
|
||||||
|
|
||||||
|
if (pqdif_log_enabled(PqdifLogLevel::Core))
|
||||||
|
{
|
||||||
|
std::cout << "========== PQDIF LARGE FILE STREAMING SUMMARY ==========" << std::endl;
|
||||||
|
std::cout << "raw_points=" << out_raw_point_count
|
||||||
|
<< ", candidate_metric_count=" << by_metric.size()
|
||||||
|
<< ", selected_metric_count=" << out_selected_metric_count
|
||||||
|
<< ", selected_dynamic_metrics=" << out_selected_dynamic_metric_count << "/" << stat_all_dynamic_metric_order().size()
|
||||||
|
<< ", duplicate_metric_count=" << duplicate_metric_count
|
||||||
|
<< ", buckets=" << out.size()
|
||||||
|
<< ", selected_observation_index=" << selected_observation_index
|
||||||
|
<< ", selected_observation_name=" << selected_observation_name
|
||||||
|
<< std::endl;
|
||||||
|
std::cout << "mode=streaming_no_expanded_points; detail=per-channel expand then immediate aggregate" << std::endl;
|
||||||
|
std::cout << "========================================================" << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
void stat_print_aggregated_metric_line(StatMetricId metric_id, const AggregatedStatValues* agg);
|
||||||
|
void dump_grouped_bucket_streaming_preview(const ParsedPqdifFile& parsed_file)
|
||||||
|
{
|
||||||
|
std::cout << "========== GROUPED STAT STREAMING CORE SUMMARY ==========" << std::endl;
|
||||||
|
std::cout << "connection_kind=" << stat_connection_kind_name(parsed_file.connection_kind)
|
||||||
|
<< ", selected_observation_index=" << parsed_file.selected_observation_index
|
||||||
|
<< ", selected_observation_name=" << parsed_file.selected_observation_name
|
||||||
|
<< ", expanded_points=SKIPPED_BY_STREAMING"
|
||||||
|
<< ", buckets=" << parsed_file.aggregated_stat_buckets.size()
|
||||||
|
<< ", core_metric_slots=" << stat_core_metric_print_order().size()
|
||||||
|
<< ", dynamic_spectrum_slots=" << stat_all_dynamic_metric_order().size()
|
||||||
|
<< std::endl;
|
||||||
|
|
||||||
|
const size_t bucket_limit = std::min<size_t>(parsed_file.aggregated_stat_buckets.size(), 3);
|
||||||
|
for (size_t i = 0; i < bucket_limit; ++i)
|
||||||
|
{
|
||||||
|
const auto& b = parsed_file.aggregated_stat_buckets[i];
|
||||||
|
std::cout << " [BUCKET " << i << "] time=" << b.timestamp_text
|
||||||
|
<< ", metric_count_present=" << b.metrics.size()
|
||||||
|
<< std::endl;
|
||||||
|
for (const auto metric_id : stat_core_metric_print_order())
|
||||||
|
{
|
||||||
|
const auto it = b.metrics.find(metric_id);
|
||||||
|
if (it != b.metrics.end())
|
||||||
|
stat_print_aggregated_metric_line(metric_id, &it->second);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
std::cout << "=========================================================" << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
bool pqdif_probe_text_looks_like_flicker(const std::string& text)
|
bool pqdif_probe_text_looks_like_flicker(const std::string& text)
|
||||||
{
|
{
|
||||||
const std::string key = normalize_key(text);
|
const std::string key = normalize_key(text);
|
||||||
@@ -7496,9 +7756,41 @@ namespace {
|
|||||||
<< ", total_base64_chars=" << file_batch.total_base64_chars
|
<< ", total_base64_chars=" << file_batch.total_base64_chars
|
||||||
<< std::endl;
|
<< std::endl;
|
||||||
|
|
||||||
// 按你的要求,入队前完整打印保存对象内部结构:文件批次 -> 时间点 -> Max/Min/Avg/P95 子记录 -> Base64 内容。
|
// 完整 Base64 内容日志非常长,尤其大文件会明显放大内存和 I/O 压力。
|
||||||
// 日志会很长;确认完成后可以临时注释掉这一行,或改成 if (pqdif_log_enabled(PqdifLogLevel::Info)) 包裹。
|
// 现在仅在 Debug/Trace 级别打印完整对象;Core/Info 只打印摘要和前几条子记录。
|
||||||
//pqdif_dump_stat_base64_file_batch_full(file_batch);
|
if (pqdif_log_enabled(PqdifLogLevel::Debug))
|
||||||
|
{
|
||||||
|
pqdif_dump_stat_base64_file_batch_full(file_batch);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
std::cout << " [BASE64 FILE BATCH COMPACT] file=" << file_batch.pqdif_file_path
|
||||||
|
<< ", time_points=" << file_batch.time_point_count
|
||||||
|
<< ", records=" << file_batch.total_record_count
|
||||||
|
<< ", total_float_count=" << file_batch.total_float_count
|
||||||
|
<< ", total_placeholder_count=" << file_batch.total_placeholder_count
|
||||||
|
<< ", total_base64_chars=" << file_batch.total_base64_chars
|
||||||
|
<< std::endl;
|
||||||
|
|
||||||
|
size_t shown = 0;
|
||||||
|
for (const auto& tp : file_batch.time_points)
|
||||||
|
{
|
||||||
|
for (const auto& rec : tp.records)
|
||||||
|
{
|
||||||
|
if (shown >= 4)
|
||||||
|
break;
|
||||||
|
std::cout << " [BASE64 SAVED SAMPLE] time=" << rec.timestamp_text
|
||||||
|
<< ", kind=" << rec.value_kind_name
|
||||||
|
<< ", floats=" << rec.float_count
|
||||||
|
<< ", placeholders=" << rec.placeholder_count
|
||||||
|
<< ", base64_len=" << rec.base64_payload.size()
|
||||||
|
<< std::endl;
|
||||||
|
++shown;
|
||||||
|
}
|
||||||
|
if (shown >= 4)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
push_pqdif_stat_base64_file_batch(std::move(file_batch));
|
push_pqdif_stat_base64_file_batch(std::move(file_batch));
|
||||||
|
|
||||||
@@ -7548,6 +7840,54 @@ namespace {
|
|||||||
// 后续指标判断需要先区分星型 / 角型两套规则。
|
// 后续指标判断需要先区分星型 / 角型两套规则。
|
||||||
parsed_file.connection_kind = stat_classify_connection_kind(parsed_file.logical_file);
|
parsed_file.connection_kind = stat_classify_connection_kind(parsed_file.logical_file);
|
||||||
|
|
||||||
|
// 2) 根据估算展开点数决定是否启用“大文件流式模式”。
|
||||||
|
// 普通文件仍走原 expanded_stat_points -> grouped buckets 流程;
|
||||||
|
// 大文件则直接按通道聚合为 buckets,不再保存全量 expanded_stat_points。
|
||||||
|
const size_t estimated_candidate_points =
|
||||||
|
stat_estimate_candidate_point_count_for_streaming(parsed_file.logical_file);
|
||||||
|
const bool use_large_file_streaming =
|
||||||
|
estimated_candidate_points > kPqdifLargeFileStreamingPointThreshold;
|
||||||
|
|
||||||
|
if (use_large_file_streaming)
|
||||||
|
{
|
||||||
|
std::cout << "[PQDIF][MEMORY][STREAMING] large file detected, use streaming bucket/base64 build. "
|
||||||
|
<< "estimated_candidate_points=" << estimated_candidate_points
|
||||||
|
<< ", threshold=" << kPqdifLargeFileStreamingPointThreshold
|
||||||
|
<< std::endl;
|
||||||
|
|
||||||
|
size_t raw_point_count = 0;
|
||||||
|
size_t selected_metric_count = 0;
|
||||||
|
size_t selected_dynamic_metric_count = 0;
|
||||||
|
parsed_file.aggregated_stat_buckets = stat_build_aggregated_buckets_streaming(
|
||||||
|
parsed_file.logical_file,
|
||||||
|
parsed_file.connection_kind,
|
||||||
|
parsed_file.selected_observation_index,
|
||||||
|
parsed_file.selected_observation_name,
|
||||||
|
raw_point_count,
|
||||||
|
selected_metric_count,
|
||||||
|
selected_dynamic_metric_count);
|
||||||
|
|
||||||
|
if (pqdif_is_trace_log_enabled())
|
||||||
|
dump_semantic_probe(parsed_file);
|
||||||
|
|
||||||
|
dump_grouped_bucket_streaming_preview(parsed_file);
|
||||||
|
|
||||||
|
// 3) 大文件仍完整生成 Base64 文件批次,只是跳过 expanded_stat_points 中间缓存。
|
||||||
|
pqdif_build_and_queue_base64_records(parsed_file);
|
||||||
|
|
||||||
|
// 大文件模式不再把完整 ParsedPqdifFile 放入解析缓存,避免 logical_file + buckets 在内存中长期驻留。
|
||||||
|
// 后续业务请从 PqdifStatBase64FileBatch 队列读取最终 Base64 结果。
|
||||||
|
std::cout << "[PQDIF][MEMORY][STREAMING] skip parsed_file cache for large file, "
|
||||||
|
<< "base64 batch has been queued. file=" << file_path.string()
|
||||||
|
<< std::endl;
|
||||||
|
|
||||||
|
std::cout << "[PQDIF] processed ok(streaming): " << file_path.string()
|
||||||
|
<< ", cache_size=" << GetParsedPqdifCacheSize()
|
||||||
|
<< std::endl;
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
// 2) 只筛选“统计类 observation”,并对其展开目标统计指标。
|
// 2) 只筛选“统计类 observation”,并对其展开目标统计指标。
|
||||||
// 当前阶段不处理全部 observation,避免不同 observation 混入同一时间聚合结果。
|
// 当前阶段不处理全部 observation,避免不同 observation 混入同一时间聚合结果。
|
||||||
parsed_file.expanded_stat_points = stat_expand_selected_statistical_observation(
|
parsed_file.expanded_stat_points = stat_expand_selected_statistical_observation(
|
||||||
@@ -7655,14 +7995,50 @@ namespace {
|
|||||||
<< ", max_per_scan=" << kMaxPqdifFilesPerScan
|
<< ", max_per_scan=" << kMaxPqdifFilesPerScan
|
||||||
<< std::endl;
|
<< std::endl;
|
||||||
|
|
||||||
const bool ok = process_single_pqdif_file(item.path, item.mac);
|
bool ok = false;
|
||||||
|
bool caught_exception = false;
|
||||||
|
std::string exception_text;
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
ok = process_single_pqdif_file(item.path, item.mac);
|
||||||
|
}
|
||||||
|
catch (const std::bad_alloc& ex)
|
||||||
|
{
|
||||||
|
ok = false;
|
||||||
|
caught_exception = true;
|
||||||
|
exception_text = ex.what();
|
||||||
|
std::cout << "[PQDIF][OOM] std::bad_alloc while processing file="
|
||||||
|
<< item.path.string()
|
||||||
|
<< ", what=" << exception_text
|
||||||
|
<< std::endl;
|
||||||
|
}
|
||||||
|
catch (const std::exception& ex)
|
||||||
|
{
|
||||||
|
ok = false;
|
||||||
|
caught_exception = true;
|
||||||
|
exception_text = ex.what();
|
||||||
|
std::cout << "[PQDIF][ERROR] exception while processing file="
|
||||||
|
<< item.path.string()
|
||||||
|
<< ", what=" << exception_text
|
||||||
|
<< std::endl;
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
ok = false;
|
||||||
|
caught_exception = true;
|
||||||
|
exception_text = "unknown";
|
||||||
|
std::cout << "[PQDIF][ERROR] unknown exception while processing file="
|
||||||
|
<< item.path.string()
|
||||||
|
<< std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
const fs::path target_root = ok ? fs::path(kDoneRootDir) : fs::path(kFailRootDir);
|
const fs::path target_root = ok ? fs::path(kDoneRootDir) : fs::path(kFailRootDir);
|
||||||
const fs::path dst = target_root / item.mac / item.path.filename();
|
const fs::path dst = target_root / item.mac / item.path.filename();
|
||||||
|
|
||||||
// 处理完成后移动文件,避免下一轮 scan_once() 重复解析同一个 PQDIF。
|
// 处理完成后移动文件,避免下一轮 scan_once() 重复解析同一个 PQDIF。
|
||||||
// 解析成功:download/<mac>/<file>.pqd -> download_done/<mac>/<file>.pqd
|
// 解析成功:download/<mac>/<file>.pqd -> download_done/<mac>/<file>.pqd
|
||||||
// 解析失败:download/<mac>/<file>.pqd -> download_fail/<mac>/<file>.pqd
|
// 解析失败或解析过程中出现异常:download/<mac>/<file>.pqd -> download_fail/<mac>/<file>.pqd
|
||||||
//
|
//
|
||||||
// 调试时如果想让文件保留在 download 目录中、方便反复解析同一个文件,
|
// 调试时如果想让文件保留在 download 目录中、方便反复解析同一个文件,
|
||||||
// 可以临时注释掉下面这个 if 块;调试结束后建议恢复,否则每一轮都会重复解析。
|
// 可以临时注释掉下面这个 if 块;调试结束后建议恢复,否则每一轮都会重复解析。
|
||||||
@@ -7672,6 +8048,13 @@ namespace {
|
|||||||
<< item.path.string() << " -> " << dst.string()
|
<< item.path.string() << " -> " << dst.string()
|
||||||
<< std::endl;
|
<< std::endl;
|
||||||
}
|
}
|
||||||
|
else if (caught_exception)
|
||||||
|
{
|
||||||
|
std::cout << "[PQDIF] exception file moved to fail dir: "
|
||||||
|
<< dst.string()
|
||||||
|
<< ", reason=" << exception_text
|
||||||
|
<< std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
cleanup_backup_dir(fs::path(kDoneRootDir) / item.mac, kBackupLimit);
|
cleanup_backup_dir(fs::path(kDoneRootDir) / item.mac, kBackupLimit);
|
||||||
cleanup_backup_dir(fs::path(kFailRootDir) / item.mac, kBackupLimit);
|
cleanup_backup_dir(fs::path(kFailRootDir) / item.mac, kBackupLimit);
|
||||||
@@ -7877,6 +8260,96 @@ void ClearReadyPqdifStatBase64Queue()
|
|||||||
g_pqdif_stat_base64_ready_queue.clear();
|
g_pqdif_stat_base64_ready_queue.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool GetBase64ByKind(const PqdifStatBase64TimePointPacket& tp, //从序列中获取指定 kind 的 Base64 内容
|
||||||
|
StatValueKind kind,
|
||||||
|
std::string& out)
|
||||||
|
{
|
||||||
|
for (const auto& r : tp.records) {
|
||||||
|
if (r.value_kind == kind) {
|
||||||
|
out = r.base64_payload;
|
||||||
|
return !out.empty();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool extract_monitor_seq_from_local_pqdif_path(const std::string& path,
|
||||||
|
short& point_name)
|
||||||
|
{
|
||||||
|
point_name = 0;
|
||||||
|
|
||||||
|
std::cout << "[extract_monitor_seq] begin path="
|
||||||
|
<< path << std::endl;
|
||||||
|
|
||||||
|
// 取纯文件名,例如:
|
||||||
|
// download/192.168.1.10/M1_xxx.pqd
|
||||||
|
// -> M1_xxx.pqd
|
||||||
|
std::string fname = extract_filename1(path);
|
||||||
|
|
||||||
|
std::cout << "[extract_monitor_seq] filename="
|
||||||
|
<< fname << std::endl;
|
||||||
|
|
||||||
|
if (fname.size() < 3) {
|
||||||
|
std::cout << "[extract_monitor_seq] filename too short"
|
||||||
|
<< std::endl;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fname[0] != 'M' && fname[0] != 'm') {
|
||||||
|
std::cout << "[extract_monitor_seq] filename not start with M/m"
|
||||||
|
<< std::endl;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t pos = fname.find('_');
|
||||||
|
|
||||||
|
std::cout << "[extract_monitor_seq] underscore pos="
|
||||||
|
<< pos << std::endl;
|
||||||
|
|
||||||
|
if (pos == std::string::npos || pos <= 1) {
|
||||||
|
std::cout << "[extract_monitor_seq] invalid underscore position"
|
||||||
|
<< std::endl;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// M1_xxx -> 1
|
||||||
|
std::string seq_str = fname.substr(1, pos - 1);
|
||||||
|
|
||||||
|
std::cout << "[extract_monitor_seq] seq_str="
|
||||||
|
<< seq_str << std::endl;
|
||||||
|
|
||||||
|
for (char c : seq_str) {
|
||||||
|
if (!std::isdigit(static_cast<unsigned char>(c))) {
|
||||||
|
std::cout << "[extract_monitor_seq] non-digit char="
|
||||||
|
<< c << std::endl;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
point_name = static_cast<short>(std::stoi(seq_str));
|
||||||
|
|
||||||
|
std::cout << "[extract_monitor_seq] success point_name="
|
||||||
|
<< point_name << std::endl;
|
||||||
|
|
||||||
|
return point_name > 0;
|
||||||
|
}
|
||||||
|
catch (const std::exception& e) {
|
||||||
|
std::cout << "[extract_monitor_seq] exception="
|
||||||
|
<< e.what() << std::endl;
|
||||||
|
|
||||||
|
point_name = 0;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
catch (...) {
|
||||||
|
std::cout << "[extract_monitor_seq] unknown exception"
|
||||||
|
<< std::endl;
|
||||||
|
|
||||||
|
point_name = 0;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void RunPqdifScanLoop()
|
void RunPqdifScanLoop()
|
||||||
{
|
{
|
||||||
std::cout << "[PQDIF] scan loop started, root=" << kPqdRootDir
|
std::cout << "[PQDIF] scan loop started, root=" << kPqdRootDir
|
||||||
@@ -7910,6 +8383,54 @@ void RunPqdifScanLoop()
|
|||||||
if (PopReadyPqdifStatBase64FileBatch(batch)) {
|
if (PopReadyPqdifStatBase64FileBatch(batch)) {
|
||||||
// batch 就是一个 PQDIF 文件完整的 Base64 组装结果
|
// batch 就是一个 PQDIF 文件完整的 Base64 组装结果
|
||||||
// 在此处处理上送逻辑
|
// 在此处处理上送逻辑
|
||||||
|
const std::string& mac = batch.mac;
|
||||||
|
|
||||||
|
short point_name = 0;
|
||||||
|
|
||||||
|
if (!extract_monitor_seq_from_local_pqdif_path(batch.pqdif_file_path, point_name)) {
|
||||||
|
std::cout << "[PQDIF_UPLOAD] failed to extract monitor seq from file="
|
||||||
|
<< batch.pqdif_file_path << std::endl;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const auto& tp : batch.time_points) {
|
||||||
|
std::string max_base64;
|
||||||
|
std::string min_base64;
|
||||||
|
std::string avg_base64;
|
||||||
|
std::string p95_base64;
|
||||||
|
|
||||||
|
bool has_max = GetBase64ByKind(tp, StatValueKind::Max, max_base64);
|
||||||
|
bool has_min = GetBase64ByKind(tp, StatValueKind::Min, min_base64);
|
||||||
|
bool has_avg = GetBase64ByKind(tp, StatValueKind::Avg, avg_base64);
|
||||||
|
bool has_p95 = GetBase64ByKind(tp, StatValueKind::P95, p95_base64);
|
||||||
|
|
||||||
|
if (!has_max || !has_min || !has_avg || !has_p95) {
|
||||||
|
std::cout << "[PQDIF_UPLOAD] skip incomplete time point, file="
|
||||||
|
<< batch.pqdif_file_path
|
||||||
|
<< " time=" << tp.timestamp_text
|
||||||
|
<< " has_max=" << has_max
|
||||||
|
<< " has_min=" << has_min
|
||||||
|
<< " has_avg=" << has_avg
|
||||||
|
<< " has_p95=" << has_p95
|
||||||
|
<< std::endl;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
enqueue_stat_pq(max_base64,
|
||||||
|
min_base64,
|
||||||
|
avg_base64,
|
||||||
|
p95_base64,
|
||||||
|
tp.timestamp,
|
||||||
|
mac,
|
||||||
|
point_name);
|
||||||
|
|
||||||
|
std::cout << "[PQDIF_UPLOAD] enqueue_stat_pq ok, file="
|
||||||
|
<< batch.pqdif_file_path
|
||||||
|
<< " time=" << tp.timestamp_text
|
||||||
|
<< " mac=" << mac
|
||||||
|
<< " point=" << point_name
|
||||||
|
<< std::endl;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (const std::exception& ex)
|
catch (const std::exception& ex)
|
||||||
|
|||||||
Reference in New Issue
Block a user