diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/LnDataDealServiceImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/LnDataDealServiceImpl.java index 572771c..3c6aece 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/LnDataDealServiceImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/LnDataDealServiceImpl.java @@ -16,10 +16,14 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; +import javax.annotation.Resource; import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.stream.Collectors; /** @@ -66,22 +70,45 @@ public class LnDataDealServiceImpl implements LnDataDealService { private LineFeignClient lineFeignClient; @QueryBean private IPqsCommunicate iPqsCommunicate; - + @Resource(name="asyncExecutor") + private Executor executor; @Override public void batchInsertion(LnDataDTO lnDataDTO) { - dataVService.batchInsertion(lnDataDTO.getDataVList()); - dataHarmrateVQuery.batchInsertion(lnDataDTO.getDataHarmrateVDTOList()); - dataFlickerQuery.batchInsertion(lnDataDTO.getDataFlickerDTOList()); - dataFlucQuery.batchInsertion(lnDataDTO.getDataFlucDTOList()); - dataHarmphasicIQuery.batchInsertion(lnDataDTO.getDataHarmphasicIDTOList()); - dataHarmphasicVQuery.batchInsertion(lnDataDTO.getDataHarmphasicVDTOList()); - dataHarmpowerPService.batchInsertion(lnDataDTO.getDataHarmpowerPDTOList()); - dataHarmpowerQService.batchInsertion(lnDataDTO.getDataHarmpowerQDTOList()); - dataHarmpowerSService.batchInsertion(lnDataDTO.getDataHarmpowerSDTOList()); - dataIService.batchInsertion(lnDataDTO.getDataIDTOList()); - dataInharmIService.batchInsertion(lnDataDTO.getDataInharmIDTOList()); - dataInharmVService.batchInsertion(lnDataDTO.getDataInharmVDTOList()); - dataPltService.batchInsertion(lnDataDTO.getDataPltDTOList()); + + List> futures = new ArrayList<>(); +// 提交每个批量插入任务 + futures.add(CompletableFuture.runAsync(() -> dataVService.batchInsertion(lnDataDTO.getDataVList()), executor)); + futures.add(CompletableFuture.runAsync(() -> dataHarmrateVQuery.batchInsertion(lnDataDTO.getDataHarmrateVDTOList()), executor)); + futures.add(CompletableFuture.runAsync(() -> dataFlickerQuery.batchInsertion(lnDataDTO.getDataFlickerDTOList()), executor)); + futures.add(CompletableFuture.runAsync(() -> dataFlucQuery.batchInsertion(lnDataDTO.getDataFlucDTOList()), executor)); + futures.add(CompletableFuture.runAsync(() -> dataHarmphasicIQuery.batchInsertion(lnDataDTO.getDataHarmphasicIDTOList()), executor)); + futures.add(CompletableFuture.runAsync(() -> dataHarmphasicVQuery.batchInsertion(lnDataDTO.getDataHarmphasicVDTOList()), executor)); + futures.add(CompletableFuture.runAsync(() -> dataHarmpowerPService.batchInsertion(lnDataDTO.getDataHarmpowerPDTOList()), executor)); + futures.add(CompletableFuture.runAsync(() -> dataHarmpowerQService.batchInsertion(lnDataDTO.getDataHarmpowerQDTOList()), executor)); + futures.add(CompletableFuture.runAsync(() -> dataHarmpowerSService.batchInsertion(lnDataDTO.getDataHarmpowerSDTOList()), executor)); + futures.add(CompletableFuture.runAsync(() -> dataIService.batchInsertion(lnDataDTO.getDataIDTOList()), executor)); + futures.add(CompletableFuture.runAsync(() -> dataInharmIService.batchInsertion(lnDataDTO.getDataInharmIDTOList()), executor)); + futures.add(CompletableFuture.runAsync(() -> dataInharmVService.batchInsertion(lnDataDTO.getDataInharmVDTOList()), executor)); + futures.add(CompletableFuture.runAsync(() -> dataPltService.batchInsertion(lnDataDTO.getDataPltDTOList()), executor)); + + // 等待所有任务完成,如果任一任务抛出异常,这里会重新抛出 + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + + // 关闭线程池(优雅关闭,等待已提交任务完成) + +// dataVService.batchInsertion(lnDataDTO.getDataVList()); +// dataHarmrateVQuery.batchInsertion(lnDataDTO.getDataHarmrateVDTOList()); +// dataFlickerQuery.batchInsertion(lnDataDTO.getDataFlickerDTOList()); +// dataFlucQuery.batchInsertion(lnDataDTO.getDataFlucDTOList()); +// dataHarmphasicIQuery.batchInsertion(lnDataDTO.getDataHarmphasicIDTOList()); +// dataHarmphasicVQuery.batchInsertion(lnDataDTO.getDataHarmphasicVDTOList()); +// dataHarmpowerPService.batchInsertion(lnDataDTO.getDataHarmpowerPDTOList()); +// dataHarmpowerQService.batchInsertion(lnDataDTO.getDataHarmpowerQDTOList()); +// dataHarmpowerSService.batchInsertion(lnDataDTO.getDataHarmpowerSDTOList()); +// dataIService.batchInsertion(lnDataDTO.getDataIDTOList()); +// dataInharmIService.batchInsertion(lnDataDTO.getDataInharmIDTOList()); +// dataInharmVService.batchInsertion(lnDataDTO.getDataInharmVDTOList()); +// dataPltService.batchInsertion(lnDataDTO.getDataPltDTOList()); //更新mysqldevice表最新数据时间 if(!CollectionUtils.isEmpty(lnDataDTO.getDataVList())){