获取下行状态

This commit is contained in:
caozehui
2026-03-06 08:42:19 +08:00
parent b048ecfc53
commit 8806ba7afa
15 changed files with 378 additions and 146 deletions

View File

@@ -2,6 +2,8 @@ package com.njcn.msgpush.module.push.client.sender.impl;
import cn.hutool.core.util.ObjectUtil;
import com.aliyun.dysmsapi20170525.Client;
import com.aliyun.dysmsapi20170525.models.QuerySendDetailsRequest;
import com.aliyun.dysmsapi20170525.models.QuerySendDetailsResponse;
import com.aliyun.dysmsapi20170525.models.SendSmsRequest;
import com.aliyun.dysmsapi20170525.models.SendSmsResponse;
import com.aliyun.teaopenapi.models.Config;
@@ -10,14 +12,17 @@ import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.client.sender.SmsSender;
import com.njcn.msgpush.module.push.client.setting.impl.AliYunMailSetting;
import com.njcn.msgpush.module.push.constant.MessageStatusConstant;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ProviderErrorCodeMappingDO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.enums.RetryStrategyEnum;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static com.aliyun.teautil.Common.toJSONString;
@@ -35,6 +40,13 @@ public class AliyunSmsSender implements SmsSender {
private Client smsClient;
/**
* 存放发送完成的消息。key为其返回的mid
*/
// private Map<String, MessageRecordDO> completeSendMessageMap = new HashMap<>();
private ScheduledExecutorService scheduledExecutorService;
public AliyunSmsSender(AliYunMailSetting aliYunSmsSetting, Sender sender) {
this.sender = sender;
if (ObjectUtil.isNotNull(aliYunSmsSetting)) {
@@ -43,7 +55,6 @@ public class AliyunSmsSender implements SmsSender {
.setAccessKeySecret(aliYunSmsSetting.getAccessKeySecret())
.setRegionId(aliYunSmsSetting.getRegionId())
.setEndpoint(aliYunSmsSetting.getEndpoint());
try {
this.smsClient = new Client(config);
} catch (Exception e) {
@@ -74,11 +85,12 @@ public class AliyunSmsSender implements SmsSender {
message.setCostTime((int) (end.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - start));
System.out.println(toJSONString(response));
if (OK.equals(response.body.code)) {
this.getDownInfo(response.body.bizId, message);
return true;
} else {
message.setErrorCode(response.body.code);
message.setErrorMsg(response.body.message);
this.sender.messageRetryQueueService.addRetryMessage(message, RetryStrategyEnum.EXPONENTIAL_BACKOFF);
this.sender.messageRetryQueueService.addRetryMessage(message);
this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel());
return false;
}
@@ -105,4 +117,42 @@ public class AliyunSmsSender implements SmsSender {
}
return res;
}
/**
* 获取下行信息
*
* @param bizId
* @param message
*/
private void getDownInfo(String bizId, MessageRecordDO message) {
if (ObjectUtil.isNull(this.scheduledExecutorService)) {
this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
}
this.scheduledExecutorService.schedule(() -> {
QuerySendDetailsRequest request = new QuerySendDetailsRequest()
.setPhoneNumber(message.getReceiver())
.setBizId(bizId)
.setSendDate(message.getSendTime().format(DateTimeFormatter.ofPattern("yyyyMMdd")))
.setCurrentPage(1L)
.setPageSize(10L);
try {
QuerySendDetailsResponse response = this.smsClient.querySendDetails(request);
System.out.println(toJSONString(response));
// if (response.statusCode != HttpStatus.OK.value()) {
response.body.smsSendDetailDTOs.smsSendDetailDTO.forEach(detail -> {
if (!"DELIVERED".equals(detail.errCode)) {
ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), detail.errCode);
message.setErrorCode(detail.errCode);
message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
this.sender.messageRetryQueueService.addRetryMessage(message);
}
});
// }
} catch (Exception e) {
throw new RuntimeException(e);
}
this.scheduledExecutorService.shutdown();
this.scheduledExecutorService = null;
}, 10, TimeUnit.SECONDS);
}
}

View File

@@ -8,7 +8,6 @@ import com.njcn.msgpush.module.push.client.setting.impl.TelecomSmsSetting;
import com.njcn.msgpush.module.push.constant.MessageStatusConstant;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ProviderErrorCodeMappingDO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.enums.RetryStrategyEnum;
import lombok.Data;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
@@ -39,11 +38,6 @@ public class TelecomSmsSender implements SmsSender {
private TelecomSmsSetting telecomSmsSetting;
private Sender sender;
/**
* 存放发送完成的消息。key为其返回的mid
*/
private Map<String, MessageRecordDO> completeSendMessageMap = new HashMap<>();
private ScheduledExecutorService scheduledExecutorService;
@Data
@@ -118,22 +112,14 @@ public class TelecomSmsSender implements SmsSender {
if (response1.getStatusCode() == HttpStatus.OK) {
String mid = telecomSmsSendResponse.list.get(0).mid;
completeSendMessageMap.put(mid, message);
// 定时任务,指定时间间隔后获取下行信息
if (ObjectUtil.isNull(this.scheduledExecutorService)) {
this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
}
this.scheduledExecutorService.schedule(() -> {
this.getDownInfo(mid);
this.scheduledExecutorService.shutdown();
this.scheduledExecutorService = null;
}, 10, TimeUnit.SECONDS);
this.getDownInfo(mid, message);
return true;
} else {
ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), telecomSmsSendResponse.list.get(0).result + "");
message.setErrorCode(telecomSmsSendResponse.list.get(0).result + "");
message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
this.sender.messageRetryQueueService.addRetryMessage(message, RetryStrategyEnum.EXPONENTIAL_BACKOFF);
this.sender.messageRetryQueueService.addRetryMessage(message);
return false;
}
});
@@ -193,45 +179,48 @@ public class TelecomSmsSender implements SmsSender {
* 获取下行信息
*
* @param mid
* @param message
*/
private void getDownInfo(String mid) {
System.out.println("getDownInfo" + LocalDateTime.now());
// 构建请求参数
Map<String, Object> request = new HashMap<>();
request.put("action", "select");
request.put("account", telecomSmsSetting.getAccount());
request.put("password", telecomSmsSetting.getPassword());
request.put("date", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd")));
request.put("condition", "APMID");
request.put("valueList", mid);
private void getDownInfo(String mid, MessageRecordDO message) {
if (ObjectUtil.isNull(this.scheduledExecutorService)) {
this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
}
this.scheduledExecutorService.schedule(() -> {
// 构建请求参数
Map<String, Object> request = new HashMap<>();
request.put("action", "select");
request.put("account", telecomSmsSetting.getAccount());
request.put("password", telecomSmsSetting.getPassword());
request.put("date", message.getSendTime().format(DateTimeFormatter.ofPattern("yyyyMMdd")));
request.put("condition", "APMID");
request.put("valueList", mid);
// request.put("condition", "MOBILE");
// request.put("valueList", message.getReceiver());
request.remove("mobile");
request.remove("content");
// 设置请求头
HttpHeaders headers = new HttpHeaders();
headers.set("Content-Type", CONTENT_TYPE);
ResponseEntity<String> response = this.sender.restTemplateUtil.post(
telecomSmsSetting.getApiUrl(),
request,
headers,
String.class
);
System.out.println(JSON.toJSONString(JSON.toJSONString(response)));
request.remove("mobile");
request.remove("content");
// 设置请求头
HttpHeaders headers = new HttpHeaders();
headers.set("Content-Type", CONTENT_TYPE);
ResponseEntity<String> response = this.sender.restTemplateUtil.post(
telecomSmsSetting.getApiUrl(),
request,
headers,
String.class
);
System.out.println(JSON.toJSONString(JSON.toJSONString(response)));
if (response.getStatusCode() == HttpStatus.OK) {
// if (response.getStatusCode() == HttpStatus.OK) {
TelecomSmsSelectResponse telecomSmsSelectResponse = JSON.parseObject(response.getBody(), TelecomSmsSelectResponse.class);
TelecomSmsSelectDetailRes telecomSmsSelectDetailRes = telecomSmsSelectResponse.list.get(0);
if (telecomSmsSelectDetailRes.getStatus() == 4) {
return;
} else if (telecomSmsSelectDetailRes.getStatus() == 5) {
MessageRecordDO message = completeSendMessageMap.get(mid);
if (telecomSmsSelectDetailRes.getStatus() == 5) {
ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), telecomSmsSelectDetailRes.getStat());
message.setErrorCode(telecomSmsSelectDetailRes.getStat());
message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
this.sender.messageRetryQueueService.addRetryMessage(message, RetryStrategyEnum.EXPONENTIAL_BACKOFF);
completeSendMessageMap.remove(mid);
this.sender.messageRetryQueueService.addRetryMessage(message);
}
}
// }
this.scheduledExecutorService.shutdown();
this.scheduledExecutorService = null;
}, 10, TimeUnit.SECONDS);
}
}

View File

@@ -49,7 +49,6 @@ public class ChannelProviderConfigController {
@PostMapping("/page")
@Operation(summary = "分页查询渠道服务商列表")
@PreAuthorize("@ss.hasPermission('push:channel:page')")
@Parameter(name = "reqVO", description = "分页查询参数", required = true)
public CommonResult<Page<ChannelProviderConfigDO>> pageChannelProviderConfig(@Validated @RequestBody ChannelProviderConfigReqVO reqVO) {
Page<ChannelProviderConfigDO> res = channelProviderConfigService.getPage(reqVO);
return success(res);
@@ -58,7 +57,6 @@ public class ChannelProviderConfigController {
@PostMapping("/add")
@Operation(summary = "新增渠道服务商")
@PreAuthorize("@ss.hasPermission('push:channel:add')")
@Parameter(name = "reqVO", description = "新增参数", required = true)
public CommonResult<Boolean> addChannelProvider(@Validated @RequestBody ChannelProviderConfigReqVO reqVO) {
boolean res = channelProviderConfigService.save(BeanUtil.copyProperties(reqVO, ChannelProviderConfigDO.class));
return success(res);
@@ -67,7 +65,6 @@ public class ChannelProviderConfigController {
@PostMapping("/update")
@Operation(summary = "更新渠道服务商")
@PreAuthorize("@ss.hasPermission('push:channel:update')")
@Parameter(name = "reqVO", description = "更新参数", required = true)
public CommonResult<Boolean> updateChannelProvider(@Validated @RequestBody ChannelProviderConfigReqVO reqVO) {
boolean res = channelProviderConfigService.updateById(BeanUtil.copyProperties(reqVO, ChannelProviderConfigDO.class));
return success(res);
@@ -77,7 +74,7 @@ public class ChannelProviderConfigController {
@Operation(summary = "删除渠道服务商")
@PreAuthorize("@ss.hasPermission('push:channel:delete')")
@Parameter(name = "ids", description = "id列表", required = true)
public CommonResult<Boolean> deleteChannelProvider(@RequestBody List<String> ids) {
public CommonResult<Boolean> deleteChannelProvider(@RequestParam("ids") List<String> ids) {
boolean res = channelProviderConfigService.removeBatchByIds(ids);
return success(res);
}

View File

@@ -15,10 +15,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@@ -36,7 +33,6 @@ public class MessageRecordController {
@PermitAll
@PostMapping("/send")
@Operation(summary = "消息推送")
@Parameter(name = "reqVOList", description = "消息列表")
public CommonResult<Boolean> send(@Valid @RequestBody List<MessageRecordReqVO> reqVOList) {
Boolean result = messageRecordService.send(reqVOList);
return CommonResult.success(result);
@@ -45,7 +41,6 @@ public class MessageRecordController {
@PostMapping("/page")
@Operation(summary = "分页查询渠道服务商列表")
@PreAuthorize("@ss.hasPermission('push:message:page')")
@Parameter(name = "reqVO", description = "分页查询参数", required = true)
public CommonResult<Page<MessageRecordDO>> pageChannelProviderConfig(@Validated @RequestBody MessageRecordReqVO reqVO) {
Page<MessageRecordDO> res = messageRecordService.getPage(reqVO);
return success(res);
@@ -54,7 +49,6 @@ public class MessageRecordController {
@PostMapping("/add")
@Operation(summary = "添加消息记录")
@PreAuthorize("@ss.hasPermission('push:message:add')")
@Parameter(name = "reqVO", description = "添加参数", required = true)
public CommonResult<Boolean> add(@Validated @RequestBody MessageRecordReqVO reqVO) {
return messageRecordService.add(reqVO) ? success(true) : success(false);
}
@@ -62,7 +56,6 @@ public class MessageRecordController {
@PostMapping("/update")
@Operation(summary = "更新消息记录")
@PreAuthorize("@ss.hasPermission('push:message:update')")
@Parameter(name = "reqVO", description = "更新参数", required = true)
public CommonResult<Boolean> update(@Validated @RequestBody MessageRecordReqVO reqVO) {
return messageRecordService.update(reqVO) ? success(true) : success(false);
}
@@ -71,7 +64,7 @@ public class MessageRecordController {
@Operation(summary = "删除消息记录")
@PreAuthorize("@ss.hasPermission('push:message:delete')")
@Parameter(name = "ids", description = "编号", required = true)
public CommonResult<Boolean> delete(@RequestBody List<String> ids) {
public CommonResult<Boolean> delete(@RequestParam("ids") List<String> ids) {
return messageRecordService.delete(ids) ? success(true) : success(false);
}
}

View File

@@ -3,8 +3,11 @@ package com.njcn.msgpush.module.push.controller.admin.retry;
import com.njcn.msgpush.framework.common.pojo.CommonResult;
import com.njcn.msgpush.framework.common.pojo.PageResult;
import com.njcn.msgpush.module.push.controller.admin.retry.vo.MessageRetryQueueReqVO;
import com.njcn.msgpush.module.push.controller.admin.retry.vo.RetryStrategyConfigVO;
import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO;
import com.njcn.msgpush.module.push.dal.dataobject.retry.RetryStrategyConfigDO;
import com.njcn.msgpush.module.push.service.retry.MessageRetryQueueService;
import com.njcn.msgpush.module.push.service.retry.RetryStrategyConfigService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
@@ -12,10 +15,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@@ -34,11 +34,12 @@ public class MessageRetryQueueController {
@Autowired
private MessageRetryQueueService messageRetryQueueService;
@Autowired
private RetryStrategyConfigService retryStrategyConfigService;
@PostMapping("/page")
@Operation(summary = "分页获得消息重试队列")
@PreAuthorize("@ss.hasPermission('push:retry:page')")
@Parameter(name = "reqVO", description = "分页查询参数", required = true)
public CommonResult<PageResult<MessageRetryQueueDO>> getRetryPage(@Validated @RequestBody MessageRetryQueueReqVO reqVO) {
PageResult<MessageRetryQueueDO> result = messageRetryQueueService.getPage(reqVO);
return success(result);
@@ -47,8 +48,31 @@ public class MessageRetryQueueController {
@PostMapping("/manual")
@Operation(summary = "批量手动重试消息")
@PreAuthorize("@ss.hasPermission('push:retry:manual')")
@Parameter(name = "messageIds", description = "消息ID列表", required = true)
public CommonResult<Void> manualRetry(@RequestBody List<String> messageIds) {
messageRetryQueueService.manualRetry(messageIds);
return success(null);
}
@PostMapping("/config/list")
@Operation(summary = "获得消息重试配置列表")
@PreAuthorize("@ss.hasPermission('push:retry:config:list')")
public CommonResult<List<RetryStrategyConfigDO>> listRetryConfig() {
return success(retryStrategyConfigService.listAll());
}
@PostMapping("/config/update")
@Operation(summary = "更新消息重试配置")
@PreAuthorize("@ss.hasPermission('push:retry:config:update')")
public CommonResult<Boolean> updateRetryConfig(@Validated @RequestBody RetryStrategyConfigVO retryStrategyConfigVO) {
return success(retryStrategyConfigService.updateStrategyConfig(retryStrategyConfigVO));
}
@PostMapping("/config/toggle")
@Operation(summary = "启用/禁用重试策略配置")
@PreAuthorize("@ss.hasPermission('push:retry:config:toggle')")
@Parameter(name = "id", description = "id", required = true)
public CommonResult<Boolean> toggleRetryConfig(@RequestParam("id") String id) {
return success(retryStrategyConfigService.toggleEnableField(id));
}
}

View File

@@ -3,6 +3,7 @@ package com.njcn.msgpush.module.push.controller.admin.retry.vo;
import com.njcn.msgpush.framework.common.pojo.PageParam;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotBlank;
import lombok.Data;
/**
@@ -16,9 +17,11 @@ public class MessageRetryQueueReqVO extends PageParam {
private String messageId;
@Schema(description = "渠道类型", example = "sms/email/app_push")
@NotBlank(message = "渠道类型不能为空")
private String channel;
@Schema(description = "接收者", example = "10086")
@NotBlank(message = "接收者不能为空")
private String receiver;
@Schema(description = "最小重试次数")

View File

@@ -0,0 +1,40 @@
package com.njcn.msgpush.module.push.controller.admin.retry.vo;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
/**
* @author caozehui
* @data 2026-03-05
*/
@Data
@Schema(description = "管理后台 - 重试策略配置 Request VO")
public class RetryStrategyConfigVO {
/**
* 主键 ID
*/
@Schema(description = "主键 ID")
private Long id;
/**
* 渠道类型sms/email/app_push
*/
@Schema(description = "渠道类型sms/email/app_push")
@NotBlank(message = "渠道类型不能为空")
private String channel;
/**
* 最大重试次数
*/
@Schema(description = "最大重试次数")
@NotNull(message = "最大重试次数不能为空")
private Integer maxRetryCount;
/**
* 重试间隔逗号分隔300,600,1800
*/
@Schema(description = "重试间隔逗号分隔300,600,1800")
@NotBlank(message = "重试间隔不能为空")
private String retryIntervals;
}

View File

@@ -0,0 +1,40 @@
package com.njcn.msgpush.module.push.dal.dataobject.retry;
import com.baomidou.mybatisplus.annotation.TableName;
import com.njcn.msgpush.framework.mybatis.core.dataobject.BaseDO;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* @author caozehui
* @data 2026-03-05
*/
@Data
@TableName("push_retry_strategy_config")
@EqualsAndHashCode(callSuper = true)
public class RetryStrategyConfigDO extends BaseDO {
/**
* 主键 ID
*/
private Long id;
/**
* 渠道类型sms/email/app_push
*/
private String channel;
/**
* 最大重试次数
*/
private Integer maxRetryCount;
/**
* 重试间隔逗号分隔300,600,1800
*/
private String retryIntervals;
/**
* 是否启用0-否 1-是
*/
private Integer enabled;
}

View File

@@ -0,0 +1,13 @@
package com.njcn.msgpush.module.push.dal.mysql.retry;
import com.njcn.msgpush.framework.mybatis.core.mapper.BaseMapperX;
import com.njcn.msgpush.module.push.dal.dataobject.retry.RetryStrategyConfigDO;
import org.apache.ibatis.annotations.Mapper;
/**
* @author caozehui
* @data 2026-03-05
*/
@Mapper
public interface RetryStrategyConfigMapper extends BaseMapperX<RetryStrategyConfigDO> {
}

View File

@@ -1,70 +0,0 @@
package com.njcn.msgpush.module.push.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.time.LocalDateTime;
/**
* @author caozehui
* @date 2026-02-27
* @description 消息重试策略枚举
*/
@Getter
@AllArgsConstructor
public enum RetryStrategyEnum {
/**
* 固定间隔重试
*/
FIXED_INTERVAL(1, "固定间隔重试") {
@Override
public LocalDateTime calculateNextRetryTime(int retryCount, int intervalSeconds) {
return LocalDateTime.now().plusSeconds(intervalSeconds);
}
},
/**
* 指数退避重试
*/
EXPONENTIAL_BACKOFF(2, "指数退避重试") {
@Override
public LocalDateTime calculateNextRetryTime(int retryCount, int baseIntervalSeconds) {
// 基础间隔 * 2^重试次数最大不超过1小时
long delay = Math.min(baseIntervalSeconds * (1L << retryCount), 3600);
return LocalDateTime.now().plusSeconds(delay);
}
};
/**
* 自定义时间重试
*/
// CUSTOM(3, "自定义时间重试") {
// @Override
// public LocalDateTime calculateNextRetryTime(int retryCount, int unused) {
//
// return LocalDateTime.now().plusMinutes(5 * retryCount);
// }
// };
private Integer code;
private String description;
/**
* 计算下次重试时间
*
* @param retryCount 当前重试次数
* @param param 参数(根据策略不同含义不同)
* @return 下次重试时间
*/
public abstract LocalDateTime calculateNextRetryTime(int retryCount, int param);
public static RetryStrategyEnum fromCode(Integer code) {
for (RetryStrategyEnum strategy : values()) {
if (strategy.getCode().equals(code)) {
return strategy;
}
}
return FIXED_INTERVAL; // 默认返回固定间隔
}
}

View File

@@ -4,7 +4,6 @@ import com.njcn.msgpush.framework.common.pojo.PageResult;
import com.njcn.msgpush.module.push.controller.admin.retry.vo.MessageRetryQueueReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO;
import com.njcn.msgpush.module.push.enums.RetryStrategyEnum;
import java.util.List;
@@ -12,10 +11,9 @@ public interface MessageRetryQueueService {
/**
* 添加消息到重试队列
*
* @param message 消息
* @param strategy 重试策略
* @param message 消息
*/
void addRetryMessage(MessageRecordDO message, RetryStrategyEnum strategy);
void addRetryMessage(MessageRecordDO message);
/**
* 批量处理重试消息

View File

@@ -2,15 +2,17 @@ package com.njcn.msgpush.module.push.service.retry;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.msgpush.framework.common.pojo.PageResult;
import com.njcn.msgpush.module.push.constant.MessageStatusConstant;
import com.njcn.msgpush.module.push.constant.MsgPushConstant;
import com.njcn.msgpush.module.push.controller.admin.retry.vo.MessageRetryQueueReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO;
import com.njcn.msgpush.module.push.dal.dataobject.retry.RetryStrategyConfigDO;
import com.njcn.msgpush.module.push.dal.mysql.retry.MessageRetryQueueMapper;
import com.njcn.msgpush.module.push.dal.redis.MessageRetryRedisDAO;
import com.njcn.msgpush.module.push.enums.RetryStrategyEnum;
import com.njcn.msgpush.module.push.service.channel.ChannelProviderConfigService;
import com.njcn.msgpush.module.push.service.message.MessageRecordService;
import lombok.extern.slf4j.Slf4j;
@@ -35,6 +37,8 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
private MessageRecordService messageRecordService;
@Autowired
public ChannelProviderConfigService channelProviderConfigService;
@Autowired
public RetryStrategyConfigService retryStrategyConfigService;
public final ThreadPoolExecutor MSG_RETRY_THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(
5,
@@ -68,7 +72,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
@Override
@Transactional(rollbackFor = Exception.class)
public void addRetryMessage(MessageRecordDO message, RetryStrategyEnum strategy) {
public void addRetryMessage(MessageRecordDO message) {
// 检查是否已存在重试记录
MessageRetryQueueDO existing = super.baseMapper.selectOne(
new com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper<MessageRetryQueueDO>()
@@ -88,7 +92,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
retryRecord.setLastErrorMsg(message.getErrorMsg());
// 计算下次重试时间
LocalDateTime nextRetryTime = strategy.calculateNextRetryTime(0, DEFAULT_RETRY_INTERVAL);
LocalDateTime nextRetryTime = this.calculateNextRetryTime(message.getChannel(), 1);
retryRecord.setNextRetryTime(nextRetryTime);
super.baseMapper.insert(retryRecord);
@@ -98,6 +102,66 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
}
}
/**
* 计算下次重试时间
*
* @param channel
* @param retryCount
* @return
*/
private LocalDateTime calculateNextRetryTime(String channel, int retryCount) {
RetryStrategyConfigDO strategyConfig = retryStrategyConfigService.getStrategyConfig(channel);
long plusSeconds = 0;
if (ObjectUtil.isNull(strategyConfig)) {
// 默认策略
switch (channel) {
case MsgPushConstant.CHANNEL_SMS: {
if (retryCount == 1) {
plusSeconds = 60 * 5;
} else if (retryCount == 2) {
plusSeconds = 60 * 10;
} else {
plusSeconds = 60 * 30;
}
}
break;
case MsgPushConstant.CHANNEL_EMAIL: {
if (retryCount == 1) {
plusSeconds = 60 * 10;
} else if (retryCount == 2) {
plusSeconds = 60 * 30;
} else if (retryCount == 3) {
plusSeconds = 60 * 60;
} else if (retryCount == 4) {
plusSeconds = 60 * 60 * 2;
} else {
plusSeconds = 60 * 60 * 4;
}
}
break;
case MsgPushConstant.CHANNEL_APP_PUSH: {
if (retryCount == 1) {
plusSeconds = 60 * 1;
} else {
plusSeconds = 60 * 60;
}
}
break;
default:
plusSeconds = 60 * 5;
}
} else {
String retryIntervals = strategyConfig.getRetryIntervals();
String[] split = retryIntervals.split(String.valueOf(StrUtil.C_COMMA));
if (retryCount >= split.length) {
plusSeconds = Long.parseLong(split[split.length - 1]);
} else {
plusSeconds = Long.parseLong(split[retryCount - 1]);
}
}
return LocalDateTime.now().plusSeconds(plusSeconds);
}
@Override
public void processRetryBatch(String channel) {
// 从数据库查询需要重试的消息
@@ -224,8 +288,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
messageRetryRedisDAO.removeFromRetryQueue(messageRecordDO.getChannel(), messageRecordDO.getMessageId());
} else {
// 还可以继续重试,更新重试信息
RetryStrategyEnum strategy = RetryStrategyEnum.EXPONENTIAL_BACKOFF;
LocalDateTime nextRetryTime = strategy.calculateNextRetryTime(newRetryCount, DEFAULT_RETRY_INTERVAL);
LocalDateTime nextRetryTime = this.calculateNextRetryTime(messageRecordDO.getChannel(), newRetryCount);
super.baseMapper.updateRetryInfo(
messageRecordDO.getMessageId(),

View File

@@ -0,0 +1,37 @@
package com.njcn.msgpush.module.push.service.retry;
import com.njcn.msgpush.module.push.controller.admin.retry.vo.RetryStrategyConfigVO;
import com.njcn.msgpush.module.push.dal.dataobject.retry.RetryStrategyConfigDO;
import java.util.List;
/**
* @author caozehui
* @data 2026-03-05
*/
public interface RetryStrategyConfigService {
/**
* 获得指定渠道的重试策略
*
* @param channel
* @return
*/
RetryStrategyConfigDO getStrategyConfig(String channel);
/**
* 获得所有重试策略
* @return
*/
List<RetryStrategyConfigDO> listAll();
/**
* 修改重试策略配置
*
* @param strategyConfigVO
* @return
*/
boolean updateStrategyConfig(RetryStrategyConfigVO strategyConfigVO);
boolean toggleEnableField(String id);
}

View File

@@ -0,0 +1,46 @@
package com.njcn.msgpush.module.push.service.retry;
import cn.hutool.core.bean.BeanUtil;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.msgpush.module.push.controller.admin.retry.vo.RetryStrategyConfigVO;
import com.njcn.msgpush.module.push.dal.dataobject.retry.RetryStrategyConfigDO;
import com.njcn.msgpush.module.push.dal.mysql.retry.RetryStrategyConfigMapper;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* @author caozehui
* @data 2026-03-05
*/
@Service
public class RetryStrategyConfigServiceImpl extends ServiceImpl<RetryStrategyConfigMapper, RetryStrategyConfigDO> implements RetryStrategyConfigService {
@Override
public RetryStrategyConfigDO getStrategyConfig(String channel) {
return this.lambdaQuery().eq(RetryStrategyConfigDO::getChannel, channel)
.eq(RetryStrategyConfigDO::getDeleted, false)
.eq(RetryStrategyConfigDO::getEnabled, true)
.one();
}
@Override
public List<RetryStrategyConfigDO> listAll() {
return this.lambdaQuery()
.eq(RetryStrategyConfigDO::getDeleted, false)
.eq(RetryStrategyConfigDO::getEnabled, true)
.list();
}
@Override
public boolean updateStrategyConfig(RetryStrategyConfigVO strategyConfigVO) {
RetryStrategyConfigDO retryStrategyConfigDO = BeanUtil.copyProperties(strategyConfigVO, RetryStrategyConfigDO.class);
return this.updateById(retryStrategyConfigDO);
}
@Override
public boolean toggleEnableField(String id) {
RetryStrategyConfigDO retryStrategyConfigDO = this.getById(id);
retryStrategyConfigDO.setEnabled(retryStrategyConfigDO.getEnabled() ^ 0X0001);
return this.updateById(retryStrategyConfigDO);
}
}