From 5090e023b191fd140e0ed55b105f044d5b83bae9 Mon Sep 17 00:00:00 2001 From: hzj <826100833@qq.com> Date: Wed, 10 Jun 2026 10:05:57 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=9C=80=E6=96=B0=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E6=9D=A5=E4=BA=86=E4=B8=8D=E6=9B=B4=E6=96=B0=E9=80=9A?= =?UTF-8?q?=E8=AE=AF=E8=A1=A8=E6=95=B0=E6=8D=AE=EF=BC=8C=E9=80=9A=E8=AE=AF?= =?UTF-8?q?=E8=A1=A8=E7=94=B1=E5=89=8D=E7=BD=AE=E7=8A=B6=E6=80=81=E7=BB=B4?= =?UTF-8?q?=E6=8A=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pojo/dto/PqsCommunicateDto.java | 3 ++ .../service/impl/LnDataDealServiceImpl.java | 1 + .../influxdb/InfluxdbPqsCommunicateImpl.java | 45 ++++++++++--------- 3 files changed, 29 insertions(+), 20 deletions(-) diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/pojo/dto/PqsCommunicateDto.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/pojo/dto/PqsCommunicateDto.java index 6271716..4b7f81a 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/pojo/dto/PqsCommunicateDto.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/pojo/dto/PqsCommunicateDto.java @@ -20,6 +20,9 @@ public class PqsCommunicateDto { private Integer type; //是否更新updateTime标志;数据上送更新1,状态翻转不更新0 private Integer flag=0; + //是否更新、pqs_communicate标志,0更新,1不更新 + + private Integer updateCommunicateFlag=0; } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/LnDataDealServiceImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/LnDataDealServiceImpl.java index a3f8e15..44d0af7 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/LnDataDealServiceImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/LnDataDealServiceImpl.java @@ -137,6 +137,7 @@ public class LnDataDealServiceImpl implements LnDataDealService { pqsCommunicateDto.setDevId(temp); pqsCommunicateDto.setType(1); pqsCommunicateDto.setFlag(1); + pqsCommunicateDto.setUpdateCommunicateFlag(1); updatefutures.add(CompletableFuture.runAsync(() -> iPqsCommunicate.insertion(pqsCommunicateDto), executor)); diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbPqsCommunicateImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbPqsCommunicateImpl.java index 6a771f2..c332b91 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbPqsCommunicateImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbPqsCommunicateImpl.java @@ -112,28 +112,33 @@ public class InfluxdbPqsCommunicateImpl implements IPqsCommunicate { public void insertion(PqsCommunicateDto pqsCommunicateDto) { // log.info("进出Influxdb实现类"); //获取最新一条数据 - long start = System.currentTimeMillis(); + if(Objects.equals(pqsCommunicateDto.getUpdateCommunicateFlag(),0)){ + long start = System.currentTimeMillis(); - PqsCommunicate dto = new PqsCommunicate(); - InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(PqsCommunicate.class); - influxQueryWrapper.eq(PqsCommunicate::getDevId,pqsCommunicateDto.getDevId()).timeDesc().limit(1); - List pqsCommunicates = pqsCommunicateMapper.selectByQueryWrapper(influxQueryWrapper); + PqsCommunicate dto = new PqsCommunicate(); + InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(PqsCommunicate.class); + influxQueryWrapper.eq(PqsCommunicate::getDevId,pqsCommunicateDto.getDevId()).timeDesc().limit(1); + List pqsCommunicates = pqsCommunicateMapper.selectByQueryWrapper(influxQueryWrapper); - long end1 = System.currentTimeMillis(); + long end1 = System.currentTimeMillis(); - log.info("查询通讯表最新数据:" + (end1 - start) + " ms"); - PqsCommunicate pqsCommunicate = new PqsCommunicate(); - pqsCommunicate.setTime(LocalDateTime.parse(pqsCommunicateDto.getTime(), DATE_TIME_FORMATTER).atZone(ZoneId.systemDefault()).toInstant()); - pqsCommunicate.setDevId(pqsCommunicateDto.getDevId()); - pqsCommunicate.setType(pqsCommunicateDto.getType()); - //如果不存数据或者状态不一样则插入数据 - //可能存在掉线后最后一组数据还未入库,添加时间判断 - if(CollectionUtils.isEmpty(pqsCommunicates)|| (!Objects.equals( pqsCommunicates.get(0).getType(),pqsCommunicateDto.getType())&&pqsCommunicates.get(0).getTime().isBefore(pqsCommunicate.getTime()))){ - pqsCommunicateMapper.insertOne(pqsCommunicate); + log.info("查询通讯表最新数据:" + (end1 - start) + " ms"); + PqsCommunicate pqsCommunicate = new PqsCommunicate(); + pqsCommunicate.setTime(LocalDateTime.parse(pqsCommunicateDto.getTime(), DATE_TIME_FORMATTER).atZone(ZoneId.systemDefault()).toInstant()); + pqsCommunicate.setDevId(pqsCommunicateDto.getDevId()); + pqsCommunicate.setType(pqsCommunicateDto.getType()); + //如果不存数据或者状态不一样则插入数据 + //可能存在掉线后最后一组数据还未入库,添加时间判断 + if(CollectionUtils.isEmpty(pqsCommunicates)|| (!Objects.equals( pqsCommunicates.get(0).getType(),pqsCommunicateDto.getType())&&pqsCommunicates.get(0).getTime().isBefore(pqsCommunicate.getTime()))){ + pqsCommunicateMapper.insertOne(pqsCommunicate); + + } + long end2 = System.currentTimeMillis(); + log.info("更新通讯表最新数据:" + (end2 - end1) + " ms"); + } + + long end3 = System.currentTimeMillis(); - } - long end2 = System.currentTimeMillis(); - log.info("更新通讯表最新数据:" + (end2 - end1) + " ms"); //更新mysql数据 DevComFlagDTO devComFlagDTO = new DevComFlagDTO(); @@ -144,8 +149,8 @@ public class InfluxdbPqsCommunicateImpl implements IPqsCommunicate { devComFlagDTO.setStatus(pqsCommunicateDto.getType()); deviceFeignClient.updateDevComFlag(devComFlagDTO); - long end3 = System.currentTimeMillis(); - log.info("更新device表状态和时间:" + (end3 - end2) + " ms"); + long end4 = System.currentTimeMillis(); + log.info("更新device表状态和时间:" + (end4 - end3) + " ms"); }