Compare commits

6 Commits

7 changed files with 1169 additions and 217 deletions

Binary file not shown.

File diff suppressed because it is too large Load Diff

View File

@@ -68,6 +68,12 @@ enum class ActionResult {
};
// ====== ★修改:扩展 RecallFile支持“多目录 + 文件筛选 + 串行下载”的状态机 ======
enum class RecallFileType {
NONE = 0,
STEADY_FILE, // 稳态文件
VOLTAGE_FILE // 暂态直下文件
};
class RecallFile
{
public:
@@ -75,18 +81,35 @@ public:
int recall_status; // 补招状态 0-未补招 1-补招中 2-补招完成 3-补招失败
std::string StartTime; // 数据补招起始时间yyyy-MM-dd HH:mm:ss
std::string EndTime; // 数据补招结束时间yyyy-MM-dd HH:mm:ss
// ===== 业务类型 =====
// STEADY稳态文件补招
// VOLTAGE暂态事件补招
std::string STEADY; // 补招历史统计数据标识 0-不补招1-补招
std::string VOLTAGE; // 补招暂态事件标识 0-不补招1-补招
// ===== 文件下载类型 =====
RecallFileType file_type = RecallFileType::NONE;
//暂态文件用
bool direct_mode = false; // 直下文件开关true 表示不按时间窗,仅按目标文件名
std::string target_filetimes; // 直下文件匹配时间点yyyyMMdd_HHmmss仅 direct_mode=true 时有效
std::string target_filetimes; // 直下文件匹配时间点yyyyMMdd_HHmmss
// ★新增:按“目录名 -> 文件名列表”的映射;由“其他线程”在目录请求成功后回填
std::map<std::string, std::vector<tag_dir_info>> dir_files;
// ★新增:候选目录(可扩展)
std::vector<std::string> dir_candidates{
std::vector<std::string> steady_dir_candidates{
"/cf/pqdif", //580绝对真实路径
"/bd0/pqdif", //chemengyu提供包含bd0
"/bd0/pqdif/%DESC%/%DAY%",
"/bd0/pqdif/Line%SEQ%",
"/bd0/historyFile/%DESC%",
"/pqdif", // 默认版本 / 新疆 可能取到其他测点文件
"/pqdif/%DESC%/%DAY%", // 上海pqdif_dir_cfg 或描述目录 + 日期 不会取到其他测点文件
"/pqdif/Line%SEQ%", // 云南 不会取到其他测点文件
"/historyFile/%DESC%" // 广东 不会取到其他测点文件
};
std::vector<std::string> voltage_dir_candidates{
"/cf/COMTRADE",
"/bd0/COMTRADE",
"/sd0/COMTRADE",
@@ -102,6 +125,9 @@ public:
ActionResult list_result = ActionResult::PENDING; // 当前目录的列举结果
ActionResult download_result = ActionResult::PENDING; // 当前文件的下载结果
// 稳态文件用:一个 guid 下的多个补招时间段
std::vector<std::pair<long long, long long>> recall_ranges;
// ★新增:下载队列(已筛选出在时间窗内的文件,含完整路径)
std::list<std::string> download_queue; //一个时间可能对应多个文件
std::string downloading_file; // 当前正在下载的文件(完整路径)
@@ -109,8 +135,22 @@ public:
std::unordered_set<std::string> required_files; // 本次应当下载成功的文件全集
std::unordered_set<std::string> file_success; // 已下载成功的文件集合
// 是否稳态文件任务
bool is_steady_file() const {
return file_type == RecallFileType::STEADY_FILE;
}
// 是否暂态直下文件任务
bool is_voltage_file() const {
return file_type == RecallFileType::VOLTAGE_FILE;
}
const std::vector<std::string>& active_dirs() const {
return is_steady_file() ? steady_dir_candidates : voltage_dir_candidates;
}
// ★新增:一个便捷复位
void reset_runtime(bool keep_direct = false)
void reset_runtime(bool keep_target_filetimes = false)
{
phase = RecallPhase::IDLE;
cur_dir_index = 0;
@@ -124,9 +164,10 @@ public:
required_files.clear();
file_success.clear();
// 注意file_type 不属于运行态,不能清除,因为它决定了本次补招的业务类型(稳态/暂态),而这个业务类型在整个补招过程中是固定的,不应当被运行态重置影响
// ★新增:按需保留直下文件开关和目标名
if (!keep_direct) {
direct_mode = false;
if (!keep_target_filetimes) {
target_filetimes.clear(); // ▲列表清空
}
}

View File

@@ -98,6 +98,8 @@ extern int TEST_PORT; //测试端口号
extern std::string FRONT_INST;
extern bool PQD_FLAG;
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////// 功能函数
template<typename T, typename... Args>
@@ -130,6 +132,11 @@ bool parse_param(int argc, char* argv[]) {
try {
g_front_seg_index = std::stoi(val.substr(0, pos));
g_front_seg_num = std::stoi(val.substr(pos + 1));
if (g_front_seg_index == 0) {
PQD_FLAG = true;
}
} catch (...) {
std::cerr << "Invalid -s format." << std::endl;
}
@@ -144,6 +151,11 @@ bool parse_param(int argc, char* argv[]) {
try {
g_front_seg_index = std::stoi(val.substr(0, pos));
g_front_seg_num = std::stoi(val.substr(pos + 1));
if (g_front_seg_index == 0) {
PQD_FLAG = true;
}
} catch (...) {
std::cerr << "Invalid -s format." << std::endl;
}
@@ -223,13 +235,13 @@ std::string get_parent_directory() {
//解析模板文件
//Set_xml_nodeinfo();
StartFrontThread(); //开启主线程
StartMQConsumerThread(); //开启消费者线程
StartMQProducerThread(); //开启生产者线程
StartTimerThread(); //开启定时线程
if(!PQD_FLAG){
StartFrontThread(); //开启主线程
StartMQConsumerThread(); //开启消费者线程
StartTimerThread(); //开启定时线程
}
//启动worker 根据启动标志启动
if(G_TEST_FLAG){

View File

@@ -758,12 +758,20 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) {
<< ", VOLTAGE=" << rf.VOLTAGE
<< "\n";
// ★新增:直下模式与目标时间列表
os << "\r\x1B[K |-- direct_mode=" << (rf.direct_mode ? "true" : "false")
<< ", target_filetimes(" << rf.target_filetimes << ")\n";
{
os << "\r\x1B[K |.. " << rf.target_filetimes << "\n";
// ★新增:文件补招类型与目标信息
os << "\r\x1B[K |-- file_type="
<< (rf.is_steady_file() ? "STEADY_FILE" :
rf.is_voltage_file() ? "VOLTAGE_FILE" : "NONE");
if (rf.is_voltage_file()) {
os << ", target_filetimes=" << rf.target_filetimes;
}
else if (rf.is_steady_file()) {
os << ", time_range=" << rf.StartTime
<< " ~ " << rf.EndTime;
}
os << "\n";
// ★新增:状态机运行态
os << "\r\x1B[K |-- phase=" << phaseStr(rf.phase)
@@ -773,15 +781,15 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) {
<< ", download_result=" << resultStr(rf.download_result) << "\n";
// ★新增:候选目录
os << "\r\x1B[K |-- dir_candidates(" << rf.dir_candidates.size() << ")\n";
os << "\r\x1B[K |-- active_dirs(" << rf.active_dirs().size() << ")\n";
{
size_t c = 0;
for (const auto& d : rf.dir_candidates) {
for (const auto& d : rf.active_dirs()) {
if (c++ >= MAX_ITEMS) break;
os << "\r\x1B[K |-- " << d << "\n";
}
if (rf.dir_candidates.size() > MAX_ITEMS) {
os << "\r\x1B[K |.. (+" << (rf.dir_candidates.size() - MAX_ITEMS) << " more)\n";
if (rf.active_dirs().size() > MAX_ITEMS) {
os << "\r\x1B[K |.. (+" << (rf.active_dirs().size() - MAX_ITEMS) << " more)\n";
}
}

View File

@@ -350,11 +350,13 @@ void process_received_message(string mac, string id,const char* data, size_t len
<< ", 时间戳: " << record.triggerTimeMs << "ms" << std::endl;
//lnk20250805 事件上送先记录,录波文件上传结束后再更新文件
append_qvvr_event(id,event.head.name,
if(record.nType != 0){
append_qvvr_event(id,event.head.name,
record.nType,record.fPersisstime,record.fMagntitude,record.triggerTimeMs,record.phase);
transfer_json_qvvr_data(id,event.head.name,
transfer_json_qvvr_data(id,event.head.name,
record.fMagntitude,record.fPersisstime,record.triggerTimeMs,record.nType,record.phase,
"");
}
//事件主动上送处理完成,不需要通知状态机
}
@@ -2449,12 +2451,14 @@ void process_received_message(string mac, string id,const char* data, size_t len
<< ", 时间戳: " << record.triggerTimeMs << "ms" << std::endl;
//记录补招上来的暂态事件
append_qvvr_event(id,event.head.name,
record.nType,record.fPersisstime,record.fMagntitude,record.triggerTimeMs,record.phase);
if(record.nType != 0){
append_qvvr_event(id,event.head.name,
record.nType,record.fPersisstime,record.fMagntitude,record.triggerTimeMs,record.phase);
//直接发走暂态事件
transfer_json_qvvr_data(id,event.head.name,
record.fMagntitude,record.fPersisstime,record.triggerTimeMs,record.nType,record.phase,"");
}
//通知状态机补招暂态事件成功
on_device_response_minimal(static_cast<int>(ResponseCode::OK), id, 0, static_cast<int>(DeviceState::READING_EVENTLOG));

View File

@@ -20,6 +20,7 @@
#include <cstdlib>
#include <limits>
#include <utility>
#include <new>
// PQDIF 解析库
#include "pqdif/PQDIF.h"
@@ -28,6 +29,17 @@
#include "pqdif/include/pqdif_lg.h"
#include "pqdif_semantic_ids.h"
#include "cloudfront/code/log4.h" //lnk20260526
extern void enqueue_stat_pq(const std::string& max_base64Str,
const std::string& min_base64Str,
const std::string& avg_base64Str,
const std::string& cp95_base64Str,
time_t data_time,
const std::string& mac,
short cid);
extern std::string extract_filename1(const std::string& path);
namespace fs = std::experimental::filesystem;
namespace {
@@ -37,7 +49,11 @@ namespace {
constexpr int kMaxPqdifFilesPerScan = 1;
constexpr size_t kParsedCacheLimit = 128;
const char* kPqdRootDir = "download";
// 大文件流式阈值:估算展开点数超过该值时,不再构造 expanded_stat_points
// 而是按通道逐步聚合成时间桶并直接组装 Base64避免单文件中间对象占用过大内存。
constexpr size_t kPqdifLargeFileStreamingPointThreshold = 800000;
const char* kPqdRootDir = "download_pqdif";
const char* kDoneRootDir = "download_done";
const char* kFailRootDir = "download_fail";
@@ -5072,6 +5088,250 @@ namespace {
return stat_select_best_metric_sources(out);
}
size_t stat_estimate_candidate_point_count_for_streaming(const PqdifLogicalFile& lf)
{
size_t total = 0;
for (const auto& obs : lf.observations)
{
for (const auto& ch : obs.channel_instances)
{
if (!pqdif_sem::IsQuantityTypeValueLog(ch.quantity_type_id.value))
continue;
for (const auto& si : ch.series_instances)
{
if (pqdif_sem::IsValueTypeTime(si.value_type_id.value))
continue;
const PqdifSeriesInstance* resolved = stat_resolve_shared_series(obs, si);
if (resolved == nullptr || resolved->values.count <= 0)
continue;
total += static_cast<size_t>(resolved->values.count);
if (total > kPqdifLargeFileStreamingPointThreshold)
return total;
}
}
}
return total;
}
struct StatStreamingSourceBucket
{
StatMetricId metric_id = StatMetricId::Unknown;
StatMetricSourceKey key;
StatMetricSourceStats stats;
std::map<time_t, AggregatedStatValues> values_by_time;
std::map<time_t, std::string> time_text_by_time;
};
void stat_streaming_set_value(AggregatedStatValues& agg, const ExpandedStatPoint& p)
{
if (agg.source_observation_index < 0)
{
agg.source_observation_index = p.observation_index;
agg.source_channel_instance_index = p.channel_instance_index;
agg.source_series_instance_index = p.series_instance_index;
agg.source_channel_name = p.channel_name;
agg.quality = StatMetricQuality::Normal;
agg.quality_reason = "ok";
}
// 同一来源同一时间同一种统计值如果重复,保留首次写入,避免一次性覆盖造成结果不稳定。
if (stat_has_value_kind(agg, p.stat_kind))
return;
agg.source_series_instance_index = p.series_instance_index;
switch (p.stat_kind)
{
case StatValueKind::Min:
agg.has_min = true;
agg.min_value = p.value;
break;
case StatValueKind::Max:
agg.has_max = true;
agg.max_value = p.value;
break;
case StatValueKind::Avg:
agg.has_avg = true;
agg.avg_value = p.value;
break;
case StatValueKind::P95:
agg.has_p95 = true;
agg.p95_value = p.value;
break;
default:
break;
}
}
void stat_streaming_add_point(
std::map<StatMetricId, std::map<StatMetricSourceKey, StatStreamingSourceBucket>>& by_metric,
const ExpandedStatPoint& p)
{
if (p.metric_id == StatMetricId::Unknown || p.stat_kind == StatValueKind::Unknown)
return;
const StatMetricSourceKey key = stat_make_source_key(p);
StatStreamingSourceBucket& src = by_metric[p.metric_id][key];
if (src.metric_id == StatMetricId::Unknown)
{
src.metric_id = p.metric_id;
src.key = key;
}
src.stats.add(p);
AggregatedStatValues& agg = src.values_by_time[p.timestamp];
stat_streaming_set_value(agg, p);
if (src.time_text_by_time.find(p.timestamp) == src.time_text_by_time.end())
src.time_text_by_time[p.timestamp] = p.timestamp_text;
}
std::vector<TimeAggregatedStatBucket> stat_build_aggregated_buckets_streaming(
const PqdifLogicalFile& lf,
ParsedConnectionKind connection_kind,
int& selected_observation_index,
std::string& selected_observation_name,
size_t& out_raw_point_count,
size_t& out_selected_metric_count,
size_t& out_selected_dynamic_metric_count)
{
selected_observation_index = -1;
selected_observation_name.clear();
out_raw_point_count = 0;
out_selected_metric_count = 0;
out_selected_dynamic_metric_count = 0;
const PqdifObservationRecord* primary = stat_select_primary_statistical_observation(lf);
if (primary != nullptr)
{
selected_observation_index = primary->observation_index;
selected_observation_name = primary->observation_name;
}
std::map<StatMetricId, std::map<StatMetricSourceKey, StatStreamingSourceBucket>> by_metric;
// 大文件模式:遍历全部 observations但每次只临时展开一个通道随后立刻压缩到
// metric/source/timestamp/kind 聚合结构中,不保留全量 ExpandedStatPoint。
for (const auto& obs : lf.observations)
{
for (const auto& ch : obs.channel_instances)
{
std::vector<ExpandedStatPoint> points = stat_expand_channel_points(lf, connection_kind, obs, ch);
out_raw_point_count += points.size();
for (const auto& p : points)
stat_streaming_add_point(by_metric, p);
}
}
std::map<time_t, TimeAggregatedStatBucket> global_buckets;
size_t duplicate_metric_count = 0;
for (auto& metric_pair : by_metric)
{
const StatMetricId metric_id = metric_pair.first;
auto& sources = metric_pair.second;
if (sources.empty())
continue;
bool has_selected = false;
StatMetricSourceKey best_key;
int best_score = std::numeric_limits<int>::min();
for (auto& src_pair : sources)
{
int score = stat_metric_source_score(src_pair.second.stats);
// 大文件流式模式会遍历全部 observations。为尽量保持原逻辑普通核心指标在分数接近时
// 优先使用主统计 observation谐波等只存在于其他 observation 时仍能正常选中。
if (selected_observation_index >= 0 && src_pair.first.observation_index == selected_observation_index)
score += 25;
if (!has_selected || score > best_score)
{
has_selected = true;
best_score = score;
best_key = src_pair.first;
}
}
if (!has_selected)
continue;
++out_selected_metric_count;
if (stat_is_dynamic_metric(metric_id))
++out_selected_dynamic_metric_count;
if (sources.size() > 1)
++duplicate_metric_count;
StatStreamingSourceBucket& selected_src = sources[best_key];
for (auto& time_pair : selected_src.values_by_time)
{
TimeAggregatedStatBucket& bucket = global_buckets[time_pair.first];
if (bucket.timestamp == 0)
{
bucket.timestamp = time_pair.first;
const auto tit = selected_src.time_text_by_time.find(time_pair.first);
bucket.timestamp_text = (tit == selected_src.time_text_by_time.end()) ? format_time_text(time_pair.first) : tit->second;
}
bucket.metrics[metric_id] = time_pair.second;
}
}
std::vector<TimeAggregatedStatBucket> out;
out.reserve(global_buckets.size());
for (auto& kv : global_buckets)
out.push_back(std::move(kv.second));
if (pqdif_log_enabled(PqdifLogLevel::Core))
{
std::cout << "========== PQDIF LARGE FILE STREAMING SUMMARY ==========" << std::endl;
std::cout << "raw_points=" << out_raw_point_count
<< ", candidate_metric_count=" << by_metric.size()
<< ", selected_metric_count=" << out_selected_metric_count
<< ", selected_dynamic_metrics=" << out_selected_dynamic_metric_count << "/" << stat_all_dynamic_metric_order().size()
<< ", duplicate_metric_count=" << duplicate_metric_count
<< ", buckets=" << out.size()
<< ", selected_observation_index=" << selected_observation_index
<< ", selected_observation_name=" << selected_observation_name
<< std::endl;
std::cout << "mode=streaming_no_expanded_points; detail=per-channel expand then immediate aggregate" << std::endl;
std::cout << "========================================================" << std::endl;
}
return out;
}
void stat_print_aggregated_metric_line(StatMetricId metric_id, const AggregatedStatValues* agg);
void dump_grouped_bucket_streaming_preview(const ParsedPqdifFile& parsed_file)
{
std::cout << "========== GROUPED STAT STREAMING CORE SUMMARY ==========" << std::endl;
std::cout << "connection_kind=" << stat_connection_kind_name(parsed_file.connection_kind)
<< ", selected_observation_index=" << parsed_file.selected_observation_index
<< ", selected_observation_name=" << parsed_file.selected_observation_name
<< ", expanded_points=SKIPPED_BY_STREAMING"
<< ", buckets=" << parsed_file.aggregated_stat_buckets.size()
<< ", core_metric_slots=" << stat_core_metric_print_order().size()
<< ", dynamic_spectrum_slots=" << stat_all_dynamic_metric_order().size()
<< std::endl;
const size_t bucket_limit = std::min<size_t>(parsed_file.aggregated_stat_buckets.size(), 3);
for (size_t i = 0; i < bucket_limit; ++i)
{
const auto& b = parsed_file.aggregated_stat_buckets[i];
std::cout << " [BUCKET " << i << "] time=" << b.timestamp_text
<< ", metric_count_present=" << b.metrics.size()
<< std::endl;
for (const auto metric_id : stat_core_metric_print_order())
{
const auto it = b.metrics.find(metric_id);
if (it != b.metrics.end())
stat_print_aggregated_metric_line(metric_id, &it->second);
}
}
std::cout << "=========================================================" << std::endl;
}
bool pqdif_probe_text_looks_like_flicker(const std::string& text)
{
const std::string key = normalize_key(text);
@@ -7496,9 +7756,41 @@ namespace {
<< ", total_base64_chars=" << file_batch.total_base64_chars
<< std::endl;
// 按你的要求,入队前完整打印保存对象内部结构:文件批次 -> 时间点 -> Max/Min/Avg/P95 子记录 -> Base64 内容
// 日志会很长;确认完成后可以临时注释掉这一行,或改成 if (pqdif_log_enabled(PqdifLogLevel::Info)) 包裹
//pqdif_dump_stat_base64_file_batch_full(file_batch);
// 完整 Base64 内容日志非常长,尤其大文件会明显放大内存和 I/O 压力
// 现在仅在 Debug/Trace 级别打印完整对象Core/Info 只打印摘要和前几条子记录
if (pqdif_log_enabled(PqdifLogLevel::Debug))
{
pqdif_dump_stat_base64_file_batch_full(file_batch);
}
else
{
std::cout << " [BASE64 FILE BATCH COMPACT] file=" << file_batch.pqdif_file_path
<< ", time_points=" << file_batch.time_point_count
<< ", records=" << file_batch.total_record_count
<< ", total_float_count=" << file_batch.total_float_count
<< ", total_placeholder_count=" << file_batch.total_placeholder_count
<< ", total_base64_chars=" << file_batch.total_base64_chars
<< std::endl;
size_t shown = 0;
for (const auto& tp : file_batch.time_points)
{
for (const auto& rec : tp.records)
{
if (shown >= 4)
break;
std::cout << " [BASE64 SAVED SAMPLE] time=" << rec.timestamp_text
<< ", kind=" << rec.value_kind_name
<< ", floats=" << rec.float_count
<< ", placeholders=" << rec.placeholder_count
<< ", base64_len=" << rec.base64_payload.size()
<< std::endl;
++shown;
}
if (shown >= 4)
break;
}
}
push_pqdif_stat_base64_file_batch(std::move(file_batch));
@@ -7548,6 +7840,54 @@ namespace {
// 后续指标判断需要先区分星型 / 角型两套规则。
parsed_file.connection_kind = stat_classify_connection_kind(parsed_file.logical_file);
// 2) 根据估算展开点数决定是否启用“大文件流式模式”。
// 普通文件仍走原 expanded_stat_points -> grouped buckets 流程;
// 大文件则直接按通道聚合为 buckets不再保存全量 expanded_stat_points。
const size_t estimated_candidate_points =
stat_estimate_candidate_point_count_for_streaming(parsed_file.logical_file);
const bool use_large_file_streaming =
estimated_candidate_points > kPqdifLargeFileStreamingPointThreshold;
if (use_large_file_streaming)
{
std::cout << "[PQDIF][MEMORY][STREAMING] large file detected, use streaming bucket/base64 build. "
<< "estimated_candidate_points=" << estimated_candidate_points
<< ", threshold=" << kPqdifLargeFileStreamingPointThreshold
<< std::endl;
size_t raw_point_count = 0;
size_t selected_metric_count = 0;
size_t selected_dynamic_metric_count = 0;
parsed_file.aggregated_stat_buckets = stat_build_aggregated_buckets_streaming(
parsed_file.logical_file,
parsed_file.connection_kind,
parsed_file.selected_observation_index,
parsed_file.selected_observation_name,
raw_point_count,
selected_metric_count,
selected_dynamic_metric_count);
if (pqdif_is_trace_log_enabled())
dump_semantic_probe(parsed_file);
dump_grouped_bucket_streaming_preview(parsed_file);
// 3) 大文件仍完整生成 Base64 文件批次,只是跳过 expanded_stat_points 中间缓存。
pqdif_build_and_queue_base64_records(parsed_file);
// 大文件模式不再把完整 ParsedPqdifFile 放入解析缓存,避免 logical_file + buckets 在内存中长期驻留。
// 后续业务请从 PqdifStatBase64FileBatch 队列读取最终 Base64 结果。
std::cout << "[PQDIF][MEMORY][STREAMING] skip parsed_file cache for large file, "
<< "base64 batch has been queued. file=" << file_path.string()
<< std::endl;
std::cout << "[PQDIF] processed ok(streaming): " << file_path.string()
<< ", cache_size=" << GetParsedPqdifCacheSize()
<< std::endl;
return true;
}
// 2) 只筛选“统计类 observation”并对其展开目标统计指标。
// 当前阶段不处理全部 observation避免不同 observation 混入同一时间聚合结果。
parsed_file.expanded_stat_points = stat_expand_selected_statistical_observation(
@@ -7655,14 +7995,50 @@ namespace {
<< ", max_per_scan=" << kMaxPqdifFilesPerScan
<< std::endl;
const bool ok = process_single_pqdif_file(item.path, item.mac);
bool ok = false;
bool caught_exception = false;
std::string exception_text;
try
{
ok = process_single_pqdif_file(item.path, item.mac);
}
catch (const std::bad_alloc& ex)
{
ok = false;
caught_exception = true;
exception_text = ex.what();
std::cout << "[PQDIF][OOM] std::bad_alloc while processing file="
<< item.path.string()
<< ", what=" << exception_text
<< std::endl;
}
catch (const std::exception& ex)
{
ok = false;
caught_exception = true;
exception_text = ex.what();
std::cout << "[PQDIF][ERROR] exception while processing file="
<< item.path.string()
<< ", what=" << exception_text
<< std::endl;
}
catch (...)
{
ok = false;
caught_exception = true;
exception_text = "unknown";
std::cout << "[PQDIF][ERROR] unknown exception while processing file="
<< item.path.string()
<< std::endl;
}
const fs::path target_root = ok ? fs::path(kDoneRootDir) : fs::path(kFailRootDir);
const fs::path dst = target_root / item.mac / item.path.filename();
// 处理完成后移动文件,避免下一轮 scan_once() 重复解析同一个 PQDIF。
// 解析成功download/<mac>/<file>.pqd -> download_done/<mac>/<file>.pqd
// 解析失败download/<mac>/<file>.pqd -> download_fail/<mac>/<file>.pqd
// 解析失败或解析过程中出现异常download/<mac>/<file>.pqd -> download_fail/<mac>/<file>.pqd
//
// 调试时如果想让文件保留在 download 目录中、方便反复解析同一个文件,
// 可以临时注释掉下面这个 if 块;调试结束后建议恢复,否则每一轮都会重复解析。
@@ -7672,6 +8048,13 @@ namespace {
<< item.path.string() << " -> " << dst.string()
<< std::endl;
}
else if (caught_exception)
{
std::cout << "[PQDIF] exception file moved to fail dir: "
<< dst.string()
<< ", reason=" << exception_text
<< std::endl;
}
cleanup_backup_dir(fs::path(kDoneRootDir) / item.mac, kBackupLimit);
cleanup_backup_dir(fs::path(kFailRootDir) / item.mac, kBackupLimit);
@@ -7877,6 +8260,96 @@ void ClearReadyPqdifStatBase64Queue()
g_pqdif_stat_base64_ready_queue.clear();
}
static bool GetBase64ByKind(const PqdifStatBase64TimePointPacket& tp, //从序列中获取指定 kind 的 Base64 内容
StatValueKind kind,
std::string& out)
{
for (const auto& r : tp.records) {
if (r.value_kind == kind) {
out = r.base64_payload;
return !out.empty();
}
}
return false;
}
static bool extract_monitor_seq_from_local_pqdif_path(const std::string& path,
short& point_name)
{
point_name = 0;
std::cout << "[extract_monitor_seq] begin path="
<< path << std::endl;
// 取纯文件名,例如:
// download/192.168.1.10/M1_xxx.pqd
// -> M1_xxx.pqd
std::string fname = extract_filename1(path);
std::cout << "[extract_monitor_seq] filename="
<< fname << std::endl;
if (fname.size() < 3) {
std::cout << "[extract_monitor_seq] filename too short"
<< std::endl;
return false;
}
if (fname[0] != 'M' && fname[0] != 'm') {
std::cout << "[extract_monitor_seq] filename not start with M/m"
<< std::endl;
return false;
}
size_t pos = fname.find('_');
std::cout << "[extract_monitor_seq] underscore pos="
<< pos << std::endl;
if (pos == std::string::npos || pos <= 1) {
std::cout << "[extract_monitor_seq] invalid underscore position"
<< std::endl;
return false;
}
// M1_xxx -> 1
std::string seq_str = fname.substr(1, pos - 1);
std::cout << "[extract_monitor_seq] seq_str="
<< seq_str << std::endl;
for (char c : seq_str) {
if (!std::isdigit(static_cast<unsigned char>(c))) {
std::cout << "[extract_monitor_seq] non-digit char="
<< c << std::endl;
return false;
}
}
try {
point_name = static_cast<short>(std::stoi(seq_str));
std::cout << "[extract_monitor_seq] success point_name="
<< point_name << std::endl;
return point_name > 0;
}
catch (const std::exception& e) {
std::cout << "[extract_monitor_seq] exception="
<< e.what() << std::endl;
point_name = 0;
return false;
}
catch (...) {
std::cout << "[extract_monitor_seq] unknown exception"
<< std::endl;
point_name = 0;
return false;
}
}
void RunPqdifScanLoop()
{
std::cout << "[PQDIF] scan loop started, root=" << kPqdRootDir
@@ -7910,6 +8383,54 @@ void RunPqdifScanLoop()
if (PopReadyPqdifStatBase64FileBatch(batch)) {
// batch 就是一个 PQDIF 文件完整的 Base64 组装结果
// 在此处处理上送逻辑
const std::string& mac = batch.mac;
short point_name = 0;
if (!extract_monitor_seq_from_local_pqdif_path(batch.pqdif_file_path, point_name)) {
std::cout << "[PQDIF_UPLOAD] failed to extract monitor seq from file="
<< batch.pqdif_file_path << std::endl;
continue;
}
for (const auto& tp : batch.time_points) {
std::string max_base64;
std::string min_base64;
std::string avg_base64;
std::string p95_base64;
bool has_max = GetBase64ByKind(tp, StatValueKind::Max, max_base64);
bool has_min = GetBase64ByKind(tp, StatValueKind::Min, min_base64);
bool has_avg = GetBase64ByKind(tp, StatValueKind::Avg, avg_base64);
bool has_p95 = GetBase64ByKind(tp, StatValueKind::P95, p95_base64);
if (!has_max || !has_min || !has_avg || !has_p95) {
std::cout << "[PQDIF_UPLOAD] skip incomplete time point, file="
<< batch.pqdif_file_path
<< " time=" << tp.timestamp_text
<< " has_max=" << has_max
<< " has_min=" << has_min
<< " has_avg=" << has_avg
<< " has_p95=" << has_p95
<< std::endl;
continue;
}
enqueue_stat_pq(max_base64,
min_base64,
avg_base64,
p95_base64,
tp.timestamp,
mac,
point_name);
std::cout << "[PQDIF_UPLOAD] enqueue_stat_pq ok, file="
<< batch.pqdif_file_path
<< " time=" << tp.timestamp_text
<< " mac=" << mac
<< " point=" << point_name
<< std::endl;
}
}
}
catch (const std::exception& ex)