feat(access): 新增电度数据处理功能并优化设备接入流程

- 新增电度数据分析接口和实现逻辑
- 更新消息消费者以支持电度数据分发处理
- 扩展数据枚举定义以支持新的数据类型
- 增加设备心跳检测定时器单元测试
- 优化Redis等待响应超时时间配置
- 移除未使用的线路服务依赖注入
- 更新设备状态处理逻辑以支持电度数据
- 完善错误处理和异常降级机制
This commit is contained in:
xy
2026-06-03 10:23:18 +08:00
parent 9fc3e126d9
commit b2b055c44c
11 changed files with 383 additions and 15 deletions

View File

@@ -4,8 +4,11 @@ import com.njcn.common.pojo.constant.ServerInfo;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.mq.message.AppAutoDataMessage;
import com.njcn.stat.api.fallback.StatClientFallbackFactory;
import io.swagger.annotations.ApiOperation;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
/**
* @author xy
@@ -15,4 +18,8 @@ public interface StatFeignClient {
@PostMapping("/analysis")
HttpResult<String> analysis(AppAutoDataMessage appAutoDataMessage);
@PostMapping("/electricityMeterAnalysis")
@ApiOperation("电度数据解析")
HttpResult<String> electricityMeterAnalysis(@RequestBody @Validated AppAutoDataMessage appAutoDataMessage);
}

View File

@@ -27,7 +27,13 @@ public class StatClientFallbackFactory implements FallbackFactory<StatFeignClien
@Override
public HttpResult<String> analysis(AppAutoDataMessage appAutoDataMessage) {
log.error("{}异常,降级处理,异常为:{}","数据解析",cause.toString());
log.error("{}异常,降级处理,异常为:{}",appAutoDataMessage.getId() + "数据解析异常",cause.toString());
throw new BusinessException(finalExceptionEnum);
}
@Override
public HttpResult<String> electricityMeterAnalysis(AppAutoDataMessage appAutoDataMessage) {
log.error("{}异常,降级处理,异常为:{}",appAutoDataMessage.getId() + "电度数据解析异常",cause.toString());
throw new BusinessException(finalExceptionEnum);
}
};

View File

@@ -45,4 +45,14 @@ public class StatController extends BaseController {
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@PostMapping("/electricityMeterAnalysis")
@ApiOperation("电度数据解析")
@ApiImplicitParam(name = "appAutoDataMessage", value = "数据实体", required = true)
public HttpResult<String> electricityMeterAnalysis(@RequestBody @Validated AppAutoDataMessage appAutoDataMessage){
String methodDescribe = getMethodDescribe("electricityMeterAnalysis");
statService.electricityMeterAnalysis(appAutoDataMessage);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
}

View File

@@ -13,4 +13,6 @@ public interface IStatService {
*/
void analysis(AppAutoDataMessage appAutoDataMessage);
void electricityMeterAnalysis(AppAutoDataMessage appAutoDataMessage);
}

View File

@@ -11,7 +11,6 @@ import com.njcn.access.utils.ChannelObjectUtil;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.common.utils.PubUtils;
import com.njcn.csdevice.api.CsCommunicateFeignClient;
import com.njcn.csdevice.api.CsLineFeignClient;
import com.njcn.csdevice.api.DataArrayFeignClient;
import com.njcn.csdevice.api.DeviceMessageFeignClient;
import com.njcn.csdevice.param.LineInfoParam;
@@ -63,7 +62,6 @@ public class StatServiceImpl implements IStatService {
private final CsDeviceFeignClient csDeviceFeignClient;
private final DeviceMessageFeignClient deviceMessageFeignClient;
private final CsCommunicateFeignClient csCommunicateFeignClient;
private final CsLineFeignClient csLineFeignClient;
@Override
@Transactional(rollbackFor = Exception.class)
@@ -181,6 +179,96 @@ public class StatServiceImpl implements IStatService {
System.gc();
}
@Override
public void electricityMeterAnalysis(AppAutoDataMessage appAutoDataMessage) {
LocalDateTime time = null;
log.info("开始消费{},发送时间{}",appAutoDataMessage.getKey(),appAutoDataMessage.getSendTime());
DataArrayParam dataArrayParam = new DataArrayParam();
dataArrayParam.setId(appAutoDataMessage.getId());
dataArrayParam.setDid(appAutoDataMessage.getDid());
dataArrayParam.setCldId(appAutoDataMessage.getMsg().getClDid());
dataArrayParam.setIdx(appAutoDataMessage.getMsg().getDsNameIdx());
List<AppAutoDataMessage.DataArray> list = appAutoDataMessage.getMsg().getDataArray();
//获取监测点id
String lineId = null;
Object object1 = redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appAutoDataMessage.getId());
if (Objects.isNull(object1)){
LineInfoParam param = new LineInfoParam();
param.setNDid(appAutoDataMessage.getId());
deviceMessageFeignClient.getLineInfo(param);
}
//获取当前设备信息判断装置型号,来筛选监测点
List<CsEquipmentDeliveryPO> poList = channelObjectUtil.objectToList(redisUtil.getObjectByKey(AppRedisKey.DEVICE_LIST),CsEquipmentDeliveryPO.class);
CsEquipmentDeliveryPO po = poList.stream().filter(item->Objects.equals(item.getNdid(),appAutoDataMessage.getId())).findFirst().orElse(null);
List<SysDicTreePO> dictTreeList = channelObjectUtil.objectToList(redisUtil.getObjectByKey(AppRedisKey.DICT_TREE),SysDicTreePO.class);
String code = Objects.requireNonNull(dictTreeList.stream().filter(item -> Objects.equals(item.getId(), po.getDevType())).findFirst().orElse(null)).getCode();
//便携式设备
if (Objects.equals(DicDataEnum.PORTABLE.getCode(),code)) {
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appAutoDataMessage.getId())), Map.class).get(appAutoDataMessage.getMsg().getClDid().toString()).toString();
}
//直连设备
else if (Objects.equals(DicDataEnum.CONNECT_DEV.getCode(),code)) {
if (Objects.equals(appAutoDataMessage.getDid(),1)) {
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appAutoDataMessage.getId())), Map.class).get("0").toString();
} else if (Objects.equals(appAutoDataMessage.getDid(),2)) {
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appAutoDataMessage.getId())), Map.class).get(appAutoDataMessage.getMsg().getClDid().toString()).toString();
}
}
//云前置设备
else if (Objects.equals(DicDataEnum.DEV_CLD.getCode(),code)) {
lineId = appAutoDataMessage.getId() + appAutoDataMessage.getMsg().getClDid();
}
//获取当前设备信息
if (CollectionUtil.isNotEmpty(list)) {
Map<String,String> map = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class);
List<String> recordList = new ArrayList<>();
for (AppAutoDataMessage.DataArray item : list) {
dataArrayParam.setStatMethod("T");
boolean flag = Objects.equals(DicDataEnum.DEV_CLD.getCode(), code) && Objects.equals(po.getDevAccessMethod(), "CLD");
int clDid = flag?1:appAutoDataMessage.getMsg().getClDid();
String key = AppRedisKey.DEV_MODEL.concat(dataArrayParam.getId() + dataArrayParam.getDid() + clDid + dataArrayParam.getIdx());
Object object = redisUtil.getObjectByKey(key);
List<CsDataArray> dataArrayList;
if (Objects.isNull(object)){
dataArrayList = saveModelData(dataArrayParam,key);
} else {
dataArrayList = objectToList(object);
}
List<String> result = assembleDdData(lineId,dataArrayList,item,appAutoDataMessage.getMsg().getClDid(),code,po.getDevAccessMethod(),map);
recordList.addAll(result);
//获取时间
boolean timeFlag = Objects.equals(DicDataEnum.DEV_CLD.getCode(), code) && Objects.equals(po.getDevAccessMethod(), "CLD");
long devTime = timeFlag?item.getDataTimeSec():item.getDataTimeSec()-8*3600;
time = Instant.ofEpochSecond(devTime)
.atZone(ZoneId.systemDefault())
.toLocalDateTime();
}
if (CollectionUtil.isNotEmpty(recordList)){
//influx数据批量入库
influxDbUtils.batchInsert(influxDbUtils.getDbName(), "", InfluxDB.ConsistencyLevel.ALL, TimeUnit.SECONDS, recordList);
//记录监测点最新数据时间
CsLineLatestData csLineLatestData = new CsLineLatestData();
csLineLatestData.setLineId(lineId);
csLineLatestData.setTimeId(Objects.isNull(time) ? LocalDateTime.now() : time);
csLineLatestDataFeignClient.addData(csLineLatestData);
}
//判断设备运行状态
if (!Objects.isNull(po.getRunStatus()) && po.getRunStatus() == 1) {
csDeviceFeignClient.updateRunStatus(appAutoDataMessage.getId(), AccessEnum.ONLINE.getCode());
//记录设备上线
PqsCommunicateDto dto = new PqsCommunicateDto();
dto.setTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
dto.setDevId(appAutoDataMessage.getId());
dto.setType(1);
dto.setDescription("通讯正常");
csCommunicateFeignClient.insertion(dto);
}
}
System.gc();
}
/**
* 缓存设备模板信息
*/
@@ -253,6 +341,51 @@ public class StatServiceImpl implements IStatService {
return records;
}
/**
* 电度influxDB数据组装
* 电度数据15分钟入库一次
*/
public List<String> assembleDdData(String lineId,List<CsDataArray> dataArrayList,AppAutoDataMessage.DataArray item,Integer clDid,String devType,String accessMethod, Map<String,String> map) {
List<String> records = new ArrayList<String>();
List<Float> floats = PubUtils.byteArrayToFloatList(Base64.getDecoder().decode(item.getData()));
if (CollectionUtil.isEmpty(floats)){
throw new BusinessException(StatResponseEnum.AUTO_DATA_NULL);
}
if (!Objects.equals(dataArrayList.size(),floats.size())){
throw new BusinessException(StatResponseEnum.ARRAY_DATA_NOT_MATCH);
}
boolean flag = Objects.equals(DicDataEnum.DEV_CLD.getCode(), devType) && Objects.equals(accessMethod, "CLD");
//fixme 捂脸设备上送的是北京时间,时序数据库录入时 需要utc时间减去8小时
long originalTimeSec = flag ? item.getDataTimeSec() : item.getDataTimeSec() - 8 * 3600;
for (int i = 0; i < dataArrayList.size(); i++) {
String tableName = map.get(dataArrayList.get(i).getName());
long adjustedTimeSec = (originalTimeSec / 900) * 900;
Map<String, String> tags = new HashMap<>();
tags.put(InfluxDBTableConstant.LINE_ID,lineId);
tags.put(InfluxDBTableConstant.PHASIC_TYPE,dataArrayList.get(i).getPhase());
if (Objects.isNull(item.getDataTag())) {
tags.put(InfluxDBTableConstant.QUALITY_FLAG,"0");
} else {
tags.put(InfluxDBTableConstant.QUALITY_FLAG,String.valueOf(item.getDataTag()));
}
Map<String,Object> fields = new HashMap<>();
//这边特殊处理如果数据为3.14159则将数据置为null
if (Objects.isNull(dataArrayList.get(i).getInfluxDbName())) {
fields.put(dataArrayList.get(i).getName(),Objects.equals(floats.get(i),3.14159f) ? null:floats.get(i));
} else {
fields.put(dataArrayList.get(i).getInfluxDbName(),Objects.equals(floats.get(i),3.14159f) ? null:floats.get(i));
}
fields.put(InfluxDBTableConstant.CL_DID,clDid.toString());
Point point = influxDbUtils.pointBuilder(tableName, adjustedTimeSec, TimeUnit.SECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
}
return records;
}
public List<CsDataArray> objectToList(Object object) {
List<CsDataArray> urlList = new ArrayList<>();
if (object != null) {