diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/IDataCleanService.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/IDataCleanService.java index aa4742a..2c9f6ca 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/IDataCleanService.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/IDataCleanService.java @@ -2,7 +2,6 @@ package com.njcn.algorithm.service.line; import com.njcn.algorithm.pojo.bo.CalculatedParam; -import java.util.concurrent.CompletableFuture; /** * @author xy diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/DataCleanServiceImpl.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/DataCleanServiceImpl.java index eb1ef64..a2c6cac 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/DataCleanServiceImpl.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/DataCleanServiceImpl.java @@ -52,8 +52,8 @@ public class DataCleanServiceImpl implements IDataCleanService { private static final Logger logger = LoggerFactory.getLogger(DataCleanServiceImpl.class); - @Value("${line.num}") - private Integer NUM = 100; + @Value("${line.num:10}") + private Integer NUM; @Resource private DataVFeignClient dataVFeignClient; @@ -101,7 +101,7 @@ public class DataCleanServiceImpl implements IDataCleanService { public void dataQualityCleanHandler(CalculatedParam calculatedParam) { MemorySizeUtil.getNowMemory(); DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); - logger.info("{},数据质量清洗算法执行=====》", LocalDateTime.now()); + logger.info("{},{}数据质量清洗算法执行=====》", LocalDateTime.now(),calculatedParam.getDataDate()); //获取监测点的统计间隔 List listOfString = (List) (List) calculatedParam.getIdList(); List lineDetailDataVOS = lineFeignClient.getLineDetailList(listOfString).getData(); @@ -290,7 +290,7 @@ public class DataCleanServiceImpl implements IDataCleanService { DictData dip = dicDataFeignClient.getDicDataByCodeAndType(DicDataEnum.VOLTAGE_DIP.getCode(), DicDataTypeEnum.EVENT_STATIS.getCode()).getData(); DictData rise = dicDataFeignClient.getDicDataByCodeAndType(DicDataEnum.VOLTAGE_RISE.getCode(), DicDataTypeEnum.EVENT_STATIS.getCode()).getData(); MemorySizeUtil.getNowMemory(); - logger.info("{},原始表数据清洗=====》", LocalDateTime.now()); + logger.info("{},{}原始表数据清洗=====》", LocalDateTime.now(),calculatedParam.getDataDate()); //获取标准 Map> map = getStandardData(); //获取监测点台账信息 @@ -304,7 +304,7 @@ public class DataCleanServiceImpl implements IDataCleanService { lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate())); lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate())); for (int i = 0; i < lineDetail.size(); i++) { - logger.info( calculatedParam.getDataDate()+"总数据:" + lineDetail.size() + "=====》当前第" + (i + 1)); +// logger.info( calculatedParam.getDataDate()+"总数据:" + lineDetail.size() + "=====》当前第" + (i + 1)); LineDetailVO.Detail item = lineDetail.get(i); flowService.lineDataClean(item, map, calculatedParam.getDataDate(), dip, rise,lineDetail.size(),(i + 1)); } diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/DataComAssServiceImpl.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/DataComAssServiceImpl.java index a5f9cdd..5bd2119 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/DataComAssServiceImpl.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/DataComAssServiceImpl.java @@ -3,6 +3,7 @@ package com.njcn.algorithm.serviceimpl.line; import cn.hutool.core.collection.CollUtil; import com.njcn.algorithm.pojo.bo.CalculatedParam; import com.njcn.algorithm.service.line.IDataComAssService; +import com.njcn.algorithm.utils.MemorySizeUtil; import com.njcn.dataProcess.api.DataComAssFeignClient; import com.njcn.dataProcess.api.DataFlickerFeignClient; import com.njcn.dataProcess.api.DataVFeignClient; @@ -38,8 +39,8 @@ import java.util.stream.Collectors; public class DataComAssServiceImpl implements IDataComAssService { private static final Logger logger = LoggerFactory.getLogger(DayDataServiceImpl.class); - @Value("${line.num}") - private Integer NUM = 100; + + @Resource private DataVFeignClient dataVFeignClient; @@ -53,7 +54,8 @@ public class DataComAssServiceImpl implements IDataComAssService { @Override public void dataComAssHandler(CalculatedParam calculatedParam) { List list = new ArrayList<>(); - logger.info("{},r_stat_comasses_d算法开始=====》", LocalDateTime.now()); + MemorySizeUtil.getNowMemory(); + logger.info("{},{}r_stat_comasses_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate()); LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate())); lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate())); diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/DayDataServiceImpl.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/DayDataServiceImpl.java index 65ad1d4..d7e7bd9 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/DayDataServiceImpl.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/DayDataServiceImpl.java @@ -35,8 +35,9 @@ public class DayDataServiceImpl implements IDayDataService { private static final Logger logger = LoggerFactory.getLogger(DayDataServiceImpl.class); - @Value("${line.num}") - private Integer NUM = 100; + @Value("${line.num:10}") + private Integer NUM; + @Resource private DataVFeignClient dataVFeignClient; @Resource @@ -75,7 +76,8 @@ public class DayDataServiceImpl implements IDayDataService { @Override public void dataVHandler(CalculatedParam calculatedParam) { - logger.info("{},dataV表转r_stat_data_v_d算法开始=====》", LocalDateTime.now()); + MemorySizeUtil.getNowMemory(); + logger.info("{},{}dataV表转r_stat_data_v_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate()); List result = new ArrayList<>(); //远程接口获取分钟数据 LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); @@ -91,7 +93,6 @@ public class DayDataServiceImpl implements IDayDataService { //获取原始数据 List partList = dataVFeignClient.getBaseData(lineParam).getData(); if (CollUtil.isNotEmpty(partList)) { - logger.info("{}dataV集合大小为>>>>>>>>>>>>{}",lineParam.getStartTime(),MemorySizeUtil.getObjectSize(partList)); partList.forEach(item->{ //相别 List phasicTypeList = item.getPhasicTypeList(); @@ -125,7 +126,8 @@ public class DayDataServiceImpl implements IDayDataService { @Override public void dataIHandler(CalculatedParam calculatedParam) { - logger.info("{},dataI表转r_stat_data_i_d算法开始=====》", LocalDateTime.now()); + MemorySizeUtil.getNowMemory(); + logger.info("{},{}dataI表转r_stat_data_i_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate()); List result = new ArrayList<>(); //远程接口获取分钟数据 LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); @@ -173,7 +175,8 @@ public class DayDataServiceImpl implements IDayDataService { @Override public void dataFlickerHandler(CalculatedParam calculatedParam) { - logger.info("{},dataFlicker表转r_stat_data_flicker_d算法开始=====》", LocalDateTime.now()); + MemorySizeUtil.getNowMemory(); + logger.info("{},{}dataFlicker表转r_stat_data_flicker_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate()); List valueList = Arrays.asList("AVG","MAX","MIN","CP95"); List result = new ArrayList<>(); //远程接口获取分钟数据 @@ -222,7 +225,8 @@ public class DayDataServiceImpl implements IDayDataService { @Override public void dataFlucHandler(CalculatedParam calculatedParam) { - logger.info("{},dataFluc表转r_stat_data_fluc_d算法开始=====》", LocalDateTime.now()); + MemorySizeUtil.getNowMemory(); + logger.info("{},{}dataFluc表转r_stat_data_fluc_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate()); List valueList = Arrays.asList("AVG","MAX","MIN","CP95"); List result = new ArrayList<>(); //远程接口获取分钟数据 @@ -271,7 +275,8 @@ public class DayDataServiceImpl implements IDayDataService { @Override public void dataHarmPhasicIHandler(CalculatedParam calculatedParam) { - logger.info("{},dataHarmPhasicI表转r_stat_data_harmphasic_i_d算法开始=====》", LocalDateTime.now()); + MemorySizeUtil.getNowMemory(); + logger.info("{},{}dataHarmPhasicI表转r_stat_data_harmphasic_i_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate()); List result = new ArrayList<>(); //远程接口获取分钟数据 LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); @@ -319,7 +324,8 @@ public class DayDataServiceImpl implements IDayDataService { @Override public void dataHarmPhasicVHandler(CalculatedParam calculatedParam) { - logger.info("{},dataHarmPhasicV表转r_stat_data_harmphasic_v_d算法开始=====》", LocalDateTime.now()); + MemorySizeUtil.getNowMemory(); + logger.info("{},{}dataHarmPhasicV表转r_stat_data_harmphasic_v_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate()); List result = new ArrayList<>(); //远程接口获取分钟数据 LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); @@ -367,7 +373,8 @@ public class DayDataServiceImpl implements IDayDataService { @Override public void dataHarmPowerPHandler(CalculatedParam calculatedParam) { - logger.info("{},dataHarmPowerP表转r_stat_data_harmpower_p_d算法开始=====》", LocalDateTime.now()); + MemorySizeUtil.getNowMemory(); + logger.info("{},{}dataHarmPowerP表转r_stat_data_harmpower_p_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate()); List result = new ArrayList<>(); //远程接口获取分钟数据 LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); @@ -415,7 +422,8 @@ public class DayDataServiceImpl implements IDayDataService { @Override public void dataHarmPowerQHandler(CalculatedParam calculatedParam) { - logger.info("{},dataHarmPowerQ表转r_stat_data_harmpower_q_d算法开始=====》", LocalDateTime.now()); + MemorySizeUtil.getNowMemory(); + logger.info("{},{}dataHarmPowerQ表转r_stat_data_harmpower_q_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate()); List result = new ArrayList<>(); //远程接口获取分钟数据 LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); @@ -463,7 +471,8 @@ public class DayDataServiceImpl implements IDayDataService { @Override public void dataHarmPowerSHandler(CalculatedParam calculatedParam) { - logger.info("{},dataHarmPowerS表转r_stat_data_harmpower_s_d算法开始=====》", LocalDateTime.now()); + MemorySizeUtil.getNowMemory(); + logger.info("{},{}dataHarmPowerS表转r_stat_data_harmpower_s_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate()); List result = new ArrayList<>(); //远程接口获取分钟数据 LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); @@ -511,7 +520,8 @@ public class DayDataServiceImpl implements IDayDataService { @Override public void dataHarmRateIHandler(CalculatedParam calculatedParam) { - logger.info("{},dataHarmRateI表转r_stat_data_harmRate_i_d算法开始=====》", LocalDateTime.now()); + MemorySizeUtil.getNowMemory(); + logger.info("{},{}dataHarmRateI表转r_stat_data_harmRate_i_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate()); List result = new ArrayList<>(); //远程接口获取分钟数据 LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); @@ -559,7 +569,8 @@ public class DayDataServiceImpl implements IDayDataService { @Override public void dataHarmRateVHandler(CalculatedParam calculatedParam) { - logger.info("{},dataHarmRateV表转r_stat_data_harmRate_v_d算法开始=====》", LocalDateTime.now()); + MemorySizeUtil.getNowMemory(); + logger.info("{},{}dataHarmRateV表转r_stat_data_harmRate_v_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate()); List result = new ArrayList<>(); //远程接口获取分钟数据 LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); @@ -607,7 +618,8 @@ public class DayDataServiceImpl implements IDayDataService { @Override public void dataInHarmIHandler(CalculatedParam calculatedParam) { - logger.info("{},dataInHarmI表转r_stat_data_inharm_i_d算法开始=====》", LocalDateTime.now()); + MemorySizeUtil.getNowMemory(); + logger.info("{},{}dataInHarmI表转r_stat_data_inharm_i_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate()); List result = new ArrayList<>(); //远程接口获取分钟数据 LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); @@ -655,7 +667,8 @@ public class DayDataServiceImpl implements IDayDataService { @Override public void dataInHarmVHandler(CalculatedParam calculatedParam) { - logger.info("{},dataInHarmV表转r_stat_data_inharm_v_d算法开始=====》", LocalDateTime.now()); + MemorySizeUtil.getNowMemory(); + logger.info("{},{}dataInHarmV表转r_stat_data_inharm_v_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate()); List result = new ArrayList<>(); //远程接口获取分钟数据 LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); @@ -703,7 +716,8 @@ public class DayDataServiceImpl implements IDayDataService { @Override public void dataPltHandler(CalculatedParam calculatedParam) { - logger.info("{},dataPlt表转r_stat_data_plt_d算法开始=====》", LocalDateTime.now()); + MemorySizeUtil.getNowMemory(); + logger.info("{},{}dataPlt表转r_stat_data_plt_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate()); List result = new ArrayList<>(); List valueList = Arrays.asList("AVG","MAX","MIN","CP95"); //远程接口获取分钟数据 diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/FlowAsyncServiceImpl.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/FlowAsyncServiceImpl.java index 51acadd..ce276f9 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/FlowAsyncServiceImpl.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/FlowAsyncServiceImpl.java @@ -390,16 +390,16 @@ public class FlowAsyncServiceImpl implements FlowAsyncService { bak.setState(1); //存储文件 InputStream reportStream = IoUtil.toStream(new Gson().toJson(resultData), CharsetUtil.UTF_8); + String[] date = dataDate.split("-"); String fileName = fileStorageUtil.uploadStreamSpecifyName( reportStream - , OssPath.DATA_CLEAN + dataDate + "/" + , OssPath.DATA_CLEAN + date[0] + "/"+ date[1] + "/"+ date[2] + "/" , item.getLineId() + ".txt"); //存储数据 bak.setFilePath(fileName); } pqDataVerifyNewFeignClient.insertData(bak); resultData=null; - logger.info( dataDate+"总数据:" + size + "=====》当前第" + i+"已完成!"); System.gc(); } diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataCrossingServiceImpl.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataCrossingServiceImpl.java index 1ca4af5..38ee5ba 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataCrossingServiceImpl.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataCrossingServiceImpl.java @@ -5,6 +5,7 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.ListUtil; import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.LocalDateTimeUtil; +import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.ObjectUtil; import com.njcn.algorithm.pojo.bo.CalculatedParam; import com.njcn.algorithm.service.line.IDataCrossingService; @@ -42,10 +43,7 @@ import java.lang.reflect.Method; import java.math.BigDecimal; import java.math.RoundingMode; import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.stream.Collectors; @@ -61,7 +59,7 @@ import java.util.stream.Collectors; public class IDataCrossingServiceImpl implements IDataCrossingService { private static final Logger logger = LoggerFactory.getLogger(DayDataServiceImpl.class); - @Value("${line.num}") + @Value("${line.num:10}") private Integer NUM; @Resource @@ -106,29 +104,28 @@ public class IDataCrossingServiceImpl implements IDataCrossingService { } Map overLimitMap = overLimitList.stream().collect(Collectors.toMap(Overlimit::getId, Function.identity())); //以100个监测点分片处理 - List> pendingIds = ListUtils.partition(lineIds, 1); - ArrayList phase = ListUtil.toList(PhaseType.PHASE_A, PhaseType.PHASE_B, PhaseType.PHASE_C); MemorySizeUtil.getNowMemory(); + List> pendingIds = ListUtils.partition(lineIds, 20); + ArrayList phase = ListUtil.toList(PhaseType.PHASE_A, PhaseType.PHASE_B, PhaseType.PHASE_C); - List> futures = new ArrayList<>(); - for (int i = 0; i < pendingIds.size(); i++) { - logger.info(calculatedParam.getDataDate()+" 总分区数据:" + pendingIds.size() + "=====》当前第"+(i + 1)+"小分区"); - List list = pendingIds.get(i); - // 获取Future - CompletableFuture future = dataLimitRateAsync.lineDataRate( - calculatedParam.getDataDate(), - list, - phase, - overLimitMap, - pendingIds.size(), - (i + 1), - lineParam.getType() - ); - - futures.add(future); + for (List pendingId : pendingIds) { + List> futures = new ArrayList<>(); + for (String s : pendingId) { + CompletableFuture future = dataLimitRateAsync.lineDataRate( + calculatedParam.getDataDate(), + Arrays.asList(s), + phase, + overLimitMap, + pendingId.size(), + pendingId.size(), + lineParam.getType() + ); + futures.add(future); + } + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); } // 等待所有任务完成 - CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + System.err.println("limitRate表转r_stat_limit_rate_d算法开始,执行日期为{},执行完成=====》"+calculatedParam.getDataDate()); System.gc(); } @@ -169,7 +166,8 @@ public class IDataCrossingServiceImpl implements IDataCrossingService { @Override public void limitQualifiedDayHandler(CalculatedParam calculatedParam) { - logger.info("{},r_stat_limit_qualified_d算法开始=====》", LocalDateTime.now()); + MemorySizeUtil.getNowMemory(); + logger.info("{},{}r_stat_limit_qualified_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate()); LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate())); lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate())); diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataIntegrityServiceImpl.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataIntegrityServiceImpl.java index 8c67c98..6136050 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataIntegrityServiceImpl.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataIntegrityServiceImpl.java @@ -5,6 +5,7 @@ import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.LocalDateTimeUtil; import com.njcn.algorithm.pojo.bo.CalculatedParam; import com.njcn.algorithm.service.line.IDataIntegrityService; +import com.njcn.algorithm.utils.MemorySizeUtil; import com.njcn.dataProcess.api.DataIntegrityFeignClient; import com.njcn.dataProcess.api.DataVFeignClient; import com.njcn.dataProcess.dto.MeasurementCountDTO; @@ -42,8 +43,8 @@ public class IDataIntegrityServiceImpl implements IDataIntegrityService { private static final Logger logger = LoggerFactory.getLogger(IDataIntegrityServiceImpl.class); - @Value("${line.num}") - private Integer NUM = 100; + @Value("${line.num:10}") + private Integer NUM; @Resource private CommTerminalGeneralClient commTerminalGeneralClient; @Resource @@ -54,7 +55,8 @@ public class IDataIntegrityServiceImpl implements IDataIntegrityService { @Override public void dataIntegrity(CalculatedParam calculatedParam) { - logger.info("{},integrity表转r_stat_integrity_d算法开始=====》", LocalDateTime.now()); + MemorySizeUtil.getNowMemory(); + logger.info("{},{}integrity表转r_stat_integrity_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate()); List poList = new ArrayList<>(); List lineIds = calculatedParam.getIdList(); String beginDay = LocalDateTimeUtil.format( diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataLimitRateAsyncImpl.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataLimitRateAsyncImpl.java index 73d0126..5abb704 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataLimitRateAsyncImpl.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataLimitRateAsyncImpl.java @@ -294,11 +294,11 @@ public class IDataLimitRateAsyncImpl implements IDataLimitRateAsync { dataLimitRateDetailFeignClient.batchInsertion(detail); } } - logger.info(dataDate + " 总分区数据:" + size + "=====》当前第" + i + "小分区已完成!"); +// logger.info(dataDate + " 总分区数据:" + size + "=====》当前第" + i + "小分区已完成!"); result = null; - if(i==size){ - MemorySizeUtil.getNowMemory(); - } +// if(i==size){ +// MemorySizeUtil.getNowMemory(); +// } System.gc(); return CompletableFuture.completedFuture(null); diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataOnlineRateServiceImpl.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataOnlineRateServiceImpl.java index c5a6a2b..e13b950 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataOnlineRateServiceImpl.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataOnlineRateServiceImpl.java @@ -9,6 +9,7 @@ import cn.hutool.core.util.NumberUtil; import cn.hutool.core.util.ObjectUtil; import com.njcn.algorithm.pojo.bo.CalculatedParam; import com.njcn.algorithm.service.line.IDataOnlineRateService; +import com.njcn.algorithm.utils.MemorySizeUtil; import com.njcn.dataProcess.api.DataIntegrityFeignClient; import com.njcn.dataProcess.api.DataOnlineRateFeignClient; import com.njcn.dataProcess.api.DataVFeignClient; @@ -48,8 +49,8 @@ public class IDataOnlineRateServiceImpl implements IDataOnlineRateService { private static final Logger logger = LoggerFactory.getLogger(IDataOnlineRateServiceImpl.class); - @Value("${line.num}") - private Integer NUM = 100; + @Value("${line.num:10}") + private Integer NUM; private final Integer online = 1; private final Integer offline = 0; @@ -69,7 +70,8 @@ public class IDataOnlineRateServiceImpl implements IDataOnlineRateService { @Override public void dataOnlineRate(CalculatedParam calculatedParam) { - logger.info("{},onlineRate表转r_stat_onlinerate_d算法开始=====》", LocalDateTime.now()); + MemorySizeUtil.getNowMemory(); + logger.info("{},{}onlineRate表转r_stat_onlinerate_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate()); LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate())); lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate())); diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IEventDetailServiceImpl.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IEventDetailServiceImpl.java index 0c2b72e..f2dafe3 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IEventDetailServiceImpl.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IEventDetailServiceImpl.java @@ -7,6 +7,7 @@ import cn.hutool.core.date.LocalDateTimeUtil; import cn.hutool.core.util.ObjectUtil; import com.njcn.algorithm.pojo.bo.CalculatedParam; import com.njcn.algorithm.service.line.IEventDetailService; +import com.njcn.algorithm.utils.MemorySizeUtil; import com.njcn.dataProcess.api.EventDetailFeignClient; import com.njcn.dataProcess.api.RmpEventDetailFeignClient; import com.njcn.dataProcess.dto.RmpEventDetailDTO; @@ -44,8 +45,8 @@ import java.util.stream.Collectors; @RequiredArgsConstructor public class IEventDetailServiceImpl implements IEventDetailService { private static final Logger logger = LoggerFactory.getLogger(IDataOnlineRateServiceImpl.class); - @Value("${line.num}") - private Integer NUM = 10; + @Value("${line.num:10}") + private Integer NUM; @Resource private RmpEventDetailFeignClient eventDetailFeignClient; @Resource @@ -55,7 +56,8 @@ public class IEventDetailServiceImpl implements IEventDetailService { @Override public void dataDayHandle(CalculatedParam calculatedParam) { - logger.info("{},r_mp_event_detail_d算法开始=====》", LocalDateTime.now()); + MemorySizeUtil.getNowMemory(); + logger.info("{},{}r_mp_event_detail_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate()); List result = new ArrayList<>(); //远程接口获取分钟数据 LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); @@ -106,7 +108,8 @@ public class IEventDetailServiceImpl implements IEventDetailService { @Override public void dataMonthHandle(CalculatedParam calculatedParam) { - logger.info("{},r_mp_event_detail_m算法开始=====》", LocalDateTime.now()); + MemorySizeUtil.getNowMemory(); + logger.info("{},{}r_mp_event_detail_m算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate()); List result = new ArrayList<>(); //远程接口获取分钟数据 LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); @@ -139,7 +142,8 @@ public class IEventDetailServiceImpl implements IEventDetailService { @Override public void dataQuarterHandle(CalculatedParam calculatedParam) { - logger.info("{},r_mp_event_detail_q算法开始=====》", LocalDateTime.now()); + MemorySizeUtil.getNowMemory(); + logger.info("{},{}r_mp_event_detail_q算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate()); List result = new ArrayList<>(); //远程接口获取分钟数据 LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); @@ -172,7 +176,8 @@ public class IEventDetailServiceImpl implements IEventDetailService { @Override public void dataYearHandle(CalculatedParam calculatedParam) { - logger.info("{},r_mp_event_detail_y算法开始=====》", LocalDateTime.now()); + MemorySizeUtil.getNowMemory(); + logger.info("{},{}r_mp_event_detail_y算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate()); List result = new ArrayList<>(); //远程接口获取分钟数据 LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/PollutionServiceImpl.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/PollutionServiceImpl.java index 4092d68..ac0be54 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/PollutionServiceImpl.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/PollutionServiceImpl.java @@ -49,8 +49,8 @@ import java.util.stream.Stream; @RequiredArgsConstructor public class PollutionServiceImpl implements IPollutionService { - @Value("${line.num}") - private Integer NUM = 100; + @Value("${line.num:10}") + private Integer NUM; @Resource private PqDataVerifyFeignClient pqDataVerifyFeignClient; diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/SpecialAnalysisServiceImpl.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/SpecialAnalysisServiceImpl.java index 85ceb40..7bb7e30 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/SpecialAnalysisServiceImpl.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/SpecialAnalysisServiceImpl.java @@ -65,8 +65,8 @@ import java.util.stream.Collectors; @RequiredArgsConstructor public class SpecialAnalysisServiceImpl implements ISpecialAnalysisService { - - private Integer NUM=10; + @Value("${line.num:10}") + private Integer NUM; @Resource private EventDetailFeignClient eventDetailFeignClient; @Resource diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/LnDataDealServiceImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/LnDataDealServiceImpl.java index 44d0af7..4e1726a 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/LnDataDealServiceImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/LnDataDealServiceImpl.java @@ -73,11 +73,10 @@ public class LnDataDealServiceImpl implements LnDataDealService { private Executor executor; @Override public void batchInsertion(LnDataDTO lnDataDTO) { - long start = System.currentTimeMillis(); List> futures = new ArrayList<>(); -// 提交每个批量插入任务 + // 提交每个批量插入任务 futures.add(CompletableFuture.runAsync(() -> dataVService.batchInsertion(lnDataDTO.getDataVList()), executor)); futures.add(CompletableFuture.runAsync(() -> dataHarmrateVQuery.batchInsertion(lnDataDTO.getDataHarmrateVDTOList()), executor)); futures.add(CompletableFuture.runAsync(() -> dataFlickerQuery.batchInsertion(lnDataDTO.getDataFlickerDTOList()), executor)); @@ -95,8 +94,6 @@ public class LnDataDealServiceImpl implements LnDataDealService { // 等待所有任务完成,如果任一任务抛出异常,这里会重新抛出 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); - long end = System.currentTimeMillis(); - log.info("入库方法执行耗时:" + (end - start) + " ms"); // dataVService.batchInsertion(lnDataDTO.getDataVList()); @@ -118,8 +115,6 @@ public class LnDataDealServiceImpl implements LnDataDealService { List lineIdList = lnDataDTO.getDataVList().stream().map(DataVDTO::getLineid).distinct().collect(Collectors.toList()); List data = lineFeignClient.getAllLine(lineIdList).getData(); - long end2 = System.currentTimeMillis(); - log.info("查询台账耗时:" + (end2 - end) + " ms"); Map map = data.stream().collect(Collectors.toMap(LineDeviceStateVO::getId, temp -> temp.getPids().split(",")[4])); @@ -144,9 +139,6 @@ public class LnDataDealServiceImpl implements LnDataDealService { }); CompletableFuture.allOf(updatefutures.toArray(new CompletableFuture[0])).join(); - long end3 = System.currentTimeMillis(); - log.info("更新device表:" + (end3 - end2) + " ms"); - } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbPqsCommunicateImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbPqsCommunicateImpl.java index c332b91..497a3f3 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbPqsCommunicateImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbPqsCommunicateImpl.java @@ -110,19 +110,16 @@ public class InfluxdbPqsCommunicateImpl implements IPqsCommunicate { @Override public void insertion(PqsCommunicateDto pqsCommunicateDto) { -// log.info("进出Influxdb实现类"); + //获取最新一条数据 if(Objects.equals(pqsCommunicateDto.getUpdateCommunicateFlag(),0)){ - long start = System.currentTimeMillis(); + PqsCommunicate dto = new PqsCommunicate(); InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(PqsCommunicate.class); influxQueryWrapper.eq(PqsCommunicate::getDevId,pqsCommunicateDto.getDevId()).timeDesc().limit(1); List pqsCommunicates = pqsCommunicateMapper.selectByQueryWrapper(influxQueryWrapper); - long end1 = System.currentTimeMillis(); - - log.info("查询通讯表最新数据:" + (end1 - start) + " ms"); PqsCommunicate pqsCommunicate = new PqsCommunicate(); pqsCommunicate.setTime(LocalDateTime.parse(pqsCommunicateDto.getTime(), DATE_TIME_FORMATTER).atZone(ZoneId.systemDefault()).toInstant()); pqsCommunicate.setDevId(pqsCommunicateDto.getDevId()); @@ -133,11 +130,10 @@ public class InfluxdbPqsCommunicateImpl implements IPqsCommunicate { pqsCommunicateMapper.insertOne(pqsCommunicate); } - long end2 = System.currentTimeMillis(); - log.info("更新通讯表最新数据:" + (end2 - end1) + " ms"); + } - long end3 = System.currentTimeMillis(); + //更新mysql数据 @@ -149,8 +145,7 @@ public class InfluxdbPqsCommunicateImpl implements IPqsCommunicate { devComFlagDTO.setStatus(pqsCommunicateDto.getType()); deviceFeignClient.updateDevComFlag(devComFlagDTO); - long end4 = System.currentTimeMillis(); - log.info("更新device表状态和时间:" + (end4 - end3) + " ms"); + }