1.添加内部服务消息调用

This commit is contained in:
2026-06-23 10:39:21 +08:00
parent d1ba92bd03
commit e83c92ec66
17 changed files with 623 additions and 139 deletions

View File

@@ -38,6 +38,59 @@
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>model-server-api</artifactId>
<version>2021</version>
<exclusions>
<exclusion>
<groupId>com.driver.dm7</groupId>
<artifactId>dm</artifactId>
</exclusion>
<exclusion>
<groupId>com.driver.ifxjdbc</groupId>
<artifactId>ifxjdbc</artifactId>
</exclusion>
<exclusion>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.epri</groupId>
<artifactId>zeus-core</artifactId>
<version>2.0.3</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.21</version>
</dependency>
</dependencies>

View File

@@ -3,10 +3,13 @@ package com.njcn.relational;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication(scanBasePackages = {"com.njcn.relational"} )
@MapperScan(value = "com.njcn.relational")
@SpringBootApplication(
scanBasePackages = {"com.njcn.relational"}
)
@MapperScan(value = "com.njcn.relational.mapper")
@EnableScheduling
public class RelationalTargetApplication {

View File

@@ -1,10 +1,11 @@
package com.njcn.relational.controller;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import com.njcn.relational.service.SyncTableConfigService;
import com.njcn.relational.service.SyncTableParseService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;

View File

@@ -0,0 +1,25 @@
package com.njcn.relational.controller;
import com.njcn.relational.service.DcloudBusService;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author: cdf
* @CreateTime: 2026-06-17
* @Description:
*/
@RestController
@RequiredArgsConstructor
@RequestMapping("/bus")
public class DcloudBusController {
private final DcloudBusService dcloudBusService;
@GetMapping("/sync")
public void syncData() {
dcloudBusService.syncDataToDb();
}
}

View File

@@ -0,0 +1,18 @@
package com.njcn.relational.service;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import dcloud.common.InnerServiceBus.ServiceBus;
import dcloud.model.common.ConModel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
public interface DcloudBusService {
boolean syncDataToDb();
}

View File

@@ -1,79 +0,0 @@
//package com.njcn.relational.service;
//
//import com.njcn.relational.utils.PowerDataParser;
//import lombok.AllArgsConstructor;
//import lombok.Data;
//import lombok.RequiredArgsConstructor;
//import org.apache.ibatis.session.ExecutorType;
//import org.apache.ibatis.session.SqlSession;
//import org.apache.ibatis.session.SqlSessionFactory;
//import org.springframework.stereotype.Service;
//import org.springframework.transaction.annotation.Transactional;
//
//import java.io.IOException;
//import java.util.List;
//import java.util.Map;
//import java.util.Set;
//import java.util.stream.Collectors;
//import java.util.stream.IntStream;
//
///**
// * @Author: cdf
// * @CreateTime: 2025-05-28
// * @Description:
// */
//@Service
//@RequiredArgsConstructor
//public class DynamicDataImportService {
// private final PowerDataParser powerDataParser;
// private final SqlSessionFactory sqlSessionFactory;
//
// @Transactional
// public void dynamicImportData(String filePath) throws IOException {
// Map<String, List<Map<String, Object>>> tableDataMap = powerDataParser.parseMultiTableFile(filePath);
//
// // 批量大小设置为1000可根据实际情况调整
// final int batchSize = 1000;
//
// try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH)) {
// for (Map.Entry<String, List<Map<String, Object>>> entry : tableDataMap.entrySet()) {
// String tableName = entry.getKey();
// List<Map<String, Object>> dataList = entry.getValue();
//
// if (dataList.isEmpty()) continue;
//
// // 获取列名
// Set<String> columns = dataList.get(0).keySet();
//
// // 分批处理
// for (int i = 0; i < dataList.size(); i += batchSize) {
// List<Map<String, Object>> batch = dataList.subList(i, Math.min(i + batchSize, dataList.size()));
//
// // 执行批量插入
// sqlSession.insert("dynamicBatchInsert", new DynamicInsertParam(
// tableName,
// columns,
// batch,
// batch.size()
// ));
//
// // 定期刷新并清空缓存
// sqlSession.flushStatements();
// sqlSession.clearCache();
// }
// System.out.println(tableName+"——表数据转移成功");
// }
// }
// }
//
//
//
// @Data
// @AllArgsConstructor
// public static class DynamicInsertParam {
// private String tableName;
// private Set<String> columns;
// private List<Map<String, Object>> batchData;
// private int batchSize;
// }
//}

View File

@@ -0,0 +1,15 @@
package com.njcn.relational.service;
import com.alibaba.fastjson.JSONArray;
import com.baomidou.mybatisplus.extension.service.IService;
import com.njcn.relational.pojo.po.SgConPlantC;
public interface SgConPlantCService extends IService<SgConPlantC> {
/**
* 批量同步DCloud接口JSON数据主键存在更新、不存在新增
* @param dataArr dcloud返回的数据集
* @return 同步结果
*/
boolean batchSyncFromDcloud(JSONArray dataArr);
}

View File

@@ -0,0 +1,254 @@
package com.njcn.relational.service.impl;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.njcn.relational.pojo.dto.DcloudSyncQuery;
import com.njcn.relational.service.DcloudBusService;
import com.njcn.relational.service.SgConPlantCService;
import dcloud.common.InnerServiceBus.ServiceBus;
import dcloud.model.common.ConModel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* @Author: cdf
* @CreateTime: 2026-06-17
* @Description: 云平台模型服务实现类
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class DcloudBusServiceImpl implements DcloudBusService {
private static final String MODEL_STR = "dcloud.model.common.ConModel";
private static final int DEFAULT_PAGE_SIZE = 1000;
private static final int MAX_RETRY = 3;
private final SgConPlantCService sgConPlantCService;
/**
* 统一同步入口
*/
public void syncData(DcloudSyncQuery query) {
query.emptyCover();
String tableName = query.getTableName();
log.info("【{}】开始同步数据,查询参数:{}", MODEL_STR, query);
ServiceBus serviceBus = initServiceBus();
if (serviceBus == null) {
log.error("ServiceBus初始化失败终止同步tableName={}", tableName);
return;
}
handleConModelSync(serviceBus, query);
}
/**
* 初始化服务总线
*/
private ServiceBus initServiceBus() {
ServiceBus bus = new ServiceBus();
try {
bus.init();
log.info("ServiceBus初始化成功");
return bus;
} catch (Exception e) {
log.error("ServiceBus初始化异常", e);
return null;
}
}
/**
* 获取数据总数
*/
private Integer getDataSize(ConModel model, String tableName, String filter) {
try {
JSONObject result = model.getDataSize(tableName, filter);
log.info("[{}] 获取数据总数filter: {}", tableName, filter);
if (result == null) {
log.error("[{}] 获取数据总数返回空", tableName);
return 0;
}
String code = StrUtil.nullToEmpty(result.getString("code"));
if (!"200".equals(code)) {
log.error("[{}] 获取数据总数失败,响应码:{}", tableName, code);
return 0;
}
Integer dataSize = result.getInteger("dataSize");
log.info("[{}] 数据总数: {}", tableName, dataSize);
return dataSize != null ? dataSize : 0;
} catch (Exception e) {
log.error("[{}] 获取数据总数异常", tableName, e);
return 0;
}
}
/**
* 分页查询数据
*/
private JSONArray getDataList(ConModel model, DcloudSyncQuery query, int pageIndex, int pageSize) {
try {
JSONObject result = model.getDataList(
query.getTableName(),
pageSize,
pageIndex,
query.getFilter(),
query.getOrderStr(),
query.getPropertyList()
);
if (result == null) {
log.error("[{}] 分页查询返回空pageIndex: {}", query.getTableName(), pageIndex);
return new JSONArray();
}
String code = StrUtil.nullToEmpty(result.getString("code"));
if (!"200".equals(code)) {
log.error("[{}] 分页查询失败,响应码:{}pageIndex: {}", query.getTableName(), code, pageIndex);
return new JSONArray();
}
JSONArray dataArr = result.getJSONArray("data");
return dataArr != null ? dataArr : new JSONArray();
} catch (Exception e) {
log.error("[{}] 分页查询异常pageIndex: {}", query.getTableName(), pageIndex, e);
return new JSONArray();
}
}
/**
* ConModel模型同步逻辑先查总数再分批查询入库
*/
private void handleConModelSync(ServiceBus bus, DcloudSyncQuery query) {
String tableName = query.getTableName();
int successCount = 0;
int failCount = 0;
try {
ConModel model = (ConModel) bus.locateService(MODEL_STR);
// 1. 先查询总数
Integer totalCount = getDataSize(model, tableName, query.getFilter());
if (totalCount == null || totalCount == 0) {
log.info("[{}] 无数据需要同步", tableName);
return;
}
log.info("[{}] 总共需要同步 {} 条数据", tableName, totalCount);
// 2. 计算分页参数
int pageSize = query.getPageSize() != null ? query.getPageSize() : DEFAULT_PAGE_SIZE;
int totalPages = (totalCount + pageSize - 1) / pageSize;
log.info("[{}] 分页参数: 每页{}条, 共{}页", tableName, pageSize, totalPages);
// 3. 分批查询并入库
for (int pageIndex = 1; pageIndex <= totalPages; pageIndex++) {
log.info("[{}] 开始查询第 {}/{} 页", tableName, pageIndex, totalPages);
// 带重试的分页查询
JSONArray pageData = null;
for (int retry = 0; retry < MAX_RETRY; retry++) {
pageData = getDataList(model, query, pageIndex, pageSize);
if (pageData != null && !pageData.isEmpty()) {
break;
}
if (retry < MAX_RETRY - 1) {
log.warn("[{}] 第{}页查询失败(第{}次重试),等待后重试", tableName, pageIndex, retry + 1);
try {
Thread.sleep(1000 * (retry + 1));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
if (pageData == null || pageData.isEmpty()) {
log.warn("[{}] 第{}页无数据,跳过", tableName, pageIndex);
failCount++;
continue;
}
log.info("[{}] 第{}页查询到 {} 条数据,开始入库", tableName, pageIndex, pageData.size());
// 分批入库(每页数据直接入库)
boolean batchResult = sgConPlantCService.batchSyncFromDcloud(pageData);
if (batchResult) {
successCount += pageData.size();
log.info("[{}] 第{}页入库成功,当前累计成功 {} 条", tableName, pageIndex, successCount);
} else {
failCount += pageData.size();
log.error("[{}] 第{}页入库失败", tableName, pageIndex);
}
}
log.info("[{}] 同步完成: 成功={}, 失败={}, 总计={}", tableName, successCount, failCount, totalCount);
} catch (Exception e) {
log.error("[{}] ConModel同步数据异常", tableName, e);
}
}
/**
* 数据入库入口
*/
@Override
public boolean syncDataToDb() {
log.info("开始从云平台同步厂站数据到本地数据库");
DcloudSyncQuery query = DcloudSyncQuery.buildPlantDefault(null, null, 1, DEFAULT_PAGE_SIZE);
ServiceBus serviceBus = initServiceBus();
if (serviceBus == null) {
log.error("ServiceBus初始化失败终止同步");
return false;
}
try {
handleConModelSync(serviceBus, query);
log.info("厂站数据同步完成");
return true;
} catch (Exception e) {
log.error("厂站数据同步失败", e);
return false;
}
}
// ===================== 便捷方法 =====================
/**
* 快速同步厂站表数据
*/
public void syncPlantConData(Integer pageIndex, Integer pageSize, String filter, String orderStr) {
DcloudSyncQuery query = DcloudSyncQuery.buildPlantDefault(filter, orderStr, pageIndex, pageSize);
syncData(query);
}
/**
* 快速同步厂站表数据(使用默认分页)
*/
public void syncPlantConData() {
syncPlantConData(1, DEFAULT_PAGE_SIZE, null, null);
}
/**
* 根据更新时间增量同步厂站数据
*/
public void syncPlantConDataByUpdateTime(String updateTime) {
String filter = "UPDATE_TIME >= '" + updateTime + "'";
DcloudSyncQuery query = DcloudSyncQuery.buildPlantDefault(filter, null, 1, DEFAULT_PAGE_SIZE);
syncData(query);
}
/**
* 根据ID同步指定厂站数据
*/
public void syncPlantConDataById(String dcloudId) {
String filter = "ID = '" + dcloudId + "'";
DcloudSyncQuery query = DcloudSyncQuery.buildPlantDefault(filter, null, 1, DEFAULT_PAGE_SIZE);
syncData(query);
}
}

View File

@@ -0,0 +1,71 @@
package com.njcn.relational.service.impl;
import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.relational.mapper.SgConPlantCMapper;
import com.njcn.relational.pojo.po.SgConPlantC;
import com.njcn.relational.service.SgConPlantCService;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
@Service
public class SgConPlantCServiceImpl extends ServiceImpl<SgConPlantCMapper, SgConPlantC> implements SgConPlantCService {
@Override
public boolean batchSyncFromDcloud(JSONArray dataArr) {
if (CollUtil.isEmpty(dataArr)) {
return true;
}
List<SgConPlantC> entityList = new ArrayList<>(dataArr.size());
for (Object obj : dataArr) {
JSONObject json = (JSONObject) obj;
SgConPlantC plant = new SgConPlantC();
plant.setD5000Id(json.getString("D5000_ID"));
plant.setD5000Name(json.getString("D5000_NAME"));
plant.setDcloudId(json.getString("DCLOUD_ID"));
plant.setDcloudName(json.getString("DCLOUD_NAME"));
plant.setDcloudVoltagelevel(json.getString("DCLOUD_VOLTAGELEVEL"));
plant.setOwner(json.getString("OWNER"));
plant.setStatus(json.getString("STATUS"));
plant.setUpdateTime(json.getString("UPDATE_TIME"));
entityList.add(plant);
}
// 批次500条复合主键冲突自动更新
return saveOrUpdateBatch(entityList, 500);
}
/**
* 根据双主键查询单条
*/
public SgConPlantC getByPk(String d5000Id, String dcloudId) {
LambdaQueryWrapper<SgConPlantC> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(SgConPlantC::getD5000Id, d5000Id)
.eq(SgConPlantC::getDcloudId, dcloudId);
return getOne(wrapper);
}
/**
* 根据双主键删除
*/
public boolean removeByPk(String d5000Id, String dcloudId) {
LambdaQueryWrapper<SgConPlantC> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(SgConPlantC::getD5000Id, d5000Id)
.eq(SgConPlantC::getDcloudId, dcloudId);
return remove(wrapper);
}
/**
* 根据双主键更新
*/
public boolean updateByPk(SgConPlantC entity) {
LambdaQueryWrapper<SgConPlantC> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(SgConPlantC::getD5000Id, entity.getD5000Id())
.eq(SgConPlantC::getDcloudId, entity.getDcloudId());
return update(entity, wrapper);
}
}

View File

@@ -3,8 +3,8 @@ spring:
datasource:
druid:
driver-class-name: dm.jdbc.driver.DmDriver
url: jdbc:dm://192.168.1.21:5236/PQSINFO_LN?useUnicode=true&characterEncoding=utf-8
username: PQSINFO_LN
url: jdbc:dm://127.0.0.1:5236/PQSADMIN?useUnicode=true&characterEncoding=utf-8
username: PQSADMIN
password: Pqsadmin123
#初始化建立物理连接的个数、最小、最大连接数
initial-size: 5

View File

@@ -34,3 +34,22 @@ sync:
export:
localPath: D:/data/export/
remotePath: /home/export/
# Dubbo 下面的配置用于调控云消息总线
dubbo:
application:
# ???????????
name: cloud.njcndnzl
registry:
# zookeeper??????????????
address: 28.34.131.11:2181,28.34.131.12:2181,28.34.131.13:2181
protocol:
# Dubbo??????
port: 20991
# ElasticSearch ??
es:
hosts: 28.34.133.3,28.34.133.4
cluster:
name: yundiz-log

View File

@@ -10,11 +10,11 @@ spring:
name: dmTransport
profiles:
#active: query_up
active: query_up
active: insert_up
security:
user:
name: data_njcn
password: dnzl@#002
name: njcn
password: dnzl@#001
#mybatis配置信息
mybatis-plus:
#别名扫描

View File

@@ -12,29 +12,36 @@
</encoder>
</appender>
<!-- ========================== 全部日志 ========================== -->
<appender name="FILE_ALL" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>/home/lnyw/logs/transport/transport${appName}.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>/home/lnyw/logs/transport/transport${appName}-%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>15</maxHistory>
<totalSizeCap>10GB</totalSizeCap>
</rollingPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{50} - %msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<!-- 默认路径(不带末尾斜杠) -->
<property name="LOG_PATH" value="/home/lnyw/logs/transport"/>
<!-- ========================== INFO 日志(修复版) ========================== -->
<!-- 根据 profile 设置不同路径 -->
<springProfile name="insert_up">
<property name="LOG_PATH" value="/home/dcloud/logs/transport"/>
</springProfile>
<springProfile name="query_up">
<property name="LOG_PATH" value="/home/lnyw/logs/transport"/>
</springProfile>
<springProfile name="insert">
<property name="LOG_PATH" value="/home/dcloud/logs/transport"/>
</springProfile>
<springProfile name="query">
<property name="LOG_PATH" value="/home/lnyw/logs/transport"/>
</springProfile>
<!-- ========================== INFO 日志 ========================== -->
<appender name="FILE_INFO" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>/home/lnyw/logs/transport/transport${appName}-info.log</file>
<file>${LOG_PATH}/info/transport-${appName}-info.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>/home/lnyw/logs/transport/transport${appName}-info-%d{yyyy-MM-dd}.log</fileNamePattern>
<fileNamePattern>${LOG_PATH}/info/transport-${appName}-info-%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>15</maxHistory>
<totalSizeCap>5GB</totalSizeCap>
</rollingPolicy>
<!-- 过滤 INFO -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>INFO</level>
<onMatch>ACCEPT</onMatch>
@@ -48,9 +55,9 @@
<!-- ========================== ERROR 日志 ========================== -->
<appender name="FILE_ERROR" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>/home/lnyw/logs/transport/transport${appName}-error.log</file>
<file>${LOG_PATH}/error/transport-${appName}-error.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>/home/lnyw/logs/transport/transport${appName}-error-%d{yyyy-MM-dd}.log</fileNamePattern>
<fileNamePattern>${LOG_PATH}/error/transport-${appName}-error-%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>30</maxHistory>
<totalSizeCap>5GB</totalSizeCap>
</rollingPolicy>
@@ -68,7 +75,6 @@
<!-- ========================== 根日志 ========================== -->
<root level="info">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="FILE_ALL"/>
<appender-ref ref="FILE_INFO"/>
<appender-ref ref="FILE_ERROR"/>
</root>
@@ -78,4 +84,4 @@
<logger name="org.springframework" level="info"/>
<logger name="org.mybatis" level="error"/>
</configuration>
</configuration>