diff --git a/pqs-common/common-mq/src/main/java/com/njcn/mq/constant/BusinessTopic.java b/pqs-common/common-mq/src/main/java/com/njcn/mq/constant/BusinessTopic.java index 2c7ca0fee..896cc731e 100644 --- a/pqs-common/common-mq/src/main/java/com/njcn/mq/constant/BusinessTopic.java +++ b/pqs-common/common-mq/src/main/java/com/njcn/mq/constant/BusinessTopic.java @@ -77,6 +77,12 @@ public interface BusinessTopic { */ String REPLY_RECALL_TOPIC = "reply_recall_Topic"; + /** + * 治理心跳过期处理主题 + */ + String HEARTBEAT_TIMEOUT_TOPIC = "heartbeat_timeout_topic"; + + /********************************数据中心*********************************/ String RMP_EVENT_DETAIL_TOPIC = "rmpEventDetailTopic"; @@ -147,4 +153,17 @@ public interface BusinessTopic { String STREAM_TAG = "streamInfo"; } + interface HeartTag { + + /** + * apf 心跳 + */ + String APF_TAG = "apf"; + + /** + * cld 心跳 + */ + String CLD_TAG = "cld"; + } + } diff --git a/pqs-common/common-mq/src/main/java/com/njcn/mq/message/HeartbeatTimeoutMessage.java b/pqs-common/common-mq/src/main/java/com/njcn/mq/message/HeartbeatTimeoutMessage.java new file mode 100644 index 000000000..96ecc712e --- /dev/null +++ b/pqs-common/common-mq/src/main/java/com/njcn/mq/message/HeartbeatTimeoutMessage.java @@ -0,0 +1,27 @@ +package com.njcn.mq.message; + +import com.njcn.middle.rocket.domain.BaseMessage; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * @author xy + */ +@Data +@EqualsAndHashCode(callSuper = true) +@AllArgsConstructor +@NoArgsConstructor +public class HeartbeatTimeoutMessage extends BaseMessage implements Serializable { + + private static final long serialVersionUID = 1L; + + private String nDid; + + private Long timestamp; + + private Integer delayLevel; +} \ No newline at end of file diff --git a/pqs-common/common-mq/src/main/java/com/njcn/mq/template/HeartbeatTimeoutMessageTemplate.java b/pqs-common/common-mq/src/main/java/com/njcn/mq/template/HeartbeatTimeoutMessageTemplate.java new file mode 100644 index 000000000..49d325f0e --- /dev/null +++ b/pqs-common/common-mq/src/main/java/com/njcn/mq/template/HeartbeatTimeoutMessageTemplate.java @@ -0,0 +1,29 @@ +package com.njcn.mq.template; + +import com.njcn.middle.rocket.template.RocketMQEnhanceTemplate; +import com.njcn.mq.constant.BusinessResource; +import com.njcn.mq.constant.BusinessTopic; +import com.njcn.mq.message.HeartbeatTimeoutMessage; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.stereotype.Component; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/8/11 15:28 + */ +@Component +public class HeartbeatTimeoutMessageTemplate extends RocketMQEnhanceTemplate { + + public HeartbeatTimeoutMessageTemplate(RocketMQTemplate template) { + super(template); + } + + public SendResult sendMember(HeartbeatTimeoutMessage heartbeatTimeoutMessage) { + heartbeatTimeoutMessage.setSource(BusinessResource.WEB_RESOURCE); + return send(BusinessTopic.HEARTBEAT_TIMEOUT_TOPIC, BusinessTopic.HeartTag.APF_TAG, heartbeatTimeoutMessage, heartbeatTimeoutMessage.getDelayLevel()); + } +}