diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/ExecutionCenter.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/ExecutionCenter.java index 2c60b7c..e01a5fe 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/ExecutionCenter.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/ExecutionCenter.java @@ -16,6 +16,7 @@ import com.njcn.common.pojo.annotation.OperateInfo; import com.njcn.common.pojo.enums.common.LogEnum; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.csdevice.api.CsLineFeignClient; +import com.njcn.csdevice.pojo.dto.CsLineDTO; import com.njcn.device.biz.commApi.CommTerminalGeneralClient; import com.njcn.device.biz.pojo.dto.DeptGetChildrenMoreDTO; import com.njcn.device.biz.pojo.dto.DeptGetDeviceDTO; @@ -23,6 +24,7 @@ import com.njcn.device.biz.pojo.dto.DeptGetSubStationDTO; import com.njcn.device.biz.pojo.dto.LineDevGetDTO; import com.njcn.device.biz.pojo.param.DeptGetLineParam; import com.njcn.device.pq.api.DeptLineFeignClient; +import com.njcn.redis.utils.RedisUtil; import com.njcn.user.api.DeptFeignClient; import com.njcn.user.pojo.po.Dept; import com.njcn.web.controller.BaseController; @@ -73,6 +75,8 @@ public class ExecutionCenter extends BaseController { private CsLineFeignClient csLineFeignClient; @Resource private FlowAsyncService flowService; + @Resource + private RedisUtil redisUtil; /*** * 1、校验非全链执行时,tagNames节点标签集合必须为非空,否则提示---无可执行节点 @@ -143,9 +147,27 @@ public class ExecutionCenter extends BaseController { String methodDescribe = getMethodDescribe("wlMeasurementPointExecutor"); //手动判断参数是否合法, CalculatedParam calculatedParam = judgeExecuteParam(baseParam); + //缓存监测点、接线方式数据 + List csLineDTOList = csLineFeignClient.getAllLineDetail().getData(); + List lineIdList = new ArrayList<>(); + Map lineConTypeMap = new HashMap<>(); + if (CollectionUtils.isNotEmpty(csLineDTOList)) { + lineIdList = csLineDTOList.stream().map(CsLineDTO::getLineId).collect(Collectors.toList()); + //接线方式(0-星型 1-角型 2-V型) + lineConTypeMap = csLineDTOList.stream() + .filter(dto -> dto.getLineId() != null) + .collect(Collectors.toMap( + CsLineDTO::getLineId, + dto -> dto.getConType() != null ? dto.getConType() : 0, + (v1, v2) -> v1 + )); + redisUtil.saveByKeyWithExpire("wlLineDetail", lineConTypeMap, 3600L*3); + } // 测点索引 if (CollectionUtils.isEmpty(calculatedParam.getIdList())) { - calculatedParam.setIdList(csLineFeignClient.getAllLine().getData()); + calculatedParam.setIdList(lineIdList); + } else { + calculatedParam.setIdList(calculatedParam.getIdList()); } LiteflowResponse liteflowResponse; if (baseParam.isRepair()) { diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataFlickerImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataFlickerImpl.java index e9e804e..c4a22a3 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataFlickerImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataFlickerImpl.java @@ -3,6 +3,8 @@ package com.njcn.dataProcess.service.impl.influxdb; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.util.ObjectUtil; +import com.alibaba.nacos.shaded.com.google.common.reflect.TypeToken; +import com.alibaba.nacos.shaded.com.google.gson.Gson; import com.github.jeffreyning.mybatisplus.service.MppServiceImpl; import com.njcn.dataProcess.dao.imapper.DataFlickerMapper; import com.njcn.dataProcess.dao.relation.mapper.RStatDataFlickerRelationMapper; @@ -15,11 +17,14 @@ import com.njcn.dataProcess.pojo.po.RStatDataFlickerD; import com.njcn.dataProcess.service.IDataFlicker; import com.njcn.dataProcess.util.TimeUtils; import com.njcn.influx.query.InfluxQueryWrapper; +import com.njcn.redis.utils.RedisUtil; import lombok.RequiredArgsConstructor; import org.apache.commons.collections4.ListUtils; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; +import javax.annotation.Resource; +import java.lang.reflect.Type; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; @@ -38,15 +43,19 @@ import java.util.stream.Collectors; public class InfluxdbDataFlickerImpl extends MppServiceImpl implements IDataFlicker { private final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault()); - private final DataFlickerMapper dataFlickerMapper; - private static final Map PHASE_MAPPING = new HashMap() {{ put("AB", "A"); put("BC", "B"); put("CA", "C"); put("M", "T"); }}; + @Resource + private RedisUtil redisUtil; + private static final Set LINE_VOLTAGE_TYPES = + Collections.unmodifiableSet(new HashSet<>(Arrays.asList("AB", "BC", "CA", "T"))); + private static final Set PHASE_VOLTAGE_TYPES = + Collections.unmodifiableSet(new HashSet<>(Arrays.asList("A", "B", "C", "T"))); @Override @@ -229,6 +238,13 @@ public class InfluxdbDataFlickerImpl extends MppServiceImpl getMinuteData(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); + List data = new ArrayList<>(); + //获取监测点、接线方式数据 + Type type = new TypeToken>(){}.getType(); + Map map = new Gson().fromJson( + String.valueOf(redisUtil.getObjectByKey("wlLineDetail")), + type + ); InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataFlicker.class); influxQueryWrapper.regular(DataFlicker::getLineId, lineParam.getLineId()) .select(DataFlicker::getLineId) @@ -242,13 +258,30 @@ public class InfluxdbDataFlickerImpl extends MppServiceImpl { - String newType = PHASE_MAPPING.get(item.getPhasicType()); - if (newType != null) { - item.setPhasicType(newType); - } - }); + if (!Objects.isNull(map)) { + //现根据监测点分组,然后根据接线方式排除多于数据,在修改相别 + Map> lineMap = result.stream().collect(Collectors.groupingBy(DataFlicker::getLineId)); + lineMap.forEach((k,v)->{ + if (Objects.isNull(map.get(k))) { + return; + } + Integer conType = map.get(k); + Set validPhasicTypes = (conType != 0) ? LINE_VOLTAGE_TYPES : PHASE_VOLTAGE_TYPES; + List result2 = v.stream().filter(item -> validPhasicTypes.contains(item.getPhasicType())).collect(Collectors.toList()); + data.addAll(result2); + }); + } else { + data.addAll(result); + } + if (CollectionUtil.isNotEmpty(data)) { + data.forEach(item -> { + String newType = PHASE_MAPPING.get(item.getPhasicType()); + if (newType != null) { + item.setPhasicType(newType); + } + }); + } } - return result; + return data; } } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataFlucImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataFlucImpl.java index 4b7ac26..486ace8 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataFlucImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataFlucImpl.java @@ -1,6 +1,8 @@ package com.njcn.dataProcess.service.impl.influxdb; import cn.hutool.core.collection.CollectionUtil; +import com.alibaba.nacos.shaded.com.google.common.reflect.TypeToken; +import com.alibaba.nacos.shaded.com.google.gson.Gson; import com.github.jeffreyning.mybatisplus.service.MppServiceImpl; import com.njcn.dataProcess.dao.imapper.DataFlucMapper; import com.njcn.dataProcess.dao.relation.mapper.RStatDataFlucRelationMapper; @@ -13,11 +15,14 @@ import com.njcn.dataProcess.pojo.po.RStatDataFlucD; import com.njcn.dataProcess.service.IDataFluc; import com.njcn.dataProcess.util.TimeUtils; import com.njcn.influx.query.InfluxQueryWrapper; +import com.njcn.redis.utils.RedisUtil; import lombok.RequiredArgsConstructor; import org.apache.commons.collections4.ListUtils; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; +import javax.annotation.Resource; +import java.lang.reflect.Type; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; @@ -36,15 +41,19 @@ import java.util.stream.Collectors; public class InfluxdbDataFlucImpl extends MppServiceImpl implements IDataFluc { private final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault()); - private final DataFlucMapper dataFlucMapper; - private static final Map PHASE_MAPPING = new HashMap() {{ put("AB", "A"); put("BC", "B"); put("CA", "C"); put("M", "T"); }}; + @Resource + private RedisUtil redisUtil; + private static final Set LINE_VOLTAGE_TYPES = + Collections.unmodifiableSet(new HashSet<>(Arrays.asList("AB", "BC", "CA", "T"))); + private static final Set PHASE_VOLTAGE_TYPES = + Collections.unmodifiableSet(new HashSet<>(Arrays.asList("A", "B", "C", "T"))); @Override public void batchInsertion(List dataFlucDTOList) { @@ -147,6 +156,13 @@ public class InfluxdbDataFlucImpl extends MppServiceImpl getMinuteData(List lineList, String startTime, String endTime, Map> timeMap, Boolean dataType) { List dataList; List result = new ArrayList<>(); + List data = new ArrayList<>(); + //获取监测点、接线方式数据 + Type type = new TypeToken>(){}.getType(); + Map map = new Gson().fromJson( + String.valueOf(redisUtil.getObjectByKey("wlLineDetail")), + type + ); InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataFluc.class); influxQueryWrapper.regular(DataFluc::getLineId, lineList) .select(DataFluc::getLineId) @@ -195,14 +211,31 @@ public class InfluxdbDataFlucImpl extends MppServiceImpl { - String newType = PHASE_MAPPING.get(item.getPhasicType()); - if (newType != null) { - item.setPhasicType(newType); - } - }); + if (!Objects.isNull(map)) { + //现根据监测点分组,然后根据接线方式排除多于数据,在修改相别 + Map> lineMap = result.stream().collect(Collectors.groupingBy(DataFluc::getLineId)); + lineMap.forEach((k,v)->{ + if (Objects.isNull(map.get(k))) { + return; + } + Integer conType = map.get(k); + Set validPhasicTypes = (conType != 0) ? LINE_VOLTAGE_TYPES : PHASE_VOLTAGE_TYPES; + List result2 = v.stream().filter(item -> validPhasicTypes.contains(item.getPhasicType())).collect(Collectors.toList()); + data.addAll(result2); + }); + } else { + data.addAll(result); + } + if (CollectionUtil.isNotEmpty(data)) { + data.forEach(item -> { + String newType = PHASE_MAPPING.get(item.getPhasicType()); + if (newType != null) { + item.setPhasicType(newType); + } + }); + } } - return result; + return data; } } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmRateVImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmRateVImpl.java index 052e359..615ac8a 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmRateVImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmRateVImpl.java @@ -2,6 +2,8 @@ package com.njcn.dataProcess.service.impl.influxdb; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollectionUtil; +import com.alibaba.nacos.shaded.com.google.common.reflect.TypeToken; +import com.alibaba.nacos.shaded.com.google.gson.Gson; import com.github.jeffreyning.mybatisplus.service.MppServiceImpl; import com.njcn.common.utils.HarmonicTimesUtil; import com.njcn.dataProcess.constant.InfluxDBTableConstant; @@ -9,6 +11,7 @@ import com.njcn.dataProcess.dao.imapper.DataHarmRateVMapper; import com.njcn.dataProcess.dao.relation.mapper.RStatDataHarmRateVRelationMapper; import com.njcn.dataProcess.dto.DataHarmrateVDTO; import com.njcn.dataProcess.param.LineCountEvaluateParam; +import com.njcn.dataProcess.po.influx.DataFlicker; import com.njcn.dataProcess.po.influx.DataHarmrateV; import com.njcn.dataProcess.po.influx.DataV; import com.njcn.dataProcess.pojo.dto.CommonMinuteDto; @@ -19,11 +22,14 @@ import com.njcn.dataProcess.service.IDataHarmRateV; import com.njcn.dataProcess.util.TimeUtils; import com.njcn.influx.constant.InfluxDbSqlConstant; import com.njcn.influx.query.InfluxQueryWrapper; +import com.njcn.redis.utils.RedisUtil; import lombok.RequiredArgsConstructor; import org.apache.commons.collections4.ListUtils; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; +import javax.annotation.Resource; +import java.lang.reflect.Type; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; @@ -36,17 +42,20 @@ import java.util.stream.Collectors; @Service("InfluxdbDataHarmRateVImpl") @RequiredArgsConstructor public class InfluxdbDataHarmRateVImpl extends MppServiceImpl implements IDataHarmRateV { - private final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault()); - private final DataHarmRateVMapper dataHarmRateVMapper; - private static final Map PHASE_MAPPING = new HashMap() {{ put("AB", "A"); put("BC", "B"); put("CA", "C"); put("M", "T"); }}; + @Resource + private RedisUtil redisUtil; + private static final Set LINE_VOLTAGE_TYPES = + Collections.unmodifiableSet(new HashSet<>(Arrays.asList("AB", "BC", "CA", "T"))); + private static final Set PHASE_VOLTAGE_TYPES = + Collections.unmodifiableSet(new HashSet<>(Arrays.asList("A", "B", "C", "T"))); @Override public List getRawData(LineCountEvaluateParam lineParam) { @@ -244,6 +253,13 @@ public class InfluxdbDataHarmRateVImpl extends MppServiceImpl getMinuteData(LineCountEvaluateParam lineParam) { List dataList; List result = new ArrayList<>(); + List data = new ArrayList<>(); + //获取监测点、接线方式数据 + Type type = new TypeToken>(){}.getType(); + Map map = new Gson().fromJson( + String.valueOf(redisUtil.getObjectByKey("wlLineDetail")), + type + ); InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataHarmrateV.class); influxQueryWrapper.samePrefixAndSuffix(InfluxDbSqlConstant.V, InfluxDbSqlConstant.V, HarmonicTimesUtil.harmonicTimesList(1, 50, 1)); influxQueryWrapper.regular(DataHarmrateV::getLineId, lineParam.getLineId()) @@ -295,13 +311,30 @@ public class InfluxdbDataHarmRateVImpl extends MppServiceImpl { - String newType = PHASE_MAPPING.get(item.getPhasicType()); - if (newType != null) { - item.setPhasicType(newType); - } - }); + if (!Objects.isNull(map)) { + //现根据监测点分组,然后根据接线方式排除多于数据,在修改相别 + Map> lineMap = result.stream().collect(Collectors.groupingBy(DataHarmrateV::getLineId)); + lineMap.forEach((k,v)->{ + if (Objects.isNull(map.get(k))) { + return; + } + Integer conType = map.get(k); + Set validPhasicTypes = (conType != 0) ? LINE_VOLTAGE_TYPES : PHASE_VOLTAGE_TYPES; + List result2 = v.stream().filter(item -> validPhasicTypes.contains(item.getPhasicType())).collect(Collectors.toList()); + data.addAll(result2); + }); + } else { + data.addAll(result); + } + if (CollectionUtil.isNotEmpty(data)) { + data.forEach(item -> { + String newType = PHASE_MAPPING.get(item.getPhasicType()); + if (newType != null) { + item.setPhasicType(newType); + } + }); + } } - return result; + return data; } } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmphasicVImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmphasicVImpl.java index 8e297c5..719849e 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmphasicVImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmphasicVImpl.java @@ -2,6 +2,8 @@ package com.njcn.dataProcess.service.impl.influxdb; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollectionUtil; +import com.alibaba.nacos.shaded.com.google.common.reflect.TypeToken; +import com.alibaba.nacos.shaded.com.google.gson.Gson; import com.github.jeffreyning.mybatisplus.service.MppServiceImpl; import com.njcn.common.utils.HarmonicTimesUtil; import com.njcn.dataProcess.dao.imapper.DataHarmphasicVMapper; @@ -17,11 +19,14 @@ import com.njcn.dataProcess.service.IDataHarmphasicV; import com.njcn.dataProcess.util.TimeUtils; import com.njcn.influx.constant.InfluxDbSqlConstant; import com.njcn.influx.query.InfluxQueryWrapper; +import com.njcn.redis.utils.RedisUtil; import lombok.RequiredArgsConstructor; import org.apache.commons.collections4.ListUtils; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; +import javax.annotation.Resource; +import java.lang.reflect.Type; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; @@ -41,13 +46,18 @@ public class InfluxdbDataHarmphasicVImpl extends MppServiceImpl PHASE_MAPPING = new HashMap() {{ put("AB", "A"); put("BC", "B"); put("CA", "C"); put("M", "T"); }}; + @Resource + private RedisUtil redisUtil; + private static final Set LINE_VOLTAGE_TYPES = + Collections.unmodifiableSet(new HashSet<>(Arrays.asList("AB", "BC", "CA", "T"))); + private static final Set PHASE_VOLTAGE_TYPES = + Collections.unmodifiableSet(new HashSet<>(Arrays.asList("A", "B", "C", "T"))); @Override @@ -205,6 +215,13 @@ public class InfluxdbDataHarmphasicVImpl extends MppServiceImpl getMinuteData(List lineList, String startTime, String endTime, Map> timeMap, Boolean dataType) { List dataList; List result = new ArrayList<>(); + List data = new ArrayList<>(); + //获取监测点、接线方式数据 + Type type = new TypeToken>(){}.getType(); + Map map = new Gson().fromJson( + String.valueOf(redisUtil.getObjectByKey("wlLineDetail")), + type + ); InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataHarmphasicV.class); influxQueryWrapper.samePrefixAndSuffix(InfluxDbSqlConstant.V, InfluxDbSqlConstant.V, HarmonicTimesUtil.harmonicTimesList(1, 50, 1)); influxQueryWrapper.regular(DataHarmphasicV::getLineId, lineList) @@ -253,13 +270,30 @@ public class InfluxdbDataHarmphasicVImpl extends MppServiceImpl { - String newType = PHASE_MAPPING.get(item.getPhasicType()); - if (newType != null) { - item.setPhasicType(newType); - } - }); + if (!Objects.isNull(map)) { + //现根据监测点分组,然后根据接线方式排除多于数据,在修改相别 + Map> lineMap = result.stream().collect(Collectors.groupingBy(DataHarmphasicV::getLineId)); + lineMap.forEach((k,v)->{ + if (Objects.isNull(map.get(k))) { + return; + } + Integer conType = map.get(k); + Set validPhasicTypes = (conType != 0) ? LINE_VOLTAGE_TYPES : PHASE_VOLTAGE_TYPES; + List result2 = v.stream().filter(item -> validPhasicTypes.contains(item.getPhasicType())).collect(Collectors.toList()); + data.addAll(result2); + }); + } else { + data.addAll(result); + } + if (CollectionUtil.isNotEmpty(data)) { + data.forEach(item -> { + String newType = PHASE_MAPPING.get(item.getPhasicType()); + if (newType != null) { + item.setPhasicType(newType); + } + }); + } } - return result; + return data; } } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataPltImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataPltImpl.java index 960dbcc..95cdbdf 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataPltImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataPltImpl.java @@ -2,6 +2,8 @@ package com.njcn.dataProcess.service.impl.influxdb; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollectionUtil; +import com.alibaba.nacos.shaded.com.google.common.reflect.TypeToken; +import com.alibaba.nacos.shaded.com.google.gson.Gson; import com.github.jeffreyning.mybatisplus.service.MppServiceImpl; import com.njcn.dataProcess.dao.imapper.DataPltMapper; import com.njcn.dataProcess.dao.relation.mapper.RStatDataPltRelationMapper; @@ -15,11 +17,14 @@ import com.njcn.dataProcess.pojo.po.RStatDataPltD; import com.njcn.dataProcess.service.IDataPlt; import com.njcn.dataProcess.util.TimeUtils; import com.njcn.influx.query.InfluxQueryWrapper; +import com.njcn.redis.utils.RedisUtil; import lombok.RequiredArgsConstructor; import org.apache.commons.collections4.ListUtils; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; +import javax.annotation.Resource; +import java.lang.reflect.Type; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; @@ -39,13 +44,18 @@ public class InfluxdbDataPltImpl extends MppServiceImpl PHASE_MAPPING = new HashMap() {{ put("AB", "A"); put("BC", "B"); put("CA", "C"); put("M", "T"); }}; + @Resource + private RedisUtil redisUtil; + private static final Set LINE_VOLTAGE_TYPES = + Collections.unmodifiableSet(new HashSet<>(Arrays.asList("AB", "BC", "CA", "T"))); + private static final Set PHASE_VOLTAGE_TYPES = + Collections.unmodifiableSet(new HashSet<>(Arrays.asList("A", "B", "C", "T"))); @Override public void batchInsertion(List dataPltDTOList) { @@ -152,6 +162,13 @@ public class InfluxdbDataPltImpl extends MppServiceImpl getMinuteDataPlt(LineCountEvaluateParam lineParam) { List dataList; List result = new ArrayList<>(); + List data = new ArrayList<>(); + //获取监测点、接线方式数据 + Type type = new TypeToken>(){}.getType(); + Map map = new Gson().fromJson( + String.valueOf(redisUtil.getObjectByKey("wlLineDetail")), + type + ); InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataPlt.class); influxQueryWrapper.regular(DataPlt::getLineId, lineParam.getLineId()) .select(DataPlt::getLineId) @@ -202,13 +219,30 @@ public class InfluxdbDataPltImpl extends MppServiceImpl { - String newType = PHASE_MAPPING.get(item.getPhasicType()); - if (newType != null) { - item.setPhasicType(newType); - } - }); + if (!Objects.isNull(map)) { + //现根据监测点分组,然后根据接线方式排除多于数据,在修改相别 + Map> lineMap = result.stream().collect(Collectors.groupingBy(DataPlt::getLineId)); + lineMap.forEach((k,v)->{ + if (Objects.isNull(map.get(k))) { + return; + } + Integer conType = map.get(k); + Set validPhasicTypes = (conType != 0) ? LINE_VOLTAGE_TYPES : PHASE_VOLTAGE_TYPES; + List result2 = v.stream().filter(item -> validPhasicTypes.contains(item.getPhasicType())).collect(Collectors.toList()); + data.addAll(result2); + }); + } else { + data.addAll(result); + } + if (CollectionUtil.isNotEmpty(data)) { + data.forEach(item -> { + String newType = PHASE_MAPPING.get(item.getPhasicType()); + if (newType != null) { + item.setPhasicType(newType); + } + }); + } } - return result; + return data; } } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataVImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataVImpl.java index 89e91e6..20d47d4 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataVImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataVImpl.java @@ -4,6 +4,8 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.ListUtil; import cn.hutool.core.util.ObjectUtil; +import com.alibaba.nacos.shaded.com.google.common.reflect.TypeToken; +import com.alibaba.nacos.shaded.com.google.gson.Gson; import com.github.jeffreyning.mybatisplus.service.MppServiceImpl; import com.njcn.common.utils.HarmonicTimesUtil; import com.njcn.dataProcess.constant.InfluxDBTableConstant; @@ -24,16 +26,19 @@ import com.njcn.dataProcess.service.IDataV; import com.njcn.dataProcess.util.TimeUtils; import com.njcn.influx.constant.InfluxDbSqlConstant; import com.njcn.influx.query.InfluxQueryWrapper; +import com.njcn.redis.utils.RedisUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.ListUtils; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; import javax.annotation.Resource; +import java.lang.reflect.Type; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -46,16 +51,20 @@ import java.util.stream.Collectors; public class InfluxdbDataVImpl extends MppServiceImpl implements IDataV { private final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault()); - private static final Map PHASE_MAPPING = new HashMap() {{ put("AB", "A"); put("BC", "B"); put("CA", "C"); put("M", "T"); }}; - @Resource private DataVMapper dataVMapper; + @Resource + private RedisUtil redisUtil; + private static final Set LINE_VOLTAGE_TYPES = + Collections.unmodifiableSet(new HashSet<>(Arrays.asList("AB", "BC", "CA", "T"))); + private static final Set PHASE_VOLTAGE_TYPES = + Collections.unmodifiableSet(new HashSet<>(Arrays.asList("A", "B", "C", "T"))); /** * 注意:influxdb不推荐采用in函数的方式批量查询监测点的数据,效率很低,容易造成崩溃,故每次单测点查询 @@ -437,6 +446,13 @@ public class InfluxdbDataVImpl extends MppServiceImpl getMinuteDataV(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); + List data = new ArrayList<>(); + //获取监测点、接线方式数据 + Type type = new TypeToken>(){}.getType(); + Map map = new Gson().fromJson( + String.valueOf(redisUtil.getObjectByKey("wlLineDetail")), + type + ); InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataV.class); influxQueryWrapper.samePrefixAndSuffix(InfluxDbSqlConstant.V, InfluxDbSqlConstant.V, HarmonicTimesUtil.harmonicTimesList(1, 50, 1)); influxQueryWrapper.regular(DataV::getLineId, lineParam.getLineId()) @@ -463,14 +479,74 @@ public class InfluxdbDataVImpl extends MppServiceImpl { - String newType = PHASE_MAPPING.get(item.getPhasicType()); - if (newType != null) { - item.setPhasicType(newType); - } - }); + if (!Objects.isNull(map)) { + //现根据监测点分组,然后根据接线方式排除多于数据,在修改相别 + Map> lineMap = result.stream().collect(Collectors.groupingBy(DataV::getLineId)); + lineMap.forEach((k,v)->{ + if (Objects.isNull(map.get(k))) { + return; + } + //这边需要特殊处理下,将线电压数据赋值 + Map lineVoltageIndex = v.stream() + .filter(d -> PHASE_MAPPING.containsKey(d.getPhasicType())) + .filter(d -> d.getRmsLvr() != null) + .collect(Collectors.toMap( + d -> buildKey(d.getTime(), d.getValueType(), d.getPhasicType()), + Function.identity(), + (existing, replacement) -> existing + )); + v.stream() + .filter(d -> PHASE_VOLTAGE_TYPES.contains(d.getPhasicType())) + .forEach(phaseData -> { + // 根据当前相电压反查对应的线电压相别 + String targetLinePhasic = getReverseLinePhasic(phaseData.getPhasicType()); + if (targetLinePhasic == null) { + return; + } + String key = buildKey(phaseData.getTime(), phaseData.getValueType(), targetLinePhasic); + DataV matchedLineData = lineVoltageIndex.get(key); + if (matchedLineData != null && matchedLineData.getRmsLvr() != null) { + phaseData.setRmsLvr(matchedLineData.getRmsLvr()); + } + }); + Integer conType = map.get(k); + Set validPhasicTypes = (conType != 0) ? LINE_VOLTAGE_TYPES : PHASE_VOLTAGE_TYPES; + List result2 = v.stream().filter(item -> validPhasicTypes.contains(item.getPhasicType())).collect(Collectors.toList()); + data.addAll(result2); + }); + } else { + data.addAll(result); + } + if (CollectionUtil.isNotEmpty(data)) { + data.forEach(item -> { + String newType = PHASE_MAPPING.get(item.getPhasicType()); + if (newType != null) { + item.setPhasicType(newType); + } + }); + } + } + return data; + } + + private static String buildKey(Object time, Object valueType, Object phasicType) { + return time + "|" + valueType + "|" + phasicType; + } + + private static String getReverseLinePhasic(String phaseType) { + if (phaseType == null) { + return null; + } + switch (phaseType) { + case "A": + return "AB"; + case "B": + return "BC"; + case "C": + return "CA"; + default: + return null; } - return result; } private void quality(List result, InfluxQueryWrapper influxQueryWrapper, LineCountEvaluateParam lineParam) {