Compare commits
4 Commits
626aac1fce
...
测试2
| Author | SHA1 | Date | |
|---|---|---|---|
| 3191422869 | |||
| a91672a994 | |||
| e64d2e2318 | |||
| 671fc6702e |
BIN
LFtid1056.rar
BIN
LFtid1056.rar
Binary file not shown.
File diff suppressed because it is too large
Load Diff
@@ -68,6 +68,12 @@ enum class ActionResult {
|
||||
};
|
||||
|
||||
// ====== ★修改:扩展 RecallFile,支持“多目录 + 文件筛选 + 串行下载”的状态机 ======
|
||||
enum class RecallFileType {
|
||||
NONE = 0,
|
||||
STEADY_FILE, // 稳态文件
|
||||
VOLTAGE_FILE // 暂态直下文件
|
||||
};
|
||||
|
||||
class RecallFile
|
||||
{
|
||||
public:
|
||||
@@ -75,18 +81,35 @@ public:
|
||||
int recall_status; // 补招状态 0-未补招 1-补招中 2-补招完成 3-补招失败
|
||||
std::string StartTime; // 数据补招起始时间(yyyy-MM-dd HH:mm:ss)
|
||||
std::string EndTime; // 数据补招结束时间(yyyy-MM-dd HH:mm:ss)
|
||||
|
||||
// ===== 业务类型 =====
|
||||
// STEADY:稳态文件补招
|
||||
// VOLTAGE:暂态事件补招
|
||||
std::string STEADY; // 补招历史统计数据标识 0-不补招;1-补招
|
||||
std::string VOLTAGE; // 补招暂态事件标识 0-不补招;1-补招
|
||||
|
||||
// ===== 文件下载类型 =====
|
||||
RecallFileType file_type = RecallFileType::NONE;
|
||||
|
||||
//暂态文件用
|
||||
bool direct_mode = false; // 直下文件开关:true 表示不按时间窗,仅按目标文件名
|
||||
std::string target_filetimes; // 直下文件匹配时间点(yyyyMMdd_HHmmss),仅 direct_mode=true 时有效
|
||||
std::string target_filetimes; // 直下文件匹配时间点(yyyyMMdd_HHmmss)
|
||||
|
||||
// ★新增:按“目录名 -> 文件名列表”的映射;由“其他线程”在目录请求成功后回填
|
||||
std::map<std::string, std::vector<tag_dir_info>> dir_files;
|
||||
|
||||
// ★新增:候选目录(可扩展)
|
||||
std::vector<std::string> dir_candidates{
|
||||
std::vector<std::string> steady_dir_candidates{
|
||||
"/cf/pqdif", //580绝对真实路径
|
||||
"/bd0/pqdif", //chemengyu提供包含bd0
|
||||
"/bd0/pqdif/%DESC%/%DAY%",
|
||||
"/bd0/pqdif/Line%SEQ%",
|
||||
"/bd0/historyFile/%DESC%",
|
||||
"/pqdif", // 默认版本 / 新疆 可能取到其他测点文件
|
||||
"/pqdif/%DESC%/%DAY%", // 上海:pqdif_dir_cfg 或描述目录 + 日期 不会取到其他测点文件
|
||||
"/pqdif/Line%SEQ%", // 云南 不会取到其他测点文件
|
||||
"/historyFile/%DESC%" // 广东 不会取到其他测点文件
|
||||
};
|
||||
|
||||
std::vector<std::string> voltage_dir_candidates{
|
||||
"/cf/COMTRADE",
|
||||
"/bd0/COMTRADE",
|
||||
"/sd0/COMTRADE",
|
||||
@@ -102,6 +125,9 @@ public:
|
||||
ActionResult list_result = ActionResult::PENDING; // 当前目录的列举结果
|
||||
ActionResult download_result = ActionResult::PENDING; // 当前文件的下载结果
|
||||
|
||||
// 稳态文件用:一个 guid 下的多个补招时间段
|
||||
std::vector<std::pair<long long, long long>> recall_ranges;
|
||||
|
||||
// ★新增:下载队列(已筛选出在时间窗内的文件,含完整路径)
|
||||
std::list<std::string> download_queue; //一个时间可能对应多个文件
|
||||
std::string downloading_file; // 当前正在下载的文件(完整路径)
|
||||
@@ -109,8 +135,22 @@ public:
|
||||
std::unordered_set<std::string> required_files; // 本次应当下载成功的文件全集
|
||||
std::unordered_set<std::string> file_success; // 已下载成功的文件集合
|
||||
|
||||
// 是否稳态文件任务
|
||||
bool is_steady_file() const {
|
||||
return file_type == RecallFileType::STEADY_FILE;
|
||||
}
|
||||
|
||||
// 是否暂态直下文件任务
|
||||
bool is_voltage_file() const {
|
||||
return file_type == RecallFileType::VOLTAGE_FILE;
|
||||
}
|
||||
|
||||
const std::vector<std::string>& active_dirs() const {
|
||||
return is_steady_file() ? steady_dir_candidates : voltage_dir_candidates;
|
||||
}
|
||||
|
||||
// ★新增:一个便捷复位
|
||||
void reset_runtime(bool keep_direct = false)
|
||||
void reset_runtime(bool keep_target_filetimes = false)
|
||||
{
|
||||
phase = RecallPhase::IDLE;
|
||||
cur_dir_index = 0;
|
||||
@@ -124,9 +164,10 @@ public:
|
||||
required_files.clear();
|
||||
file_success.clear();
|
||||
|
||||
// 注意:file_type 不属于运行态,不能清除,因为它决定了本次补招的业务类型(稳态/暂态),而这个业务类型在整个补招过程中是固定的,不应当被运行态重置影响
|
||||
|
||||
// ★新增:按需保留直下文件开关和目标名
|
||||
if (!keep_direct) {
|
||||
direct_mode = false;
|
||||
if (!keep_target_filetimes) {
|
||||
target_filetimes.clear(); // ▲列表清空
|
||||
}
|
||||
}
|
||||
|
||||
@@ -98,6 +98,8 @@ extern int TEST_PORT; //测试端口号
|
||||
|
||||
extern std::string FRONT_INST;
|
||||
|
||||
extern bool PQD_FLAG;
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////// 功能函数
|
||||
|
||||
template<typename T, typename... Args>
|
||||
@@ -130,6 +132,11 @@ bool parse_param(int argc, char* argv[]) {
|
||||
try {
|
||||
g_front_seg_index = std::stoi(val.substr(0, pos));
|
||||
g_front_seg_num = std::stoi(val.substr(pos + 1));
|
||||
|
||||
if (g_front_seg_index == 0) {
|
||||
PQD_FLAG = true;
|
||||
}
|
||||
|
||||
} catch (...) {
|
||||
std::cerr << "Invalid -s format." << std::endl;
|
||||
}
|
||||
@@ -144,6 +151,11 @@ bool parse_param(int argc, char* argv[]) {
|
||||
try {
|
||||
g_front_seg_index = std::stoi(val.substr(0, pos));
|
||||
g_front_seg_num = std::stoi(val.substr(pos + 1));
|
||||
|
||||
if (g_front_seg_index == 0) {
|
||||
PQD_FLAG = true;
|
||||
}
|
||||
|
||||
} catch (...) {
|
||||
std::cerr << "Invalid -s format." << std::endl;
|
||||
}
|
||||
@@ -223,13 +235,13 @@ std::string get_parent_directory() {
|
||||
//解析模板文件
|
||||
//Set_xml_nodeinfo();
|
||||
|
||||
StartFrontThread(); //开启主线程
|
||||
|
||||
StartMQConsumerThread(); //开启消费者线程
|
||||
|
||||
StartMQProducerThread(); //开启生产者线程
|
||||
|
||||
StartTimerThread(); //开启定时线程
|
||||
if(!PQD_FLAG){
|
||||
StartFrontThread(); //开启主线程
|
||||
StartMQConsumerThread(); //开启消费者线程
|
||||
StartTimerThread(); //开启定时线程
|
||||
}
|
||||
|
||||
//启动worker 根据启动标志启动
|
||||
if(G_TEST_FLAG){
|
||||
|
||||
@@ -758,12 +758,20 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) {
|
||||
<< ", VOLTAGE=" << rf.VOLTAGE
|
||||
<< "\n";
|
||||
|
||||
// ★新增:直下模式与目标时间列表
|
||||
os << "\r\x1B[K |-- direct_mode=" << (rf.direct_mode ? "true" : "false")
|
||||
<< ", target_filetimes(" << rf.target_filetimes << ")\n";
|
||||
{
|
||||
os << "\r\x1B[K |.. " << rf.target_filetimes << "\n";
|
||||
// ★新增:文件补招类型与目标信息
|
||||
os << "\r\x1B[K |-- file_type="
|
||||
<< (rf.is_steady_file() ? "STEADY_FILE" :
|
||||
rf.is_voltage_file() ? "VOLTAGE_FILE" : "NONE");
|
||||
|
||||
if (rf.is_voltage_file()) {
|
||||
os << ", target_filetimes=" << rf.target_filetimes;
|
||||
}
|
||||
else if (rf.is_steady_file()) {
|
||||
os << ", time_range=" << rf.StartTime
|
||||
<< " ~ " << rf.EndTime;
|
||||
}
|
||||
|
||||
os << "\n";
|
||||
|
||||
// ★新增:状态机运行态
|
||||
os << "\r\x1B[K |-- phase=" << phaseStr(rf.phase)
|
||||
@@ -773,15 +781,15 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) {
|
||||
<< ", download_result=" << resultStr(rf.download_result) << "\n";
|
||||
|
||||
// ★新增:候选目录
|
||||
os << "\r\x1B[K |-- dir_candidates(" << rf.dir_candidates.size() << ")\n";
|
||||
os << "\r\x1B[K |-- active_dirs(" << rf.active_dirs().size() << ")\n";
|
||||
{
|
||||
size_t c = 0;
|
||||
for (const auto& d : rf.dir_candidates) {
|
||||
for (const auto& d : rf.active_dirs()) {
|
||||
if (c++ >= MAX_ITEMS) break;
|
||||
os << "\r\x1B[K |-- " << d << "\n";
|
||||
}
|
||||
if (rf.dir_candidates.size() > MAX_ITEMS) {
|
||||
os << "\r\x1B[K |.. (+" << (rf.dir_candidates.size() - MAX_ITEMS) << " more)\n";
|
||||
if (rf.active_dirs().size() > MAX_ITEMS) {
|
||||
os << "\r\x1B[K |.. (+" << (rf.active_dirs().size() - MAX_ITEMS) << " more)\n";
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -350,11 +350,13 @@ void process_received_message(string mac, string id,const char* data, size_t len
|
||||
<< ", 时间戳: " << record.triggerTimeMs << "ms" << std::endl;
|
||||
|
||||
//lnk20250805 事件上送先记录,录波文件上传结束后再更新文件
|
||||
append_qvvr_event(id,event.head.name,
|
||||
if(record.nType != 0){
|
||||
append_qvvr_event(id,event.head.name,
|
||||
record.nType,record.fPersisstime,record.fMagntitude,record.triggerTimeMs,record.phase);
|
||||
transfer_json_qvvr_data(id,event.head.name,
|
||||
transfer_json_qvvr_data(id,event.head.name,
|
||||
record.fMagntitude,record.fPersisstime,record.triggerTimeMs,record.nType,record.phase,
|
||||
"");
|
||||
}
|
||||
|
||||
//事件主动上送处理完成,不需要通知状态机
|
||||
}
|
||||
@@ -2449,12 +2451,14 @@ void process_received_message(string mac, string id,const char* data, size_t len
|
||||
<< ", 时间戳: " << record.triggerTimeMs << "ms" << std::endl;
|
||||
|
||||
//记录补招上来的暂态事件
|
||||
append_qvvr_event(id,event.head.name,
|
||||
record.nType,record.fPersisstime,record.fMagntitude,record.triggerTimeMs,record.phase);
|
||||
if(record.nType != 0){
|
||||
append_qvvr_event(id,event.head.name,
|
||||
record.nType,record.fPersisstime,record.fMagntitude,record.triggerTimeMs,record.phase);
|
||||
|
||||
//直接发走暂态事件
|
||||
transfer_json_qvvr_data(id,event.head.name,
|
||||
record.fMagntitude,record.fPersisstime,record.triggerTimeMs,record.nType,record.phase,"");
|
||||
|
||||
}
|
||||
//通知状态机补招暂态事件成功
|
||||
on_device_response_minimal(static_cast<int>(ResponseCode::OK), id, 0, static_cast<int>(DeviceState::READING_EVENTLOG));
|
||||
|
||||
|
||||
@@ -29,6 +29,17 @@
|
||||
#include "pqdif/include/pqdif_lg.h"
|
||||
#include "pqdif_semantic_ids.h"
|
||||
|
||||
#include "cloudfront/code/log4.h" //lnk20260526
|
||||
|
||||
extern void enqueue_stat_pq(const std::string& max_base64Str,
|
||||
const std::string& min_base64Str,
|
||||
const std::string& avg_base64Str,
|
||||
const std::string& cp95_base64Str,
|
||||
time_t data_time,
|
||||
const std::string& mac,
|
||||
short cid);
|
||||
extern std::string extract_filename1(const std::string& path);
|
||||
|
||||
namespace fs = std::experimental::filesystem;
|
||||
|
||||
namespace {
|
||||
@@ -42,7 +53,7 @@ namespace {
|
||||
// 而是按通道逐步聚合成时间桶并直接组装 Base64,避免单文件中间对象占用过大内存。
|
||||
constexpr size_t kPqdifLargeFileStreamingPointThreshold = 800000;
|
||||
|
||||
const char* kPqdRootDir = "download";
|
||||
const char* kPqdRootDir = "download_pqdif";
|
||||
const char* kDoneRootDir = "download_done";
|
||||
const char* kFailRootDir = "download_fail";
|
||||
|
||||
@@ -8249,6 +8260,96 @@ void ClearReadyPqdifStatBase64Queue()
|
||||
g_pqdif_stat_base64_ready_queue.clear();
|
||||
}
|
||||
|
||||
static bool GetBase64ByKind(const PqdifStatBase64TimePointPacket& tp, //从序列中获取指定 kind 的 Base64 内容
|
||||
StatValueKind kind,
|
||||
std::string& out)
|
||||
{
|
||||
for (const auto& r : tp.records) {
|
||||
if (r.value_kind == kind) {
|
||||
out = r.base64_payload;
|
||||
return !out.empty();
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static bool extract_monitor_seq_from_local_pqdif_path(const std::string& path,
|
||||
short& point_name)
|
||||
{
|
||||
point_name = 0;
|
||||
|
||||
std::cout << "[extract_monitor_seq] begin path="
|
||||
<< path << std::endl;
|
||||
|
||||
// 取纯文件名,例如:
|
||||
// download/192.168.1.10/M1_xxx.pqd
|
||||
// -> M1_xxx.pqd
|
||||
std::string fname = extract_filename1(path);
|
||||
|
||||
std::cout << "[extract_monitor_seq] filename="
|
||||
<< fname << std::endl;
|
||||
|
||||
if (fname.size() < 3) {
|
||||
std::cout << "[extract_monitor_seq] filename too short"
|
||||
<< std::endl;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (fname[0] != 'M' && fname[0] != 'm') {
|
||||
std::cout << "[extract_monitor_seq] filename not start with M/m"
|
||||
<< std::endl;
|
||||
return false;
|
||||
}
|
||||
|
||||
size_t pos = fname.find('_');
|
||||
|
||||
std::cout << "[extract_monitor_seq] underscore pos="
|
||||
<< pos << std::endl;
|
||||
|
||||
if (pos == std::string::npos || pos <= 1) {
|
||||
std::cout << "[extract_monitor_seq] invalid underscore position"
|
||||
<< std::endl;
|
||||
return false;
|
||||
}
|
||||
|
||||
// M1_xxx -> 1
|
||||
std::string seq_str = fname.substr(1, pos - 1);
|
||||
|
||||
std::cout << "[extract_monitor_seq] seq_str="
|
||||
<< seq_str << std::endl;
|
||||
|
||||
for (char c : seq_str) {
|
||||
if (!std::isdigit(static_cast<unsigned char>(c))) {
|
||||
std::cout << "[extract_monitor_seq] non-digit char="
|
||||
<< c << std::endl;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
point_name = static_cast<short>(std::stoi(seq_str));
|
||||
|
||||
std::cout << "[extract_monitor_seq] success point_name="
|
||||
<< point_name << std::endl;
|
||||
|
||||
return point_name > 0;
|
||||
}
|
||||
catch (const std::exception& e) {
|
||||
std::cout << "[extract_monitor_seq] exception="
|
||||
<< e.what() << std::endl;
|
||||
|
||||
point_name = 0;
|
||||
return false;
|
||||
}
|
||||
catch (...) {
|
||||
std::cout << "[extract_monitor_seq] unknown exception"
|
||||
<< std::endl;
|
||||
|
||||
point_name = 0;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
void RunPqdifScanLoop()
|
||||
{
|
||||
std::cout << "[PQDIF] scan loop started, root=" << kPqdRootDir
|
||||
@@ -8282,6 +8383,54 @@ void RunPqdifScanLoop()
|
||||
if (PopReadyPqdifStatBase64FileBatch(batch)) {
|
||||
// batch 就是一个 PQDIF 文件完整的 Base64 组装结果
|
||||
// 在此处处理上送逻辑
|
||||
const std::string& mac = batch.mac;
|
||||
|
||||
short point_name = 0;
|
||||
|
||||
if (!extract_monitor_seq_from_local_pqdif_path(batch.pqdif_file_path, point_name)) {
|
||||
std::cout << "[PQDIF_UPLOAD] failed to extract monitor seq from file="
|
||||
<< batch.pqdif_file_path << std::endl;
|
||||
continue;
|
||||
}
|
||||
|
||||
for (const auto& tp : batch.time_points) {
|
||||
std::string max_base64;
|
||||
std::string min_base64;
|
||||
std::string avg_base64;
|
||||
std::string p95_base64;
|
||||
|
||||
bool has_max = GetBase64ByKind(tp, StatValueKind::Max, max_base64);
|
||||
bool has_min = GetBase64ByKind(tp, StatValueKind::Min, min_base64);
|
||||
bool has_avg = GetBase64ByKind(tp, StatValueKind::Avg, avg_base64);
|
||||
bool has_p95 = GetBase64ByKind(tp, StatValueKind::P95, p95_base64);
|
||||
|
||||
if (!has_max || !has_min || !has_avg || !has_p95) {
|
||||
std::cout << "[PQDIF_UPLOAD] skip incomplete time point, file="
|
||||
<< batch.pqdif_file_path
|
||||
<< " time=" << tp.timestamp_text
|
||||
<< " has_max=" << has_max
|
||||
<< " has_min=" << has_min
|
||||
<< " has_avg=" << has_avg
|
||||
<< " has_p95=" << has_p95
|
||||
<< std::endl;
|
||||
continue;
|
||||
}
|
||||
|
||||
enqueue_stat_pq(max_base64,
|
||||
min_base64,
|
||||
avg_base64,
|
||||
p95_base64,
|
||||
tp.timestamp,
|
||||
mac,
|
||||
point_name);
|
||||
|
||||
std::cout << "[PQDIF_UPLOAD] enqueue_stat_pq ok, file="
|
||||
<< batch.pqdif_file_path
|
||||
<< " time=" << tp.timestamp_text
|
||||
<< " mac=" << mac
|
||||
<< " point=" << point_name
|
||||
<< std::endl;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (const std::exception& ex)
|
||||
|
||||
Reference in New Issue
Block a user