将入库改成多线程添加日志,排查耗时情况
This commit is contained in:
@@ -98,7 +98,6 @@ public class LnDataDealServiceImpl implements LnDataDealService {
|
|||||||
long end = System.currentTimeMillis();
|
long end = System.currentTimeMillis();
|
||||||
log.info("入库方法执行耗时:" + (end - start) + " ms");
|
log.info("入库方法执行耗时:" + (end - start) + " ms");
|
||||||
|
|
||||||
// 关闭线程池(优雅关闭,等待已提交任务完成)
|
|
||||||
|
|
||||||
// dataVService.batchInsertion(lnDataDTO.getDataVList());
|
// dataVService.batchInsertion(lnDataDTO.getDataVList());
|
||||||
// dataHarmrateVQuery.batchInsertion(lnDataDTO.getDataHarmrateVDTOList());
|
// dataHarmrateVQuery.batchInsertion(lnDataDTO.getDataHarmrateVDTOList());
|
||||||
@@ -119,11 +118,17 @@ public class LnDataDealServiceImpl implements LnDataDealService {
|
|||||||
|
|
||||||
List<String> lineIdList = lnDataDTO.getDataVList().stream().map(DataVDTO::getLineid).distinct().collect(Collectors.toList());
|
List<String> lineIdList = lnDataDTO.getDataVList().stream().map(DataVDTO::getLineid).distinct().collect(Collectors.toList());
|
||||||
List<LineDeviceStateVO> data = lineFeignClient.getAllLine(lineIdList).getData();
|
List<LineDeviceStateVO> data = lineFeignClient.getAllLine(lineIdList).getData();
|
||||||
|
long end2 = System.currentTimeMillis();
|
||||||
|
log.info("查询台账耗时:" + (end2 - end) + " ms");
|
||||||
|
|
||||||
|
|
||||||
Map<String, String> map = data.stream().collect(Collectors.toMap(LineDeviceStateVO::getId, temp -> temp.getPids().split(",")[4]));
|
Map<String, String> map = data.stream().collect(Collectors.toMap(LineDeviceStateVO::getId, temp -> temp.getPids().split(",")[4]));
|
||||||
lnDataDTO.getDataVList().forEach(temp->{
|
lnDataDTO.getDataVList().forEach(temp->{
|
||||||
temp.setDevId(map.get(temp.getLineid()));
|
temp.setDevId(map.get(temp.getLineid()));
|
||||||
});
|
});
|
||||||
Map<String, List<DataVDTO>> collect = lnDataDTO.getDataVList().stream().collect(Collectors.groupingBy(DataVDTO::getDevId));
|
Map<String, List<DataVDTO>> collect = lnDataDTO.getDataVList().stream().collect(Collectors.groupingBy(DataVDTO::getDevId));
|
||||||
|
List<CompletableFuture<Void>> updatefutures = new ArrayList<>();
|
||||||
|
|
||||||
collect.forEach((temp,dataVDTOList)->{
|
collect.forEach((temp,dataVDTOList)->{
|
||||||
PqsCommunicateDto pqsCommunicateDto = new PqsCommunicateDto();
|
PqsCommunicateDto pqsCommunicateDto = new PqsCommunicateDto();
|
||||||
DataVDTO dataVDTO =dataVDTOList.stream().max(Comparator.comparing(DataVDTO::getTimeid)).get();
|
DataVDTO dataVDTO =dataVDTOList.stream().max(Comparator.comparing(DataVDTO::getTimeid)).get();
|
||||||
@@ -132,12 +137,14 @@ public class LnDataDealServiceImpl implements LnDataDealService {
|
|||||||
pqsCommunicateDto.setDevId(temp);
|
pqsCommunicateDto.setDevId(temp);
|
||||||
pqsCommunicateDto.setType(1);
|
pqsCommunicateDto.setType(1);
|
||||||
pqsCommunicateDto.setFlag(1);
|
pqsCommunicateDto.setFlag(1);
|
||||||
|
updatefutures.add(CompletableFuture.runAsync(() -> iPqsCommunicate.insertion(pqsCommunicateDto), executor));
|
||||||
|
|
||||||
|
|
||||||
iPqsCommunicate.insertion(pqsCommunicateDto);
|
|
||||||
});
|
});
|
||||||
|
CompletableFuture.allOf(updatefutures.toArray(new CompletableFuture[0])).join();
|
||||||
|
|
||||||
long end2 = System.currentTimeMillis();
|
long end3 = System.currentTimeMillis();
|
||||||
log.info("更新device表:" + (end2 - end) + " ms");
|
log.info("更新device表:" + (end3 - end2) + " ms");
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -112,11 +112,16 @@ public class InfluxdbPqsCommunicateImpl implements IPqsCommunicate {
|
|||||||
public void insertion(PqsCommunicateDto pqsCommunicateDto) {
|
public void insertion(PqsCommunicateDto pqsCommunicateDto) {
|
||||||
// log.info("进出Influxdb实现类");
|
// log.info("进出Influxdb实现类");
|
||||||
//获取最新一条数据
|
//获取最新一条数据
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
|
||||||
PqsCommunicate dto = new PqsCommunicate();
|
PqsCommunicate dto = new PqsCommunicate();
|
||||||
InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(PqsCommunicate.class);
|
InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(PqsCommunicate.class);
|
||||||
influxQueryWrapper.eq(PqsCommunicate::getDevId,pqsCommunicateDto.getDevId()).timeDesc().limit(1);
|
influxQueryWrapper.eq(PqsCommunicate::getDevId,pqsCommunicateDto.getDevId()).timeDesc().limit(1);
|
||||||
List<PqsCommunicate> pqsCommunicates = pqsCommunicateMapper.selectByQueryWrapper(influxQueryWrapper);
|
List<PqsCommunicate> pqsCommunicates = pqsCommunicateMapper.selectByQueryWrapper(influxQueryWrapper);
|
||||||
|
|
||||||
|
long end1 = System.currentTimeMillis();
|
||||||
|
|
||||||
|
log.info("更新device表:" + (end1 - start) + " ms");
|
||||||
PqsCommunicate pqsCommunicate = new PqsCommunicate();
|
PqsCommunicate pqsCommunicate = new PqsCommunicate();
|
||||||
pqsCommunicate.setTime(LocalDateTime.parse(pqsCommunicateDto.getTime(), DATE_TIME_FORMATTER).atZone(ZoneId.systemDefault()).toInstant());
|
pqsCommunicate.setTime(LocalDateTime.parse(pqsCommunicateDto.getTime(), DATE_TIME_FORMATTER).atZone(ZoneId.systemDefault()).toInstant());
|
||||||
pqsCommunicate.setDevId(pqsCommunicateDto.getDevId());
|
pqsCommunicate.setDevId(pqsCommunicateDto.getDevId());
|
||||||
@@ -127,6 +132,9 @@ public class InfluxdbPqsCommunicateImpl implements IPqsCommunicate {
|
|||||||
pqsCommunicateMapper.insertOne(pqsCommunicate);
|
pqsCommunicateMapper.insertOne(pqsCommunicate);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
long end2 = System.currentTimeMillis();
|
||||||
|
log.info("更新device表:" + (end2 - end1) + " ms");
|
||||||
|
|
||||||
//更新mysql数据
|
//更新mysql数据
|
||||||
DevComFlagDTO devComFlagDTO = new DevComFlagDTO();
|
DevComFlagDTO devComFlagDTO = new DevComFlagDTO();
|
||||||
devComFlagDTO.setId(pqsCommunicateDto.getDevId());
|
devComFlagDTO.setId(pqsCommunicateDto.getDevId());
|
||||||
@@ -136,6 +144,9 @@ public class InfluxdbPqsCommunicateImpl implements IPqsCommunicate {
|
|||||||
devComFlagDTO.setStatus(pqsCommunicateDto.getType());
|
devComFlagDTO.setStatus(pqsCommunicateDto.getType());
|
||||||
|
|
||||||
deviceFeignClient.updateDevComFlag(devComFlagDTO);
|
deviceFeignClient.updateDevComFlag(devComFlagDTO);
|
||||||
|
long end3 = System.currentTimeMillis();
|
||||||
|
log.info("更新device表状态和时间:" + (end3 - end2) + " ms");
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user