新增了大文件处理,防止大量pqdif文件导致内存异常。另外添加了解析内存异常时,尝试删除原文件,防止一直溢出。
This commit is contained in:
@@ -20,6 +20,7 @@
|
||||
#include <cstdlib>
|
||||
#include <limits>
|
||||
#include <utility>
|
||||
#include <new>
|
||||
|
||||
// PQDIF 解析库
|
||||
#include "pqdif/PQDIF.h"
|
||||
@@ -37,6 +38,10 @@ namespace {
|
||||
constexpr int kMaxPqdifFilesPerScan = 1;
|
||||
constexpr size_t kParsedCacheLimit = 128;
|
||||
|
||||
// 大文件流式阈值:估算展开点数超过该值时,不再构造 expanded_stat_points,
|
||||
// 而是按通道逐步聚合成时间桶并直接组装 Base64,避免单文件中间对象占用过大内存。
|
||||
constexpr size_t kPqdifLargeFileStreamingPointThreshold = 800000;
|
||||
|
||||
const char* kPqdRootDir = "download";
|
||||
const char* kDoneRootDir = "download_done";
|
||||
const char* kFailRootDir = "download_fail";
|
||||
@@ -5072,6 +5077,250 @@ namespace {
|
||||
return stat_select_best_metric_sources(out);
|
||||
}
|
||||
|
||||
|
||||
size_t stat_estimate_candidate_point_count_for_streaming(const PqdifLogicalFile& lf)
|
||||
{
|
||||
size_t total = 0;
|
||||
for (const auto& obs : lf.observations)
|
||||
{
|
||||
for (const auto& ch : obs.channel_instances)
|
||||
{
|
||||
if (!pqdif_sem::IsQuantityTypeValueLog(ch.quantity_type_id.value))
|
||||
continue;
|
||||
|
||||
for (const auto& si : ch.series_instances)
|
||||
{
|
||||
if (pqdif_sem::IsValueTypeTime(si.value_type_id.value))
|
||||
continue;
|
||||
|
||||
const PqdifSeriesInstance* resolved = stat_resolve_shared_series(obs, si);
|
||||
if (resolved == nullptr || resolved->values.count <= 0)
|
||||
continue;
|
||||
|
||||
total += static_cast<size_t>(resolved->values.count);
|
||||
if (total > kPqdifLargeFileStreamingPointThreshold)
|
||||
return total;
|
||||
}
|
||||
}
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
struct StatStreamingSourceBucket
|
||||
{
|
||||
StatMetricId metric_id = StatMetricId::Unknown;
|
||||
StatMetricSourceKey key;
|
||||
StatMetricSourceStats stats;
|
||||
std::map<time_t, AggregatedStatValues> values_by_time;
|
||||
std::map<time_t, std::string> time_text_by_time;
|
||||
};
|
||||
|
||||
void stat_streaming_set_value(AggregatedStatValues& agg, const ExpandedStatPoint& p)
|
||||
{
|
||||
if (agg.source_observation_index < 0)
|
||||
{
|
||||
agg.source_observation_index = p.observation_index;
|
||||
agg.source_channel_instance_index = p.channel_instance_index;
|
||||
agg.source_series_instance_index = p.series_instance_index;
|
||||
agg.source_channel_name = p.channel_name;
|
||||
agg.quality = StatMetricQuality::Normal;
|
||||
agg.quality_reason = "ok";
|
||||
}
|
||||
|
||||
// 同一来源同一时间同一种统计值如果重复,保留首次写入,避免一次性覆盖造成结果不稳定。
|
||||
if (stat_has_value_kind(agg, p.stat_kind))
|
||||
return;
|
||||
|
||||
agg.source_series_instance_index = p.series_instance_index;
|
||||
switch (p.stat_kind)
|
||||
{
|
||||
case StatValueKind::Min:
|
||||
agg.has_min = true;
|
||||
agg.min_value = p.value;
|
||||
break;
|
||||
case StatValueKind::Max:
|
||||
agg.has_max = true;
|
||||
agg.max_value = p.value;
|
||||
break;
|
||||
case StatValueKind::Avg:
|
||||
agg.has_avg = true;
|
||||
agg.avg_value = p.value;
|
||||
break;
|
||||
case StatValueKind::P95:
|
||||
agg.has_p95 = true;
|
||||
agg.p95_value = p.value;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void stat_streaming_add_point(
|
||||
std::map<StatMetricId, std::map<StatMetricSourceKey, StatStreamingSourceBucket>>& by_metric,
|
||||
const ExpandedStatPoint& p)
|
||||
{
|
||||
if (p.metric_id == StatMetricId::Unknown || p.stat_kind == StatValueKind::Unknown)
|
||||
return;
|
||||
|
||||
const StatMetricSourceKey key = stat_make_source_key(p);
|
||||
StatStreamingSourceBucket& src = by_metric[p.metric_id][key];
|
||||
if (src.metric_id == StatMetricId::Unknown)
|
||||
{
|
||||
src.metric_id = p.metric_id;
|
||||
src.key = key;
|
||||
}
|
||||
|
||||
src.stats.add(p);
|
||||
AggregatedStatValues& agg = src.values_by_time[p.timestamp];
|
||||
stat_streaming_set_value(agg, p);
|
||||
if (src.time_text_by_time.find(p.timestamp) == src.time_text_by_time.end())
|
||||
src.time_text_by_time[p.timestamp] = p.timestamp_text;
|
||||
}
|
||||
|
||||
std::vector<TimeAggregatedStatBucket> stat_build_aggregated_buckets_streaming(
|
||||
const PqdifLogicalFile& lf,
|
||||
ParsedConnectionKind connection_kind,
|
||||
int& selected_observation_index,
|
||||
std::string& selected_observation_name,
|
||||
size_t& out_raw_point_count,
|
||||
size_t& out_selected_metric_count,
|
||||
size_t& out_selected_dynamic_metric_count)
|
||||
{
|
||||
selected_observation_index = -1;
|
||||
selected_observation_name.clear();
|
||||
out_raw_point_count = 0;
|
||||
out_selected_metric_count = 0;
|
||||
out_selected_dynamic_metric_count = 0;
|
||||
|
||||
const PqdifObservationRecord* primary = stat_select_primary_statistical_observation(lf);
|
||||
if (primary != nullptr)
|
||||
{
|
||||
selected_observation_index = primary->observation_index;
|
||||
selected_observation_name = primary->observation_name;
|
||||
}
|
||||
|
||||
std::map<StatMetricId, std::map<StatMetricSourceKey, StatStreamingSourceBucket>> by_metric;
|
||||
|
||||
// 大文件模式:遍历全部 observations,但每次只临时展开一个通道,随后立刻压缩到
|
||||
// metric/source/timestamp/kind 聚合结构中,不保留全量 ExpandedStatPoint。
|
||||
for (const auto& obs : lf.observations)
|
||||
{
|
||||
for (const auto& ch : obs.channel_instances)
|
||||
{
|
||||
std::vector<ExpandedStatPoint> points = stat_expand_channel_points(lf, connection_kind, obs, ch);
|
||||
out_raw_point_count += points.size();
|
||||
for (const auto& p : points)
|
||||
stat_streaming_add_point(by_metric, p);
|
||||
}
|
||||
}
|
||||
|
||||
std::map<time_t, TimeAggregatedStatBucket> global_buckets;
|
||||
size_t duplicate_metric_count = 0;
|
||||
|
||||
for (auto& metric_pair : by_metric)
|
||||
{
|
||||
const StatMetricId metric_id = metric_pair.first;
|
||||
auto& sources = metric_pair.second;
|
||||
if (sources.empty())
|
||||
continue;
|
||||
|
||||
bool has_selected = false;
|
||||
StatMetricSourceKey best_key;
|
||||
int best_score = std::numeric_limits<int>::min();
|
||||
|
||||
for (auto& src_pair : sources)
|
||||
{
|
||||
int score = stat_metric_source_score(src_pair.second.stats);
|
||||
// 大文件流式模式会遍历全部 observations。为尽量保持原逻辑,普通核心指标在分数接近时
|
||||
// 优先使用主统计 observation;谐波等只存在于其他 observation 时仍能正常选中。
|
||||
if (selected_observation_index >= 0 && src_pair.first.observation_index == selected_observation_index)
|
||||
score += 25;
|
||||
|
||||
if (!has_selected || score > best_score)
|
||||
{
|
||||
has_selected = true;
|
||||
best_score = score;
|
||||
best_key = src_pair.first;
|
||||
}
|
||||
}
|
||||
|
||||
if (!has_selected)
|
||||
continue;
|
||||
|
||||
++out_selected_metric_count;
|
||||
if (stat_is_dynamic_metric(metric_id))
|
||||
++out_selected_dynamic_metric_count;
|
||||
if (sources.size() > 1)
|
||||
++duplicate_metric_count;
|
||||
|
||||
StatStreamingSourceBucket& selected_src = sources[best_key];
|
||||
for (auto& time_pair : selected_src.values_by_time)
|
||||
{
|
||||
TimeAggregatedStatBucket& bucket = global_buckets[time_pair.first];
|
||||
if (bucket.timestamp == 0)
|
||||
{
|
||||
bucket.timestamp = time_pair.first;
|
||||
const auto tit = selected_src.time_text_by_time.find(time_pair.first);
|
||||
bucket.timestamp_text = (tit == selected_src.time_text_by_time.end()) ? format_time_text(time_pair.first) : tit->second;
|
||||
}
|
||||
bucket.metrics[metric_id] = time_pair.second;
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<TimeAggregatedStatBucket> out;
|
||||
out.reserve(global_buckets.size());
|
||||
for (auto& kv : global_buckets)
|
||||
out.push_back(std::move(kv.second));
|
||||
|
||||
if (pqdif_log_enabled(PqdifLogLevel::Core))
|
||||
{
|
||||
std::cout << "========== PQDIF LARGE FILE STREAMING SUMMARY ==========" << std::endl;
|
||||
std::cout << "raw_points=" << out_raw_point_count
|
||||
<< ", candidate_metric_count=" << by_metric.size()
|
||||
<< ", selected_metric_count=" << out_selected_metric_count
|
||||
<< ", selected_dynamic_metrics=" << out_selected_dynamic_metric_count << "/" << stat_all_dynamic_metric_order().size()
|
||||
<< ", duplicate_metric_count=" << duplicate_metric_count
|
||||
<< ", buckets=" << out.size()
|
||||
<< ", selected_observation_index=" << selected_observation_index
|
||||
<< ", selected_observation_name=" << selected_observation_name
|
||||
<< std::endl;
|
||||
std::cout << "mode=streaming_no_expanded_points; detail=per-channel expand then immediate aggregate" << std::endl;
|
||||
std::cout << "========================================================" << std::endl;
|
||||
}
|
||||
|
||||
return out;
|
||||
}
|
||||
|
||||
void stat_print_aggregated_metric_line(StatMetricId metric_id, const AggregatedStatValues* agg);
|
||||
void dump_grouped_bucket_streaming_preview(const ParsedPqdifFile& parsed_file)
|
||||
{
|
||||
std::cout << "========== GROUPED STAT STREAMING CORE SUMMARY ==========" << std::endl;
|
||||
std::cout << "connection_kind=" << stat_connection_kind_name(parsed_file.connection_kind)
|
||||
<< ", selected_observation_index=" << parsed_file.selected_observation_index
|
||||
<< ", selected_observation_name=" << parsed_file.selected_observation_name
|
||||
<< ", expanded_points=SKIPPED_BY_STREAMING"
|
||||
<< ", buckets=" << parsed_file.aggregated_stat_buckets.size()
|
||||
<< ", core_metric_slots=" << stat_core_metric_print_order().size()
|
||||
<< ", dynamic_spectrum_slots=" << stat_all_dynamic_metric_order().size()
|
||||
<< std::endl;
|
||||
|
||||
const size_t bucket_limit = std::min<size_t>(parsed_file.aggregated_stat_buckets.size(), 3);
|
||||
for (size_t i = 0; i < bucket_limit; ++i)
|
||||
{
|
||||
const auto& b = parsed_file.aggregated_stat_buckets[i];
|
||||
std::cout << " [BUCKET " << i << "] time=" << b.timestamp_text
|
||||
<< ", metric_count_present=" << b.metrics.size()
|
||||
<< std::endl;
|
||||
for (const auto metric_id : stat_core_metric_print_order())
|
||||
{
|
||||
const auto it = b.metrics.find(metric_id);
|
||||
if (it != b.metrics.end())
|
||||
stat_print_aggregated_metric_line(metric_id, &it->second);
|
||||
}
|
||||
}
|
||||
std::cout << "=========================================================" << std::endl;
|
||||
}
|
||||
|
||||
bool pqdif_probe_text_looks_like_flicker(const std::string& text)
|
||||
{
|
||||
const std::string key = normalize_key(text);
|
||||
@@ -7496,9 +7745,41 @@ namespace {
|
||||
<< ", total_base64_chars=" << file_batch.total_base64_chars
|
||||
<< std::endl;
|
||||
|
||||
// 按你的要求,入队前完整打印保存对象内部结构:文件批次 -> 时间点 -> Max/Min/Avg/P95 子记录 -> Base64 内容。
|
||||
// 日志会很长;确认完成后可以临时注释掉这一行,或改成 if (pqdif_log_enabled(PqdifLogLevel::Info)) 包裹。
|
||||
//pqdif_dump_stat_base64_file_batch_full(file_batch);
|
||||
// 完整 Base64 内容日志非常长,尤其大文件会明显放大内存和 I/O 压力。
|
||||
// 现在仅在 Debug/Trace 级别打印完整对象;Core/Info 只打印摘要和前几条子记录。
|
||||
if (pqdif_log_enabled(PqdifLogLevel::Debug))
|
||||
{
|
||||
pqdif_dump_stat_base64_file_batch_full(file_batch);
|
||||
}
|
||||
else
|
||||
{
|
||||
std::cout << " [BASE64 FILE BATCH COMPACT] file=" << file_batch.pqdif_file_path
|
||||
<< ", time_points=" << file_batch.time_point_count
|
||||
<< ", records=" << file_batch.total_record_count
|
||||
<< ", total_float_count=" << file_batch.total_float_count
|
||||
<< ", total_placeholder_count=" << file_batch.total_placeholder_count
|
||||
<< ", total_base64_chars=" << file_batch.total_base64_chars
|
||||
<< std::endl;
|
||||
|
||||
size_t shown = 0;
|
||||
for (const auto& tp : file_batch.time_points)
|
||||
{
|
||||
for (const auto& rec : tp.records)
|
||||
{
|
||||
if (shown >= 4)
|
||||
break;
|
||||
std::cout << " [BASE64 SAVED SAMPLE] time=" << rec.timestamp_text
|
||||
<< ", kind=" << rec.value_kind_name
|
||||
<< ", floats=" << rec.float_count
|
||||
<< ", placeholders=" << rec.placeholder_count
|
||||
<< ", base64_len=" << rec.base64_payload.size()
|
||||
<< std::endl;
|
||||
++shown;
|
||||
}
|
||||
if (shown >= 4)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
push_pqdif_stat_base64_file_batch(std::move(file_batch));
|
||||
|
||||
@@ -7548,6 +7829,54 @@ namespace {
|
||||
// 后续指标判断需要先区分星型 / 角型两套规则。
|
||||
parsed_file.connection_kind = stat_classify_connection_kind(parsed_file.logical_file);
|
||||
|
||||
// 2) 根据估算展开点数决定是否启用“大文件流式模式”。
|
||||
// 普通文件仍走原 expanded_stat_points -> grouped buckets 流程;
|
||||
// 大文件则直接按通道聚合为 buckets,不再保存全量 expanded_stat_points。
|
||||
const size_t estimated_candidate_points =
|
||||
stat_estimate_candidate_point_count_for_streaming(parsed_file.logical_file);
|
||||
const bool use_large_file_streaming =
|
||||
estimated_candidate_points > kPqdifLargeFileStreamingPointThreshold;
|
||||
|
||||
if (use_large_file_streaming)
|
||||
{
|
||||
std::cout << "[PQDIF][MEMORY][STREAMING] large file detected, use streaming bucket/base64 build. "
|
||||
<< "estimated_candidate_points=" << estimated_candidate_points
|
||||
<< ", threshold=" << kPqdifLargeFileStreamingPointThreshold
|
||||
<< std::endl;
|
||||
|
||||
size_t raw_point_count = 0;
|
||||
size_t selected_metric_count = 0;
|
||||
size_t selected_dynamic_metric_count = 0;
|
||||
parsed_file.aggregated_stat_buckets = stat_build_aggregated_buckets_streaming(
|
||||
parsed_file.logical_file,
|
||||
parsed_file.connection_kind,
|
||||
parsed_file.selected_observation_index,
|
||||
parsed_file.selected_observation_name,
|
||||
raw_point_count,
|
||||
selected_metric_count,
|
||||
selected_dynamic_metric_count);
|
||||
|
||||
if (pqdif_is_trace_log_enabled())
|
||||
dump_semantic_probe(parsed_file);
|
||||
|
||||
dump_grouped_bucket_streaming_preview(parsed_file);
|
||||
|
||||
// 3) 大文件仍完整生成 Base64 文件批次,只是跳过 expanded_stat_points 中间缓存。
|
||||
pqdif_build_and_queue_base64_records(parsed_file);
|
||||
|
||||
// 大文件模式不再把完整 ParsedPqdifFile 放入解析缓存,避免 logical_file + buckets 在内存中长期驻留。
|
||||
// 后续业务请从 PqdifStatBase64FileBatch 队列读取最终 Base64 结果。
|
||||
std::cout << "[PQDIF][MEMORY][STREAMING] skip parsed_file cache for large file, "
|
||||
<< "base64 batch has been queued. file=" << file_path.string()
|
||||
<< std::endl;
|
||||
|
||||
std::cout << "[PQDIF] processed ok(streaming): " << file_path.string()
|
||||
<< ", cache_size=" << GetParsedPqdifCacheSize()
|
||||
<< std::endl;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
// 2) 只筛选“统计类 observation”,并对其展开目标统计指标。
|
||||
// 当前阶段不处理全部 observation,避免不同 observation 混入同一时间聚合结果。
|
||||
parsed_file.expanded_stat_points = stat_expand_selected_statistical_observation(
|
||||
@@ -7655,14 +7984,50 @@ namespace {
|
||||
<< ", max_per_scan=" << kMaxPqdifFilesPerScan
|
||||
<< std::endl;
|
||||
|
||||
const bool ok = process_single_pqdif_file(item.path, item.mac);
|
||||
bool ok = false;
|
||||
bool caught_exception = false;
|
||||
std::string exception_text;
|
||||
|
||||
try
|
||||
{
|
||||
ok = process_single_pqdif_file(item.path, item.mac);
|
||||
}
|
||||
catch (const std::bad_alloc& ex)
|
||||
{
|
||||
ok = false;
|
||||
caught_exception = true;
|
||||
exception_text = ex.what();
|
||||
std::cout << "[PQDIF][OOM] std::bad_alloc while processing file="
|
||||
<< item.path.string()
|
||||
<< ", what=" << exception_text
|
||||
<< std::endl;
|
||||
}
|
||||
catch (const std::exception& ex)
|
||||
{
|
||||
ok = false;
|
||||
caught_exception = true;
|
||||
exception_text = ex.what();
|
||||
std::cout << "[PQDIF][ERROR] exception while processing file="
|
||||
<< item.path.string()
|
||||
<< ", what=" << exception_text
|
||||
<< std::endl;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
ok = false;
|
||||
caught_exception = true;
|
||||
exception_text = "unknown";
|
||||
std::cout << "[PQDIF][ERROR] unknown exception while processing file="
|
||||
<< item.path.string()
|
||||
<< std::endl;
|
||||
}
|
||||
|
||||
const fs::path target_root = ok ? fs::path(kDoneRootDir) : fs::path(kFailRootDir);
|
||||
const fs::path dst = target_root / item.mac / item.path.filename();
|
||||
|
||||
// 处理完成后移动文件,避免下一轮 scan_once() 重复解析同一个 PQDIF。
|
||||
// 解析成功:download/<mac>/<file>.pqd -> download_done/<mac>/<file>.pqd
|
||||
// 解析失败:download/<mac>/<file>.pqd -> download_fail/<mac>/<file>.pqd
|
||||
// 解析失败或解析过程中出现异常:download/<mac>/<file>.pqd -> download_fail/<mac>/<file>.pqd
|
||||
//
|
||||
// 调试时如果想让文件保留在 download 目录中、方便反复解析同一个文件,
|
||||
// 可以临时注释掉下面这个 if 块;调试结束后建议恢复,否则每一轮都会重复解析。
|
||||
@@ -7672,6 +8037,13 @@ namespace {
|
||||
<< item.path.string() << " -> " << dst.string()
|
||||
<< std::endl;
|
||||
}
|
||||
else if (caught_exception)
|
||||
{
|
||||
std::cout << "[PQDIF] exception file moved to fail dir: "
|
||||
<< dst.string()
|
||||
<< ", reason=" << exception_text
|
||||
<< std::endl;
|
||||
}
|
||||
|
||||
cleanup_backup_dir(fs::path(kDoneRootDir) / item.mac, kBackupLimit);
|
||||
cleanup_backup_dir(fs::path(kFailRootDir) / item.mac, kBackupLimit);
|
||||
|
||||
Reference in New Issue
Block a user