feat(algorithm): 添加监测点接线方式缓存优化数据处理逻辑
- 在ExecutionCenter中引入CsLineDTO和RedisUtil依赖 - 实现监测点、接线方式数据缓存到Redis功能,设置3小时过期时间 - 修改wlMeasurementPointExecutor方法使用缓存的线路ID列表 - 在数据处理服务中添加Gson和TypeToken用于JSON序列化 - 实现根据接线方式过滤电压类型数据的功能 - 添加线电压和相电压类型常量定义 - 优化数据查询逻辑,按监测点分组后根据接线方式过滤多余数据 - 实现线电压数据向相电压数据的特殊处理逻辑 - 统一返回经过接线方式过滤后的数据结果
This commit is contained in:
@@ -16,6 +16,7 @@ import com.njcn.common.pojo.annotation.OperateInfo;
|
||||
import com.njcn.common.pojo.enums.common.LogEnum;
|
||||
import com.njcn.common.pojo.exception.BusinessException;
|
||||
import com.njcn.csdevice.api.CsLineFeignClient;
|
||||
import com.njcn.csdevice.pojo.dto.CsLineDTO;
|
||||
import com.njcn.device.biz.commApi.CommTerminalGeneralClient;
|
||||
import com.njcn.device.biz.pojo.dto.DeptGetChildrenMoreDTO;
|
||||
import com.njcn.device.biz.pojo.dto.DeptGetDeviceDTO;
|
||||
@@ -23,6 +24,7 @@ import com.njcn.device.biz.pojo.dto.DeptGetSubStationDTO;
|
||||
import com.njcn.device.biz.pojo.dto.LineDevGetDTO;
|
||||
import com.njcn.device.biz.pojo.param.DeptGetLineParam;
|
||||
import com.njcn.device.pq.api.DeptLineFeignClient;
|
||||
import com.njcn.redis.utils.RedisUtil;
|
||||
import com.njcn.user.api.DeptFeignClient;
|
||||
import com.njcn.user.pojo.po.Dept;
|
||||
import com.njcn.web.controller.BaseController;
|
||||
@@ -73,6 +75,8 @@ public class ExecutionCenter extends BaseController {
|
||||
private CsLineFeignClient csLineFeignClient;
|
||||
@Resource
|
||||
private FlowAsyncService flowService;
|
||||
@Resource
|
||||
private RedisUtil redisUtil;
|
||||
|
||||
/***
|
||||
* 1、校验非全链执行时,tagNames节点标签集合必须为非空,否则提示---无可执行节点
|
||||
@@ -143,9 +147,27 @@ public class ExecutionCenter extends BaseController {
|
||||
String methodDescribe = getMethodDescribe("wlMeasurementPointExecutor");
|
||||
//手动判断参数是否合法,
|
||||
CalculatedParam calculatedParam = judgeExecuteParam(baseParam);
|
||||
//缓存监测点、接线方式数据
|
||||
List<CsLineDTO> csLineDTOList = csLineFeignClient.getAllLineDetail().getData();
|
||||
List<String> lineIdList = new ArrayList<>();
|
||||
Map<String, Integer> lineConTypeMap = new HashMap<>();
|
||||
if (CollectionUtils.isNotEmpty(csLineDTOList)) {
|
||||
lineIdList = csLineDTOList.stream().map(CsLineDTO::getLineId).collect(Collectors.toList());
|
||||
//接线方式(0-星型 1-角型 2-V型)
|
||||
lineConTypeMap = csLineDTOList.stream()
|
||||
.filter(dto -> dto.getLineId() != null)
|
||||
.collect(Collectors.toMap(
|
||||
CsLineDTO::getLineId,
|
||||
dto -> dto.getConType() != null ? dto.getConType() : 0,
|
||||
(v1, v2) -> v1
|
||||
));
|
||||
redisUtil.saveByKeyWithExpire("wlLineDetail", lineConTypeMap, 3600L*3);
|
||||
}
|
||||
// 测点索引
|
||||
if (CollectionUtils.isEmpty(calculatedParam.getIdList())) {
|
||||
calculatedParam.setIdList(csLineFeignClient.getAllLine().getData());
|
||||
calculatedParam.setIdList(lineIdList);
|
||||
} else {
|
||||
calculatedParam.setIdList(calculatedParam.getIdList());
|
||||
}
|
||||
LiteflowResponse liteflowResponse;
|
||||
if (baseParam.isRepair()) {
|
||||
|
||||
@@ -3,6 +3,8 @@ package com.njcn.dataProcess.service.impl.influxdb;
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.collection.CollectionUtil;
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import com.alibaba.nacos.shaded.com.google.common.reflect.TypeToken;
|
||||
import com.alibaba.nacos.shaded.com.google.gson.Gson;
|
||||
import com.github.jeffreyning.mybatisplus.service.MppServiceImpl;
|
||||
import com.njcn.dataProcess.dao.imapper.DataFlickerMapper;
|
||||
import com.njcn.dataProcess.dao.relation.mapper.RStatDataFlickerRelationMapper;
|
||||
@@ -15,11 +17,14 @@ import com.njcn.dataProcess.pojo.po.RStatDataFlickerD;
|
||||
import com.njcn.dataProcess.service.IDataFlicker;
|
||||
import com.njcn.dataProcess.util.TimeUtils;
|
||||
import com.njcn.influx.query.InfluxQueryWrapper;
|
||||
import com.njcn.redis.utils.RedisUtil;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.apache.commons.collections4.ListUtils;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.lang.reflect.Type;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
@@ -38,15 +43,19 @@ import java.util.stream.Collectors;
|
||||
public class InfluxdbDataFlickerImpl extends MppServiceImpl<RStatDataFlickerRelationMapper, RStatDataFlickerD> implements IDataFlicker {
|
||||
|
||||
private final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault());
|
||||
|
||||
private final DataFlickerMapper dataFlickerMapper;
|
||||
|
||||
private static final Map<String, String> PHASE_MAPPING = new HashMap<String, String>() {{
|
||||
put("AB", "A");
|
||||
put("BC", "B");
|
||||
put("CA", "C");
|
||||
put("M", "T");
|
||||
}};
|
||||
@Resource
|
||||
private RedisUtil redisUtil;
|
||||
private static final Set<String> LINE_VOLTAGE_TYPES =
|
||||
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("AB", "BC", "CA", "T")));
|
||||
private static final Set<String> PHASE_VOLTAGE_TYPES =
|
||||
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("A", "B", "C", "T")));
|
||||
|
||||
|
||||
@Override
|
||||
@@ -229,6 +238,13 @@ public class InfluxdbDataFlickerImpl extends MppServiceImpl<RStatDataFlickerRela
|
||||
*/
|
||||
public List<DataFlicker> getMinuteData(LineCountEvaluateParam lineParam) {
|
||||
List<DataFlicker> result = new ArrayList<>();
|
||||
List<DataFlicker> data = new ArrayList<>();
|
||||
//获取监测点、接线方式数据
|
||||
Type type = new TypeToken<Map<String, Integer>>(){}.getType();
|
||||
Map<String, Integer> map = new Gson().fromJson(
|
||||
String.valueOf(redisUtil.getObjectByKey("wlLineDetail")),
|
||||
type
|
||||
);
|
||||
InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataFlicker.class);
|
||||
influxQueryWrapper.regular(DataFlicker::getLineId, lineParam.getLineId())
|
||||
.select(DataFlicker::getLineId)
|
||||
@@ -242,13 +258,30 @@ public class InfluxdbDataFlickerImpl extends MppServiceImpl<RStatDataFlickerRela
|
||||
.eq(DataFlicker::getQualityFlag, "0");
|
||||
quality(result, influxQueryWrapper, lineParam);
|
||||
if (CollectionUtil.isNotEmpty(result)) {
|
||||
result.forEach(item -> {
|
||||
String newType = PHASE_MAPPING.get(item.getPhasicType());
|
||||
if (newType != null) {
|
||||
item.setPhasicType(newType);
|
||||
}
|
||||
});
|
||||
if (!Objects.isNull(map)) {
|
||||
//现根据监测点分组,然后根据接线方式排除多于数据,在修改相别
|
||||
Map<String, List<DataFlicker>> lineMap = result.stream().collect(Collectors.groupingBy(DataFlicker::getLineId));
|
||||
lineMap.forEach((k,v)->{
|
||||
if (Objects.isNull(map.get(k))) {
|
||||
return;
|
||||
}
|
||||
Integer conType = map.get(k);
|
||||
Set<String> validPhasicTypes = (conType != 0) ? LINE_VOLTAGE_TYPES : PHASE_VOLTAGE_TYPES;
|
||||
List<DataFlicker> result2 = v.stream().filter(item -> validPhasicTypes.contains(item.getPhasicType())).collect(Collectors.toList());
|
||||
data.addAll(result2);
|
||||
});
|
||||
} else {
|
||||
data.addAll(result);
|
||||
}
|
||||
if (CollectionUtil.isNotEmpty(data)) {
|
||||
data.forEach(item -> {
|
||||
String newType = PHASE_MAPPING.get(item.getPhasicType());
|
||||
if (newType != null) {
|
||||
item.setPhasicType(newType);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
return result;
|
||||
return data;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package com.njcn.dataProcess.service.impl.influxdb;
|
||||
|
||||
import cn.hutool.core.collection.CollectionUtil;
|
||||
import com.alibaba.nacos.shaded.com.google.common.reflect.TypeToken;
|
||||
import com.alibaba.nacos.shaded.com.google.gson.Gson;
|
||||
import com.github.jeffreyning.mybatisplus.service.MppServiceImpl;
|
||||
import com.njcn.dataProcess.dao.imapper.DataFlucMapper;
|
||||
import com.njcn.dataProcess.dao.relation.mapper.RStatDataFlucRelationMapper;
|
||||
@@ -13,11 +15,14 @@ import com.njcn.dataProcess.pojo.po.RStatDataFlucD;
|
||||
import com.njcn.dataProcess.service.IDataFluc;
|
||||
import com.njcn.dataProcess.util.TimeUtils;
|
||||
import com.njcn.influx.query.InfluxQueryWrapper;
|
||||
import com.njcn.redis.utils.RedisUtil;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.apache.commons.collections4.ListUtils;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.lang.reflect.Type;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
@@ -36,15 +41,19 @@ import java.util.stream.Collectors;
|
||||
public class InfluxdbDataFlucImpl extends MppServiceImpl<RStatDataFlucRelationMapper, RStatDataFlucD> implements IDataFluc {
|
||||
|
||||
private final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault());
|
||||
|
||||
private final DataFlucMapper dataFlucMapper;
|
||||
|
||||
private static final Map<String, String> PHASE_MAPPING = new HashMap<String, String>() {{
|
||||
put("AB", "A");
|
||||
put("BC", "B");
|
||||
put("CA", "C");
|
||||
put("M", "T");
|
||||
}};
|
||||
@Resource
|
||||
private RedisUtil redisUtil;
|
||||
private static final Set<String> LINE_VOLTAGE_TYPES =
|
||||
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("AB", "BC", "CA", "T")));
|
||||
private static final Set<String> PHASE_VOLTAGE_TYPES =
|
||||
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("A", "B", "C", "T")));
|
||||
|
||||
@Override
|
||||
public void batchInsertion(List<DataFlucDTO> dataFlucDTOList) {
|
||||
@@ -147,6 +156,13 @@ public class InfluxdbDataFlucImpl extends MppServiceImpl<RStatDataFlucRelationMa
|
||||
public List<DataFluc> getMinuteData(List<String> lineList, String startTime, String endTime, Map<String,List<String>> timeMap, Boolean dataType) {
|
||||
List<DataFluc> dataList;
|
||||
List<DataFluc> result = new ArrayList<>();
|
||||
List<DataFluc> data = new ArrayList<>();
|
||||
//获取监测点、接线方式数据
|
||||
Type type = new TypeToken<Map<String, Integer>>(){}.getType();
|
||||
Map<String, Integer> map = new Gson().fromJson(
|
||||
String.valueOf(redisUtil.getObjectByKey("wlLineDetail")),
|
||||
type
|
||||
);
|
||||
InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataFluc.class);
|
||||
influxQueryWrapper.regular(DataFluc::getLineId, lineList)
|
||||
.select(DataFluc::getLineId)
|
||||
@@ -195,14 +211,31 @@ public class InfluxdbDataFlucImpl extends MppServiceImpl<RStatDataFlucRelationMa
|
||||
}
|
||||
}
|
||||
if (CollectionUtil.isNotEmpty(result)) {
|
||||
result.forEach(item -> {
|
||||
String newType = PHASE_MAPPING.get(item.getPhasicType());
|
||||
if (newType != null) {
|
||||
item.setPhasicType(newType);
|
||||
}
|
||||
});
|
||||
if (!Objects.isNull(map)) {
|
||||
//现根据监测点分组,然后根据接线方式排除多于数据,在修改相别
|
||||
Map<String, List<DataFluc>> lineMap = result.stream().collect(Collectors.groupingBy(DataFluc::getLineId));
|
||||
lineMap.forEach((k,v)->{
|
||||
if (Objects.isNull(map.get(k))) {
|
||||
return;
|
||||
}
|
||||
Integer conType = map.get(k);
|
||||
Set<String> validPhasicTypes = (conType != 0) ? LINE_VOLTAGE_TYPES : PHASE_VOLTAGE_TYPES;
|
||||
List<DataFluc> result2 = v.stream().filter(item -> validPhasicTypes.contains(item.getPhasicType())).collect(Collectors.toList());
|
||||
data.addAll(result2);
|
||||
});
|
||||
} else {
|
||||
data.addAll(result);
|
||||
}
|
||||
if (CollectionUtil.isNotEmpty(data)) {
|
||||
data.forEach(item -> {
|
||||
String newType = PHASE_MAPPING.get(item.getPhasicType());
|
||||
if (newType != null) {
|
||||
item.setPhasicType(newType);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
return result;
|
||||
return data;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -2,6 +2,8 @@ package com.njcn.dataProcess.service.impl.influxdb;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.collection.CollectionUtil;
|
||||
import com.alibaba.nacos.shaded.com.google.common.reflect.TypeToken;
|
||||
import com.alibaba.nacos.shaded.com.google.gson.Gson;
|
||||
import com.github.jeffreyning.mybatisplus.service.MppServiceImpl;
|
||||
import com.njcn.common.utils.HarmonicTimesUtil;
|
||||
import com.njcn.dataProcess.constant.InfluxDBTableConstant;
|
||||
@@ -9,6 +11,7 @@ import com.njcn.dataProcess.dao.imapper.DataHarmRateVMapper;
|
||||
import com.njcn.dataProcess.dao.relation.mapper.RStatDataHarmRateVRelationMapper;
|
||||
import com.njcn.dataProcess.dto.DataHarmrateVDTO;
|
||||
import com.njcn.dataProcess.param.LineCountEvaluateParam;
|
||||
import com.njcn.dataProcess.po.influx.DataFlicker;
|
||||
import com.njcn.dataProcess.po.influx.DataHarmrateV;
|
||||
import com.njcn.dataProcess.po.influx.DataV;
|
||||
import com.njcn.dataProcess.pojo.dto.CommonMinuteDto;
|
||||
@@ -19,11 +22,14 @@ import com.njcn.dataProcess.service.IDataHarmRateV;
|
||||
import com.njcn.dataProcess.util.TimeUtils;
|
||||
import com.njcn.influx.constant.InfluxDbSqlConstant;
|
||||
import com.njcn.influx.query.InfluxQueryWrapper;
|
||||
import com.njcn.redis.utils.RedisUtil;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.apache.commons.collections4.ListUtils;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.lang.reflect.Type;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
@@ -36,17 +42,20 @@ import java.util.stream.Collectors;
|
||||
@Service("InfluxdbDataHarmRateVImpl")
|
||||
@RequiredArgsConstructor
|
||||
public class InfluxdbDataHarmRateVImpl extends MppServiceImpl<RStatDataHarmRateVRelationMapper, RStatDataHarmRateVD> implements IDataHarmRateV {
|
||||
|
||||
private final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault());
|
||||
|
||||
private final DataHarmRateVMapper dataHarmRateVMapper;
|
||||
|
||||
private static final Map<String, String> PHASE_MAPPING = new HashMap<String, String>() {{
|
||||
put("AB", "A");
|
||||
put("BC", "B");
|
||||
put("CA", "C");
|
||||
put("M", "T");
|
||||
}};
|
||||
@Resource
|
||||
private RedisUtil redisUtil;
|
||||
private static final Set<String> LINE_VOLTAGE_TYPES =
|
||||
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("AB", "BC", "CA", "T")));
|
||||
private static final Set<String> PHASE_VOLTAGE_TYPES =
|
||||
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("A", "B", "C", "T")));
|
||||
|
||||
@Override
|
||||
public List<DataHarmDto> getRawData(LineCountEvaluateParam lineParam) {
|
||||
@@ -244,6 +253,13 @@ public class InfluxdbDataHarmRateVImpl extends MppServiceImpl<RStatDataHarmRateV
|
||||
public List<DataHarmrateV> getMinuteData(LineCountEvaluateParam lineParam) {
|
||||
List<DataHarmrateV> dataList;
|
||||
List<DataHarmrateV> result = new ArrayList<>();
|
||||
List<DataHarmrateV> data = new ArrayList<>();
|
||||
//获取监测点、接线方式数据
|
||||
Type type = new TypeToken<Map<String, Integer>>(){}.getType();
|
||||
Map<String, Integer> map = new Gson().fromJson(
|
||||
String.valueOf(redisUtil.getObjectByKey("wlLineDetail")),
|
||||
type
|
||||
);
|
||||
InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataHarmrateV.class);
|
||||
influxQueryWrapper.samePrefixAndSuffix(InfluxDbSqlConstant.V, InfluxDbSqlConstant.V, HarmonicTimesUtil.harmonicTimesList(1, 50, 1));
|
||||
influxQueryWrapper.regular(DataHarmrateV::getLineId, lineParam.getLineId())
|
||||
@@ -295,13 +311,30 @@ public class InfluxdbDataHarmRateVImpl extends MppServiceImpl<RStatDataHarmRateV
|
||||
}
|
||||
}
|
||||
if (CollectionUtil.isNotEmpty(result)) {
|
||||
result.forEach(item -> {
|
||||
String newType = PHASE_MAPPING.get(item.getPhasicType());
|
||||
if (newType != null) {
|
||||
item.setPhasicType(newType);
|
||||
}
|
||||
});
|
||||
if (!Objects.isNull(map)) {
|
||||
//现根据监测点分组,然后根据接线方式排除多于数据,在修改相别
|
||||
Map<String, List<DataHarmrateV>> lineMap = result.stream().collect(Collectors.groupingBy(DataHarmrateV::getLineId));
|
||||
lineMap.forEach((k,v)->{
|
||||
if (Objects.isNull(map.get(k))) {
|
||||
return;
|
||||
}
|
||||
Integer conType = map.get(k);
|
||||
Set<String> validPhasicTypes = (conType != 0) ? LINE_VOLTAGE_TYPES : PHASE_VOLTAGE_TYPES;
|
||||
List<DataHarmrateV> result2 = v.stream().filter(item -> validPhasicTypes.contains(item.getPhasicType())).collect(Collectors.toList());
|
||||
data.addAll(result2);
|
||||
});
|
||||
} else {
|
||||
data.addAll(result);
|
||||
}
|
||||
if (CollectionUtil.isNotEmpty(data)) {
|
||||
data.forEach(item -> {
|
||||
String newType = PHASE_MAPPING.get(item.getPhasicType());
|
||||
if (newType != null) {
|
||||
item.setPhasicType(newType);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
return result;
|
||||
return data;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,8 @@ package com.njcn.dataProcess.service.impl.influxdb;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.collection.CollectionUtil;
|
||||
import com.alibaba.nacos.shaded.com.google.common.reflect.TypeToken;
|
||||
import com.alibaba.nacos.shaded.com.google.gson.Gson;
|
||||
import com.github.jeffreyning.mybatisplus.service.MppServiceImpl;
|
||||
import com.njcn.common.utils.HarmonicTimesUtil;
|
||||
import com.njcn.dataProcess.dao.imapper.DataHarmphasicVMapper;
|
||||
@@ -17,11 +19,14 @@ import com.njcn.dataProcess.service.IDataHarmphasicV;
|
||||
import com.njcn.dataProcess.util.TimeUtils;
|
||||
import com.njcn.influx.constant.InfluxDbSqlConstant;
|
||||
import com.njcn.influx.query.InfluxQueryWrapper;
|
||||
import com.njcn.redis.utils.RedisUtil;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.apache.commons.collections4.ListUtils;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.lang.reflect.Type;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
@@ -41,13 +46,18 @@ public class InfluxdbDataHarmphasicVImpl extends MppServiceImpl<RStatDataHarmPha
|
||||
|
||||
private final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault());
|
||||
private final DataHarmphasicVMapper dataHarmphasicVMapper;
|
||||
|
||||
private static final Map<String, String> PHASE_MAPPING = new HashMap<String, String>() {{
|
||||
put("AB", "A");
|
||||
put("BC", "B");
|
||||
put("CA", "C");
|
||||
put("M", "T");
|
||||
}};
|
||||
@Resource
|
||||
private RedisUtil redisUtil;
|
||||
private static final Set<String> LINE_VOLTAGE_TYPES =
|
||||
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("AB", "BC", "CA", "T")));
|
||||
private static final Set<String> PHASE_VOLTAGE_TYPES =
|
||||
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("A", "B", "C", "T")));
|
||||
|
||||
|
||||
@Override
|
||||
@@ -205,6 +215,13 @@ public class InfluxdbDataHarmphasicVImpl extends MppServiceImpl<RStatDataHarmPha
|
||||
public List<DataHarmphasicV> getMinuteData(List<String> lineList, String startTime, String endTime, Map<String,List<String>> timeMap, Boolean dataType) {
|
||||
List<DataHarmphasicV> dataList;
|
||||
List<DataHarmphasicV> result = new ArrayList<>();
|
||||
List<DataHarmphasicV> data = new ArrayList<>();
|
||||
//获取监测点、接线方式数据
|
||||
Type type = new TypeToken<Map<String, Integer>>(){}.getType();
|
||||
Map<String, Integer> map = new Gson().fromJson(
|
||||
String.valueOf(redisUtil.getObjectByKey("wlLineDetail")),
|
||||
type
|
||||
);
|
||||
InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataHarmphasicV.class);
|
||||
influxQueryWrapper.samePrefixAndSuffix(InfluxDbSqlConstant.V, InfluxDbSqlConstant.V, HarmonicTimesUtil.harmonicTimesList(1, 50, 1));
|
||||
influxQueryWrapper.regular(DataHarmphasicV::getLineId, lineList)
|
||||
@@ -253,13 +270,30 @@ public class InfluxdbDataHarmphasicVImpl extends MppServiceImpl<RStatDataHarmPha
|
||||
}
|
||||
}
|
||||
if (CollectionUtil.isNotEmpty(result)) {
|
||||
result.forEach(item -> {
|
||||
String newType = PHASE_MAPPING.get(item.getPhasicType());
|
||||
if (newType != null) {
|
||||
item.setPhasicType(newType);
|
||||
}
|
||||
});
|
||||
if (!Objects.isNull(map)) {
|
||||
//现根据监测点分组,然后根据接线方式排除多于数据,在修改相别
|
||||
Map<String, List<DataHarmphasicV>> lineMap = result.stream().collect(Collectors.groupingBy(DataHarmphasicV::getLineId));
|
||||
lineMap.forEach((k,v)->{
|
||||
if (Objects.isNull(map.get(k))) {
|
||||
return;
|
||||
}
|
||||
Integer conType = map.get(k);
|
||||
Set<String> validPhasicTypes = (conType != 0) ? LINE_VOLTAGE_TYPES : PHASE_VOLTAGE_TYPES;
|
||||
List<DataHarmphasicV> result2 = v.stream().filter(item -> validPhasicTypes.contains(item.getPhasicType())).collect(Collectors.toList());
|
||||
data.addAll(result2);
|
||||
});
|
||||
} else {
|
||||
data.addAll(result);
|
||||
}
|
||||
if (CollectionUtil.isNotEmpty(data)) {
|
||||
data.forEach(item -> {
|
||||
String newType = PHASE_MAPPING.get(item.getPhasicType());
|
||||
if (newType != null) {
|
||||
item.setPhasicType(newType);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
return result;
|
||||
return data;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,8 @@ package com.njcn.dataProcess.service.impl.influxdb;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.collection.CollectionUtil;
|
||||
import com.alibaba.nacos.shaded.com.google.common.reflect.TypeToken;
|
||||
import com.alibaba.nacos.shaded.com.google.gson.Gson;
|
||||
import com.github.jeffreyning.mybatisplus.service.MppServiceImpl;
|
||||
import com.njcn.dataProcess.dao.imapper.DataPltMapper;
|
||||
import com.njcn.dataProcess.dao.relation.mapper.RStatDataPltRelationMapper;
|
||||
@@ -15,11 +17,14 @@ import com.njcn.dataProcess.pojo.po.RStatDataPltD;
|
||||
import com.njcn.dataProcess.service.IDataPlt;
|
||||
import com.njcn.dataProcess.util.TimeUtils;
|
||||
import com.njcn.influx.query.InfluxQueryWrapper;
|
||||
import com.njcn.redis.utils.RedisUtil;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.apache.commons.collections4.ListUtils;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.lang.reflect.Type;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
@@ -39,13 +44,18 @@ public class InfluxdbDataPltImpl extends MppServiceImpl<RStatDataPltRelationMapp
|
||||
|
||||
private final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault());
|
||||
private final DataPltMapper dataPltMapper;
|
||||
|
||||
private static final Map<String, String> PHASE_MAPPING = new HashMap<String, String>() {{
|
||||
put("AB", "A");
|
||||
put("BC", "B");
|
||||
put("CA", "C");
|
||||
put("M", "T");
|
||||
}};
|
||||
@Resource
|
||||
private RedisUtil redisUtil;
|
||||
private static final Set<String> LINE_VOLTAGE_TYPES =
|
||||
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("AB", "BC", "CA", "T")));
|
||||
private static final Set<String> PHASE_VOLTAGE_TYPES =
|
||||
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("A", "B", "C", "T")));
|
||||
|
||||
@Override
|
||||
public void batchInsertion(List<DataPltDTO> dataPltDTOList) {
|
||||
@@ -152,6 +162,13 @@ public class InfluxdbDataPltImpl extends MppServiceImpl<RStatDataPltRelationMapp
|
||||
public List<DataPlt> getMinuteDataPlt(LineCountEvaluateParam lineParam) {
|
||||
List<DataPlt> dataList;
|
||||
List<DataPlt> result = new ArrayList<>();
|
||||
List<DataPlt> data = new ArrayList<>();
|
||||
//获取监测点、接线方式数据
|
||||
Type type = new TypeToken<Map<String, Integer>>(){}.getType();
|
||||
Map<String, Integer> map = new Gson().fromJson(
|
||||
String.valueOf(redisUtil.getObjectByKey("wlLineDetail")),
|
||||
type
|
||||
);
|
||||
InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataPlt.class);
|
||||
influxQueryWrapper.regular(DataPlt::getLineId, lineParam.getLineId())
|
||||
.select(DataPlt::getLineId)
|
||||
@@ -202,13 +219,30 @@ public class InfluxdbDataPltImpl extends MppServiceImpl<RStatDataPltRelationMapp
|
||||
}
|
||||
}
|
||||
if (CollectionUtil.isNotEmpty(result)) {
|
||||
result.forEach(item -> {
|
||||
String newType = PHASE_MAPPING.get(item.getPhasicType());
|
||||
if (newType != null) {
|
||||
item.setPhasicType(newType);
|
||||
}
|
||||
});
|
||||
if (!Objects.isNull(map)) {
|
||||
//现根据监测点分组,然后根据接线方式排除多于数据,在修改相别
|
||||
Map<String, List<DataPlt>> lineMap = result.stream().collect(Collectors.groupingBy(DataPlt::getLineId));
|
||||
lineMap.forEach((k,v)->{
|
||||
if (Objects.isNull(map.get(k))) {
|
||||
return;
|
||||
}
|
||||
Integer conType = map.get(k);
|
||||
Set<String> validPhasicTypes = (conType != 0) ? LINE_VOLTAGE_TYPES : PHASE_VOLTAGE_TYPES;
|
||||
List<DataPlt> result2 = v.stream().filter(item -> validPhasicTypes.contains(item.getPhasicType())).collect(Collectors.toList());
|
||||
data.addAll(result2);
|
||||
});
|
||||
} else {
|
||||
data.addAll(result);
|
||||
}
|
||||
if (CollectionUtil.isNotEmpty(data)) {
|
||||
data.forEach(item -> {
|
||||
String newType = PHASE_MAPPING.get(item.getPhasicType());
|
||||
if (newType != null) {
|
||||
item.setPhasicType(newType);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
return result;
|
||||
return data;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,8 @@ import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.collection.CollectionUtil;
|
||||
import cn.hutool.core.collection.ListUtil;
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import com.alibaba.nacos.shaded.com.google.common.reflect.TypeToken;
|
||||
import com.alibaba.nacos.shaded.com.google.gson.Gson;
|
||||
import com.github.jeffreyning.mybatisplus.service.MppServiceImpl;
|
||||
import com.njcn.common.utils.HarmonicTimesUtil;
|
||||
import com.njcn.dataProcess.constant.InfluxDBTableConstant;
|
||||
@@ -24,16 +26,19 @@ import com.njcn.dataProcess.service.IDataV;
|
||||
import com.njcn.dataProcess.util.TimeUtils;
|
||||
import com.njcn.influx.constant.InfluxDbSqlConstant;
|
||||
import com.njcn.influx.query.InfluxQueryWrapper;
|
||||
import com.njcn.redis.utils.RedisUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.collections4.ListUtils;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.lang.reflect.Type;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
@@ -46,16 +51,20 @@ import java.util.stream.Collectors;
|
||||
public class InfluxdbDataVImpl extends MppServiceImpl<RStatDataVRelationMapper, RStatDataVD> implements IDataV {
|
||||
|
||||
private final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault());
|
||||
|
||||
private static final Map<String, String> PHASE_MAPPING = new HashMap<String, String>() {{
|
||||
put("AB", "A");
|
||||
put("BC", "B");
|
||||
put("CA", "C");
|
||||
put("M", "T");
|
||||
}};
|
||||
|
||||
@Resource
|
||||
private DataVMapper dataVMapper;
|
||||
@Resource
|
||||
private RedisUtil redisUtil;
|
||||
private static final Set<String> LINE_VOLTAGE_TYPES =
|
||||
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("AB", "BC", "CA", "T")));
|
||||
private static final Set<String> PHASE_VOLTAGE_TYPES =
|
||||
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("A", "B", "C", "T")));
|
||||
|
||||
/**
|
||||
* 注意:influxdb不推荐采用in函数的方式批量查询监测点的数据,效率很低,容易造成崩溃,故每次单测点查询
|
||||
@@ -437,6 +446,13 @@ public class InfluxdbDataVImpl extends MppServiceImpl<RStatDataVRelationMapper,
|
||||
*/
|
||||
public List<DataV> getMinuteDataV(LineCountEvaluateParam lineParam) {
|
||||
List<DataV> result = new ArrayList<>();
|
||||
List<DataV> data = new ArrayList<>();
|
||||
//获取监测点、接线方式数据
|
||||
Type type = new TypeToken<Map<String, Integer>>(){}.getType();
|
||||
Map<String, Integer> map = new Gson().fromJson(
|
||||
String.valueOf(redisUtil.getObjectByKey("wlLineDetail")),
|
||||
type
|
||||
);
|
||||
InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataV.class);
|
||||
influxQueryWrapper.samePrefixAndSuffix(InfluxDbSqlConstant.V, InfluxDbSqlConstant.V, HarmonicTimesUtil.harmonicTimesList(1, 50, 1));
|
||||
influxQueryWrapper.regular(DataV::getLineId, lineParam.getLineId())
|
||||
@@ -463,14 +479,74 @@ public class InfluxdbDataVImpl extends MppServiceImpl<RStatDataVRelationMapper,
|
||||
}
|
||||
quality(result, influxQueryWrapper, lineParam);
|
||||
if (CollectionUtil.isNotEmpty(result)) {
|
||||
result.forEach(item -> {
|
||||
String newType = PHASE_MAPPING.get(item.getPhasicType());
|
||||
if (newType != null) {
|
||||
item.setPhasicType(newType);
|
||||
}
|
||||
});
|
||||
if (!Objects.isNull(map)) {
|
||||
//现根据监测点分组,然后根据接线方式排除多于数据,在修改相别
|
||||
Map<String, List<DataV>> lineMap = result.stream().collect(Collectors.groupingBy(DataV::getLineId));
|
||||
lineMap.forEach((k,v)->{
|
||||
if (Objects.isNull(map.get(k))) {
|
||||
return;
|
||||
}
|
||||
//这边需要特殊处理下,将线电压数据赋值
|
||||
Map<String, DataV> lineVoltageIndex = v.stream()
|
||||
.filter(d -> PHASE_MAPPING.containsKey(d.getPhasicType()))
|
||||
.filter(d -> d.getRmsLvr() != null)
|
||||
.collect(Collectors.toMap(
|
||||
d -> buildKey(d.getTime(), d.getValueType(), d.getPhasicType()),
|
||||
Function.identity(),
|
||||
(existing, replacement) -> existing
|
||||
));
|
||||
v.stream()
|
||||
.filter(d -> PHASE_VOLTAGE_TYPES.contains(d.getPhasicType()))
|
||||
.forEach(phaseData -> {
|
||||
// 根据当前相电压反查对应的线电压相别
|
||||
String targetLinePhasic = getReverseLinePhasic(phaseData.getPhasicType());
|
||||
if (targetLinePhasic == null) {
|
||||
return;
|
||||
}
|
||||
String key = buildKey(phaseData.getTime(), phaseData.getValueType(), targetLinePhasic);
|
||||
DataV matchedLineData = lineVoltageIndex.get(key);
|
||||
if (matchedLineData != null && matchedLineData.getRmsLvr() != null) {
|
||||
phaseData.setRmsLvr(matchedLineData.getRmsLvr());
|
||||
}
|
||||
});
|
||||
Integer conType = map.get(k);
|
||||
Set<String> validPhasicTypes = (conType != 0) ? LINE_VOLTAGE_TYPES : PHASE_VOLTAGE_TYPES;
|
||||
List<DataV> result2 = v.stream().filter(item -> validPhasicTypes.contains(item.getPhasicType())).collect(Collectors.toList());
|
||||
data.addAll(result2);
|
||||
});
|
||||
} else {
|
||||
data.addAll(result);
|
||||
}
|
||||
if (CollectionUtil.isNotEmpty(data)) {
|
||||
data.forEach(item -> {
|
||||
String newType = PHASE_MAPPING.get(item.getPhasicType());
|
||||
if (newType != null) {
|
||||
item.setPhasicType(newType);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
||||
private static String buildKey(Object time, Object valueType, Object phasicType) {
|
||||
return time + "|" + valueType + "|" + phasicType;
|
||||
}
|
||||
|
||||
private static String getReverseLinePhasic(String phaseType) {
|
||||
if (phaseType == null) {
|
||||
return null;
|
||||
}
|
||||
switch (phaseType) {
|
||||
case "A":
|
||||
return "AB";
|
||||
case "B":
|
||||
return "BC";
|
||||
case "C":
|
||||
return "CA";
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private void quality(List<DataV> result, InfluxQueryWrapper influxQueryWrapper, LineCountEvaluateParam lineParam) {
|
||||
|
||||
Reference in New Issue
Block a user