From fce3f74da12b10deede2aac98ae32c733c8b8217 Mon Sep 17 00:00:00 2001 From: hzj <826100833@qq.com> Date: Wed, 10 Jun 2026 08:44:16 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=86=E5=85=A5=E5=BA=93=E6=94=B9=E6=88=90?= =?UTF-8?q?=E5=A4=9A=E7=BA=BF=E7=A8=8B=E6=B7=BB=E5=8A=A0=E6=97=A5=E5=BF=97?= =?UTF-8?q?=EF=BC=8C=E6=8E=92=E6=9F=A5=E8=80=97=E6=97=B6=E6=83=85=E5=86=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/impl/LnDataDealServiceImpl.java | 15 +++++++++++---- .../impl/influxdb/InfluxdbPqsCommunicateImpl.java | 11 +++++++++++ 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/LnDataDealServiceImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/LnDataDealServiceImpl.java index 2d7c593..a3f8e15 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/LnDataDealServiceImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/LnDataDealServiceImpl.java @@ -98,7 +98,6 @@ public class LnDataDealServiceImpl implements LnDataDealService { long end = System.currentTimeMillis(); log.info("入库方法执行耗时:" + (end - start) + " ms"); - // 关闭线程池(优雅关闭,等待已提交任务完成) // dataVService.batchInsertion(lnDataDTO.getDataVList()); // dataHarmrateVQuery.batchInsertion(lnDataDTO.getDataHarmrateVDTOList()); @@ -119,11 +118,17 @@ public class LnDataDealServiceImpl implements LnDataDealService { List lineIdList = lnDataDTO.getDataVList().stream().map(DataVDTO::getLineid).distinct().collect(Collectors.toList()); List data = lineFeignClient.getAllLine(lineIdList).getData(); + long end2 = System.currentTimeMillis(); + log.info("查询台账耗时:" + (end2 - end) + " ms"); + + Map map = data.stream().collect(Collectors.toMap(LineDeviceStateVO::getId, temp -> temp.getPids().split(",")[4])); lnDataDTO.getDataVList().forEach(temp->{ temp.setDevId(map.get(temp.getLineid())); }); Map> collect = lnDataDTO.getDataVList().stream().collect(Collectors.groupingBy(DataVDTO::getDevId)); + List> updatefutures = new ArrayList<>(); + collect.forEach((temp,dataVDTOList)->{ PqsCommunicateDto pqsCommunicateDto = new PqsCommunicateDto(); DataVDTO dataVDTO =dataVDTOList.stream().max(Comparator.comparing(DataVDTO::getTimeid)).get(); @@ -132,12 +137,14 @@ public class LnDataDealServiceImpl implements LnDataDealService { pqsCommunicateDto.setDevId(temp); pqsCommunicateDto.setType(1); pqsCommunicateDto.setFlag(1); + updatefutures.add(CompletableFuture.runAsync(() -> iPqsCommunicate.insertion(pqsCommunicateDto), executor)); + - iPqsCommunicate.insertion(pqsCommunicateDto); }); + CompletableFuture.allOf(updatefutures.toArray(new CompletableFuture[0])).join(); - long end2 = System.currentTimeMillis(); - log.info("更新device表:" + (end2 - end) + " ms"); + long end3 = System.currentTimeMillis(); + log.info("更新device表:" + (end3 - end2) + " ms"); } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbPqsCommunicateImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbPqsCommunicateImpl.java index d56058e..6c4d058 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbPqsCommunicateImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbPqsCommunicateImpl.java @@ -112,11 +112,16 @@ public class InfluxdbPqsCommunicateImpl implements IPqsCommunicate { public void insertion(PqsCommunicateDto pqsCommunicateDto) { // log.info("进出Influxdb实现类"); //获取最新一条数据 + long start = System.currentTimeMillis(); + PqsCommunicate dto = new PqsCommunicate(); InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(PqsCommunicate.class); influxQueryWrapper.eq(PqsCommunicate::getDevId,pqsCommunicateDto.getDevId()).timeDesc().limit(1); List pqsCommunicates = pqsCommunicateMapper.selectByQueryWrapper(influxQueryWrapper); + long end1 = System.currentTimeMillis(); + + log.info("更新device表:" + (end1 - start) + " ms"); PqsCommunicate pqsCommunicate = new PqsCommunicate(); pqsCommunicate.setTime(LocalDateTime.parse(pqsCommunicateDto.getTime(), DATE_TIME_FORMATTER).atZone(ZoneId.systemDefault()).toInstant()); pqsCommunicate.setDevId(pqsCommunicateDto.getDevId()); @@ -127,6 +132,9 @@ public class InfluxdbPqsCommunicateImpl implements IPqsCommunicate { pqsCommunicateMapper.insertOne(pqsCommunicate); } + long end2 = System.currentTimeMillis(); + log.info("更新device表:" + (end2 - end1) + " ms"); + //更新mysql数据 DevComFlagDTO devComFlagDTO = new DevComFlagDTO(); devComFlagDTO.setId(pqsCommunicateDto.getDevId()); @@ -136,6 +144,9 @@ public class InfluxdbPqsCommunicateImpl implements IPqsCommunicate { devComFlagDTO.setStatus(pqsCommunicateDto.getType()); deviceFeignClient.updateDevComFlag(devComFlagDTO); + long end3 = System.currentTimeMillis(); + log.info("更新device表状态和时间:" + (end3 - end2) + " ms"); + }