diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java index 3238e87..72fd1bd 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java @@ -20,7 +20,6 @@ import com.njcn.access.pojo.po.CsLineModel; import com.njcn.access.service.*; import com.njcn.access.utils.CRC32Utils; import com.njcn.access.utils.JsonUtil; -import com.njcn.common.pojo.dto.DeviceLogDTO; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.csdevice.api.CsLineFeignClient; import com.njcn.csdevice.api.DevModelFeignClient; @@ -48,6 +47,7 @@ import com.njcn.system.pojo.vo.DictTreeVO; import com.njcn.web.utils.RequestUtil; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.multipart.MultipartFile; @@ -88,6 +88,7 @@ public class CsDevModelServiceImpl implements ICsDevModelService { private final DevModelRelationFeignClient devModelRelationFeignClient; private final CsLineFeignClient csLineFeignClient; private final LogMessageTemplate logMessageTemplate; + private final StringRedisTemplate stringRedisTemplate; @Override @Transactional(rollbackFor = {Exception.class}) @@ -143,7 +144,9 @@ public class CsDevModelServiceImpl implements ICsDevModelService { } } //5.清空模板缓存 - redisUtil.deleteKeysByString(AppRedisKey.DEV_MODEL); + //redisUtil.deleteKeysByString(AppRedisKey.DEV_MODEL); + //6.通知其他服务节点清理本地缓存 + stringRedisTemplate.convertAndSend("model_cache_clear", "clear"); logMessageTemplate.sendMember(logDto); } catch (Exception e) { logDto.setResult(0); diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java index 3f37dc6..db26426 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java @@ -42,6 +42,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -89,6 +90,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService { private final DeviceMessageFeignClient deviceMessageFeignClient; private final CsHarmonicPlanLineFeignClient csHarmonicPlanLineFeignClient; private final LogMessageTemplate logMessageTemplate; + private final StringRedisTemplate stringRedisTemplate; @Override @@ -766,7 +768,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService { try { //删除缓存数据 redisUtil.delete(AppRedisKey.MODEL + nDid); - redisUtil.deleteKeysByString(AppRedisKey.DEV_MODEL); + //redisUtil.deleteKeysByString(AppRedisKey.DEV_MODEL); //询问装置当前所用模板 publisher.send("/Pfm/DevCmd/"+version+"/"+nDid,new Gson().toJson(getJson(mid,TypeEnum.TYPE_3.getCode())),1,false); //接收到模板,判断模板是否存在,替换模板,发起接入 @@ -837,7 +839,9 @@ public class CsDeviceServiceImpl implements ICsDeviceService { Map modelMap = new HashMap<>(); //删除缓存数据 redisUtil.delete(AppRedisKey.MODEL + nDid); - redisUtil.deleteKeysByString(AppRedisKey.DEV_MODEL); + //redisUtil.deleteKeysByString(AppRedisKey.DEV_MODEL); + //通知其他服务节点清理本地缓存 + stringRedisTemplate.convertAndSend("model_cache_clear", "clear"); //询问装置当前所用模板 publisher.send("/Pfm/DevCmd/"+version+"/"+nDid,new Gson().toJson(getJson(mid,TypeEnum.TYPE_3.getCode())),1,false); //接收到模板,判断模板是否存在,替换模板,发起接入 @@ -899,7 +903,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService { Map modelMap = new HashMap<>(); redisUtil.delete(AppRedisKey.MODEL + nDid); - redisUtil.deleteKeysByString(AppRedisKey.DEV_MODEL); + //redisUtil.deleteKeysByString(AppRedisKey.DEV_MODEL); publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(getJson(mid,TypeEnum.TYPE_3.getCode())), 1, false); try { Thread.sleep(2000); diff --git a/iot-analysis/analysis-stat/stat-boot/pom.xml b/iot-analysis/analysis-stat/stat-boot/pom.xml index 1a41c8f..43f6b07 100644 --- a/iot-analysis/analysis-stat/stat-boot/pom.xml +++ b/iot-analysis/analysis-stat/stat-boot/pom.xml @@ -25,6 +25,11 @@ com.github.tocrhz mqtt-spring-boot-starter + + + com.github.ben-manes.caffeine + caffeine + com.njcn common-web diff --git a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/listener/ModelCacheClearListener.java b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/listener/ModelCacheClearListener.java new file mode 100644 index 0000000..c60feee --- /dev/null +++ b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/listener/ModelCacheClearListener.java @@ -0,0 +1,40 @@ +package com.njcn.stat.listener; + +import com.njcn.stat.service.IStatService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.data.redis.listener.ChannelTopic; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; + +/** + * @author 徐扬 + */ +@Slf4j +@Component +public class ModelCacheClearListener implements MessageListener { + + public static final String MODEL_CACHE_CLEAR_CHANNEL = "model_cache_clear"; + + @Resource + private RedisMessageListenerContainer redisMessageListenerContainer; + + @Resource + private IStatService statService; + + @PostConstruct + public void init() { + redisMessageListenerContainer.addMessageListener(this, new ChannelTopic(MODEL_CACHE_CLEAR_CHANNEL)); + } + + @Override + public void onMessage(Message message, byte[] pattern) { + log.info("收到模板缓存清理通知,channel={}", new String(message.getChannel())); + statService.clearModelCache(); + log.info("本地模板缓存已清理"); + } +} diff --git a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/IStatService.java b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/IStatService.java index bbcab5b..f727ab0 100644 --- a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/IStatService.java +++ b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/IStatService.java @@ -15,4 +15,5 @@ public interface IStatService { void electricityMeterAnalysis(AppAutoDataMessage appAutoDataMessage); + void clearModelCache(); } diff --git a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java index 5df74ec..5eb31ad 100644 --- a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java +++ b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java @@ -3,6 +3,8 @@ package com.njcn.stat.service.impl; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.date.DatePattern; import com.alibaba.nacos.shaded.com.google.gson.Gson; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import com.njcn.access.api.CsDeviceFeignClient; import com.njcn.access.api.CsLineLatestDataFeignClient; import com.njcn.access.enums.AccessEnum; @@ -63,6 +65,12 @@ public class StatServiceImpl implements IStatService { private final DeviceMessageFeignClient deviceMessageFeignClient; private final CsCommunicateFeignClient csCommunicateFeignClient; + // 本地缓存:模板数据变更极少,60分钟过期足够,最大缓存50000条 + private final Cache> modelCache = Caffeine.newBuilder() + .maximumSize(50000) + .expireAfterWrite(60, TimeUnit.MINUTES) + .build(); + @Override @Transactional(rollbackFor = Exception.class) public void analysis(AppAutoDataMessage appAutoDataMessage) { @@ -137,15 +145,15 @@ public class StatServiceImpl implements IStatService { } boolean flag = Objects.equals(DicDataEnum.DEV_CLD.getCode(), code) && Objects.equals(po.getDevAccessMethod(), "CLD"); int clDid = flag?1:appAutoDataMessage.getMsg().getClDid(); -// String key = AppRedisKey.DEV_MODEL.concat(dataArrayParam.getId() + dataArrayParam.getDid() + clDid + dataArrayParam.getStatMethod() + dataArrayParam.getIdx()); String key = AppRedisKey.DEV_MODEL.concat(dataArrayParam.getId() + dataArrayParam.getDid() + clDid + dataArrayParam.getIdx()); - Object object = redisUtil.getObjectByKey(key); - List dataArrayList; - if (Objects.isNull(object)){ - dataArrayList = saveModelData(dataArrayParam,key); - } else { - dataArrayList = objectToList(object); - } +// Object object = redisUtil.getObjectByKey(key); +// List dataArrayList; +// if (Objects.isNull(object)){ +// dataArrayList = saveModelData(dataArrayParam,key); +// } else { +// dataArrayList = objectToList(object); +// } + List dataArrayList = getModelData(dataArrayParam, key); List result = assembleData(lineId,dataArrayList,item,appAutoDataMessage.getMsg().getClDid(),dataArrayParam.getStatMethod(),po.getProcess(),code,po.getDevAccessMethod(),map); recordList.addAll(result); //获取时间 @@ -229,13 +237,14 @@ public class StatServiceImpl implements IStatService { boolean flag = Objects.equals(DicDataEnum.DEV_CLD.getCode(), code) && Objects.equals(po.getDevAccessMethod(), "CLD"); int clDid = flag?1:appAutoDataMessage.getMsg().getClDid(); String key = AppRedisKey.DEV_MODEL.concat(dataArrayParam.getId() + dataArrayParam.getDid() + clDid + dataArrayParam.getIdx()); - Object object = redisUtil.getObjectByKey(key); - List dataArrayList; - if (Objects.isNull(object)){ - dataArrayList = saveModelData(dataArrayParam,key); - } else { - dataArrayList = objectToList(object); - } +// Object object = redisUtil.getObjectByKey(key); +// List dataArrayList; +// if (Objects.isNull(object)){ +// dataArrayList = saveModelData(dataArrayParam,key); +// } else { +// dataArrayList = objectToList(object); +// } + List dataArrayList = getModelData(dataArrayParam, key); List result = assembleDdData(lineId,dataArrayList,item,appAutoDataMessage.getMsg().getClDid(),code,po.getDevAccessMethod(),map); recordList.addAll(result); //获取时间 @@ -269,6 +278,15 @@ public class StatServiceImpl implements IStatService { System.gc(); } + + /** + * 清理本地模板缓存(由Redis Pub/Sub通知触发) + */ + @Override + public void clearModelCache() { + modelCache.invalidateAll(); + } + /** * 缓存设备模板信息 */ @@ -398,4 +416,19 @@ public class StatServiceImpl implements IStatService { return urlList; } + /** + * 获取设备模板信息(本地缓存优先) + */ + public List getModelData(DataArrayParam dataArrayParam, String key) { + List dataArrayList = modelCache.getIfPresent(key); + if (CollectionUtil.isEmpty(dataArrayList)) { + dataArrayList = dataArrayFeignClient.findListByParam(dataArrayParam).getData(); + if (CollectionUtil.isEmpty(dataArrayList)){ + throw new BusinessException(StatResponseEnum.DATA_ARRAY_NULL); + } + modelCache.put(key, dataArrayList); + } + return dataArrayList; + } + }