海南暂降推送至Kafka

This commit is contained in:
hzj
2026-06-04 14:58:57 +08:00
parent 66d5364317
commit 271c178a0f
21 changed files with 1299 additions and 0 deletions

38
kafka-event-data/.gitignore vendored Normal file
View File

@@ -0,0 +1,38 @@
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store

153
kafka-event-data/pom.xml Normal file
View File

@@ -0,0 +1,153 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.njcn</groupId>
<artifactId>kafka-event-data</artifactId>
<parent>
<groupId>com.njcn.product</groupId>
<artifactId>CN_Product</artifactId>
<version>1.0.0</version>
</parent>
<dependencies>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>njcn-common</artifactId>
<version>0.0.1</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.njcn</groupId>-->
<!-- <artifactId>common-redis</artifactId>-->
<!-- <version>1.0.0</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>2.7.12</version>
</dependency>
<!-- 多数据源切换当数据源为oracle时需要使用 -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>spingboot2.3.12</artifactId>
<version>2.3.12</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>mybatis-plus</artifactId>
<version>0.0.1</version>
</dependency>
<!--oracle驱动-->
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc8</artifactId>
<version>21.6.0.0</version>
</dependency>
<dependency>
<groupId>com.oracle.database.nls</groupId>
<artifactId>orai18n</artifactId>
<version>21.1.0.0</version> <!-- 版本号需要与你的ojdbc版本匹配 -->
</dependency>
<!-- Spring Security -->
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-starter-security</artifactId>-->
<!-- </dependency>-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.14.RELEASE</version> <!-- 与Spring Boot 2.3.x兼容 -->
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.3-jre</version> <!-- 使用最新稳定版 -->
</dependency>
</dependencies>
<build>
<finalName>kafka-event-data</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*</include>
</includes>
</resource>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.xml</include>
</includes>
</resource>
</resources>
</build>
</project>

View File

@@ -0,0 +1,17 @@
package com.njcn.kafka.event;
import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@Slf4j
@SpringBootApplication(scanBasePackages = "com.njcn")
@MapperScan("com.njcn.**.mapper")
public class KafkaEventtApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaEventtApplication.class, args);
}
}

View File

@@ -0,0 +1,29 @@
package com.njcn.kafka.event.controller;
import com.njcn.kafka.event.pojo.param.PushParam;
import com.njcn.kafka.event.service.EventDataService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* Description:
* Date: 2025/05/27 下午 2:56【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Slf4j
@RestController
@RequestMapping("/eventdata")
@AllArgsConstructor
public class EventDataController {
private final EventDataService eventDataService;
@PostMapping("/push")
public void push(@RequestBody PushParam pushParam) throws Exception {
eventDataService.push(pushParam);
}
}

View File

@@ -0,0 +1,31 @@
package com.njcn.kafka.event.controller;
import com.njcn.kafka.event.service.KafkaProducerService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* Description:
* Date: 2025/05/27 下午 2:06【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Slf4j
@RestController
@RequestMapping("/test")
@AllArgsConstructor
public class KafkaTestController {
private final KafkaProducerService kafkaProducerService;
@GetMapping("/testkafka")
public void test(@RequestParam("context") String context) {
kafkaProducerService.sendMessageWithCallback("eventdata",context);
}
}

View File

@@ -0,0 +1,20 @@
package com.njcn.kafka.event.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.njcn.kafka.event.pojo.po.OracleRmpEventDetailPO;
import java.util.List;
/**
* data-migration
*
* @author cdf
* @date 2024/2/19
*/
public interface OracleRmpEventDetailPOMapper extends BaseMapper<OracleRmpEventDetailPO> {
}

View File

@@ -0,0 +1,27 @@
package com.njcn.kafka.event.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.njcn.kafka.event.pojo.dto.LedgerBaseInfoDTO;
import com.njcn.kafka.event.pojo.po.PqLine;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
*
* Description:
* Date: 2025/06/19 下午 1:43【需求编号】
*
* @author clam
* @version V1.0.0
*/
public interface PqLineMapper extends BaseMapper<PqLine> {
List<LedgerBaseInfoDTO> getBaseLedger(@Param("ids")List<Integer> ids, @Param("searchValue")String searchValue);
}

View File

@@ -0,0 +1,50 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.njcn.kafka.event.mapper.PqLineMapper">
<select id="getBaseLedger" resultType="com.njcn.kafka.event.pojo.dto.LedgerBaseInfoDTO">
select
PQ_GDINFORMATION.name gdName,
pq_line.GD_INDEX gdIndex,
pq_line.line_index lineId,
pq_line.name lineName,
PQ_SUBVOLTAGE.SUBV_INDEX busBarId,
PQ_SUBVOLTAGE.name busBarName,
PQ_SUBVOLTAGE.SCALE,
pq_device.dev_index devId,
pq_device.name devName,
pq_device.ip ip,
pq_line.pt1/pq_line.pt2 pt,
pq_line.ct1/pq_line.ct2 ct,
pq_device.Status runFlag,
PQ_SUBSTATION.sub_index stationId,
PQ_SUBSTATION.name stationName
from
pq_line inner JOIN PQ_SUBVOLTAGE on pq_line.SUBV_INDEX = PQ_SUBVOLTAGE.SUBV_INDEX
inner JOIN pq_device on PQ_SUBVOLTAGE.DEV_INDEX = pq_device.DEV_INDEX
inner JOIN PQ_SUBSTATION on pq_device.SUB_INDEX = PQ_SUBSTATION.SUB_INDEX
inner JOIN PQ_GDINFORMATION on pq_line.GD_INDEX =PQ_GDINFORMATION.GD_INDEX
<where>
and pq_line.line_index in
<foreach collection="ids" item="item" open="(" close=")" separator=",">
#{item}
</foreach>
<if test="searchValue!=null and searchValue!=''">
and (
PQ_GDINFORMATION.name LIKE '%' || #{searchValue} || '%'
or PQ_SUBSTATION.name LIKE '%' || #{searchValue} || '%'
or pq_line.name LIKE '%' || #{searchValue} || '%'
)
</if>
</where>
</select>
</mapper>

View File

@@ -0,0 +1,68 @@
package com.njcn.kafka.event.pojo.dto;
import lombok.Data;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.List;
/**
* Description:
* Date: 2026/06/04 上午 9:14【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Data
public class EventPushDTO {
/**
* type :
* time : 2025-10-09 13:25:14
* body : [{"eventdetail_index":"d4963f8f-201f-443f-b293-dda8ef870ee6","lineid":"1591","timeid":"2025-10-09 13:25:14","ms":"509","describe":"","wavetype":"1","persisttime":"23.000","eventvalue":"0.6","eventreason":"","eventtype":"","gdname":"","bdname":"110kV皇后店变","busname":"XX名称","phasicType":"A","pointname":"232待用","wavePath":"192.168.1.102/3_20240515_163022_349"}]
*/
private String type;
private LocalDateTime time;
private List<EventDTO> body;
@Data
public static class EventDTO {
/**
* eventdetail_index : d4963f8f-201f-443f-b293-dda8ef870ee6
* lineid : 1591
* timeid : 2025-10-09 13:25:14
* ms : 509
* describe :
* wavetype : 1
* persisttime : 23.000
* eventvalue : 0.6
* eventreason :
* eventtype :
* gdname :
* bdname : 110kV皇后店变
* busname : XX名称
* phasicType : A
* pointname : 232待用
* wavePath : 192.168.1.102/3_20240515_163022_349
*/
private String eventdetail_index;
private Integer lineid;
private LocalDateTime timeid;
private Integer ms;
private String describe;
private String wavetype;
private BigDecimal persisttime;
private BigDecimal eventvalue;
private String eventreason;
private String eventtype;
private String gdname;
private String bdname;
private String busname;
private String phasicType;
private String pointname;
private String wavePath;
}
}

View File

@@ -0,0 +1,43 @@
package com.njcn.kafka.event.pojo.dto;
import lombok.Data;
/**
* @Author: cdf
* @CreateTime: 2025-06-25
* @Description:
*/
@Data
public class LedgerBaseInfoDTO {
private String gdName;
private String gdIndex;
private Integer lineId;
private String lineName;
private Integer busBarId;
private String busBarName;
private String scale;
private Integer devId;
private String devName;
private String objName;
private Integer stationId;
private String stationName;
//通讯状态
private Integer runFlag=0;
private Integer eventCount;
private String ip;
private int pt;
private int ct;
}

View File

@@ -0,0 +1,34 @@
package com.njcn.kafka.event.pojo.param;
import lombok.Data;
import java.time.LocalDateTime;
import java.util.List;
/**
* Description:
* Date: 2025/05/28 上午 8:47【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Data
public class PushParam {
//补招起始日期_yyyy-MM-dd(按小时跑的任务可加时分秒
private LocalDateTime beginTime;
//补招截止日期_yyyy-MM-dd(按小时跑的任务可加时分秒)
private LocalDateTime endTime;
//时间日期_yyyy-MM-dd(按小时跑的任务可加时分秒)
private String dataDate;
/**
* 待计算的对象索引集合,监测点、设备、母线、变电站、单位等等
*/
private List<String> idList;
}

View File

@@ -0,0 +1,174 @@
package com.njcn.kafka.event.pojo.po;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.Date;
/**
* data-migration
*
* @author cdf
* @date 2024/2/19
*/
@Data
@EqualsAndHashCode(callSuper = false)
@TableName("PQS_EVENTDETAIL")
public class OracleRmpEventDetailPO implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 暂时事件ID
*/
@TableId(value = "EVENTDETAIL_INDEX",type = IdType.ASSIGN_ID)
private String eventdetailIndex;
/**
* 监测点ID
*/
@TableField(value = "LINEID")
private Integer lineid;
/**
* 统计指标类型
*/
@TableField(value = "MS")
private Integer ms;
/**
* 统计指标类型
*/
@TableField(value = "WAVETYPE")
private String wavetype;
/**
* 暂降原因字典表PQS_Dicdata
*/
@TableField(value = "EVENTREASON")
private String eventreason;
/**
* 暂降类型字典表PQS_Dicdata
*/
@TableField(value = "EVENTTYPE")
private String eventtype;
/**
* 事件关联分析表Guid
*/
@TableField(value = "EVENTASS_INDEX")
private String eventassIndex;
@TableField(value = "DQTIME")
private Double dqTime;
/**
* 特征值计算更新时间外键PQS_Relevance的Time字段
*/
@TableField(value = "DEALTIME")
private Date dealTime;
/**
* 默认事件个数为0
*/
@TableField(value = "NUM")
private Integer num;
/**
* 波形文件是否从装置招到本地(0未招1已招)默认值为0
*/
@TableField(value = "FILEFLAG")
private Integer fileFlag;
/**
* 特征值计算标志0未处理1已处理; 2已处理无结果;3计算失败默认值为0
*/
@TableField(value = "DEALFLAG")
private Integer dealFlag;
/**
* 处理结果第一条事件发生时间(读comtra文件获取)
*/
@TableField(value = "FIRSTTIME")
private LocalDateTime firstTime;
/**
* 处理结果第一条事件暂降类型字典表PQS_Dicdata
*/
@TableField(value = "FIRSTTYPE")
private String firstType;
/**
* 处理结果第一条事件发生时间毫秒(读comtra文件获取)
*/
@TableField(value = "FIRSTMS")
private BigDecimal firstMs;
/**
* 暂降能量
*/
@TableField(value = "ENERGY")
private Double energy;
/**
* 暂降严重度
*/
@TableField(value = "SEVERITY")
private Double severity;
/**
* 暂降源与监测位置关系 Upper:上游Lower :下游Unknown :未知;为空则是未计算
*/
@TableField(value = "SAGSOURCE")
private String sagsource;
/**
* 开始时间
*/
@TableField(value = "TIMEID")
private LocalDateTime timeid;
/**
* 持续时间,单位秒
*/
@TableField(value = "PERSISTTIME")
private BigDecimal persisttime;
/**
* 特征幅值
*/
@TableField(value = "EVENTVALUE")
private BigDecimal eventvalue;
/**
* 相别
*/
@TableField(value = "PHASIC_TYPE")
private String phasicType;
/**
* 事件描述
*/
@TableField(value = "DESCRIBE")
private String describe;
/**
* 波形路径
*/
@TableField(value = "WAVENAME")
private String wavePath;
@TableField(value = "TRANSIENTVALUE")
private Double transientValue;
@TableField(value = "CREATE_TIME")
private LocalDateTime createTime;
}

View File

@@ -0,0 +1,133 @@
package com.njcn.kafka.event.pojo.po;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
*
* Description:
* Date: 2025/06/19 下午 1:43【需求编号】
*
* @author clam
* @version V1.0.0
*/
/**
* 靠靠?
*/
@Data
@NoArgsConstructor
@TableName(value = "PQ_LINE")
public class PqLine {
/**
* 靠靠
*/
@TableId(value = "LINE_INDEX", type = IdType.INPUT)
private Integer lineIndex;
/**
* 靠靠靠
*/
@TableField(value = "GD_INDEX")
private Integer gdIndex;
/**
* 靠靠?
*/
@TableField(value = "SUB_INDEX")
private Integer subIndex;
/**
* 靠靠
*/
@TableField(value = "SUBV_INDEX")
private Integer subvIndex;
/**
* 靠靠
*/
@TableField(value = "DEV_INDEX")
private Integer devIndex;
/**
* 靠靠
*/
@TableField(value = "\"NAME\"")
private String name;
/**
* PT靠靠
*/
@TableField(value = "PT1")
private Double pt1;
/**
* PT靠靠
*/
@TableField(value = "PT2")
private Double pt2;
/**
* CT靠靠
*/
@TableField(value = "CT1")
private Double ct1;
/**
* CT靠靠
*/
@TableField(value = "CT2")
private Double ct2;
/**
* 靠靠
*/
@TableField(value = "DEVCMP")
private Double devcmp;
/**
* 靠靠
*/
@TableField(value = "DLCMP")
private Double dlcmp;
/**
* 靠靠
*/
@TableField(value = "JZCMP")
private Double jzcmp;
/**
* 靠靠
*/
@TableField(value = "XYCMP")
private Double xycmp;
/**
* 靠?靠靠靠靠靠靠?
*/
@TableField(value = "SUBV_NO")
private Integer subvNo;
/**
* (靠PQS_Dictionary?靠靠Guid
*/
@TableField(value = "\"SCALE\"")
private String scale;
/**
* 靠靠
*/
@TableField(value = "SUBV_NAME")
private String subvName;
@TableField(exist = false)
private String subName;
@TableField(exist = false)
private String deptName;
}

View File

@@ -0,0 +1,15 @@
package com.njcn.kafka.event.service;
import com.njcn.kafka.event.pojo.param.PushParam;
/**
* Description:
* Date: 2025/05/28 上午 8:38【需求编号】
*
* @author clam
* @version V1.0.0
*/
public interface EventDataService {
void push(PushParam pushParam) throws Exception;
}

View File

@@ -0,0 +1,15 @@
package com.njcn.kafka.event.service;
/**
* Description:
* Date: 2025/05/27 下午 1:57【需求编号】
*
* @author clam
* @version V1.0.0
*/
public interface KafkaProducerService {
void sendMessage(String topic, String message);
// 带回调的发送
void sendMessageWithCallback(String topic, String message);
}

View File

@@ -0,0 +1,74 @@
package com.njcn.kafka.event.service.impl;
import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.njcn.kafka.event.mapper.OracleRmpEventDetailPOMapper;
import com.njcn.kafka.event.mapper.PqLineMapper;
import com.njcn.kafka.event.pojo.dto.EventPushDTO;
import com.njcn.kafka.event.pojo.dto.LedgerBaseInfoDTO;
import com.njcn.kafka.event.pojo.param.PushParam;
import com.njcn.kafka.event.pojo.po.OracleRmpEventDetailPO;
import com.njcn.kafka.event.service.EventDataService;
import com.njcn.kafka.event.service.KafkaProducerService;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Description:
* Date: 2025/05/28 上午 8:38【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Service
@RequiredArgsConstructor
public class EventDataServiceImpl implements EventDataService {
private final KafkaProducerService kafkaProducerService;
private final OracleRmpEventDetailPOMapper oracleRmpEventDetailPOMapper;
private final PqLineMapper pqLineMapper;
@Override
public void push(PushParam pushParam) throws Exception {
LocalDateTime beginTime = pushParam.getBeginTime();
LocalDateTime endTime = pushParam.getEndTime();
List<OracleRmpEventDetailPO> oracleRmpEventDetailPOList = oracleRmpEventDetailPOMapper.selectList(
new LambdaQueryWrapper<OracleRmpEventDetailPO>()
.between(OracleRmpEventDetailPO::getCreateTime,beginTime,endTime));
if(!CollectionUtils.isEmpty(oracleRmpEventDetailPOList)){
List<Integer> lineIds = oracleRmpEventDetailPOList.stream().map(OracleRmpEventDetailPO::getLineid).collect(Collectors.toList());
List<LedgerBaseInfoDTO> baseLedger = pqLineMapper.getBaseLedger(lineIds, null);
Map<Integer, LedgerBaseInfoDTO> ledgerBaseInfoDTOMap = baseLedger.stream().collect(Collectors.toMap(LedgerBaseInfoDTO::getLineId, Function.identity()));
EventPushDTO eventPushDTO = new EventPushDTO();
eventPushDTO.setTime(LocalDateTime.now());
List<EventPushDTO.EventDTO> collect = oracleRmpEventDetailPOList.stream().map(temp -> {
EventPushDTO.EventDTO eventDTO = new EventPushDTO.EventDTO();
BeanUtils.copyProperties(temp,eventDTO);
eventDTO.setEventdetail_index(temp.getEventdetailIndex());
eventDTO.setBusname(ledgerBaseInfoDTOMap.get(temp.getLineid()).getBusBarName());
eventDTO.setBdname(ledgerBaseInfoDTOMap.get(temp.getLineid()).getStationName());
eventDTO.setPointname(ledgerBaseInfoDTOMap.get(temp.getLineid()).getLineName());
eventDTO.setGdname(ledgerBaseInfoDTOMap.get(temp.getLineid()).getGdName());
return eventDTO;
}).collect(Collectors.toList());
eventPushDTO.setBody(collect);
String jsonStr = JSONUtil.toJsonStr(collect);
kafkaProducerService.sendMessage("eventdata",jsonStr);
}
}
}

View File

@@ -0,0 +1,50 @@
package com.njcn.kafka.event.service.impl;
import com.njcn.kafka.event.service.KafkaProducerService;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* Description:
* Date: 2025/05/27 下午 2:00【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Service
@RequiredArgsConstructor
public class KafkaProducerServiceImpl implements KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
@Override
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
@Override
// 带回调的发送
public void sendMessageWithCallback(String topic, String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("Sent message=[" + message +
"] with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
System.out.println("Unable to send message=[" +
message + "] due to : " + ex.getMessage());
}
});
}
}

View File

@@ -0,0 +1,88 @@
spring:
application:
name: kafka-event-data
kafka:
# Kafka 服务器地址
bootstrap-servers: 192.168.2.130:9092
# 生产者配置
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 可选配置
acks: all
retries: 3
batch-size: 16384
buffer-memory: 33554432
# 消费者配置
consumer:
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 可选配置
enable-auto-commit: false
max-poll-records: 500
datasource:
dynamic:
primary: master
strict: false # 是否严格匹配数据源默认false
druid: # 如果使用Druid连接池
validation-query: SELECT 1 FROM DUAL # 达梦专用校验SQL
initial-size: 10
# 初始化大小,最小,最大
min-idle: 20
maxActive: 500
# 配置获取连接等待超时的时间
maxWait: 60000
# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
timeBetweenEvictionRunsMillis: 60000
# 配置一个连接在池中最小生存的时间,单位是毫秒
minEvictableIdleTimeMillis: 300000
testWhileIdle: true
testOnBorrow: true
testOnReturn: false
# 打开PSCache并且指定每个连接上PSCache的大小
poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
datasource:
master:
url: jdbc:oracle:thin:@192.168.1.51:1521:pqsbase
username: pqsadmin_hn
password: pqsadmin
driver-class-name: oracle.jdbc.OracleDriver
# salve:
# driver-class-name: dm.jdbc.driver.DmDriver
# url: jdbc:dm://192.168.1.21:5236/PQSADMIN?useUnicode=true&characterEncoding=utf-8
# username: PQSADMINLN
# password: Pqsadmin123
redis:
database: 10
host: localhost
port: 6379
timeout: 5000
jedis:
pool:
max-active: 20
max-wait: 8000
max-idle: 8
min-idle: 2
test-on-borrow: true # 借出连接时验证
test-on-return: true # 归还连接时验证
test-while-idle: true # 空闲时验证
smsServer:
info: http://22.33.194.50:18096
netInfo: http://22.33.191.206:18096
account: xbjbpt
password: WLv8w071
aliyun:
oss:
endpoint: oss-cn-beijing.aliyuncs.com
accessKeyId: LTAI5tQYuyu1PpiCdeM74PT6
accessKeySecret: vTGHcQOCF9u7w9FL3HAHJO1oufVWru
bucketName: cn-comtrade

View File

@@ -0,0 +1,83 @@
#当前服务的基本信息
microservice:
ename: 12345
name: 12345
server:
port: 18094
spring:
application:
name: kafka-event-data
profiles:
active: dev
#mybatis配置信息
mybatis-plus:
mapper-locations: classpath*:com/njcn/**/mapping/*.xml
#别名扫描
type-aliases-package: com.njcn.product.event.**.pojo
configuration:
#驼峰命名
map-underscore-to-camel-case: true
#配置sql日志输出
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
#关闭日志输出
# log-impl: org.apache.ibatis.logging.nologging.NoLoggingImpl
global-config:
db-config:
#指定主键生成策略
id-type: assign_uuid
SYS_TYPE_ZT: 1cfcd6e2-c5fe-4b15-988a-32b90f1170c1
SYS_TYPE_WT: 983f9dfe-4f9a-4c96-89d8-7d425a1f1d6c
db:
type: oracle
#文件位置配置
business:
#处理波形数据位置
wavePath: D://Comtrade
targetPath: /pqmonitor
exportBaseDir: D://exportComtrade
eventCronExpression: 0 0/2 * * * ?
failCronExpression: 0 5/10 * * * ?
userCronExpression: 0 5 1 * * ?
sendMessageCronExpression : 0 */3 * * * ?
syncinterval: 2
failsyncinterval: 1440
#实时短信功能
RealTimeSMSSwitch: false
#wavePath: /usr/local/comtrade
#处理临时数据
tempPath: D://file
#tempPath: /usr/local/file
#文件存储的方式 3.本地存储
file:
storage: 3
#oss服务器配置
min:
io:
endpoint: http://192.168.1.13:9009
accessKey: minio
secretKey: minio@123
bucket: excelreport
#华为obs服务器配置
huawei:
access-key: J9GS9EA79PZ60OK23LWP
security-key: BirGrAFDSLxU8ow5fffyXgZRAmMRb1R1AdqCI60d
obs:
bucket: test-8601
endpoint: https://obs.cn-east-3.myhuaweicloud.com
# 单位为秒
expire: 3600
#线程池配置信息
threadPool:
corePoolSize: 10
maxPoolSize: 20
queueCapacity: 500
keepAliveSeconds: 60

View File

@@ -0,0 +1,145 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="20 seconds" debug="false">
<include resource="org/springframework/boot/logging/logback/defaults.xml" />
<springProperty scope="context" name="log.projectName" source="spring.application.name" defaultValue="event_msg"/>
<springProperty scope="context" name="logCommonLevel" source="log.commonLevel" defaultValue="info"/>
<springProperty scope="context" name="logHomeDir" source="log.homeDir" defaultValue="D:\logs"/>
<conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter" />
<conversionRule conversionWord="wex"
converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" />
<conversionRule conversionWord="ec"
converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" />
<!--日志输出格式-->
<property name="log.pattern" value="|-%d{yyyy-MM-dd HH:mm:ss.SSS} ${LOG_LEVEL_PATTERN:-%level} ${log.projectName} -- %t %logger{100}.%M ==> %m%n${Log_EXCEPTION_CONVERSION_WORD:-%ec}}"/>
<property name="log.maxHistory" value="30"/>
<!-- 控制台输出(可选) -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<charset>UTF-8</charset> <!-- 控制台也建议指定 -->
<pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!--客户端输出日志-->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<!--系统中常规的debug日志-->
<!-- 滚动记录文件,先将日志记录到指定文件,当符合某个条件时,将日志记录到其他文件 RollingFileAppender -->
<appender name="DEBUG" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>
${logHomeDir}/${log.projectName}/debug/debug.log
</file>
<!-- 如果日志级别等于配置级别过滤器会根据onMath 和 onMismatch接收或拒绝日志。 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<!-- 设置过滤级别 -->
<level>DEBUG</level>
<!-- 用于配置符合过滤条件的操作 -->
<onMatch>ACCEPT</onMatch>
<!-- 用于配置不符合过滤条件的操作 -->
<onMismatch>DENY</onMismatch>
</filter>
<!-- 最常用的滚动策略,它根据时间来制定滚动策略.既负责滚动也负责触发滚动 SizeAndTimeBasedRollingPolicy-->
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<!--日志输出位置 可相对、和绝对路径 -->
<fileNamePattern>
${logHomeDir}/${log.projectName}/debug/debug.log.%d{yyyy-MM-dd}.%i.log
</fileNamePattern>
<maxFileSize>10MB</maxFileSize>
<!-- 可选节点,控制保留的归档文件的最大数量,超出数量就删除旧文件,假设设置每个月滚动,且<maxHistory>是6
则只保存最近6个月的文件删除之前的旧文件。注意删除旧文件是那些为了归档而创建的目录也会被删除 -->
<maxHistory>${log.maxHistory:-30}</maxHistory>
<!--重启清理日志文件-->
<!-- <cleanHistoryOnStart>true</cleanHistoryOnStart>-->
<!--每个文件最多100MB保留N天的历史记录但最多20GB-->
<!--<totalSizeCap>20GB</totalSizeCap>-->
<!--日志文件最大的大小-->
<!--<MaxFileSize>${log.maxSize}</MaxFileSize>-->
</rollingPolicy>
<encoder>
<pattern>
${log.pattern}
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<!--系统中常规的info日志-->
<appender name="INFO" class="ch.qos.logback.core.rolling.RollingFileAppender">
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>INFO</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
<file>
${logHomeDir}/${log.projectName}/info/info.log
</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>
${logHomeDir}/${log.projectName}/info/info.log.%d{yyyy-MM-dd}.%i.log
</fileNamePattern>
<maxFileSize>10MB</maxFileSize>
<maxHistory>${log.maxHistory:-30}</maxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>
${log.pattern}
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<!--系统中常规的error日志-->
<appender name="ERROR" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>
${logHomeDir}/${log.projectName}/error/error.log
</file>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>ERROR</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>
${logHomeDir}/${log.projectName}/error/error.log.%d{yyyy-MM-dd}.%i.log
</fileNamePattern>
<maxFileSize>10MB</maxFileSize>
<maxHistory>${log.maxHistory:-30}</maxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>
${log.pattern}
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<logger name="org.apache.catalina.startup.DigesterFactory" level="ERROR"/>
<logger name="org.apache.catalina.util.LifecycleBase" level="ERROR"/>
<logger name="org.apache.coyote.http11.Http11NioProtocol" level="WARN"/>
<logger name="com.njcn" level="INFO" additivity="false">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="DEBUG"/>
<appender-ref ref="INFO"/>
<appender-ref ref="ERROR"/>
</logger>
<root level="${logCommonLevel}">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="DEBUG"/>
<appender-ref ref="INFO"/>
<appender-ref ref="ERROR"/>
</root>
</configuration>

View File

@@ -0,0 +1,12 @@
<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="UTF-8" />
<title></title>
</head>
<body>
<p>
<span th:text="${hello}">Hello!</span>
</p>
</body>
</html>