完成补招功能

This commit is contained in:
lnk
2026-06-05 16:05:44 +08:00
parent e64d2e2318
commit a91672a994
4 changed files with 166 additions and 21 deletions

Binary file not shown.

View File

@@ -4711,10 +4711,14 @@ static std::string resolve_recall_dir(const std::string& raw_dir,
// ====== 从文件名中提取“第二段下划线分隔字段”并转换为 epoch 秒 ======
static bool extract_pqdif_range_from_filename(const std::string& fname,
long long& fs,
long long& fe)
long long& fe,
int& file_seq,
std::string& file_monitor_id)
{
fs = -1;
fe = -1;
file_seq = -1;
file_monitor_id.clear();
auto to_epoch = [](int y, int mon, int d, int h, int m, int s)->long long {
struct tm t {};
@@ -4755,30 +4759,46 @@ static bool extract_pqdif_range_from_filename(const std::string& fname,
}
}
// 新疆B8B7FEE6D4792D30-20230915-003000-p.pqd,按 1 小时窗口估算 设备ID-时间点-p.pqd
// 新疆B8B7FEE6D4792D30-20230915-003000-p.pqd
{
std::regex re(R"([^-]+-(\d{4})(\d{2})(\d{2})-(\d{2})(\d{2})(\d{2})-p\.pqd$)",
std::regex::icase);
std::regex re(
R"(([^-]+)-(\d{4})(\d{2})(\d{2})-(\d{2})(\d{2})(\d{2})-p\.pqd$)",
std::regex::icase);
std::smatch m;
if (std::regex_search(fname, m, re)) {
fs = to_epoch(std::stoi(m[1]), std::stoi(m[2]), std::stoi(m[3]),
std::stoi(m[4]), std::stoi(m[5]), std::stoi(m[6]));
file_monitor_id = m[1]; // B8B7FEE6D4792D30
fs = to_epoch(std::stoi(m[2]), std::stoi(m[3]), std::stoi(m[4]),
std::stoi(m[5]), std::stoi(m[6]), std::stoi(m[7]));
fe = fs + 3600;
return fs >= 0;
}
}
// 默认ied_ld_20230915_0030_60.pqd,最后的 60 是间隔分钟 设备名_逻辑设备_开始日期_开始时间_间隔分钟
// 默认ied_ld_20230915_0030_60.pqd
// 例如PQMonitor_PQM1_20260601_0000_24.pqd
// 第二段 PQM1 最后的数字作为 seq
{
std::regex re(R"(.*_(\d{4})(\d{2})(\d{2})_(\d{2})(\d{2})_(\d+)\.pqd$)",
std::regex re(R"([^_]+_([^_]+)_(\d{4})(\d{2})(\d{2})_(\d{2})(\d{2})_(\d+)\.pqd$)",
std::regex::icase);
std::smatch m;
if (std::regex_search(fname, m, re)) {
fs = to_epoch(std::stoi(m[1]), std::stoi(m[2]), std::stoi(m[3]),
std::stoi(m[4]), std::stoi(m[5]), 0);
int intv_min = std::stoi(m[6]);
std::string ld = m[1]; // 例如 PQM1、PQM2
std::regex seq_re(R"((\d+)$)");
std::smatch seq_m;
if (std::regex_search(ld, seq_m, seq_re)) {
file_seq = std::stoi(seq_m[1]);
}
fs = to_epoch(std::stoi(m[2]), std::stoi(m[3]), std::stoi(m[4]),
std::stoi(m[5]), std::stoi(m[6]), 0);
int intv_min = std::stoi(m[7]);
fe = fs + intv_min * 60;
return fs >= 0 && intv_min > 0;
}
}
@@ -4871,6 +4891,34 @@ static void dircache_clear_device(const std::string& dev_id)
g_device_dir_cache.erase(dev_id);
}
//复制文件
bool move_file_crossfs(const std::string& src, const std::string& dst)
{
if (std::rename(src.c_str(), dst.c_str()) == 0) {
return true;
}
if (errno != EXDEV) {
return false;
}
std::ifstream in(src.c_str(), std::ios::binary);
std::ofstream out(dst.c_str(), std::ios::binary | std::ios::trunc);
if (!in || !out) {
return false;
}
out << in.rdbuf();
out.close();
in.close();
if (std::remove(src.c_str()) != 0) {
return false;
}
return true;
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////补招文件逻辑
// ====== ★修改check_recall_stat —— 加入“两步法”状态机 ======
void check_recall_file() {
@@ -5338,8 +5386,10 @@ void check_recall_file() {
long long fs = -1;
long long fe = -1;
if (!extract_pqdif_range_from_filename(fname, fs, fe)) {
int file_seq = -1;
std::string file_monitor_id;
if (!extract_pqdif_range_from_filename(fname, fs, fe, file_seq, file_monitor_id)) {
std::cout << "[check_recall_file] steady skip invalid pqdif filename dev="
<< dev.terminal_id
<< " monitor=" << lm.monitor_id
@@ -5347,6 +5397,43 @@ void check_recall_file() {
<< " file=" << fname << std::endl;
continue;
}
int cur_seq = -1;
try {
cur_seq = std::stoi(lm.logical_device_seq);
} catch (...) {
cur_seq = -1;
}
// 新疆:用文件名里的 monitor_id 找 seq
if (!file_monitor_id.empty()) {
if (file_monitor_id != lm.monitor_id) {//跳过id不匹配的新疆文件
std::cout << "[check_recall_file] steady skip other xj monitor file dev="
<< dev.terminal_id
<< " cur_monitor=" << lm.monitor_id
<< " file_monitor_id=" << file_monitor_id
<< " file=" << fname
<< std::endl;
continue;
}
// 到这里说明新疆文件的测点ID就是当前测点
file_seq = cur_seq;
}
// 默认:文件名能解析出 PQM 序号
else if (file_seq > 0) {
if (cur_seq > 0 && file_seq != cur_seq) {//跳过不同测点的文件
std::cout << "[check_recall_file] steady skip other seq file dev="
<< dev.terminal_id
<< " cur_monitor=" << lm.monitor_id
<< " cur_seq=" << cur_seq
<< " file_seq=" << file_seq
<< " file=" << fname
<< std::endl;
continue;
}
}
// 广东/上海/云南file_monitor_id 为空file_seq 也为 -1不过滤
// 文件时间段和补招时间段有交集就下载
bool hit = false;
@@ -5653,16 +5740,20 @@ void check_recall_file() {
std::string fname = sanitize(extract_filename1(front.downloading_file));
if (!fname.empty()) {
std::string old_local = base_dir + "/" + fname;
std::string new_local = base_dir + "/M" + seq + "_" + fname;
std::string new_fname = "M" + seq + "_" + fname;
std::string new_local = base_dir + "/" + new_fname;
// 先把下载目录下文件名改成 M1_xxx.pqd
if (old_local != new_local) {
if (std::rename(old_local.c_str(), new_local.c_str()) != 0) { //修改下载好的文件名为 M1_开头的格式方便后续处理如果失败不影响结果只是文件名不规范
if (std::rename(old_local.c_str(), new_local.c_str()) != 0) {
std::cout << "[check_recall_file][WARN] rename steady local file failed"
<< " old=" << old_local
<< " new=" << new_local
<< " errno=" << errno
<< std::endl;
DIY_ERRORLOG_CODE(lm.monitor_id, 2, static_cast<int>(LogCode::LOG_CODE_RECALL), "重命名本地文件失败");
new_local = old_local; // 重命名失败,后续仍尝试用原文件
new_fname = fname;
} else {
std::cout << "[check_recall_file] rename steady local file ok"
<< " old=" << old_local
@@ -5671,6 +5762,56 @@ void check_recall_file() {
DIY_DEBUGLOG_CODE(lm.monitor_id, 2, static_cast<int>(LogCode::LOG_CODE_RECALL), "重命名本地文件成功");
}
}
// 只处理 .pqd 文件
if (new_fname.size() >= 4 &&
new_fname.substr(new_fname.size() - 4) == ".pqd") {
std::string pqdif_dir = std::string("download_pqdif/") + sanitize(dev.addr_str);
// 如果你已有递归建目录函数,用你的函数即可
create_directory_recursive(pqdif_dir.c_str());
std::string tmp_fname = new_fname.substr(0, new_fname.size() - 4) + ".tmp";
std::string local_tmp = base_dir + "/" + tmp_fname;
std::string dst_tmp = pqdif_dir + "/" + tmp_fname;
std::string dst_pqd = pqdif_dir + "/" + new_fname;
// 1. .pqd -> .tmp
if (std::rename(new_local.c_str(), local_tmp.c_str()) != 0) {
std::cout << "[check_recall_file][WARN] rename .pqd to .tmp failed"
<< " old=" << new_local
<< " new=" << local_tmp
<< " errno=" << errno
<< std::endl;
DIY_ERRORLOG_CODE(lm.monitor_id, 2, static_cast<int>(LogCode::LOG_CODE_RECALL), ".pqd改.tmp失败");
}
// 2. 移动 .tmp 到 download_pqdif
else if (!move_file_crossfs(local_tmp, dst_tmp)) {
std::cout << "[check_recall_file][WARN] move tmp file failed"
<< " old=" << local_tmp
<< " new=" << dst_tmp
<< " errno=" << errno
<< std::endl;
DIY_ERRORLOG_CODE(lm.monitor_id, 2, static_cast<int>(LogCode::LOG_CODE_RECALL), "移动tmp文件失败");
}
// 3. download_pqdif 下 .tmp -> .pqd
else if (std::rename(dst_tmp.c_str(), dst_pqd.c_str()) != 0) {//原子操作,不会影响解析线程扫描到不完整文件
std::cout << "[check_recall_file][WARN] rename .tmp to .pqd failed"
<< " old=" << dst_tmp
<< " new=" << dst_pqd
<< " errno=" << errno
<< std::endl;
DIY_ERRORLOG_CODE(lm.monitor_id, 2, static_cast<int>(LogCode::LOG_CODE_RECALL), ".tmp改.pqd失败");
} else {
std::cout << "[check_recall_file] move pqd file to pqdif dir ok"
<< " src=" << new_local
<< " dst=" << dst_pqd
<< std::endl;
DIY_DEBUGLOG_CODE(lm.monitor_id, 2, static_cast<int>(LogCode::LOG_CODE_RECALL), "pqd文件移动到解析目录成功");
}
}
}
}

View File

@@ -98,11 +98,15 @@ public:
std::map<std::string, std::vector<tag_dir_info>> dir_files;
std::vector<std::string> steady_dir_candidates{
"/pqdif", // 默认版本 / 新疆
"/pqdif/%DAY%", // 上海:日期子目录
"/pqdif/%DESC%/%DAY%", // 上海pqdif_dir_cfg 或描述目录 + 日期
"/pqdif/Line%SEQ%", // 云南
"/historyFile/%DESC%" // 广东
"/cf/pqdif", //580绝对真实路径
"/bd0/pqdif", //chemengyu提供包含bd0
"/bd0/pqdif/%DESC%/%DAY%",
"/bd0/pqdif/Line%SEQ%",
"/bd0/historyFile/%DESC%",
"/pqdif", // 默认版本 / 新疆 可能取到其他测点文件
"/pqdif/%DESC%/%DAY%", // 上海pqdif_dir_cfg 或描述目录 + 日期 不会取到其他测点文件
"/pqdif/Line%SEQ%", // 云南 不会取到其他测点文件
"/historyFile/%DESC%" // 广东 不会取到其他测点文件
};
std::vector<std::string> voltage_dir_candidates{

View File

@@ -53,7 +53,7 @@ namespace {
// 而是按通道逐步聚合成时间桶并直接组装 Base64避免单文件中间对象占用过大内存。
constexpr size_t kPqdifLargeFileStreamingPointThreshold = 800000;
const char* kPqdRootDir = "download";
const char* kPqdRootDir = "download_pqdif";
const char* kDoneRootDir = "download_done";
const char* kFailRootDir = "download_fail";