1.优化数据清洗文件上传目录结构,结构导致因目录过多导致阿里云文件服务器卡顿问题;
2.优化算法执行目录删除多余日志输出;
This commit is contained in:
@@ -2,7 +2,6 @@ package com.njcn.algorithm.service.line;
|
|||||||
|
|
||||||
import com.njcn.algorithm.pojo.bo.CalculatedParam;
|
import com.njcn.algorithm.pojo.bo.CalculatedParam;
|
||||||
|
|
||||||
import java.util.concurrent.CompletableFuture;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author xy
|
* @author xy
|
||||||
|
|||||||
@@ -52,8 +52,8 @@ public class DataCleanServiceImpl implements IDataCleanService {
|
|||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(DataCleanServiceImpl.class);
|
private static final Logger logger = LoggerFactory.getLogger(DataCleanServiceImpl.class);
|
||||||
|
|
||||||
@Value("${line.num}")
|
@Value("${line.num:10}")
|
||||||
private Integer NUM = 100;
|
private Integer NUM;
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private DataVFeignClient dataVFeignClient;
|
private DataVFeignClient dataVFeignClient;
|
||||||
@@ -101,7 +101,7 @@ public class DataCleanServiceImpl implements IDataCleanService {
|
|||||||
public void dataQualityCleanHandler(CalculatedParam calculatedParam) {
|
public void dataQualityCleanHandler(CalculatedParam calculatedParam) {
|
||||||
MemorySizeUtil.getNowMemory();
|
MemorySizeUtil.getNowMemory();
|
||||||
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
|
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
|
||||||
logger.info("{},数据质量清洗算法执行=====》", LocalDateTime.now());
|
logger.info("{},{}数据质量清洗算法执行=====》", LocalDateTime.now(),calculatedParam.getDataDate());
|
||||||
//获取监测点的统计间隔
|
//获取监测点的统计间隔
|
||||||
List<String> listOfString = (List<String>) (List<?>) calculatedParam.getIdList();
|
List<String> listOfString = (List<String>) (List<?>) calculatedParam.getIdList();
|
||||||
List<LineDetailDataVO> lineDetailDataVOS = lineFeignClient.getLineDetailList(listOfString).getData();
|
List<LineDetailDataVO> lineDetailDataVOS = lineFeignClient.getLineDetailList(listOfString).getData();
|
||||||
@@ -290,7 +290,7 @@ public class DataCleanServiceImpl implements IDataCleanService {
|
|||||||
DictData dip = dicDataFeignClient.getDicDataByCodeAndType(DicDataEnum.VOLTAGE_DIP.getCode(), DicDataTypeEnum.EVENT_STATIS.getCode()).getData();
|
DictData dip = dicDataFeignClient.getDicDataByCodeAndType(DicDataEnum.VOLTAGE_DIP.getCode(), DicDataTypeEnum.EVENT_STATIS.getCode()).getData();
|
||||||
DictData rise = dicDataFeignClient.getDicDataByCodeAndType(DicDataEnum.VOLTAGE_RISE.getCode(), DicDataTypeEnum.EVENT_STATIS.getCode()).getData();
|
DictData rise = dicDataFeignClient.getDicDataByCodeAndType(DicDataEnum.VOLTAGE_RISE.getCode(), DicDataTypeEnum.EVENT_STATIS.getCode()).getData();
|
||||||
MemorySizeUtil.getNowMemory();
|
MemorySizeUtil.getNowMemory();
|
||||||
logger.info("{},原始表数据清洗=====》", LocalDateTime.now());
|
logger.info("{},{}原始表数据清洗=====》", LocalDateTime.now(),calculatedParam.getDataDate());
|
||||||
//获取标准
|
//获取标准
|
||||||
Map<String, List<PqReasonableRangeDto>> map = getStandardData();
|
Map<String, List<PqReasonableRangeDto>> map = getStandardData();
|
||||||
//获取监测点台账信息
|
//获取监测点台账信息
|
||||||
@@ -304,7 +304,7 @@ public class DataCleanServiceImpl implements IDataCleanService {
|
|||||||
lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate()));
|
lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate()));
|
||||||
lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate()));
|
lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate()));
|
||||||
for (int i = 0; i < lineDetail.size(); i++) {
|
for (int i = 0; i < lineDetail.size(); i++) {
|
||||||
logger.info( calculatedParam.getDataDate()+"总数据:" + lineDetail.size() + "=====》当前第" + (i + 1));
|
// logger.info( calculatedParam.getDataDate()+"总数据:" + lineDetail.size() + "=====》当前第" + (i + 1));
|
||||||
LineDetailVO.Detail item = lineDetail.get(i);
|
LineDetailVO.Detail item = lineDetail.get(i);
|
||||||
flowService.lineDataClean(item, map, calculatedParam.getDataDate(), dip, rise,lineDetail.size(),(i + 1));
|
flowService.lineDataClean(item, map, calculatedParam.getDataDate(), dip, rise,lineDetail.size(),(i + 1));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package com.njcn.algorithm.serviceimpl.line;
|
|||||||
import cn.hutool.core.collection.CollUtil;
|
import cn.hutool.core.collection.CollUtil;
|
||||||
import com.njcn.algorithm.pojo.bo.CalculatedParam;
|
import com.njcn.algorithm.pojo.bo.CalculatedParam;
|
||||||
import com.njcn.algorithm.service.line.IDataComAssService;
|
import com.njcn.algorithm.service.line.IDataComAssService;
|
||||||
|
import com.njcn.algorithm.utils.MemorySizeUtil;
|
||||||
import com.njcn.dataProcess.api.DataComAssFeignClient;
|
import com.njcn.dataProcess.api.DataComAssFeignClient;
|
||||||
import com.njcn.dataProcess.api.DataFlickerFeignClient;
|
import com.njcn.dataProcess.api.DataFlickerFeignClient;
|
||||||
import com.njcn.dataProcess.api.DataVFeignClient;
|
import com.njcn.dataProcess.api.DataVFeignClient;
|
||||||
@@ -38,8 +39,8 @@ import java.util.stream.Collectors;
|
|||||||
public class DataComAssServiceImpl implements IDataComAssService {
|
public class DataComAssServiceImpl implements IDataComAssService {
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(DayDataServiceImpl.class);
|
private static final Logger logger = LoggerFactory.getLogger(DayDataServiceImpl.class);
|
||||||
@Value("${line.num}")
|
|
||||||
private Integer NUM = 100;
|
|
||||||
@Resource
|
@Resource
|
||||||
private DataVFeignClient dataVFeignClient;
|
private DataVFeignClient dataVFeignClient;
|
||||||
|
|
||||||
@@ -53,7 +54,8 @@ public class DataComAssServiceImpl implements IDataComAssService {
|
|||||||
@Override
|
@Override
|
||||||
public void dataComAssHandler(CalculatedParam calculatedParam) {
|
public void dataComAssHandler(CalculatedParam calculatedParam) {
|
||||||
List<DataComassesDPO> list = new ArrayList<>();
|
List<DataComassesDPO> list = new ArrayList<>();
|
||||||
logger.info("{},r_stat_comasses_d算法开始=====》", LocalDateTime.now());
|
MemorySizeUtil.getNowMemory();
|
||||||
|
logger.info("{},{}r_stat_comasses_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate());
|
||||||
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
||||||
lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate()));
|
lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate()));
|
||||||
lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate()));
|
lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate()));
|
||||||
|
|||||||
@@ -35,8 +35,9 @@ public class DayDataServiceImpl implements IDayDataService {
|
|||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(DayDataServiceImpl.class);
|
private static final Logger logger = LoggerFactory.getLogger(DayDataServiceImpl.class);
|
||||||
|
|
||||||
@Value("${line.num}")
|
@Value("${line.num:10}")
|
||||||
private Integer NUM = 100;
|
private Integer NUM;
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private DataVFeignClient dataVFeignClient;
|
private DataVFeignClient dataVFeignClient;
|
||||||
@Resource
|
@Resource
|
||||||
@@ -75,7 +76,8 @@ public class DayDataServiceImpl implements IDayDataService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void dataVHandler(CalculatedParam calculatedParam) {
|
public void dataVHandler(CalculatedParam calculatedParam) {
|
||||||
logger.info("{},dataV表转r_stat_data_v_d算法开始=====》", LocalDateTime.now());
|
MemorySizeUtil.getNowMemory();
|
||||||
|
logger.info("{},{}dataV表转r_stat_data_v_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate());
|
||||||
List<DataVDto> result = new ArrayList<>();
|
List<DataVDto> result = new ArrayList<>();
|
||||||
//远程接口获取分钟数据
|
//远程接口获取分钟数据
|
||||||
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
||||||
@@ -91,7 +93,6 @@ public class DayDataServiceImpl implements IDayDataService {
|
|||||||
//获取原始数据
|
//获取原始数据
|
||||||
List<CommonMinuteDto> partList = dataVFeignClient.getBaseData(lineParam).getData();
|
List<CommonMinuteDto> partList = dataVFeignClient.getBaseData(lineParam).getData();
|
||||||
if (CollUtil.isNotEmpty(partList)) {
|
if (CollUtil.isNotEmpty(partList)) {
|
||||||
logger.info("{}dataV集合大小为>>>>>>>>>>>>{}",lineParam.getStartTime(),MemorySizeUtil.getObjectSize(partList));
|
|
||||||
partList.forEach(item->{
|
partList.forEach(item->{
|
||||||
//相别
|
//相别
|
||||||
List<CommonMinuteDto.PhasicType> phasicTypeList = item.getPhasicTypeList();
|
List<CommonMinuteDto.PhasicType> phasicTypeList = item.getPhasicTypeList();
|
||||||
@@ -125,7 +126,8 @@ public class DayDataServiceImpl implements IDayDataService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void dataIHandler(CalculatedParam calculatedParam) {
|
public void dataIHandler(CalculatedParam calculatedParam) {
|
||||||
logger.info("{},dataI表转r_stat_data_i_d算法开始=====》", LocalDateTime.now());
|
MemorySizeUtil.getNowMemory();
|
||||||
|
logger.info("{},{}dataI表转r_stat_data_i_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate());
|
||||||
List<DataIDto> result = new ArrayList<>();
|
List<DataIDto> result = new ArrayList<>();
|
||||||
//远程接口获取分钟数据
|
//远程接口获取分钟数据
|
||||||
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
||||||
@@ -173,7 +175,8 @@ public class DayDataServiceImpl implements IDayDataService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void dataFlickerHandler(CalculatedParam calculatedParam) {
|
public void dataFlickerHandler(CalculatedParam calculatedParam) {
|
||||||
logger.info("{},dataFlicker表转r_stat_data_flicker_d算法开始=====》", LocalDateTime.now());
|
MemorySizeUtil.getNowMemory();
|
||||||
|
logger.info("{},{}dataFlicker表转r_stat_data_flicker_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate());
|
||||||
List<String> valueList = Arrays.asList("AVG","MAX","MIN","CP95");
|
List<String> valueList = Arrays.asList("AVG","MAX","MIN","CP95");
|
||||||
List<DataFlickerDto> result = new ArrayList<>();
|
List<DataFlickerDto> result = new ArrayList<>();
|
||||||
//远程接口获取分钟数据
|
//远程接口获取分钟数据
|
||||||
@@ -222,7 +225,8 @@ public class DayDataServiceImpl implements IDayDataService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void dataFlucHandler(CalculatedParam calculatedParam) {
|
public void dataFlucHandler(CalculatedParam calculatedParam) {
|
||||||
logger.info("{},dataFluc表转r_stat_data_fluc_d算法开始=====》", LocalDateTime.now());
|
MemorySizeUtil.getNowMemory();
|
||||||
|
logger.info("{},{}dataFluc表转r_stat_data_fluc_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate());
|
||||||
List<String> valueList = Arrays.asList("AVG","MAX","MIN","CP95");
|
List<String> valueList = Arrays.asList("AVG","MAX","MIN","CP95");
|
||||||
List<DataFlucDto> result = new ArrayList<>();
|
List<DataFlucDto> result = new ArrayList<>();
|
||||||
//远程接口获取分钟数据
|
//远程接口获取分钟数据
|
||||||
@@ -271,7 +275,8 @@ public class DayDataServiceImpl implements IDayDataService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void dataHarmPhasicIHandler(CalculatedParam calculatedParam) {
|
public void dataHarmPhasicIHandler(CalculatedParam calculatedParam) {
|
||||||
logger.info("{},dataHarmPhasicI表转r_stat_data_harmphasic_i_d算法开始=====》", LocalDateTime.now());
|
MemorySizeUtil.getNowMemory();
|
||||||
|
logger.info("{},{}dataHarmPhasicI表转r_stat_data_harmphasic_i_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate());
|
||||||
List<DataHarmPhasicIDto> result = new ArrayList<>();
|
List<DataHarmPhasicIDto> result = new ArrayList<>();
|
||||||
//远程接口获取分钟数据
|
//远程接口获取分钟数据
|
||||||
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
||||||
@@ -319,7 +324,8 @@ public class DayDataServiceImpl implements IDayDataService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void dataHarmPhasicVHandler(CalculatedParam calculatedParam) {
|
public void dataHarmPhasicVHandler(CalculatedParam calculatedParam) {
|
||||||
logger.info("{},dataHarmPhasicV表转r_stat_data_harmphasic_v_d算法开始=====》", LocalDateTime.now());
|
MemorySizeUtil.getNowMemory();
|
||||||
|
logger.info("{},{}dataHarmPhasicV表转r_stat_data_harmphasic_v_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate());
|
||||||
List<DataHarmPhasicVDto> result = new ArrayList<>();
|
List<DataHarmPhasicVDto> result = new ArrayList<>();
|
||||||
//远程接口获取分钟数据
|
//远程接口获取分钟数据
|
||||||
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
||||||
@@ -367,7 +373,8 @@ public class DayDataServiceImpl implements IDayDataService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void dataHarmPowerPHandler(CalculatedParam calculatedParam) {
|
public void dataHarmPowerPHandler(CalculatedParam calculatedParam) {
|
||||||
logger.info("{},dataHarmPowerP表转r_stat_data_harmpower_p_d算法开始=====》", LocalDateTime.now());
|
MemorySizeUtil.getNowMemory();
|
||||||
|
logger.info("{},{}dataHarmPowerP表转r_stat_data_harmpower_p_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate());
|
||||||
List<DataHarmPowerPDto> result = new ArrayList<>();
|
List<DataHarmPowerPDto> result = new ArrayList<>();
|
||||||
//远程接口获取分钟数据
|
//远程接口获取分钟数据
|
||||||
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
||||||
@@ -415,7 +422,8 @@ public class DayDataServiceImpl implements IDayDataService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void dataHarmPowerQHandler(CalculatedParam calculatedParam) {
|
public void dataHarmPowerQHandler(CalculatedParam calculatedParam) {
|
||||||
logger.info("{},dataHarmPowerQ表转r_stat_data_harmpower_q_d算法开始=====》", LocalDateTime.now());
|
MemorySizeUtil.getNowMemory();
|
||||||
|
logger.info("{},{}dataHarmPowerQ表转r_stat_data_harmpower_q_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate());
|
||||||
List<DataHarmPowerQDto> result = new ArrayList<>();
|
List<DataHarmPowerQDto> result = new ArrayList<>();
|
||||||
//远程接口获取分钟数据
|
//远程接口获取分钟数据
|
||||||
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
||||||
@@ -463,7 +471,8 @@ public class DayDataServiceImpl implements IDayDataService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void dataHarmPowerSHandler(CalculatedParam calculatedParam) {
|
public void dataHarmPowerSHandler(CalculatedParam calculatedParam) {
|
||||||
logger.info("{},dataHarmPowerS表转r_stat_data_harmpower_s_d算法开始=====》", LocalDateTime.now());
|
MemorySizeUtil.getNowMemory();
|
||||||
|
logger.info("{},{}dataHarmPowerS表转r_stat_data_harmpower_s_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate());
|
||||||
List<DataHarmPowerSDto> result = new ArrayList<>();
|
List<DataHarmPowerSDto> result = new ArrayList<>();
|
||||||
//远程接口获取分钟数据
|
//远程接口获取分钟数据
|
||||||
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
||||||
@@ -511,7 +520,8 @@ public class DayDataServiceImpl implements IDayDataService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void dataHarmRateIHandler(CalculatedParam calculatedParam) {
|
public void dataHarmRateIHandler(CalculatedParam calculatedParam) {
|
||||||
logger.info("{},dataHarmRateI表转r_stat_data_harmRate_i_d算法开始=====》", LocalDateTime.now());
|
MemorySizeUtil.getNowMemory();
|
||||||
|
logger.info("{},{}dataHarmRateI表转r_stat_data_harmRate_i_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate());
|
||||||
List<DataHarmRateIDto> result = new ArrayList<>();
|
List<DataHarmRateIDto> result = new ArrayList<>();
|
||||||
//远程接口获取分钟数据
|
//远程接口获取分钟数据
|
||||||
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
||||||
@@ -559,7 +569,8 @@ public class DayDataServiceImpl implements IDayDataService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void dataHarmRateVHandler(CalculatedParam calculatedParam) {
|
public void dataHarmRateVHandler(CalculatedParam calculatedParam) {
|
||||||
logger.info("{},dataHarmRateV表转r_stat_data_harmRate_v_d算法开始=====》", LocalDateTime.now());
|
MemorySizeUtil.getNowMemory();
|
||||||
|
logger.info("{},{}dataHarmRateV表转r_stat_data_harmRate_v_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate());
|
||||||
List<DataHarmRateVDto> result = new ArrayList<>();
|
List<DataHarmRateVDto> result = new ArrayList<>();
|
||||||
//远程接口获取分钟数据
|
//远程接口获取分钟数据
|
||||||
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
||||||
@@ -607,7 +618,8 @@ public class DayDataServiceImpl implements IDayDataService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void dataInHarmIHandler(CalculatedParam calculatedParam) {
|
public void dataInHarmIHandler(CalculatedParam calculatedParam) {
|
||||||
logger.info("{},dataInHarmI表转r_stat_data_inharm_i_d算法开始=====》", LocalDateTime.now());
|
MemorySizeUtil.getNowMemory();
|
||||||
|
logger.info("{},{}dataInHarmI表转r_stat_data_inharm_i_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate());
|
||||||
List<DataInHarmIDto> result = new ArrayList<>();
|
List<DataInHarmIDto> result = new ArrayList<>();
|
||||||
//远程接口获取分钟数据
|
//远程接口获取分钟数据
|
||||||
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
||||||
@@ -655,7 +667,8 @@ public class DayDataServiceImpl implements IDayDataService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void dataInHarmVHandler(CalculatedParam calculatedParam) {
|
public void dataInHarmVHandler(CalculatedParam calculatedParam) {
|
||||||
logger.info("{},dataInHarmV表转r_stat_data_inharm_v_d算法开始=====》", LocalDateTime.now());
|
MemorySizeUtil.getNowMemory();
|
||||||
|
logger.info("{},{}dataInHarmV表转r_stat_data_inharm_v_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate());
|
||||||
List<DataInHarmVDto> result = new ArrayList<>();
|
List<DataInHarmVDto> result = new ArrayList<>();
|
||||||
//远程接口获取分钟数据
|
//远程接口获取分钟数据
|
||||||
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
||||||
@@ -703,7 +716,8 @@ public class DayDataServiceImpl implements IDayDataService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void dataPltHandler(CalculatedParam calculatedParam) {
|
public void dataPltHandler(CalculatedParam calculatedParam) {
|
||||||
logger.info("{},dataPlt表转r_stat_data_plt_d算法开始=====》", LocalDateTime.now());
|
MemorySizeUtil.getNowMemory();
|
||||||
|
logger.info("{},{}dataPlt表转r_stat_data_plt_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate());
|
||||||
List<DataPltDto> result = new ArrayList<>();
|
List<DataPltDto> result = new ArrayList<>();
|
||||||
List<String> valueList = Arrays.asList("AVG","MAX","MIN","CP95");
|
List<String> valueList = Arrays.asList("AVG","MAX","MIN","CP95");
|
||||||
//远程接口获取分钟数据
|
//远程接口获取分钟数据
|
||||||
|
|||||||
@@ -390,16 +390,16 @@ public class FlowAsyncServiceImpl implements FlowAsyncService {
|
|||||||
bak.setState(1);
|
bak.setState(1);
|
||||||
//存储文件
|
//存储文件
|
||||||
InputStream reportStream = IoUtil.toStream(new Gson().toJson(resultData), CharsetUtil.UTF_8);
|
InputStream reportStream = IoUtil.toStream(new Gson().toJson(resultData), CharsetUtil.UTF_8);
|
||||||
|
String[] date = dataDate.split("-");
|
||||||
String fileName = fileStorageUtil.uploadStreamSpecifyName(
|
String fileName = fileStorageUtil.uploadStreamSpecifyName(
|
||||||
reportStream
|
reportStream
|
||||||
, OssPath.DATA_CLEAN + dataDate + "/"
|
, OssPath.DATA_CLEAN + date[0] + "/"+ date[1] + "/"+ date[2] + "/"
|
||||||
, item.getLineId() + ".txt");
|
, item.getLineId() + ".txt");
|
||||||
//存储数据
|
//存储数据
|
||||||
bak.setFilePath(fileName);
|
bak.setFilePath(fileName);
|
||||||
}
|
}
|
||||||
pqDataVerifyNewFeignClient.insertData(bak);
|
pqDataVerifyNewFeignClient.insertData(bak);
|
||||||
resultData=null;
|
resultData=null;
|
||||||
logger.info( dataDate+"总数据:" + size + "=====》当前第" + i+"已完成!");
|
|
||||||
System.gc();
|
System.gc();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import cn.hutool.core.collection.CollUtil;
|
|||||||
import cn.hutool.core.collection.ListUtil;
|
import cn.hutool.core.collection.ListUtil;
|
||||||
import cn.hutool.core.date.DatePattern;
|
import cn.hutool.core.date.DatePattern;
|
||||||
import cn.hutool.core.date.LocalDateTimeUtil;
|
import cn.hutool.core.date.LocalDateTimeUtil;
|
||||||
|
import cn.hutool.core.util.ArrayUtil;
|
||||||
import cn.hutool.core.util.ObjectUtil;
|
import cn.hutool.core.util.ObjectUtil;
|
||||||
import com.njcn.algorithm.pojo.bo.CalculatedParam;
|
import com.njcn.algorithm.pojo.bo.CalculatedParam;
|
||||||
import com.njcn.algorithm.service.line.IDataCrossingService;
|
import com.njcn.algorithm.service.line.IDataCrossingService;
|
||||||
@@ -42,10 +43,7 @@ import java.lang.reflect.Method;
|
|||||||
import java.math.BigDecimal;
|
import java.math.BigDecimal;
|
||||||
import java.math.RoundingMode;
|
import java.math.RoundingMode;
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.util.ArrayList;
|
import java.util.*;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
@@ -61,7 +59,7 @@ import java.util.stream.Collectors;
|
|||||||
public class IDataCrossingServiceImpl implements IDataCrossingService {
|
public class IDataCrossingServiceImpl implements IDataCrossingService {
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(DayDataServiceImpl.class);
|
private static final Logger logger = LoggerFactory.getLogger(DayDataServiceImpl.class);
|
||||||
@Value("${line.num}")
|
@Value("${line.num:10}")
|
||||||
private Integer NUM;
|
private Integer NUM;
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
@@ -106,29 +104,28 @@ public class IDataCrossingServiceImpl implements IDataCrossingService {
|
|||||||
}
|
}
|
||||||
Map<String, Overlimit> overLimitMap = overLimitList.stream().collect(Collectors.toMap(Overlimit::getId, Function.identity()));
|
Map<String, Overlimit> overLimitMap = overLimitList.stream().collect(Collectors.toMap(Overlimit::getId, Function.identity()));
|
||||||
//以100个监测点分片处理
|
//以100个监测点分片处理
|
||||||
List<List<String>> pendingIds = ListUtils.partition(lineIds, 1);
|
|
||||||
ArrayList<String> phase = ListUtil.toList(PhaseType.PHASE_A, PhaseType.PHASE_B, PhaseType.PHASE_C);
|
|
||||||
MemorySizeUtil.getNowMemory();
|
MemorySizeUtil.getNowMemory();
|
||||||
|
List<List<String>> pendingIds = ListUtils.partition(lineIds, 20);
|
||||||
|
ArrayList<String> phase = ListUtil.toList(PhaseType.PHASE_A, PhaseType.PHASE_B, PhaseType.PHASE_C);
|
||||||
|
|
||||||
List<CompletableFuture<Void>> futures = new ArrayList<>();
|
for (List<String> pendingId : pendingIds) {
|
||||||
for (int i = 0; i < pendingIds.size(); i++) {
|
List<CompletableFuture<Void>> futures = new ArrayList<>();
|
||||||
logger.info(calculatedParam.getDataDate()+" 总分区数据:" + pendingIds.size() + "=====》当前第"+(i + 1)+"小分区");
|
for (String s : pendingId) {
|
||||||
List<String> list = pendingIds.get(i);
|
CompletableFuture<Void> future = dataLimitRateAsync.lineDataRate(
|
||||||
// 获取Future
|
calculatedParam.getDataDate(),
|
||||||
CompletableFuture<Void> future = dataLimitRateAsync.lineDataRate(
|
Arrays.asList(s),
|
||||||
calculatedParam.getDataDate(),
|
phase,
|
||||||
list,
|
overLimitMap,
|
||||||
phase,
|
pendingId.size(),
|
||||||
overLimitMap,
|
pendingId.size(),
|
||||||
pendingIds.size(),
|
lineParam.getType()
|
||||||
(i + 1),
|
);
|
||||||
lineParam.getType()
|
futures.add(future);
|
||||||
);
|
}
|
||||||
|
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
||||||
futures.add(future);
|
|
||||||
}
|
}
|
||||||
// 等待所有任务完成
|
// 等待所有任务完成
|
||||||
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
System.err.println("limitRate表转r_stat_limit_rate_d算法开始,执行日期为{},执行完成=====》"+calculatedParam.getDataDate());
|
||||||
System.gc();
|
System.gc();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -169,7 +166,8 @@ public class IDataCrossingServiceImpl implements IDataCrossingService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void limitQualifiedDayHandler(CalculatedParam calculatedParam) {
|
public void limitQualifiedDayHandler(CalculatedParam calculatedParam) {
|
||||||
logger.info("{},r_stat_limit_qualified_d算法开始=====》", LocalDateTime.now());
|
MemorySizeUtil.getNowMemory();
|
||||||
|
logger.info("{},{}r_stat_limit_qualified_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate());
|
||||||
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
||||||
lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate()));
|
lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate()));
|
||||||
lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate()));
|
lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate()));
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import cn.hutool.core.date.DatePattern;
|
|||||||
import cn.hutool.core.date.LocalDateTimeUtil;
|
import cn.hutool.core.date.LocalDateTimeUtil;
|
||||||
import com.njcn.algorithm.pojo.bo.CalculatedParam;
|
import com.njcn.algorithm.pojo.bo.CalculatedParam;
|
||||||
import com.njcn.algorithm.service.line.IDataIntegrityService;
|
import com.njcn.algorithm.service.line.IDataIntegrityService;
|
||||||
|
import com.njcn.algorithm.utils.MemorySizeUtil;
|
||||||
import com.njcn.dataProcess.api.DataIntegrityFeignClient;
|
import com.njcn.dataProcess.api.DataIntegrityFeignClient;
|
||||||
import com.njcn.dataProcess.api.DataVFeignClient;
|
import com.njcn.dataProcess.api.DataVFeignClient;
|
||||||
import com.njcn.dataProcess.dto.MeasurementCountDTO;
|
import com.njcn.dataProcess.dto.MeasurementCountDTO;
|
||||||
@@ -42,8 +43,8 @@ public class IDataIntegrityServiceImpl implements IDataIntegrityService {
|
|||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(IDataIntegrityServiceImpl.class);
|
private static final Logger logger = LoggerFactory.getLogger(IDataIntegrityServiceImpl.class);
|
||||||
|
|
||||||
@Value("${line.num}")
|
@Value("${line.num:10}")
|
||||||
private Integer NUM = 100;
|
private Integer NUM;
|
||||||
@Resource
|
@Resource
|
||||||
private CommTerminalGeneralClient commTerminalGeneralClient;
|
private CommTerminalGeneralClient commTerminalGeneralClient;
|
||||||
@Resource
|
@Resource
|
||||||
@@ -54,7 +55,8 @@ public class IDataIntegrityServiceImpl implements IDataIntegrityService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void dataIntegrity(CalculatedParam<String> calculatedParam) {
|
public void dataIntegrity(CalculatedParam<String> calculatedParam) {
|
||||||
logger.info("{},integrity表转r_stat_integrity_d算法开始=====》", LocalDateTime.now());
|
MemorySizeUtil.getNowMemory();
|
||||||
|
logger.info("{},{}integrity表转r_stat_integrity_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate());
|
||||||
List<DataIntegrityDto> poList = new ArrayList<>();
|
List<DataIntegrityDto> poList = new ArrayList<>();
|
||||||
List<String> lineIds = calculatedParam.getIdList();
|
List<String> lineIds = calculatedParam.getIdList();
|
||||||
String beginDay = LocalDateTimeUtil.format(
|
String beginDay = LocalDateTimeUtil.format(
|
||||||
|
|||||||
@@ -294,11 +294,11 @@ public class IDataLimitRateAsyncImpl implements IDataLimitRateAsync {
|
|||||||
dataLimitRateDetailFeignClient.batchInsertion(detail);
|
dataLimitRateDetailFeignClient.batchInsertion(detail);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
logger.info(dataDate + " 总分区数据:" + size + "=====》当前第" + i + "小分区已完成!");
|
// logger.info(dataDate + " 总分区数据:" + size + "=====》当前第" + i + "小分区已完成!");
|
||||||
result = null;
|
result = null;
|
||||||
if(i==size){
|
// if(i==size){
|
||||||
MemorySizeUtil.getNowMemory();
|
// MemorySizeUtil.getNowMemory();
|
||||||
}
|
// }
|
||||||
System.gc();
|
System.gc();
|
||||||
|
|
||||||
return CompletableFuture.completedFuture(null);
|
return CompletableFuture.completedFuture(null);
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import cn.hutool.core.util.NumberUtil;
|
|||||||
import cn.hutool.core.util.ObjectUtil;
|
import cn.hutool.core.util.ObjectUtil;
|
||||||
import com.njcn.algorithm.pojo.bo.CalculatedParam;
|
import com.njcn.algorithm.pojo.bo.CalculatedParam;
|
||||||
import com.njcn.algorithm.service.line.IDataOnlineRateService;
|
import com.njcn.algorithm.service.line.IDataOnlineRateService;
|
||||||
|
import com.njcn.algorithm.utils.MemorySizeUtil;
|
||||||
import com.njcn.dataProcess.api.DataIntegrityFeignClient;
|
import com.njcn.dataProcess.api.DataIntegrityFeignClient;
|
||||||
import com.njcn.dataProcess.api.DataOnlineRateFeignClient;
|
import com.njcn.dataProcess.api.DataOnlineRateFeignClient;
|
||||||
import com.njcn.dataProcess.api.DataVFeignClient;
|
import com.njcn.dataProcess.api.DataVFeignClient;
|
||||||
@@ -48,8 +49,8 @@ public class IDataOnlineRateServiceImpl implements IDataOnlineRateService {
|
|||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(IDataOnlineRateServiceImpl.class);
|
private static final Logger logger = LoggerFactory.getLogger(IDataOnlineRateServiceImpl.class);
|
||||||
|
|
||||||
@Value("${line.num}")
|
@Value("${line.num:10}")
|
||||||
private Integer NUM = 100;
|
private Integer NUM;
|
||||||
|
|
||||||
private final Integer online = 1;
|
private final Integer online = 1;
|
||||||
private final Integer offline = 0;
|
private final Integer offline = 0;
|
||||||
@@ -69,7 +70,8 @@ public class IDataOnlineRateServiceImpl implements IDataOnlineRateService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void dataOnlineRate(CalculatedParam calculatedParam) {
|
public void dataOnlineRate(CalculatedParam calculatedParam) {
|
||||||
logger.info("{},onlineRate表转r_stat_onlinerate_d算法开始=====》", LocalDateTime.now());
|
MemorySizeUtil.getNowMemory();
|
||||||
|
logger.info("{},{}onlineRate表转r_stat_onlinerate_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate());
|
||||||
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
||||||
lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate()));
|
lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate()));
|
||||||
lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate()));
|
lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate()));
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import cn.hutool.core.date.LocalDateTimeUtil;
|
|||||||
import cn.hutool.core.util.ObjectUtil;
|
import cn.hutool.core.util.ObjectUtil;
|
||||||
import com.njcn.algorithm.pojo.bo.CalculatedParam;
|
import com.njcn.algorithm.pojo.bo.CalculatedParam;
|
||||||
import com.njcn.algorithm.service.line.IEventDetailService;
|
import com.njcn.algorithm.service.line.IEventDetailService;
|
||||||
|
import com.njcn.algorithm.utils.MemorySizeUtil;
|
||||||
import com.njcn.dataProcess.api.EventDetailFeignClient;
|
import com.njcn.dataProcess.api.EventDetailFeignClient;
|
||||||
import com.njcn.dataProcess.api.RmpEventDetailFeignClient;
|
import com.njcn.dataProcess.api.RmpEventDetailFeignClient;
|
||||||
import com.njcn.dataProcess.dto.RmpEventDetailDTO;
|
import com.njcn.dataProcess.dto.RmpEventDetailDTO;
|
||||||
@@ -44,8 +45,8 @@ import java.util.stream.Collectors;
|
|||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
public class IEventDetailServiceImpl implements IEventDetailService {
|
public class IEventDetailServiceImpl implements IEventDetailService {
|
||||||
private static final Logger logger = LoggerFactory.getLogger(IDataOnlineRateServiceImpl.class);
|
private static final Logger logger = LoggerFactory.getLogger(IDataOnlineRateServiceImpl.class);
|
||||||
@Value("${line.num}")
|
@Value("${line.num:10}")
|
||||||
private Integer NUM = 10;
|
private Integer NUM;
|
||||||
@Resource
|
@Resource
|
||||||
private RmpEventDetailFeignClient eventDetailFeignClient;
|
private RmpEventDetailFeignClient eventDetailFeignClient;
|
||||||
@Resource
|
@Resource
|
||||||
@@ -55,7 +56,8 @@ public class IEventDetailServiceImpl implements IEventDetailService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void dataDayHandle(CalculatedParam<String> calculatedParam) {
|
public void dataDayHandle(CalculatedParam<String> calculatedParam) {
|
||||||
logger.info("{},r_mp_event_detail_d算法开始=====》", LocalDateTime.now());
|
MemorySizeUtil.getNowMemory();
|
||||||
|
logger.info("{},{}r_mp_event_detail_d算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate());
|
||||||
List<DataEventDetailDto> result = new ArrayList<>();
|
List<DataEventDetailDto> result = new ArrayList<>();
|
||||||
//远程接口获取分钟数据
|
//远程接口获取分钟数据
|
||||||
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
||||||
@@ -106,7 +108,8 @@ public class IEventDetailServiceImpl implements IEventDetailService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void dataMonthHandle(CalculatedParam<String> calculatedParam) {
|
public void dataMonthHandle(CalculatedParam<String> calculatedParam) {
|
||||||
logger.info("{},r_mp_event_detail_m算法开始=====》", LocalDateTime.now());
|
MemorySizeUtil.getNowMemory();
|
||||||
|
logger.info("{},{}r_mp_event_detail_m算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate());
|
||||||
List<DataEventDetailDto> result = new ArrayList<>();
|
List<DataEventDetailDto> result = new ArrayList<>();
|
||||||
//远程接口获取分钟数据
|
//远程接口获取分钟数据
|
||||||
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
||||||
@@ -139,7 +142,8 @@ public class IEventDetailServiceImpl implements IEventDetailService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void dataQuarterHandle(CalculatedParam<String> calculatedParam) {
|
public void dataQuarterHandle(CalculatedParam<String> calculatedParam) {
|
||||||
logger.info("{},r_mp_event_detail_q算法开始=====》", LocalDateTime.now());
|
MemorySizeUtil.getNowMemory();
|
||||||
|
logger.info("{},{}r_mp_event_detail_q算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate());
|
||||||
List<DataEventDetailDto> result = new ArrayList<>();
|
List<DataEventDetailDto> result = new ArrayList<>();
|
||||||
//远程接口获取分钟数据
|
//远程接口获取分钟数据
|
||||||
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
||||||
@@ -172,7 +176,8 @@ public class IEventDetailServiceImpl implements IEventDetailService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void dataYearHandle(CalculatedParam<String> calculatedParam) {
|
public void dataYearHandle(CalculatedParam<String> calculatedParam) {
|
||||||
logger.info("{},r_mp_event_detail_y算法开始=====》", LocalDateTime.now());
|
MemorySizeUtil.getNowMemory();
|
||||||
|
logger.info("{},{}r_mp_event_detail_y算法开始=====》", LocalDateTime.now(),calculatedParam.getDataDate());
|
||||||
List<DataEventDetailDto> result = new ArrayList<>();
|
List<DataEventDetailDto> result = new ArrayList<>();
|
||||||
//远程接口获取分钟数据
|
//远程接口获取分钟数据
|
||||||
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
||||||
|
|||||||
@@ -49,8 +49,8 @@ import java.util.stream.Stream;
|
|||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
public class PollutionServiceImpl implements IPollutionService {
|
public class PollutionServiceImpl implements IPollutionService {
|
||||||
|
|
||||||
@Value("${line.num}")
|
@Value("${line.num:10}")
|
||||||
private Integer NUM = 100;
|
private Integer NUM;
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private PqDataVerifyFeignClient pqDataVerifyFeignClient;
|
private PqDataVerifyFeignClient pqDataVerifyFeignClient;
|
||||||
|
|||||||
@@ -65,8 +65,8 @@ import java.util.stream.Collectors;
|
|||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
public class SpecialAnalysisServiceImpl implements ISpecialAnalysisService {
|
public class SpecialAnalysisServiceImpl implements ISpecialAnalysisService {
|
||||||
|
|
||||||
|
@Value("${line.num:10}")
|
||||||
private Integer NUM=10;
|
private Integer NUM;
|
||||||
@Resource
|
@Resource
|
||||||
private EventDetailFeignClient eventDetailFeignClient;
|
private EventDetailFeignClient eventDetailFeignClient;
|
||||||
@Resource
|
@Resource
|
||||||
|
|||||||
@@ -73,11 +73,10 @@ public class LnDataDealServiceImpl implements LnDataDealService {
|
|||||||
private Executor executor;
|
private Executor executor;
|
||||||
@Override
|
@Override
|
||||||
public void batchInsertion(LnDataDTO lnDataDTO) {
|
public void batchInsertion(LnDataDTO lnDataDTO) {
|
||||||
long start = System.currentTimeMillis();
|
|
||||||
|
|
||||||
|
|
||||||
List<CompletableFuture<Void>> futures = new ArrayList<>();
|
List<CompletableFuture<Void>> futures = new ArrayList<>();
|
||||||
// 提交每个批量插入任务
|
// 提交每个批量插入任务
|
||||||
futures.add(CompletableFuture.runAsync(() -> dataVService.batchInsertion(lnDataDTO.getDataVList()), executor));
|
futures.add(CompletableFuture.runAsync(() -> dataVService.batchInsertion(lnDataDTO.getDataVList()), executor));
|
||||||
futures.add(CompletableFuture.runAsync(() -> dataHarmrateVQuery.batchInsertion(lnDataDTO.getDataHarmrateVDTOList()), executor));
|
futures.add(CompletableFuture.runAsync(() -> dataHarmrateVQuery.batchInsertion(lnDataDTO.getDataHarmrateVDTOList()), executor));
|
||||||
futures.add(CompletableFuture.runAsync(() -> dataFlickerQuery.batchInsertion(lnDataDTO.getDataFlickerDTOList()), executor));
|
futures.add(CompletableFuture.runAsync(() -> dataFlickerQuery.batchInsertion(lnDataDTO.getDataFlickerDTOList()), executor));
|
||||||
@@ -95,8 +94,6 @@ public class LnDataDealServiceImpl implements LnDataDealService {
|
|||||||
// 等待所有任务完成,如果任一任务抛出异常,这里会重新抛出
|
// 等待所有任务完成,如果任一任务抛出异常,这里会重新抛出
|
||||||
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
||||||
|
|
||||||
long end = System.currentTimeMillis();
|
|
||||||
log.info("入库方法执行耗时:" + (end - start) + " ms");
|
|
||||||
|
|
||||||
|
|
||||||
// dataVService.batchInsertion(lnDataDTO.getDataVList());
|
// dataVService.batchInsertion(lnDataDTO.getDataVList());
|
||||||
@@ -118,8 +115,6 @@ public class LnDataDealServiceImpl implements LnDataDealService {
|
|||||||
|
|
||||||
List<String> lineIdList = lnDataDTO.getDataVList().stream().map(DataVDTO::getLineid).distinct().collect(Collectors.toList());
|
List<String> lineIdList = lnDataDTO.getDataVList().stream().map(DataVDTO::getLineid).distinct().collect(Collectors.toList());
|
||||||
List<LineDeviceStateVO> data = lineFeignClient.getAllLine(lineIdList).getData();
|
List<LineDeviceStateVO> data = lineFeignClient.getAllLine(lineIdList).getData();
|
||||||
long end2 = System.currentTimeMillis();
|
|
||||||
log.info("查询台账耗时:" + (end2 - end) + " ms");
|
|
||||||
|
|
||||||
|
|
||||||
Map<String, String> map = data.stream().collect(Collectors.toMap(LineDeviceStateVO::getId, temp -> temp.getPids().split(",")[4]));
|
Map<String, String> map = data.stream().collect(Collectors.toMap(LineDeviceStateVO::getId, temp -> temp.getPids().split(",")[4]));
|
||||||
@@ -144,9 +139,6 @@ public class LnDataDealServiceImpl implements LnDataDealService {
|
|||||||
});
|
});
|
||||||
CompletableFuture.allOf(updatefutures.toArray(new CompletableFuture[0])).join();
|
CompletableFuture.allOf(updatefutures.toArray(new CompletableFuture[0])).join();
|
||||||
|
|
||||||
long end3 = System.currentTimeMillis();
|
|
||||||
log.info("更新device表:" + (end3 - end2) + " ms");
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -110,19 +110,16 @@ public class InfluxdbPqsCommunicateImpl implements IPqsCommunicate {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void insertion(PqsCommunicateDto pqsCommunicateDto) {
|
public void insertion(PqsCommunicateDto pqsCommunicateDto) {
|
||||||
// log.info("进出Influxdb实现类");
|
|
||||||
//获取最新一条数据
|
//获取最新一条数据
|
||||||
if(Objects.equals(pqsCommunicateDto.getUpdateCommunicateFlag(),0)){
|
if(Objects.equals(pqsCommunicateDto.getUpdateCommunicateFlag(),0)){
|
||||||
long start = System.currentTimeMillis();
|
|
||||||
|
|
||||||
PqsCommunicate dto = new PqsCommunicate();
|
PqsCommunicate dto = new PqsCommunicate();
|
||||||
InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(PqsCommunicate.class);
|
InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(PqsCommunicate.class);
|
||||||
influxQueryWrapper.eq(PqsCommunicate::getDevId,pqsCommunicateDto.getDevId()).timeDesc().limit(1);
|
influxQueryWrapper.eq(PqsCommunicate::getDevId,pqsCommunicateDto.getDevId()).timeDesc().limit(1);
|
||||||
List<PqsCommunicate> pqsCommunicates = pqsCommunicateMapper.selectByQueryWrapper(influxQueryWrapper);
|
List<PqsCommunicate> pqsCommunicates = pqsCommunicateMapper.selectByQueryWrapper(influxQueryWrapper);
|
||||||
|
|
||||||
long end1 = System.currentTimeMillis();
|
|
||||||
|
|
||||||
log.info("查询通讯表最新数据:" + (end1 - start) + " ms");
|
|
||||||
PqsCommunicate pqsCommunicate = new PqsCommunicate();
|
PqsCommunicate pqsCommunicate = new PqsCommunicate();
|
||||||
pqsCommunicate.setTime(LocalDateTime.parse(pqsCommunicateDto.getTime(), DATE_TIME_FORMATTER).atZone(ZoneId.systemDefault()).toInstant());
|
pqsCommunicate.setTime(LocalDateTime.parse(pqsCommunicateDto.getTime(), DATE_TIME_FORMATTER).atZone(ZoneId.systemDefault()).toInstant());
|
||||||
pqsCommunicate.setDevId(pqsCommunicateDto.getDevId());
|
pqsCommunicate.setDevId(pqsCommunicateDto.getDevId());
|
||||||
@@ -133,11 +130,10 @@ public class InfluxdbPqsCommunicateImpl implements IPqsCommunicate {
|
|||||||
pqsCommunicateMapper.insertOne(pqsCommunicate);
|
pqsCommunicateMapper.insertOne(pqsCommunicate);
|
||||||
|
|
||||||
}
|
}
|
||||||
long end2 = System.currentTimeMillis();
|
|
||||||
log.info("更新通讯表最新数据:" + (end2 - end1) + " ms");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
long end3 = System.currentTimeMillis();
|
|
||||||
|
|
||||||
|
|
||||||
//更新mysql数据
|
//更新mysql数据
|
||||||
@@ -149,8 +145,7 @@ public class InfluxdbPqsCommunicateImpl implements IPqsCommunicate {
|
|||||||
devComFlagDTO.setStatus(pqsCommunicateDto.getType());
|
devComFlagDTO.setStatus(pqsCommunicateDto.getType());
|
||||||
|
|
||||||
deviceFeignClient.updateDevComFlag(devComFlagDTO);
|
deviceFeignClient.updateDevComFlag(devComFlagDTO);
|
||||||
long end4 = System.currentTimeMillis();
|
|
||||||
log.info("更新device表状态和时间:" + (end4 - end3) + " ms");
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user