diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java index 3d51079..2a1863a 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java @@ -22,9 +22,11 @@ import com.njcn.access.pojo.dto.file.FileRedisDto; import com.njcn.access.pojo.param.ReqAndResParam; import com.njcn.access.pojo.po.CsLineModel; import com.njcn.access.pojo.po.CsTopic; -import com.njcn.access.service.*; +import com.njcn.access.service.ICsEquipmentDeliveryService; +import com.njcn.access.service.ICsLineModelService; +import com.njcn.access.service.ICsTopicService; +import com.njcn.access.service.IHeartbeatService; import com.njcn.access.utils.ChannelObjectUtil; -import com.njcn.common.pojo.dto.DeviceLogDTO; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.csdevice.api.*; import com.njcn.csdevice.pojo.dto.PqsCommunicateDto; @@ -36,10 +38,8 @@ import com.njcn.device.biz.utils.COverlimitUtil; import com.njcn.mq.message.AppAutoDataMessage; import com.njcn.mq.message.AppEventMessage; import com.njcn.mq.message.AppFileMessage; -import com.njcn.mq.template.AppAutoDataMessageTemplate; -import com.njcn.mq.template.AppEventMessageTemplate; -import com.njcn.mq.template.AppFileMessageTemplate; -import com.njcn.mq.template.AppFileStreamMessageTemplate; +import com.njcn.mq.message.LogMessage; +import com.njcn.mq.template.*; import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.utils.RedisUtil; import com.njcn.rt.api.RtFeignClient; @@ -86,10 +86,8 @@ public class MqttMessageHandler { private final DataSetFeignClient dataSetFeignClient; private final AppAutoDataMessageTemplate appAutoDataMessageTemplate; private final AppEventMessageTemplate appEventMessageTemplate; - private final CsLogsFeignClient csLogsFeignClient; private final AppFileMessageTemplate appFileMessageTemplate; private final AppFileStreamMessageTemplate appFileStreamMessageTemplate; - private final ICsDeviceOnlineLogsService onlineLogsService; private final CsSoftInfoFeignClient csSoftInfoFeignClient; private final CsLineFeignClient csLineFeignClient; private final DevCapacityFeignClient devCapacityFeignClient; @@ -100,6 +98,7 @@ public class MqttMessageHandler { private final RtFeignClient rtFeignClient; private final CsCommunicateFeignClient csCommunicateFeignClient; private final IHeartbeatService heartbeatService; + private final LogMessageTemplate logMessageTemplate; @Autowired Validator validator; @@ -107,14 +106,9 @@ public class MqttMessageHandler { @Transactional(rollbackFor = Exception.class) public void devTopic(String topic, MqttMessage message, @NamedValue("edgeId") String nDid, @Payload String payload){ //日志记录 - DeviceLogDTO logDto = new DeviceLogDTO(); - try{ - logDto.setUserName("运维管理员"); - logDto.setLoginName("njcnyw"); - } catch (Exception e) { - logDto.setUserName("运维管理员"); - logDto.setLoginName("njcnyw"); - } + LogMessage logDto = new LogMessage(); + logDto.setUserIndex("系统"); + logDto.setLoginName("系统"); logDto.setOperate(nDid + "设备主题录入"); logDto.setResult(1); //业务流程开始 @@ -122,9 +116,9 @@ public class MqttMessageHandler { ReqAndResParam.Res res = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResParam.Res.class); //检验传递的参数是否准确 Set> validate = validator.validate(res); - validate.forEach(constraintViolation -> { - System.out.println(constraintViolation.getMessage()); - }); +// validate.forEach(constraintViolation -> { +// System.out.println(constraintViolation.getMessage()); +// }); if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){ if (Objects.equals(res.getType(), Integer.parseInt(TypeEnum.TYPE_16.getCode()))){ List list = new ArrayList<>(); @@ -141,18 +135,18 @@ public class MqttMessageHandler { list.add(csTopic); }); csTopicService.addTopic(nDid,list); - csLogsFeignClient.addUserLog(logDto); + logMessageTemplate.sendMember(logDto); } else { logDto.setResult(0); logDto.setFailReason(AccessResponseEnum.MESSAGE_TYPE_ERROR.getMessage()); - csLogsFeignClient.addUserLog(logDto); - log.info(AccessResponseEnum.MESSAGE_TYPE_ERROR.getMessage()); + logMessageTemplate.sendMember(logDto); + //log.info(AccessResponseEnum.MESSAGE_TYPE_ERROR.getMessage()); } } else { logDto.setResult(0); logDto.setFailReason(AccessResponseEnum.RESPONSE_ERROR.getMessage()); - csLogsFeignClient.addUserLog(logDto); - log.info(AccessResponseEnum.RESPONSE_ERROR.getMessage()); + logMessageTemplate.sendMember(logDto); + //log.info(AccessResponseEnum.RESPONSE_ERROR.getMessage()); } } @@ -169,14 +163,9 @@ public class MqttMessageHandler { public void devOperation(String topic, MqttMessage message, @NamedValue("edgeId") String nDid, @Payload String payload){ log.info("收到注册应答响应--->{}", nDid); //日志记录 - DeviceLogDTO logDto = new DeviceLogDTO(); - try{ - logDto.setUserName("运维管理员"); - logDto.setLoginName("njcnyw"); - } catch (Exception e) { - logDto.setUserName("运维管理员"); - logDto.setLoginName("njcnyw"); - } + LogMessage logDto = new LogMessage(); + logDto.setUserIndex("系统"); + logDto.setLoginName("系统"); logDto.setOperate("收到设备"+nDid+"注册应答响应"); logDto.setResult(1); //业务处理 @@ -193,18 +182,18 @@ public class MqttMessageHandler { reqAndResParam.setExpire(-1); String version = csTopicService.getVersion(nDid); publisher.send("/Pfm/DevCmd/"+version+"/"+nDid,new Gson().toJson(reqAndResParam),1,false); - csLogsFeignClient.addUserLog(logDto); + logMessageTemplate.sendMember(logDto); } else { logDto.setResult(0); logDto.setFailReason(AccessResponseEnum.MESSAGE_TYPE_ERROR.getMessage()); - csLogsFeignClient.addUserLog(logDto); - log.info(AccessResponseEnum.MESSAGE_TYPE_ERROR.getMessage()); + logMessageTemplate.sendMember(logDto); + //log.info(AccessResponseEnum.MESSAGE_TYPE_ERROR.getMessage()); } } else { logDto.setResult(0); logDto.setFailReason(AccessResponseEnum.REGISTER_RESPONSE_ERROR.getMessage()); - csLogsFeignClient.addUserLog(logDto); - log.info(AccessResponseEnum.REGISTER_RESPONSE_ERROR.getMessage()); + logMessageTemplate.sendMember(logDto); + //log.info(AccessResponseEnum.REGISTER_RESPONSE_ERROR.getMessage()); } } @@ -220,20 +209,15 @@ public class MqttMessageHandler { @Transactional(rollbackFor = Exception.class) public void devAccessOperation(String topic, MqttMessage message, @NamedValue("version") String version, @NamedValue("edgeId") String nDid, @Payload String payload){ //日志实体 - DeviceLogDTO logDto = new DeviceLogDTO(); - try{ - logDto.setUserName("运维管理员"); - logDto.setLoginName("njcnyw"); - } catch (Exception e) { - logDto.setUserName("运维管理员"); - logDto.setLoginName("njcnyw"); - } + LogMessage logDto = new LogMessage(); + logDto.setUserIndex("系统"); + logDto.setLoginName("系统"); logDto.setResult(1); //业务处理 Gson gson = new Gson(); ReqAndResDto.Res res = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResDto.Res.class); redisUtil.saveByKeyWithExpire("devResponse",res.getCode(),5L); - if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){ + if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())) { switch (res.getType()){ /** * 装置类型模板应答 @@ -242,15 +226,17 @@ public class MqttMessageHandler { * 3.平台端需读取装置的DevMod来判断网关支持的设备模板(包含设备型号和模板版本),根据app提交的接入子设备DID匹配数据模板(型号及版本),生成DevCfg下发给网关,网关根据下发信息生成就地设备点表。 */ case 4611: - log.info("{},装置模板应答,应答code {}",nDid,res.getCode()); + logDto.setOperate("装置模板应答"); + //log.info("{},装置模板应答,应答code {}",nDid,res.getCode()); ModelDto modelDto = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ModelDto.class); List list = modelDto.getMsg().getDevMod(); List list2 = modelDto.getMsg().getDevCfg(); - if (CollectionUtils.isEmpty(list)){ - log.error(AccessResponseEnum.MODEL_VERSION_ERROR.getMessage()); + if (CollectionUtils.isEmpty(list)) { + //log.error(AccessResponseEnum.MODEL_VERSION_ERROR.getMessage()); + logDto.setOperate("查看装置端模板报文数据"); logDto.setResult(0); logDto.setFailReason(AccessResponseEnum.MODEL_VERSION_ERROR.getMessage()); - csLogsFeignClient.addUserLog(logDto); + logMessageTemplate.sendMember(logDto); //有异常删除缓存的模板信息 redisUtil.delete(AppRedisKey.MODEL + nDid); throw new BusinessException(AccessResponseEnum.MODEL_VERSION_ERROR); @@ -267,11 +253,11 @@ public class MqttMessageHandler { CsModelDto csModelDto = new CsModelDto(); CsDevModelPO po = devModelFeignClient.findModel(item.getDevType(),item.getVersionNo(),item.getVersionDate()).getData(); if (Objects.isNull(po)){ - log.error(AccessResponseEnum.MODEL_NO_FIND.getMessage()); - logDto.setOperate(nDid + "模板缺失"); + //log.error(AccessResponseEnum.MODEL_NO_FIND.getMessage()); + logDto.setOperate(nDid + "查询系统中是否存在模板"); logDto.setResult(0); logDto.setFailReason(AccessResponseEnum.MODEL_NO_FIND.getMessage()); - csLogsFeignClient.addUserLog(logDto); + logMessageTemplate.sendMember(logDto); //有异常删除缓存的模板信息 redisUtil.delete(AppRedisKey.MODEL + nDid); throw new BusinessException(AccessResponseEnum.MODEL_NO_FIND); @@ -279,9 +265,10 @@ public class MqttMessageHandler { if (Objects.equals(po.getType(),0)){ List dataSetList = dataSetFeignClient.getModuleDataSet(po.getId()).getData(); if (CollectionUtils.isEmpty(dataSetList)){ + logDto.setOperate("查看APF模块个数"); logDto.setResult(0); logDto.setFailReason(AccessResponseEnum.MODULE_NUMBER_IS_NULL.getMessage()); - csLogsFeignClient.addUserLog(logDto); + logMessageTemplate.sendMember(logDto); //有异常删除缓存的模板信息 redisUtil.delete(AppRedisKey.MODEL + nDid); throw new BusinessException(AccessResponseEnum.MODULE_NUMBER_IS_NULL); @@ -302,10 +289,11 @@ public class MqttMessageHandler { List lineList = csLineModelService.getMonitorNumByModelId(modelId); String key = AppRedisKey.LINE + nDid; redisUtil.saveByKeyWithExpire(key,lineList,600L); + logMessageTemplate.sendMember(logDto); break; case 4613: logDto.setOperate(nDid + "设备接入"); - log.info("{},收到接入应答响应,应答code {}",nDid,res.getCode()); + //log.info("{},收到接入应答响应,应答code {}",nDid,res.getCode()); if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){ int mid = 1; //修改装置状态 @@ -333,13 +321,13 @@ public class MqttMessageHandler { //录波任务倒计时 redisUtil.saveByKeyWithExpire("startFile:" + nDid,null,60L); } else { - log.info(AccessResponseEnum.ACCESS_RESPONSE_ERROR.getMessage()); + //log.info(AccessResponseEnum.ACCESS_RESPONSE_ERROR.getMessage()); logDto.setResult(0); logDto.setFailReason(AccessResponseEnum.ACCESS_RESPONSE_ERROR.getMessage()); - csLogsFeignClient.addUserLog(logDto); + logMessageTemplate.sendMember(logDto); throw new BusinessException(AccessResponseEnum.ACCESS_RESPONSE_ERROR); } - csLogsFeignClient.addUserLog(logDto); + logMessageTemplate.sendMember(logDto); break; case 4614: RspDataDto rspDataDto = JSON.parseObject(JSON.toJSONString(res.getMsg()), RspDataDto.class); @@ -371,11 +359,13 @@ public class MqttMessageHandler { csSoftInfoFeignClient.removeSoftInfo(soft); } equipmentFeignClient.updateSoftInfo(nDid,csSoftInfoPo.getId()); + logMessageTemplate.sendMember(logDto); break; case 2: List devInfo = JSON.parseArray(JSON.toJSONString(rspDataDto.getDataArray()), RspDataDto.LdevInfo.class); if (CollectionUtil.isNotEmpty(devInfo)){ if (Objects.equals(res.getDid(),1)){ + logDto.setOperate(nDid + "更新APF容量"); List list3 = new ArrayList<>(); boolean hasZeroClDid = devInfo.stream().anyMatch(item -> item.getClDid() == 0); //治理设备 @@ -411,19 +401,23 @@ public class MqttMessageHandler { }); } } + logMessageTemplate.sendMember(logDto); break; case 15: + logDto.setOperate(nDid + "更新设备软件信息"); JSONObject jsonObject = JSONObject.parseObject(JSON.toJSONString(res)); AppAutoDataMessage appAutoDataMessage = JSONObject.toJavaObject(jsonObject, AppAutoDataMessage.class); appAutoDataMessage.setId(nDid); rtFeignClient.apfRtAnalysis(appAutoDataMessage); + logMessageTemplate.sendMember(logDto); break; case 48: - logDto.setUserName("运维管理员"); + logDto.setUserIndex("系统"); logDto.setOperate("监测点:" + (nDid + rspDataDto.getClDid()) + "询问项目列表"); List projectInfoList = JSON.parseArray(JSON.toJSONString(rspDataDto.getDataArray()), RspDataDto.ProjectInfo.class); String key3 = AppRedisKey.PROJECT_INFO + nDid + rspDataDto.getClDid(); redisUtil.saveByKeyWithExpire(key3,projectInfoList,60L); + logMessageTemplate.sendMember(logDto); break; default: break; @@ -441,10 +435,11 @@ public class MqttMessageHandler { } } else { String result = getEnum(res.getCode()); - log.info(result); + //log.info(result); + logDto.setOperate("装置响应"); logDto.setResult(0); logDto.setFailReason(result); - csLogsFeignClient.addUserLog(logDto); + logMessageTemplate.sendMember(logDto); throw new BusinessException(result); } } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java index d933915..d923e95 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java @@ -3,10 +3,7 @@ package com.njcn.access.listener; import cn.hutool.core.util.ObjectUtil; import com.alibaba.nacos.shaded.com.google.gson.Gson; import com.github.tocrhz.mqtt.publisher.MqttPublisher; -import com.njcn.access.service.ICsEquipmentDeliveryService; import com.njcn.access.utils.RedisSetUtil; -import com.njcn.access.utils.SendMessageUtil; -import com.njcn.csdevice.api.*; import com.njcn.redis.utils.RedisUtil; import com.njcn.rt.pojo.dto.BaseRealDataSet; import lombok.extern.slf4j.Slf4j; @@ -27,27 +24,12 @@ import java.util.Set; @Slf4j @Component public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener { - - @Resource - private ICsEquipmentDeliveryService csEquipmentDeliveryService; - @Resource - private CsLogsFeignClient csLogsFeignClient; - @Resource - private CsLedgerFeignClient csLedgerFeignclient; - @Resource - private EquipmentFeignClient equipmentFeignClient; @Resource private RedisUtil redisUtil; @Resource - private SendMessageUtil sendMessageUtil; - @Resource - private CsCommunicateFeignClient csCommunicateFeignClient; - @Resource private MqttPublisher publisher; @Resource private RedisSetUtil redisSetUtil; - @Resource - private DeviceMessageFeignClient deviceMessageFeignClient; public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessApplicationRunner.java b/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessApplicationRunner.java index a0e557b..b57a084 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessApplicationRunner.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessApplicationRunner.java @@ -4,11 +4,14 @@ import cn.hutool.core.collection.CollUtil; import com.njcn.access.service.ICsEquipmentDeliveryService; import com.njcn.access.service.ICsTopicService; import com.njcn.access.service.impl.CsDeviceServiceImpl; +import com.njcn.access.utils.ChannelObjectUtil; import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; +import com.njcn.mq.message.LogMessage; +import com.njcn.mq.template.LogMessageTemplate; import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.utils.RedisUtil; -import com.njcn.system.api.DictTreeFeignClient; import com.njcn.system.enums.DicDataEnum; +import com.njcn.system.pojo.po.SysDicTreePO; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.ApplicationArguments; @@ -17,8 +20,10 @@ import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.*; +import java.util.stream.Collectors; /** * 类的介绍:用来重新发起设备的接入,存在程序意外停止了,缓存失效导致无法更新装置的状态,所以需要在程序启动时发起设备的接入 @@ -36,7 +41,8 @@ public class AccessApplicationRunner implements ApplicationRunner { private final ICsEquipmentDeliveryService csEquipmentDeliveryService; private final ICsTopicService csTopicService; private final CsDeviceServiceImpl csDeviceService; - private final DictTreeFeignClient dictTreeFeignClient; + private final LogMessageTemplate logMessageTemplate; + private final ChannelObjectUtil channelObjectUtil; ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private static final long ACCESS_TIME = 60L; @@ -48,6 +54,13 @@ public class AccessApplicationRunner implements ApplicationRunner { log.info("系统重启,所有符合条件的装置发起接入!"); List list = csEquipmentDeliveryService.getOnlineDev(); if (CollUtil.isNotEmpty(list)) { + //获取字典数据 + List dictTreeKey = channelObjectUtil.objectToList(redisUtil.getObjectByKey(AppRedisKey.DICT_TREE), SysDicTreePO.class); + Map dictTreeMap = dictTreeKey.stream().collect(Collectors.toMap(SysDicTreePO::getId, item -> item)); + //获取主题版本信息 + List nDidIds = list.stream().map(CsEquipmentDeliveryPO::getNdid).collect(Collectors.toList()); + Map topicVersions = csTopicService.getVersion(nDidIds); + ExecutorService executor = Executors.newFixedThreadPool(10); // 将任务平均分配给10个子列表 List> subLists = new ArrayList<>(); @@ -65,7 +78,7 @@ public class AccessApplicationRunner implements ApplicationRunner { futures.add(executor.submit(new Callable() { @Override public Void call() { - accessDev(subLists.get(index)); + accessDev(subLists.get(index), dictTreeMap, topicVersions); return null; } })); @@ -86,25 +99,48 @@ public class AccessApplicationRunner implements ApplicationRunner { scheduler.shutdown(); } - public void accessDev(List list) { + public void accessDev(List list, Map dictTreeMap, Map topicVersions) { if (CollUtil.isNotEmpty(list)) { + List successIds = new ArrayList<>(); + List failIds = new ArrayList<>(); try { list.forEach(item->{ //System.out.println(Thread.currentThread().getName() + ": reboot : nDid : " + item.getNdid()); //判断设备类型 便携式设备需要特殊处理 未注册的要先注册、再接入;已注册的直接重新接入 - String code = dictTreeFeignClient.queryById(item.getDevType()).getData().getCode(); - if (Objects.equals(code, DicDataEnum.PORTABLE.getCode()) && Objects.equals(item.getStatus(),1)) { + if (Objects.equals(dictTreeMap.get(item.getDevType()).getCode(), DicDataEnum.PORTABLE.getCode()) && Objects.equals(item.getStatus(),1)) { //csDeviceService.wlDevRegister(item.getNdid()); log.info("请先手动注册、接入"); } else { - String version = csTopicService.getVersion(item.getNdid()); + String version = topicVersions.get(item.getNdid()); if (Objects.isNull(version)) { version = "V1"; } - csDeviceService.autoAccess(item.getNdid(),version,1); + boolean result = csDeviceService.autoAccess(item.getNdid(),version,1); + if (result) { + redisUtil.saveByKey(AppRedisKey.DEVICE_MID + item.getNdid(), 1); + successIds.add(item.getNdid()); + } else { + failIds.add(item.getNdid()); + } } - redisUtil.saveByKey(AppRedisKey.DEVICE_MID + item.getNdid(),1); }); + if (CollUtil.isNotEmpty(successIds)) { + LogMessage logDto = new LogMessage(); + logDto.setUserIndex("系统首次启动"); + logDto.setLoginName("系统首次启动"); + logDto.setResult(1); + logDto.setOperate(String.join(",", successIds) + "装置接入"); + logMessageTemplate.sendMember(logDto); + } + if (CollUtil.isNotEmpty(failIds)) { + LogMessage logDto = new LogMessage(); + logDto.setUserIndex("系统首次启动"); + logDto.setLoginName("系统首次启动"); + logDto.setResult(0); + logDto.setOperate(String.join(",", failIds) + "装置接入"); + logDto.setFailReason("装置不在线或者系统询问装置模板信息,装置未响应"); + logMessageTemplate.sendMember(logDto); + } } catch (Exception e) { log.error(e.getMessage()); } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/runner/AutoAccessTimer.java b/iot-access/access-boot/src/main/java/com/njcn/access/runner/AutoAccessTimer.java index 1c1b48f..4166400 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/runner/AutoAccessTimer.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/runner/AutoAccessTimer.java @@ -4,11 +4,14 @@ import cn.hutool.core.collection.CollUtil; import com.njcn.access.service.ICsEquipmentDeliveryService; import com.njcn.access.service.ICsTopicService; import com.njcn.access.service.impl.CsDeviceServiceImpl; +import com.njcn.access.utils.ChannelObjectUtil; import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; +import com.njcn.mq.message.LogMessage; +import com.njcn.mq.template.LogMessageTemplate; import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.utils.RedisUtil; -import com.njcn.system.api.DictTreeFeignClient; import com.njcn.system.enums.DicDataEnum; +import com.njcn.system.pojo.po.SysDicTreePO; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.ApplicationArguments; @@ -17,8 +20,10 @@ import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.*; +import java.util.stream.Collectors; /** * @author xy @@ -34,7 +39,8 @@ public class AutoAccessTimer implements ApplicationRunner { private final ICsEquipmentDeliveryService csEquipmentDeliveryService; private final ICsTopicService csTopicService; private final CsDeviceServiceImpl csDeviceService; - private final DictTreeFeignClient dictTreeFeignClient; + private final ChannelObjectUtil channelObjectUtil; + private final LogMessageTemplate logMessageTemplate; @Override public void run(ApplicationArguments args) { @@ -87,6 +93,13 @@ public class AutoAccessTimer implements ApplicationRunner { log.info("轮询定时任务执行中!"); List list = csEquipmentDeliveryService.getOfflineDev(); if (CollUtil.isNotEmpty(list)) { + //获取字典数据 + List dictTreeKey = channelObjectUtil.objectToList(redisUtil.getObjectByKey(AppRedisKey.DICT_TREE), SysDicTreePO.class); + Map dictTreeMap = dictTreeKey.stream().collect(Collectors.toMap(SysDicTreePO::getId, item -> item)); + //获取主题版本信息 + List nDidIds = list.stream().map(CsEquipmentDeliveryPO::getNdid).collect(Collectors.toList()); + Map topicVersions = csTopicService.getVersion(nDidIds); + ExecutorService executor = Executors.newFixedThreadPool(10); try { List> subLists = CollUtil.split(list, 10); @@ -94,7 +107,7 @@ public class AutoAccessTimer implements ApplicationRunner { for (List subList : subLists) { futures.add(executor.submit(() -> { try { - accessDevSafely(subList); // 使用安全版本 + accessDevSafely(subList,dictTreeMap,topicVersions); } catch (Exception e) { log.error("处理设备子列表异常", e); } @@ -125,40 +138,55 @@ public class AutoAccessTimer implements ApplicationRunner { } //安全的accessDev版本 - private void accessDevSafely(List list) { + private void accessDevSafely(List list, Map dictTreeMap, Map topicVersions) { if (CollUtil.isNotEmpty(list)) { + List successIds = new ArrayList<>(); + List failIds = new ArrayList<>(); for (CsEquipmentDeliveryPO item : list) { try { - processSingleDevice(item); + if (Objects.equals(dictTreeMap.get(item.getDevType()).getCode(), DicDataEnum.PORTABLE.getCode()) && Objects.equals(item.getStatus(), 1)) { + log.info("设备 {} 需要手动注册、接入", item.getNdid()); + } else { + String version = topicVersions.get(item.getNdid()); + if (Objects.isNull(version)) { + version = "V1"; + } + // 使用try-catch确保单个设备失败不影响其他设备 + try { + boolean success = csDeviceService.autoAccess2(item.getNdid(), version, 1); + if (success) { + redisUtil.saveByKey(AppRedisKey.DEVICE_MID + item.getNdid(), 1); + successIds.add(item.getNdid()); + } else { + log.warn("设备 {} 接入失败", item.getNdid()); + failIds.add(item.getNdid()); + } + } catch (Exception e) { + log.error("设备 {} 接入异常: {}", item.getNdid(), e.getMessage()); + } + } } catch (Exception e) { log.error("处理设备 {} 失败: {}", item.getNdid(), e.getMessage()); } } - } - } - - private void processSingleDevice(CsEquipmentDeliveryPO item) { - //System.out.println(Thread.currentThread().getName() + ": auto : nDid : " + item.getNdid()); - String code = dictTreeFeignClient.queryById(item.getDevType()).getData().getCode(); - if (Objects.equals(code, DicDataEnum.PORTABLE.getCode()) && Objects.equals(item.getStatus(), 1)) { - log.info("设备 {} 需要手动注册、接入", item.getNdid()); - } else { - String version = csTopicService.getVersion(item.getNdid()); - if (Objects.isNull(version)) { - version = "V1"; + if (CollUtil.isNotEmpty(successIds)) { + LogMessage logDto = new LogMessage(); + logDto.setUserIndex("系统首次启动"); + logDto.setLoginName("系统首次启动"); + logDto.setResult(1); + logDto.setOperate(String.join(",", successIds) + "装置接入"); + logMessageTemplate.sendMember(logDto); } - // 使用try-catch确保单个设备失败不影响其他设备 - try { - boolean success = csDeviceService.autoAccess2(item.getNdid(), version, 1); - if (success) { - redisUtil.saveByKey(AppRedisKey.DEVICE_MID + item.getNdid(), 1); - } else { - log.warn("设备 {} 接入失败", item.getNdid()); - } - } catch (Exception e) { - log.error("设备 {} 接入异常: {}", item.getNdid(), e.getMessage()); + if (CollUtil.isNotEmpty(failIds)) { + LogMessage logDto = new LogMessage(); + logDto.setUserIndex("系统首次启动"); + logDto.setLoginName("系统首次启动"); + logDto.setResult(0); + logDto.setOperate(String.join(",", failIds) + "装置接入"); + logDto.setFailReason("装置不在线或者系统询问装置模板信息,装置未响应"); + logMessageTemplate.sendMember(logDto); } } - } + } } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsTopicService.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsTopicService.java index 912237b..9336a6a 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsTopicService.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsTopicService.java @@ -4,6 +4,7 @@ import com.baomidou.mybatisplus.extension.service.IService; import com.njcn.access.pojo.po.CsTopic; import java.util.List; +import java.util.Map; /** *

@@ -28,5 +29,7 @@ public interface ICsTopicService extends IService { */ String getVersion(String nDid); + Map getVersion(List list); + void deleteByNDid(String nDid); } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java index a7408d3..3238e87 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java @@ -22,13 +22,18 @@ import com.njcn.access.utils.CRC32Utils; import com.njcn.access.utils.JsonUtil; import com.njcn.common.pojo.dto.DeviceLogDTO; import com.njcn.common.pojo.exception.BusinessException; -import com.njcn.csdevice.api.*; +import com.njcn.csdevice.api.CsLineFeignClient; +import com.njcn.csdevice.api.DevModelFeignClient; +import com.njcn.csdevice.api.DevModelRelationFeignClient; +import com.njcn.csdevice.api.EquipmentFeignClient; import com.njcn.csdevice.enums.AlgorithmResponseEnum; import com.njcn.csdevice.pojo.param.CsDevModelAddParm; import com.njcn.csdevice.pojo.po.CsDataArray; import com.njcn.csdevice.pojo.po.CsDataSet; import com.njcn.csdevice.pojo.po.CsDevModelPO; import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; +import com.njcn.mq.message.LogMessage; +import com.njcn.mq.template.LogMessageTemplate; import com.njcn.oss.constant.OssPath; import com.njcn.oss.utils.FileStorageUtil; import com.njcn.redis.pojo.enums.AppRedisKey; @@ -74,7 +79,6 @@ public class CsDevModelServiceImpl implements ICsDevModelService { private final ICsLineModelService csLineModelService; private final ICsGroupService csGroupService; private final ICsGroArrService csGroArrService; - private final CsLogsFeignClient csLogsFeignClient; private final EleWaveFeignClient waveFeignClient; private final DictTreeFeignClient dictTreeFeignClient; private final MqttPublisher publisher; @@ -83,13 +87,14 @@ public class CsDevModelServiceImpl implements ICsDevModelService { private final EquipmentFeignClient eequipmentFeignClient; private final DevModelRelationFeignClient devModelRelationFeignClient; private final CsLineFeignClient csLineFeignClient; + private final LogMessageTemplate logMessageTemplate; @Override @Transactional(rollbackFor = {Exception.class}) public void addModel(MultipartFile file) { //日志实体 - DeviceLogDTO logDto = new DeviceLogDTO(); - logDto.setUserName(RequestUtil.getUserNickname()); + LogMessage logDto = new LogMessage(); + logDto.setUserIndex(RequestUtil.getUserNickname()); logDto.setLoginName(RequestUtil.getUsername()); logDto.setResult(1); String json = null; @@ -139,11 +144,11 @@ public class CsDevModelServiceImpl implements ICsDevModelService { } //5.清空模板缓存 redisUtil.deleteKeysByString(AppRedisKey.DEV_MODEL); - csLogsFeignClient.addUserLog(logDto); + logMessageTemplate.sendMember(logDto); } catch (Exception e) { logDto.setResult(0); logDto.setFailReason(AccessResponseEnum.MODEL_ANALYSIS_ERROR.getMessage()); - csLogsFeignClient.addUserLog(logDto); + logMessageTemplate.sendMember(logDto); throw new BusinessException(e.getMessage()); } } @@ -152,8 +157,8 @@ public class CsDevModelServiceImpl implements ICsDevModelService { @Transactional(rollbackFor = {Exception.class}) public void addDict(MultipartFile file) { //日志实体 - DeviceLogDTO logDto = new DeviceLogDTO(); - logDto.setUserName(RequestUtil.getUserNickname()); + LogMessage logDto = new LogMessage(); + logDto.setUserIndex(RequestUtil.getUserNickname()); logDto.setLoginName(RequestUtil.getUsername()); logDto.setResult(1); String json = null; @@ -163,11 +168,11 @@ public class CsDevModelServiceImpl implements ICsDevModelService { TemplateDto templateDto = gson.fromJson(json, TemplateDto.class); logDto.setOperate(templateDto.getDevType() + "录入通用字典"); analysisDict(templateDto); - csLogsFeignClient.addUserLog(logDto); + logMessageTemplate.sendMember(logDto); } catch (Exception e) { logDto.setResult(0); logDto.setFailReason(AccessResponseEnum.DICT_ANALYSIS_ERROR.getMessage()); - csLogsFeignClient.addUserLog(logDto); + logMessageTemplate.sendMember(logDto); throw new BusinessException(AccessResponseEnum.DICT_ANALYSIS_ERROR); } } @@ -201,7 +206,7 @@ public class CsDevModelServiceImpl implements ICsDevModelService { //发送数据给前端 String json = "{fileName:"+file.getOriginalFilename()+",allStep:"+times+",nowStep:"+i+"}"; publisher.send("/Web/Progress/" + id, new Gson().toJson(json), 1, false); - DeviceLogDTO logDto = new DeviceLogDTO(); + LogMessage logDto = new LogMessage(); byte[] lsBytes; if (length > 50*1024) { lsBytes = Arrays.copyOfRange(bytes, (i - 1) * cap, i * cap); @@ -227,25 +232,26 @@ public class CsDevModelServiceImpl implements ICsDevModelService { //判断是否重发 sendNextStep(logDto,path,file,bytes.length,lsBytes,(i-1)*cap,version,id,i,hexString,true); } - csLogsFeignClient.addUserLog(logDto); + logMessageTemplate.sendMember(logDto); } } else { String json = "{fileName:"+file.getOriginalFilename()+",allStep:\""+1+"\",nowStep:"+1+"}"; publisher.send("/Web/Progress", new Gson().toJson(json), 1, false); ReqAndResDto.Req req = getPojo(1,path,file,length,bytes,0,hexString); publisher.send("/Pfm/DevFileCmd/" + version + "/" + id, new Gson().toJson(req), 1, false); - DeviceLogDTO logDto = new DeviceLogDTO(); + LogMessage logDto = new LogMessage(); logDto.setOperate(id + "系统上送文件,当前文件只有1帧"); logDto.setResult(1); - csLogsFeignClient.addUserLog(logDto); + logMessageTemplate.sendMember(logDto); //判断是否重发 sendNextStep(logDto,path,file,length,bytes,0,version,id,1,hexString,false); } } catch (Exception e) { - DeviceLogDTO logDto = new DeviceLogDTO(); + LogMessage logDto = new LogMessage(); logDto.setResult(0); - logDto.setFailReason(AccessResponseEnum.UPLOAD_ERROR.getMessage()); - csLogsFeignClient.addUserLog(logDto); + logDto.setOperate("系统上传文件"); + logDto.setFailReason(e.getMessage()); + logMessageTemplate.sendMember(logDto); throw new BusinessException(AccessResponseEnum.UPLOAD_ERROR); } } @@ -279,7 +285,7 @@ public class CsDevModelServiceImpl implements ICsDevModelService { /** * 根据装置响应来判断发送的内容 */ - public void sendNextStep(DeviceLogDTO logDto, String path, MultipartFile file, int length, byte[] bytes, Integer offset, String version, String id, int mid, String fileCheck, boolean result) { + public void sendNextStep(LogMessage logDto, String path, MultipartFile file, int length, byte[] bytes, Integer offset, String version, String id, int mid, String fileCheck, boolean result) { try { for (int i = 0; i < 30; i++) { if (result) { @@ -303,16 +309,17 @@ public class CsDevModelServiceImpl implements ICsDevModelService { publisher.send("/Pfm/DevFileCmd/" + version + "/" + id, new Gson().toJson(req), 1, false); logDto.setOperate(id + "系统上送文件,装置响应失败,重新发送,这是第" + (i+1) + "次"); logDto.setResult(1); - csLogsFeignClient.addUserLog(logDto); + logMessageTemplate.sendMember(logDto); } } } } catch (InterruptedException e) { assert logDto != null; logDto.setResult(0); - logDto.setFailReason(AccessResponseEnum.RELOAD_UPLOAD_ERROR.getMessage()); - csLogsFeignClient.addUserLog(logDto); - throw new RuntimeException(e); + logDto.setOperate("平台重新上送文件"); + logDto.setFailReason(e.getMessage()); + logMessageTemplate.sendMember(logDto); + throw new BusinessException(AccessResponseEnum.RELOAD_UPLOAD_ERROR); } } @@ -322,8 +329,8 @@ public class CsDevModelServiceImpl implements ICsDevModelService { */ private CsDevModelPO addCsDevModel(TemplateDto templateDto, String filePath){ //日志实体 - DeviceLogDTO logDto = new DeviceLogDTO(); - logDto.setUserName(RequestUtil.getUserNickname()); + LogMessage logDto = new LogMessage(); + logDto.setUserIndex(RequestUtil.getUserNickname()); logDto.setLoginName(RequestUtil.getUsername()); logDto.setOperate("新增"+templateDto.getDevType()+"模板数据"); logDto.setResult(1); @@ -331,23 +338,9 @@ public class CsDevModelServiceImpl implements ICsDevModelService { if (!Objects.isNull(po)){ logDto.setResult(0); logDto.setFailReason(AccessResponseEnum.MODEL_REPEAT.getMessage()); - csLogsFeignClient.addUserLog(logDto); + logMessageTemplate.sendMember(logDto); throw new BusinessException(AccessResponseEnum.MODEL_REPEAT); } -// CsDevModelPO model = new CsDevModelPO(); -// model.setDevTypeName(templateDto.getDevType()); -// model.setName(templateDto.getDevType()); -// model.setVersionNo(templateDto.getVersion()); -// model.setVersionDate(Date.valueOf(templateDto.getTime())); -// model.setFilePath(filePath); -// model.setStatus ("1"); -// //fixme 先用数据类型来区分模板的类型 -// if (templateDto.getDataList().contains("Apf") || templateDto.getDataList().contains("Dvr")){ -// model.setType(0); -// } else { -// model.setType(1); -// } -// csDevModelMapper.insert(model); CsDevModelAddParm csDevModelAddParm = new CsDevModelAddParm(); csDevModelAddParm.setDevTypeName(templateDto.getDevType()); csDevModelAddParm.setName(templateDto.getDevType()); @@ -361,7 +354,7 @@ public class CsDevModelServiceImpl implements ICsDevModelService { csDevModelAddParm.setType(1); } CsDevModelPO model = devModelFeignClient.addDevModel(csDevModelAddParm).getData(); - csLogsFeignClient.addUserLog(logDto); + logMessageTemplate.sendMember(logDto); return model; } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java index 0dc2c69..44c77a6 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java @@ -27,6 +27,8 @@ import com.njcn.csdevice.pojo.param.*; import com.njcn.csdevice.pojo.po.*; import com.njcn.csdevice.pojo.vo.CsEquipmentDeliveryVO; import com.njcn.csharmonic.api.CsHarmonicPlanLineFeignClient; +import com.njcn.mq.message.LogMessage; +import com.njcn.mq.template.LogMessageTemplate; import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.utils.RedisUtil; import com.njcn.system.api.DicDataFeignClient; @@ -75,7 +77,6 @@ public class CsDeviceServiceImpl implements ICsDeviceService { private final MqttUtil mqttUtil; private final ICsTopicService csTopicService; private final DicDataFeignClient dicDataFeignClient; - private final CsLogsFeignClient csLogsFeignClient; private final ProcessFeignClient processFeignClient; private final CsLinePOService csLinePOService; private final CsDeviceUserPOService csDeviceUserPOService; @@ -89,56 +90,61 @@ public class CsDeviceServiceImpl implements ICsDeviceService { private final AppProjectFeignClient appProjectFeignClient; private final DeviceMessageFeignClient deviceMessageFeignClient; private final CsHarmonicPlanLineFeignClient csHarmonicPlanLineFeignClient; + private final LogMessageTemplate logMessageTemplate; @Override @Transactional(rollbackFor = {Exception.class}) public void devRegister(String nDid,Integer type) { //日志实体 - DeviceLogDTO logDto = new DeviceLogDTO(); - logDto.setUserName(RequestUtil.getUserNickname()); - logDto.setLoginName(RequestUtil.getUsername()); - logDto.setOperate("直连设备"+nDid+"注册"); - logDto.setResult(1); + LogMessage message = new LogMessage(); + message.setUserIndex(RequestUtil.getUserNickname()); + message.setLoginName(RequestUtil.getUsername()); + message.setResult(1); + //1.判断客户端是否在线 + String clientName = "NJCN-" + nDid.substring(nDid.length() - 6); + boolean mqttClient = mqttUtil.judgeClientOnline(clientName); + if (!mqttClient){ + message.setOperate("直连装置"+nDid+"注册中,判断装置是否连接MQTT服务器"); + message.setResult(0); + message.setFailReason(AlgorithmResponseEnum.DEV_OFFLINE.getMessage()); + logMessageTemplate.sendMember(message); + throw new BusinessException(AlgorithmResponseEnum.DEV_OFFLINE); + } //1.判断nDid是否存在 CsEquipmentDeliveryVO csEquipmentDeliveryVO = csEquipmentDeliveryService.queryEquipmentBynDid(nDid); if (Objects.isNull(csEquipmentDeliveryVO.getNdid())){ - logDto.setResult(0); - logDto.setFailReason(AccessResponseEnum.NDID_NO_FIND.getMessage()); - csLogsFeignClient.addUserLog(logDto); + message.setOperate("直连装置"+nDid+"注册中,判断nDid是否存在"); + message.setResult(0); + message.setFailReason(AccessResponseEnum.NDID_NO_FIND.getMessage()); + logMessageTemplate.sendMember(message); throw new BusinessException(AccessResponseEnum.NDID_NO_FIND); } - //2.判断设备是否是直连设备 + //2.判断装置是否是直连装置 SysDicTreePO sysDicTreePo = dictTreeFeignClient.queryById(csEquipmentDeliveryVO.getDevType()).getData(); if (Objects.isNull(sysDicTreePo)){ - logDto.setResult(0); - logDto.setFailReason(AccessResponseEnum.DEV_NOT_FIND.getMessage()); - csLogsFeignClient.addUserLog(logDto); + message.setOperate("直连装置"+nDid+"注册中,判断装置型号"); + message.setResult(0); + message.setFailReason(AccessResponseEnum.DEV_NOT_FIND.getMessage()); + logMessageTemplate.sendMember(message); throw new BusinessException(AccessResponseEnum.DEV_NOT_FIND); } String code = sysDicTreePo.getCode(); if (!Objects.equals(code, DicDataEnum.CONNECT_DEV.getCode())){ - logDto.setResult(0); - logDto.setFailReason(AccessResponseEnum.DEV_IS_NOT_ZL.getMessage()); - csLogsFeignClient.addUserLog(logDto); + message.setOperate("直连装置"+nDid+"注册中,判断装置是否为直连装置"); + message.setResult(0); + message.setFailReason(AccessResponseEnum.DEV_IS_NOT_ZL.getMessage()); + logMessageTemplate.sendMember(message); throw new BusinessException(AccessResponseEnum.DEV_IS_NOT_ZL); } //3.判断是否已经注册过 if (!Objects.isNull(csEquipmentDeliveryVO.getNdid()) && Objects.equals(type,csEquipmentDeliveryVO.getProcess()) && Objects.equals(AccessEnum.ACCESS.getCode(),csEquipmentDeliveryVO.getStatus())){ - logDto.setResult(0); - logDto.setFailReason(AccessResponseEnum.NDID_SAME_STEP.getMessage()); - csLogsFeignClient.addUserLog(logDto); + message.setOperate("直连装置"+nDid+"注册中,判断装置是否已经注册"); + message.setResult(0); + message.setFailReason(AccessResponseEnum.NDID_SAME_STEP.getMessage()); + logMessageTemplate.sendMember(message); throw new BusinessException(AccessResponseEnum.NDID_SAME_STEP); } - //4.判断客户端是否在线 - String clientName = "NJCN-" + nDid.substring(nDid.length() - 6); - boolean mqttClient = mqttUtil.judgeClientOnline(clientName); - if (!mqttClient){ - logDto.setResult(0); - logDto.setFailReason(AccessResponseEnum.MISSING_CLIENT.getMessage()); - csLogsFeignClient.addUserLog(logDto); - throw new BusinessException(AccessResponseEnum.MISSING_CLIENT); - } - //5.判断当前流程是否是合法的 + //4.判断当前流程是否是合法的 //note(重要说明) 这边流程原先是三个阶段,在实际应用中嫌麻烦,简化为一个流程 // if (csEquipmentDeliveryVO.getProcess() > type){ // logDto.setResult(0); @@ -149,30 +155,32 @@ public class CsDeviceServiceImpl implements ICsDeviceService { // logDto.setFailReason(AccessResponseEnum.PROCESS_MISSING_ERROR.getMessage()); // throw new BusinessException(AccessResponseEnum.PROCESS_MISSING_ERROR); // } - //6.询问设备支持的主题信息 + //5.询问装置支持的主题信息 //将支持的主题入库 askTopic(nDid); //7.MQTT询问装置用的模板,并判断库中是否存在模板 //存在则建立关系;不存在则告警出来 SysDicTreePO dictData = dictTreeFeignClient.queryById(csEquipmentDeliveryVO.getDevModel()).getData(); if (Objects.isNull(dictData)){ - logDto.setResult(0); - logDto.setFailReason(AccessResponseEnum.DEV_MODEL_NOT_FIND.getMessage()); - csLogsFeignClient.addUserLog(logDto); + message.setOperate("直连装置"+nDid+"注册中,判断系统中是否存在装置型号"); + message.setResult(0); + message.setFailReason(AccessResponseEnum.DEV_MODEL_NOT_FIND.getMessage()); + logMessageTemplate.sendMember(message); throw new BusinessException(AccessResponseEnum.DEV_MODEL_NOT_FIND); } String devModel = dictData.getCode(); zhiLianRegister(nDid,devModel); - csLogsFeignClient.addUserLog(logDto); + message.setOperate("直连装置"+nDid+"下发注册报文"); + logMessageTemplate.sendMember(message); } @Override public Object getModel(String nDid) { //日志实体 - DeviceLogDTO logDto = new DeviceLogDTO(); - logDto.setUserName(RequestUtil.getUserNickname()); + LogMessage logDto = new LogMessage(); + logDto.setUserIndex(RequestUtil.getUserNickname()); logDto.setLoginName(RequestUtil.getUsername()); - logDto.setOperate("获取"+nDid+"设备模板信息"); + logDto.setOperate("获取"+nDid+"装置模板信息"); logDto.setResult(1); Object model = null; try { @@ -182,13 +190,13 @@ public class CsDeviceServiceImpl implements ICsDeviceService { if (Objects.isNull(model)){ logDto.setResult(0); logDto.setFailReason(AccessResponseEnum.MODEL_MISS.getMessage()); - csLogsFeignClient.addUserLog(logDto); + logMessageTemplate.sendMember(logDto); throw new BusinessException(AccessResponseEnum.MODEL_MISS); } } catch (InterruptedException e) { e.printStackTrace(); } - csLogsFeignClient.addUserLog(logDto); + logMessageTemplate.sendMember(logDto); return model; } @@ -197,10 +205,10 @@ public class CsDeviceServiceImpl implements ICsDeviceService { //fixme 这边事务不起作用,中途出错会导致数据部分录入,再次接入会报主键冲突,所以暂时加了个重置按钮,清空台账数据的 public void devAccess(DevAccessParam devAccessParam) { //日志实体 - DeviceLogDTO logDto = new DeviceLogDTO(); - logDto.setUserName(RequestUtil.getUserNickname()); + LogMessage logDto = new LogMessage(); + logDto.setUserIndex(RequestUtil.getUserNickname()); logDto.setLoginName(RequestUtil.getUsername()); - logDto.setOperate("设备"+devAccessParam.getNDid()+"接入"); + logDto.setOperate("装置"+devAccessParam.getNDid()+"接入"); logDto.setResult(1); try { //获取版本 @@ -217,7 +225,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService { csLedgerParam.setSort(0); csLedgerService.addLedgerTree(csLedgerParam); List modelId = channelObjectUtil.objectToList(redisUtil.getObjectByKey(AppRedisKey.MODEL + devAccessParam.getNDid()),CsModelDto.class); - //2.新增装置-模板关系、获取电能质量的逻辑设备id + //2.新增装置-模板关系、获取电能质量的逻辑装置id for (CsModelDto item : modelId) { CsDevModelRelationAddParm csDevModelRelationAddParm = new CsDevModelRelationAddParm(); csDevModelRelationAddParm.setDevId(vo.getId()); @@ -287,7 +295,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService { if (CollectionUtil.isNotEmpty(lineList)){ logDto.setResult(0); logDto.setFailReason(AccessResponseEnum.LINE_POSITION_REPEAT.getMessage()); - csLogsFeignClient.addUserLog(logDto); + logMessageTemplate.sendMember(logDto); throw new BusinessException(AccessResponseEnum.LINE_POSITION_REPEAT); } //删除监测点稳态指标告警的默认指标配置 @@ -310,16 +318,14 @@ public class CsDeviceServiceImpl implements ICsDeviceService { po.setSubUserId(RequestUtil.getUserIndex()); po.setDeviceId(vo.getId()); csDeviceUserService.saveBatch(Collections.singletonList(po)); - //6.修改装置状态;修改设备接入的工程、项目 + //6.修改装置状态;修改装置接入的工程、项目 csEquipmentDeliveryService.updateStatusBynDid(devAccessParam.getNDid(), AccessEnum.REGISTERED.getCode(),devAccessParam.getEngineeringId(), devAccessParam.getProjectId()); //7.发起自动接入请求 devAccessAskTemplate(devAccessParam.getNDid(),version,1); //8.删除redis监测点模板信息 redisUtil.delete(AppRedisKey.MODEL + devAccessParam.getNDid()); redisUtil.delete(AppRedisKey.LINE + devAccessParam.getNDid()); - //9.存储日志 - csLogsFeignClient.addUserLog(logDto); - //10.存储设备调试日志表 + //9.存储装置调试日志表 CsEquipmentProcessPO csEquipmentProcess = new CsEquipmentProcessPO(); csEquipmentProcess.setDevId(devAccessParam.getNDid()); csEquipmentProcess.setOperator(RequestUtil.getUserIndex()); @@ -330,7 +336,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService { csEquipmentProcess.setStatus(1); } processFeignClient.add(csEquipmentProcess); - //11.这里会出现工程用户接入设备时,如果当前工程用户并没有关注,接入之后应该将用户和工程关联起来 + //11.这里会出现工程用户接入装置时,如果当前工程用户并没有关注,接入之后应该将用户和工程关联起来 List users = userFeignClient.getUserVOByIdList(Collections.singletonList(RequestUtil.getUserIndex())).getData(); if (CollectionUtil.isNotEmpty(users)) { UserVO userVO = users.get(0); @@ -338,10 +344,12 @@ public class CsDeviceServiceImpl implements ICsDeviceService { csMarketDataFeignClient.insertData(userVO.getId(), devAccessParam.getEngineeringId()); } } + //10.存储日志 + logMessageTemplate.sendMember(logDto); } catch (Exception e) { logDto.setResult(0); logDto.setFailReason(e.getMessage()); - csLogsFeignClient.addUserLog(logDto); + logMessageTemplate.sendMember(logDto); throw new BusinessException(CommonResponseEnum.FAIL); } } @@ -367,7 +375,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService { //清除关系表 QueryWrapper csLedgerQueryWrapper = new QueryWrapper<>(); /** - * 删除设备 + * 删除装置 */ csLedgerQueryWrapper.clear(); csLedgerQueryWrapper.eq("id",devId); @@ -412,16 +420,16 @@ public class CsDeviceServiceImpl implements ICsDeviceService { @Transactional(rollbackFor = Exception.class) public String wlDevRegister(String nDid) { String result = "fail"; - // 根据模板接入设备 - DeviceLogDTO logDto = new DeviceLogDTO(); - logDto.setUserName(RequestUtil.getUserNickname()); + // 根据模板接入装置 + LogMessage logDto = new LogMessage(); + logDto.setUserIndex(RequestUtil.getUserNickname()); logDto.setLoginName(RequestUtil.getUsername()); - logDto.setOperate("便携式设备"+nDid+"注册、接入"); + logDto.setOperate("便携式装置"+nDid+"注册、接入"); logDto.setResult(1); try { - // 设备状态判断 + // 装置状态判断 checkDeviceStatus(nDid); - // 询问设备支持的主题信息,并将支持的主题入库 + // 询问装置支持的主题信息,并将支持的主题入库 askAndStoreTopics(nDid); Thread.sleep(2000); // MQTT询问装置用的模板,并判断库中是否存在模板 @@ -430,10 +438,10 @@ public class CsDeviceServiceImpl implements ICsDeviceService { CsEquipmentDeliveryVO vo = equipmentFeignClient.queryEquipmentByndid(nDid).getData(); List csLinePoList = new ArrayList<>(); //1.录入装置台账信息 - //note 1、这边发现便携式设备注册时,如果没有工程 项目,后期特殊处理非常的麻烦,这边接入时,先查询工程 项目,如果没有则创建;如果存在则直接使用; - //note 2、查询之前已经接入过的便携式设备,如果存在修改台账信息,添加工程、项目 + //note 1、这边发现便携式装置注册时,如果没有工程 项目,后期特殊处理非常的麻烦,这边接入时,先查询工程 项目,如果没有则创建;如果存在则直接使用; + //note 2、查询之前已经接入过的便携式装置,如果存在修改台账信息,添加工程、项目 String projectId = this.autoPortableLedger(); - //新增便携式设备 + //新增便携式装置 CsLedgerParam csLedgerParam = new CsLedgerParam(); csLedgerParam.setId(vo.getId()); csLedgerParam.setPid(projectId); @@ -491,11 +499,9 @@ public class CsDeviceServiceImpl implements ICsDeviceService { //String version = csTopicService.getVersion(nDid); String version = "V1"; devAccessAskTemplate(nDid,version,1); - //6.修改流程,便携式设备接入成功即为实际环境 + //6.修改流程,便携式装置接入成功即为实际环境 csEquipmentDeliveryService.updateProcessBynDid(nDid,4); - //7.存储日志 - csLogsFeignClient.addUserLog(logDto); - //9.删除redis监测点模板信息 + //7.删除redis监测点模板信息 redisUtil.delete(AppRedisKey.MODEL + nDid); redisUtil.delete(AppRedisKey.LINE + nDid); //判断接入状态 @@ -504,10 +510,12 @@ public class CsDeviceServiceImpl implements ICsDeviceService { if (Objects.nonNull(object)) { result = "success"; } + //8.存储日志 + logMessageTemplate.sendMember(logDto); } catch (Exception e) { logDto.setResult(0); logDto.setFailReason(e.getMessage()); - csLogsFeignClient.addUserLog(logDto); + logMessageTemplate.sendMember(logDto); resetFactory(nDid); throw new BusinessException(AccessResponseEnum.ACCESS_ERROR); } @@ -516,7 +524,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService { @Override public void wlAccess(String nDid) { - //设备状态判断 + //装置状态判断 checkDeviceStatus(nDid); //获取版本 String version = csTopicService.getVersion(nDid); @@ -550,7 +558,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService { param.setSort(Integer.MAX_VALUE); csProjectPO = appProjectFeignClient.addPortableProject(param).getData(); } - //修改已存在的便携式设备 + //修改已存在的便携式装置 csLedgerService.updatePortableLedger(csEngineeringPO.getId(),csProjectPO.getId()); return csProjectPO.getId(); } @@ -559,16 +567,16 @@ public class CsDeviceServiceImpl implements ICsDeviceService { @Transactional(rollbackFor = Exception.class) public String onlineRegister(String projectId,String nDid) { String result = "fail"; - // 根据模板接入设备 - DeviceLogDTO logDto = new DeviceLogDTO(); - logDto.setUserName(RequestUtil.getUserNickname()); + // 根据模板接入装置 + LogMessage logDto = new LogMessage(); + logDto.setUserIndex(RequestUtil.getUserNickname()); logDto.setLoginName(RequestUtil.getUsername()); - logDto.setOperate("监测设备"+nDid+"注册、接入"); + logDto.setOperate("监测装置"+nDid+"注册、接入"); logDto.setResult(1); try { - // 设备状态判断 + // 装置状态判断 checkDeviceStatus(nDid); - // 询问设备支持的主题信息,并将支持的主题入库 + // 询问装置支持的主题信息,并将支持的主题入库 askAndStoreTopics(nDid); Thread.sleep(2000); // MQTT询问装置用的模板,并判断库中是否存在模板 @@ -577,7 +585,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService { CsEquipmentDeliveryVO vo = equipmentFeignClient.queryEquipmentByndid(nDid).getData(); List csLinePoList = new ArrayList<>(); //1.录入装置台账信息 - //新增监测设备 + //新增监测装置 CsLedgerParam csLedgerParam = new CsLedgerParam(); csLedgerParam.setId(vo.getId()); csLedgerParam.setPid(projectId); @@ -646,9 +654,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService { devAccessAskTemplate(nDid,version,1); //6.修改流程,接入成功即为实际环境 csEquipmentDeliveryService.updateProcessBynDid(nDid,4); - //7.存储日志 - csLogsFeignClient.addUserLog(logDto); - //9.删除redis监测点模板信息 + //7.删除redis监测点模板信息 redisUtil.delete(AppRedisKey.MODEL + nDid); redisUtil.delete(AppRedisKey.LINE + nDid); //判断接入状态 @@ -657,10 +663,12 @@ public class CsDeviceServiceImpl implements ICsDeviceService { if (Objects.nonNull(object)) { result = "success"; } + //8.存储日志 + logMessageTemplate.sendMember(logDto); } catch (Exception e) { logDto.setResult(0); logDto.setFailReason(e.getMessage()); - csLogsFeignClient.addUserLog(logDto); + logMessageTemplate.sendMember(logDto); resetFactory(nDid); throw new BusinessException(e.getMessage()); } @@ -668,7 +676,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService { } private void checkDeviceStatus(String nDid) { - DeviceLogDTO logDto = createLogDto("当前设备"+nDid+"状态判断"); + LogMessage logDto = createLogDto("当前装置"+nDid+"状态判断"); CsEquipmentDeliveryVO csEquipmentDeliveryVO = csEquipmentDeliveryService.queryEquipmentBynDid(nDid); if (Objects.isNull(csEquipmentDeliveryVO.getNdid())) { throwExceptionAndLog(nDid,AccessResponseEnum.NDID_NO_FIND, logDto); @@ -687,12 +695,12 @@ public class CsDeviceServiceImpl implements ICsDeviceService { } private void askAndStoreTopics(String nDid) { - // 询问设备支持的主题信息 + // 询问装置支持的主题信息 this.askTopic(nDid); } private void checkDeviceModel(String nDid) { - DeviceLogDTO logDto = createLogDto("MQTT询问装置用的模板,并判断库中是否存在模板"); + LogMessage logDto = createLogDto("MQTT询问装置用的模板,并判断库中是否存在模板"); CsEquipmentDeliveryVO csEquipmentDeliveryVO = csEquipmentDeliveryService.queryEquipmentBynDid(nDid); SysDicTreePO dictData = dictTreeFeignClient.queryById(csEquipmentDeliveryVO.getDevModel()).getData(); if (Objects.isNull(dictData)) { @@ -702,19 +710,19 @@ public class CsDeviceServiceImpl implements ICsDeviceService { zhiLianRegister(nDid,devModel); } - private DeviceLogDTO createLogDto(String operate) { - DeviceLogDTO logDto = new DeviceLogDTO(); - logDto.setUserName("运维管理员"); - logDto.setLoginName("njcnyw"); + private LogMessage createLogDto(String operate) { + LogMessage logDto = new LogMessage(); + logDto.setUserIndex("系统"); + logDto.setLoginName("系统"); logDto.setOperate(operate); logDto.setResult(1); return logDto; } - private void throwExceptionAndLog(String nDid,AccessResponseEnum responseEnum, DeviceLogDTO logDto) { + private void throwExceptionAndLog(String nDid,AccessResponseEnum responseEnum, LogMessage logDto) { logDto.setResult(0); logDto.setFailReason(responseEnum.getMessage()); - csLogsFeignClient.addUserLog(logDto); + logMessageTemplate.sendMember(logDto); resetFactory(nDid); throw new BusinessException(responseEnum); } @@ -728,13 +736,14 @@ public class CsDeviceServiceImpl implements ICsDeviceService { public boolean devAccessAskTemplate(String nDid,String version,Integer mid) { String clientName = "NJCN-" + nDid.substring(nDid.length() - 6); boolean mqttClient = mqttUtil.judgeClientOnline(clientName); + LogMessage logDto = new LogMessage(); + logDto.setUserIndex("系统"); + logDto.setLoginName("系统"); if (!mqttClient) { - DeviceLogDTO logDto = new DeviceLogDTO(); - logDto.setUserName("运维管理员"); - logDto.setLoginName("njcnyw"); - logDto.setResult(1); - logDto.setOperate(nDid + "接入失败,装置客户端不在线"); - csLogsFeignClient.addUserLog(logDto); + logDto.setResult(0); + logDto.setOperate(nDid + "系统向装置询问模板信息"); + logDto.setFailReason(AlgorithmResponseEnum.DEV_OFFLINE.getMessage()); + logMessageTemplate.sendMember(logDto); throw new BusinessException(AlgorithmResponseEnum.DEV_OFFLINE); } boolean result = false; @@ -778,14 +787,16 @@ public class CsDeviceServiceImpl implements ICsDeviceService { //录波任务倒计时 redisUtil.saveByKeyWithExpire("startFile:" + nDid,null,60L); result = true; + } else { + logDto.setResult(0); + logDto.setOperate(nDid + "系统向装置询问模板信息"); + logDto.setFailReason("装置未应答"); + logMessageTemplate.sendMember(logDto); } } catch (Exception e) { - DeviceLogDTO logDto = new DeviceLogDTO(); - logDto.setUserName("运维管理员"); - logDto.setLoginName("njcnyw"); - logDto.setResult(1); + logDto.setResult(0); logDto.setOperate(nDid + "装置接入失败"); - csLogsFeignClient.addUserLog(logDto); + logMessageTemplate.sendMember(logDto); throw new BusinessException(e.getMessage()); } return result; @@ -803,13 +814,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService { String clientName = "NJCN-" + nDid.substring(nDid.length() - 6); boolean mqttClient = mqttUtil.judgeClientOnline(clientName); if (!mqttClient) { - DeviceLogDTO logDto = new DeviceLogDTO(); - logDto.setUserName("运维管理员"); - logDto.setLoginName("njcnyw"); - logDto.setResult(1); - logDto.setOperate(nDid + "接入失败,装置客户端不在线"); - csLogsFeignClient.addUserLog(logDto); - throw new BusinessException(AlgorithmResponseEnum.DEV_OFFLINE); + return result; } Map modelMap = new HashMap<>(); //删除缓存数据 @@ -852,13 +857,13 @@ public class CsDeviceServiceImpl implements ICsDeviceService { result = true; } } catch (Exception e) { - DeviceLogDTO logDto = new DeviceLogDTO(); - logDto.setUserName("运维管理员"); - logDto.setLoginName("njcnyw"); - logDto.setResult(1); - logDto.setOperate(nDid + "装置接入失败"); - csLogsFeignClient.addUserLog(logDto); - throw new BusinessException(e.getMessage()); + LogMessage logDto = new LogMessage(); + logDto.setUserIndex("系统首次启动"); + logDto.setLoginName("系统首次启动"); + logDto.setResult(0); + logDto.setOperate(nDid + "装置接入"); + logDto.setFailReason(e.getMessage()); + logMessageTemplate.sendMember(logDto); } return result; } @@ -870,14 +875,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService { String clientName = "NJCN-" + nDid.substring(nDid.length() - 6); boolean mqttClient = mqttUtil.judgeClientOnline(clientName); if (!mqttClient) { - DeviceLogDTO logDto = new DeviceLogDTO(); - logDto.setUserName("运维管理员"); - logDto.setLoginName("njcnyw"); - logDto.setResult(1); - logDto.setOperate(nDid + "接入失败,装置客户端不在线"); - csLogsFeignClient.addUserLog(logDto); - // 改为返回false而不是抛出异常 - log.warn("设备 {} 客户端不在线", nDid); + //log.warn("装置 {} 客户端不在线", nDid); return false; } @@ -889,12 +887,12 @@ public class CsDeviceServiceImpl implements ICsDeviceService { Thread.sleep(2000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - log.warn("线程休眠被中断: {}", e.getMessage()); + //log.warn("线程休眠被中断: {}", e.getMessage()); return false; } List modelId = channelObjectUtil.objectToList(redisUtil.getObjectByKey(AppRedisKey.MODEL + nDid), CsModelDto.class); if (CollUtil.isEmpty(modelId)) { - log.warn("设备 {} 未获取到模板信息", nDid); + //log.warn("装置 {} 未获取到模板信息", nDid); return false; } CsEquipmentDeliveryVO vo = equipmentFeignClient.queryEquipmentByndid(nDid).getData(); @@ -916,13 +914,14 @@ public class CsDeviceServiceImpl implements ICsDeviceService { publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(getJson(mid,TypeEnum.TYPE_5.getCode())), 1, false); result = true; } catch (Exception e) { - DeviceLogDTO logDto = new DeviceLogDTO(); - logDto.setUserName("运维管理员"); - logDto.setLoginName("njcnyw"); - logDto.setResult(1); - logDto.setOperate(nDid + "装置接入失败"); - csLogsFeignClient.addUserLog(logDto); - log.error("设备 {} 接入失败: {}", nDid, e.getMessage()); + LogMessage logDto = new LogMessage(); + logDto.setUserIndex("定时任务"); + logDto.setLoginName("定时任务"); + logDto.setResult(0); + logDto.setOperate(nDid + "装置接入"); + logDto.setFailReason(e.getMessage()); + logMessageTemplate.sendMember(logDto); + //log.error("装置 {} 接入失败: {}", nDid, e.getMessage()); } return result; } @@ -953,7 +952,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService { } /** - * 平台对设备发起主题询问命令 + * 平台对装置发起主题询问命令 */ public void askTopic(String nDid) { ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req(); @@ -967,7 +966,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService { } /** - * 平台对设备发起注册命令 + * 平台对装置发起注册命令 * @param nDid * @param devType */ diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java index e783612..f4f1863 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java @@ -9,15 +9,16 @@ import com.njcn.access.mapper.CsEquipmentDeliveryMapper; import com.njcn.access.pojo.param.DeviceStatusParam; import com.njcn.access.service.ICsEquipmentDeliveryService; import com.njcn.access.utils.MqttUtil; -import com.njcn.common.pojo.dto.DeviceLogDTO; -import com.njcn.csdevice.api.CsLogsFeignClient; import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; import com.njcn.csdevice.pojo.vo.CsEquipmentDeliveryVO; import lombok.RequiredArgsConstructor; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; /** @@ -35,8 +36,6 @@ public class CsEquipmentDeliveryServiceImpl extends ServiceImpl lambdaUpdateWrapper = new LambdaUpdateWrapper<>(); @@ -110,13 +109,6 @@ public class CsEquipmentDeliveryServiceImpl extends ServiceImpl @@ -42,6 +45,17 @@ public class CsTopicServiceImpl extends ServiceImpl impl return version; } + @Override + public Map getVersion(List list) { + Map map = new HashMap<>(); + List topicList = this.lambdaQuery().in(CsTopic::getNDid,list).isNotNull(CsTopic::getVersion).list(); + Map> topicMap = topicList.stream().collect(Collectors.groupingBy(CsTopic::getNDid)); + topicMap.forEach((key,value)->{ + map.put(key,value.get(0).getVersion()); + }); + return map; + } + @Override public void deleteByNDid(String nDid) { LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); diff --git a/iot-message/message-boot/pom.xml b/iot-message/message-boot/pom.xml index 171829d..9bf5625 100644 --- a/iot-message/message-boot/pom.xml +++ b/iot-message/message-boot/pom.xml @@ -20,6 +20,11 @@ + + com.njcn + cs-system-api + ${project.version} + com.njcn common-web diff --git a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/LogConsumer.java b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/LogConsumer.java new file mode 100644 index 0000000..db0bb58 --- /dev/null +++ b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/LogConsumer.java @@ -0,0 +1,138 @@ +package com.njcn.message.consumer; + +import com.njcn.common.pojo.dto.DeviceLogDTO; +import com.njcn.cssystem.api.CsLogsFeignClient; +import com.njcn.middle.rocket.constant.EnhanceMessageConstant; +import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler; +import com.njcn.mq.constant.BusinessTopic; +import com.njcn.mq.constant.MessageStatus; +import com.njcn.mq.message.LogMessage; +import com.njcn.redis.pojo.enums.AppRedisKey; +import com.njcn.redis.pojo.enums.RedisKeyEnum; +import com.njcn.redis.utils.RedisUtil; +import com.njcn.system.api.RocketMqLogFeignClient; +import com.njcn.system.pojo.po.RocketmqMsgErrorLog; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.beans.BeanUtils; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.Objects; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/8/11 15:32 + */ +@Service +@RocketMQMessageListener( + topic = BusinessTopic.ZL_LOG_TOPIC, + consumerGroup = BusinessTopic.ZL_LOG_TOPIC, + selectorExpression = BusinessTopic.LogTag.LOG_TAG, + consumeThreadNumber = 10, + enableMsgTrace = true +) +@Slf4j +public class LogConsumer extends EnhanceConsumerMessageHandler implements RocketMQListener { + + @Resource + private RedisUtil redisUtil; + @Resource + private RocketMqLogFeignClient rocketMqLogFeignClient; + @Resource + private CsLogsFeignClient csLogsFeignClient; + + @Override + protected void handleMessage(LogMessage logMessage) { + DeviceLogDTO deviceLogDTO = new DeviceLogDTO(); + BeanUtils.copyProperties(logMessage,deviceLogDTO); + csLogsFeignClient.addUserLog(deviceLogDTO); + } + + + /*** + * 通过redis分布式锁判断当前消息所处状态 + * 1、null 查不到该key的数据,属于第一次消费,放行 + * 2、fail 上次消息消费时发生异常,放行 + * 3、being processed 正在处理,打回去 + * 4、success 最近72小时消费成功,避免重复消费,打回去 + */ + @Override + public boolean filter(LogMessage message) { + String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey())); + if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) { + redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 60L); + return false; + } + return true; + } + + /** + * 消费成功,缓存到redis 5分钟,避免重复消费 + */ + @Override + protected void consumeSuccess(LogMessage message) { + redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.SUCCESS, 5 * 60L); + } + + /** + * 发生异常时,进行错误信息入库保存 + * 默认没有实现类,子类可以实现该方法,调用feign接口进行入库保存 + */ + @Override + protected void saveExceptionMsgLog(LogMessage message, String identity, Exception exception) { + redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); + RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog(); + rocketmqMsgErrorLog.setMsgKey(message.getKey()); + rocketmqMsgErrorLog.setResource(message.getSource()); + if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) { + //数据库字段配置长度200,避免插入失败,大致分析异常原因 + String exceptionMsg = exception.getMessage(); + if(exceptionMsg.length() > 200){ + exceptionMsg = exceptionMsg.substring(0,180); + } + rocketmqMsgErrorLog.setRecord(exceptionMsg); + //如果是当前消息重试的则略过 + if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){ + //单次消费异常 + rocketMqLogFeignClient.add(rocketmqMsgErrorLog); + } + } else { + rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。"); + //重试N次后,依然消费异常 + rocketMqLogFeignClient.add(rocketmqMsgErrorLog); + } + } + + + /*** + * 处理失败后,是否重试 + * 一般开启 + */ + @Override + protected boolean isRetry() { + return true; + } + + + /*** + * 消费失败是否抛出异常,抛出异常后就不再消费了 + */ + @Override + protected boolean throwException() { + return false; + } + + + /*** + * 调用父类handler处理消息的元信息 + */ + @Override + public void onMessage(LogMessage appAutoDataMessage) { + super.dispatchMessage(appAutoDataMessage); + } +}