feat(cache): 优化设备模板缓存策略并添加分布式缓存清理功能
- 引入Caffeine本地缓存替代Redis缓存提高性能 - 添加Redis Pub/Sub机制实现多节点缓存同步清理 - 在CsDeviceServiceImpl中注释掉原有的Redis缓存清理逻辑 - 在CsDevModelServiceImpl中添加缓存清理消息发送功能 - 新增ModelCacheClearListener监听器处理缓存清理通知 - 实现IStatService接口的clearModelCache方法 - 添加getModelData方法实现本地缓存优先的数据获取策略 - 配置最大50000条缓存数据,60分钟过期时间
This commit is contained in:
@@ -20,7 +20,6 @@ import com.njcn.access.pojo.po.CsLineModel;
|
||||
import com.njcn.access.service.*;
|
||||
import com.njcn.access.utils.CRC32Utils;
|
||||
import com.njcn.access.utils.JsonUtil;
|
||||
import com.njcn.common.pojo.dto.DeviceLogDTO;
|
||||
import com.njcn.common.pojo.exception.BusinessException;
|
||||
import com.njcn.csdevice.api.CsLineFeignClient;
|
||||
import com.njcn.csdevice.api.DevModelFeignClient;
|
||||
@@ -48,6 +47,7 @@ import com.njcn.system.pojo.vo.DictTreeVO;
|
||||
import com.njcn.web.utils.RequestUtil;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.web.multipart.MultipartFile;
|
||||
@@ -88,6 +88,7 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
|
||||
private final DevModelRelationFeignClient devModelRelationFeignClient;
|
||||
private final CsLineFeignClient csLineFeignClient;
|
||||
private final LogMessageTemplate logMessageTemplate;
|
||||
private final StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = {Exception.class})
|
||||
@@ -143,7 +144,9 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
|
||||
}
|
||||
}
|
||||
//5.清空模板缓存
|
||||
redisUtil.deleteKeysByString(AppRedisKey.DEV_MODEL);
|
||||
//redisUtil.deleteKeysByString(AppRedisKey.DEV_MODEL);
|
||||
//6.通知其他服务节点清理本地缓存
|
||||
stringRedisTemplate.convertAndSend("model_cache_clear", "clear");
|
||||
logMessageTemplate.sendMember(logDto);
|
||||
} catch (Exception e) {
|
||||
logDto.setResult(0);
|
||||
|
||||
@@ -42,6 +42,7 @@ import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
@@ -89,6 +90,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
|
||||
private final DeviceMessageFeignClient deviceMessageFeignClient;
|
||||
private final CsHarmonicPlanLineFeignClient csHarmonicPlanLineFeignClient;
|
||||
private final LogMessageTemplate logMessageTemplate;
|
||||
private final StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
|
||||
@Override
|
||||
@@ -766,7 +768,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
|
||||
try {
|
||||
//删除缓存数据
|
||||
redisUtil.delete(AppRedisKey.MODEL + nDid);
|
||||
redisUtil.deleteKeysByString(AppRedisKey.DEV_MODEL);
|
||||
//redisUtil.deleteKeysByString(AppRedisKey.DEV_MODEL);
|
||||
//询问装置当前所用模板
|
||||
publisher.send("/Pfm/DevCmd/"+version+"/"+nDid,new Gson().toJson(getJson(mid,TypeEnum.TYPE_3.getCode())),1,false);
|
||||
//接收到模板,判断模板是否存在,替换模板,发起接入
|
||||
@@ -837,7 +839,9 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
|
||||
Map<Integer,String> modelMap = new HashMap<>();
|
||||
//删除缓存数据
|
||||
redisUtil.delete(AppRedisKey.MODEL + nDid);
|
||||
redisUtil.deleteKeysByString(AppRedisKey.DEV_MODEL);
|
||||
//redisUtil.deleteKeysByString(AppRedisKey.DEV_MODEL);
|
||||
//通知其他服务节点清理本地缓存
|
||||
stringRedisTemplate.convertAndSend("model_cache_clear", "clear");
|
||||
//询问装置当前所用模板
|
||||
publisher.send("/Pfm/DevCmd/"+version+"/"+nDid,new Gson().toJson(getJson(mid,TypeEnum.TYPE_3.getCode())),1,false);
|
||||
//接收到模板,判断模板是否存在,替换模板,发起接入
|
||||
@@ -899,7 +903,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
|
||||
|
||||
Map<Integer, String> modelMap = new HashMap<>();
|
||||
redisUtil.delete(AppRedisKey.MODEL + nDid);
|
||||
redisUtil.deleteKeysByString(AppRedisKey.DEV_MODEL);
|
||||
//redisUtil.deleteKeysByString(AppRedisKey.DEV_MODEL);
|
||||
publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(getJson(mid,TypeEnum.TYPE_3.getCode())), 1, false);
|
||||
try {
|
||||
Thread.sleep(2000);
|
||||
|
||||
@@ -25,6 +25,11 @@
|
||||
<groupId>com.github.tocrhz</groupId>
|
||||
<artifactId>mqtt-spring-boot-starter</artifactId>
|
||||
</dependency>
|
||||
<!-- caffeine 本地缓存 -->
|
||||
<dependency>
|
||||
<groupId>com.github.ben-manes.caffeine</groupId>
|
||||
<artifactId>caffeine</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.njcn</groupId>
|
||||
<artifactId>common-web</artifactId>
|
||||
|
||||
@@ -0,0 +1,40 @@
|
||||
package com.njcn.stat.listener;
|
||||
|
||||
import com.njcn.stat.service.IStatService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.data.redis.connection.Message;
|
||||
import org.springframework.data.redis.connection.MessageListener;
|
||||
import org.springframework.data.redis.listener.ChannelTopic;
|
||||
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.Resource;
|
||||
|
||||
/**
|
||||
* @author 徐扬
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class ModelCacheClearListener implements MessageListener {
|
||||
|
||||
public static final String MODEL_CACHE_CLEAR_CHANNEL = "model_cache_clear";
|
||||
|
||||
@Resource
|
||||
private RedisMessageListenerContainer redisMessageListenerContainer;
|
||||
|
||||
@Resource
|
||||
private IStatService statService;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
redisMessageListenerContainer.addMessageListener(this, new ChannelTopic(MODEL_CACHE_CLEAR_CHANNEL));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(Message message, byte[] pattern) {
|
||||
log.info("收到模板缓存清理通知,channel={}", new String(message.getChannel()));
|
||||
statService.clearModelCache();
|
||||
log.info("本地模板缓存已清理");
|
||||
}
|
||||
}
|
||||
@@ -15,4 +15,5 @@ public interface IStatService {
|
||||
|
||||
void electricityMeterAnalysis(AppAutoDataMessage appAutoDataMessage);
|
||||
|
||||
void clearModelCache();
|
||||
}
|
||||
|
||||
@@ -3,6 +3,8 @@ package com.njcn.stat.service.impl;
|
||||
import cn.hutool.core.collection.CollectionUtil;
|
||||
import cn.hutool.core.date.DatePattern;
|
||||
import com.alibaba.nacos.shaded.com.google.gson.Gson;
|
||||
import com.github.benmanes.caffeine.cache.Cache;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import com.njcn.access.api.CsDeviceFeignClient;
|
||||
import com.njcn.access.api.CsLineLatestDataFeignClient;
|
||||
import com.njcn.access.enums.AccessEnum;
|
||||
@@ -63,6 +65,12 @@ public class StatServiceImpl implements IStatService {
|
||||
private final DeviceMessageFeignClient deviceMessageFeignClient;
|
||||
private final CsCommunicateFeignClient csCommunicateFeignClient;
|
||||
|
||||
// 本地缓存:模板数据变更极少,60分钟过期足够,最大缓存50000条
|
||||
private final Cache<String, List<CsDataArray>> modelCache = Caffeine.newBuilder()
|
||||
.maximumSize(50000)
|
||||
.expireAfterWrite(60, TimeUnit.MINUTES)
|
||||
.build();
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void analysis(AppAutoDataMessage appAutoDataMessage) {
|
||||
@@ -137,15 +145,15 @@ public class StatServiceImpl implements IStatService {
|
||||
}
|
||||
boolean flag = Objects.equals(DicDataEnum.DEV_CLD.getCode(), code) && Objects.equals(po.getDevAccessMethod(), "CLD");
|
||||
int clDid = flag?1:appAutoDataMessage.getMsg().getClDid();
|
||||
// String key = AppRedisKey.DEV_MODEL.concat(dataArrayParam.getId() + dataArrayParam.getDid() + clDid + dataArrayParam.getStatMethod() + dataArrayParam.getIdx());
|
||||
String key = AppRedisKey.DEV_MODEL.concat(dataArrayParam.getId() + dataArrayParam.getDid() + clDid + dataArrayParam.getIdx());
|
||||
Object object = redisUtil.getObjectByKey(key);
|
||||
List<CsDataArray> dataArrayList;
|
||||
if (Objects.isNull(object)){
|
||||
dataArrayList = saveModelData(dataArrayParam,key);
|
||||
} else {
|
||||
dataArrayList = objectToList(object);
|
||||
}
|
||||
// Object object = redisUtil.getObjectByKey(key);
|
||||
// List<CsDataArray> dataArrayList;
|
||||
// if (Objects.isNull(object)){
|
||||
// dataArrayList = saveModelData(dataArrayParam,key);
|
||||
// } else {
|
||||
// dataArrayList = objectToList(object);
|
||||
// }
|
||||
List<CsDataArray> dataArrayList = getModelData(dataArrayParam, key);
|
||||
List<String> result = assembleData(lineId,dataArrayList,item,appAutoDataMessage.getMsg().getClDid(),dataArrayParam.getStatMethod(),po.getProcess(),code,po.getDevAccessMethod(),map);
|
||||
recordList.addAll(result);
|
||||
//获取时间
|
||||
@@ -229,13 +237,14 @@ public class StatServiceImpl implements IStatService {
|
||||
boolean flag = Objects.equals(DicDataEnum.DEV_CLD.getCode(), code) && Objects.equals(po.getDevAccessMethod(), "CLD");
|
||||
int clDid = flag?1:appAutoDataMessage.getMsg().getClDid();
|
||||
String key = AppRedisKey.DEV_MODEL.concat(dataArrayParam.getId() + dataArrayParam.getDid() + clDid + dataArrayParam.getIdx());
|
||||
Object object = redisUtil.getObjectByKey(key);
|
||||
List<CsDataArray> dataArrayList;
|
||||
if (Objects.isNull(object)){
|
||||
dataArrayList = saveModelData(dataArrayParam,key);
|
||||
} else {
|
||||
dataArrayList = objectToList(object);
|
||||
}
|
||||
// Object object = redisUtil.getObjectByKey(key);
|
||||
// List<CsDataArray> dataArrayList;
|
||||
// if (Objects.isNull(object)){
|
||||
// dataArrayList = saveModelData(dataArrayParam,key);
|
||||
// } else {
|
||||
// dataArrayList = objectToList(object);
|
||||
// }
|
||||
List<CsDataArray> dataArrayList = getModelData(dataArrayParam, key);
|
||||
List<String> result = assembleDdData(lineId,dataArrayList,item,appAutoDataMessage.getMsg().getClDid(),code,po.getDevAccessMethod(),map);
|
||||
recordList.addAll(result);
|
||||
//获取时间
|
||||
@@ -269,6 +278,15 @@ public class StatServiceImpl implements IStatService {
|
||||
System.gc();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 清理本地模板缓存(由Redis Pub/Sub通知触发)
|
||||
*/
|
||||
@Override
|
||||
public void clearModelCache() {
|
||||
modelCache.invalidateAll();
|
||||
}
|
||||
|
||||
/**
|
||||
* 缓存设备模板信息
|
||||
*/
|
||||
@@ -398,4 +416,19 @@ public class StatServiceImpl implements IStatService {
|
||||
return urlList;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取设备模板信息(本地缓存优先)
|
||||
*/
|
||||
public List<CsDataArray> getModelData(DataArrayParam dataArrayParam, String key) {
|
||||
List<CsDataArray> dataArrayList = modelCache.getIfPresent(key);
|
||||
if (CollectionUtil.isEmpty(dataArrayList)) {
|
||||
dataArrayList = dataArrayFeignClient.findListByParam(dataArrayParam).getData();
|
||||
if (CollectionUtil.isEmpty(dataArrayList)){
|
||||
throw new BusinessException(StatResponseEnum.DATA_ARRAY_NULL);
|
||||
}
|
||||
modelCache.put(key, dataArrayList);
|
||||
}
|
||||
return dataArrayList;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user