From 87757b352c75c1758ebeec5dd123adc43fed7e40 Mon Sep 17 00:00:00 2001 From: caozehui <2427765068@qq.com> Date: Mon, 30 Mar 2026 13:58:03 +0800 Subject: [PATCH] =?UTF-8?q?=E9=85=8D=E9=A2=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../push/checker/MsgPushGuardChain.java | 1 + .../push/checker/impl/RateLimitChecker.java | 8 ++- .../sender/impl/UniPushAppPushSender.java | 4 +- .../message/MessageRecordController.java | 2 +- .../push/dal/redis/MessageRetryRedisDAO.java | 2 +- .../push/dal/redis/RateLimitRedisDAO.java | 49 +++++++++++++++++++ .../push/dal/redis/SystemQuotaRedisDAO.java | 34 ++++++------- .../module/push/job/MessageRetryJob.java | 12 ++--- .../message/MessageRecordServiceImpl.java | 8 ++- .../quota/SystemQuotaConfigServiceImpl.java | 10 +--- .../ratelimit/RateLimitConfigService.java | 5 ++ .../ratelimit/RateLimitConfigServiceImpl.java | 34 +++++++++++++ 12 files changed, 132 insertions(+), 37 deletions(-) create mode 100644 msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/redis/RateLimitRedisDAO.java diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/MsgPushGuardChain.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/MsgPushGuardChain.java index 3c452bd..7a75957 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/MsgPushGuardChain.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/MsgPushGuardChain.java @@ -27,5 +27,6 @@ public class MsgPushGuardChain { public void checkAll(List messageRecordList) { blacklistChecker.check(messageRecordList); quotaChecker.check(messageRecordList); + rateLimitChecker.check(messageRecordList); } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/impl/RateLimitChecker.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/impl/RateLimitChecker.java index 915f726..ef8f241 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/impl/RateLimitChecker.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/impl/RateLimitChecker.java @@ -4,6 +4,8 @@ package com.njcn.msgpush.module.push.checker.impl; import com.njcn.msgpush.module.push.checker.IChecker; import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO; import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; +import com.njcn.msgpush.module.push.service.ratelimit.RateLimitConfigService; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.List; @@ -15,8 +17,12 @@ import java.util.List; */ @Component public class RateLimitChecker implements IChecker { + + @Autowired + private RateLimitConfigService rateLimitConfigService; + @Override public void check(List messageRecordList) { - + rateLimitConfigService.check(messageRecordList); } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/UniPushAppPushSender.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/UniPushAppPushSender.java index 0f846af..c395aee 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/UniPushAppPushSender.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/UniPushAppPushSender.java @@ -25,6 +25,7 @@ public class UniPushAppPushSender implements AppPushSender { private Sender sender; private ApiHelper apiHelper; + private PushApi pushApi; public UniPushAppPushSender(UniPushAppPushSetting uniPushAppPushSetting, Sender sender) { // this.uniPushAppPushSetting = uniPushAppPushSetting; @@ -36,13 +37,14 @@ public class UniPushAppPushSender implements AppPushSender { gtApiConfiguration.setMasterSecret(uniPushAppPushSetting.getMasterSecret()); gtApiConfiguration.setDomain("https://restapi.getui.com/v2/"); this.apiHelper = ApiHelper.build(gtApiConfiguration); + this.pushApi = apiHelper.creatApi(PushApi.class); } @Override public boolean appPush(MessageRecordDO message) { PushDTO pushDTO = this.buildPushDTO(message.getTitle(), message.getContent()); // 进行cid单推 - PushApi pushApi = apiHelper.creatApi(PushApi.class); + // 设置接收人信息 Audience audience = new Audience(); audience.addCid(message.getReceiver()); diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/MessageRecordController.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/MessageRecordController.java index f61c820..5e595b1 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/MessageRecordController.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/MessageRecordController.java @@ -33,7 +33,7 @@ public class MessageRecordController { @PermitAll @PostMapping("/send") @Operation(summary = "消息推送") - @Idempotent(timeout = 60) + @Idempotent(timeout = 2) public CommonResult> send(@Valid @RequestBody List reqVOList) { return success(messageRecordService.send(reqVOList)); } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/redis/MessageRetryRedisDAO.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/redis/MessageRetryRedisDAO.java index 693163a..ea0be3a 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/redis/MessageRetryRedisDAO.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/redis/MessageRetryRedisDAO.java @@ -24,7 +24,7 @@ public class MessageRetryRedisDAO { /** * Redis中消息重试队列的Key前缀 */ - private static final String RETRY_QUEUE_KEY_PREFIX = "msgpush:retry_queue:"; + private static final String RETRY_QUEUE_KEY_PREFIX = "msPpush:retry_queue:"; /** * 获取指定渠道的重试队列Key diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/redis/RateLimitRedisDAO.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/redis/RateLimitRedisDAO.java new file mode 100644 index 0000000..86aab22 --- /dev/null +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/redis/RateLimitRedisDAO.java @@ -0,0 +1,49 @@ +package com.njcn.msgpush.module.push.dal.redis; + +import cn.hutool.core.util.ObjectUtil; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +import java.time.LocalDate; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.concurrent.TimeUnit; + +/** + * @author caozehui + * @data 2026-03-25 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class RateLimitRedisDAO { + private final RedisTemplate redisTemplate; + + private final String RATE_LIMIT_KEY_PREFIX = "msgPush:rate_limit"; + + private String buildKey(String channel, String appName, String receiver) { + DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyyMMdd"); + return RATE_LIMIT_KEY_PREFIX + channel + ":" + appName + ":" + receiver + ":" + LocalDate.now().format(dtf); + } + + public void set(String channel, String appName, String receiver) { + String key = buildKey(channel, appName, receiver); + + long now = System.currentTimeMillis(); + long tomorrowZero = LocalDate.now().plusDays(1).atStartOfDay().toInstant(ZoneOffset.of("+8")).toEpochMilli(); + long ttl = tomorrowZero - now; + Boolean absent = redisTemplate.opsForValue().setIfAbsent(key, "1", ttl, TimeUnit.MILLISECONDS); + if (!absent) { + Integer oldCount = this.get(channel, appName, receiver); + redisTemplate.opsForValue().set(key, (oldCount + 1) + "", ttl, TimeUnit.MILLISECONDS); + } + } + + public Integer get(String channel, String appName, String receiver) { + String key = buildKey(channel, appName, receiver); + String countStr = redisTemplate.opsForValue().get(key); + return ObjectUtil.isNull(countStr) ? 0 : Integer.parseInt(countStr); + } +} diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/redis/SystemQuotaRedisDAO.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/redis/SystemQuotaRedisDAO.java index aa0a91c..f2df2cd 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/redis/SystemQuotaRedisDAO.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/redis/SystemQuotaRedisDAO.java @@ -6,29 +6,32 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; +import java.time.LocalDate; +import java.time.ZoneOffset; +import java.util.concurrent.TimeUnit; + @Slf4j @Component @RequiredArgsConstructor public class SystemQuotaRedisDAO { private final RedisTemplate redisTemplate; - private static final String QUOTA_KEY_PREFIX = "msgpush:quota:"; + private static final String QUOTA_KEY_PREFIX = "msgPush:quota:"; - public void set(String channel, String appName, boolean isSchedule) { + private String buildKey(String channel, String appName) { + return QUOTA_KEY_PREFIX + channel + ":" + appName; + } + + public void set(String channel, String appName) { String key = buildKey(channel, appName); - if (isSchedule) { - redisTemplate.opsForValue().set(key, "0"); - } else { - String countStr = redisTemplate.opsForValue().get(key); - Integer count = 0; - if (ObjectUtil.isNull(countStr)) { - count = 1; - } else { - count = Integer.parseInt(countStr) + 1; - } - - redisTemplate.opsForValue().set(key, String.valueOf(count)); + long now = System.currentTimeMillis(); + long tomorrowZero = LocalDate.now().plusDays(1).atStartOfDay().toInstant(ZoneOffset.of("+8")).toEpochMilli(); + long ttl = tomorrowZero - now; + Boolean absent = redisTemplate.opsForValue().setIfAbsent(key, "1", ttl, TimeUnit.MILLISECONDS); + if (!absent) { + Integer oldCount = this.get(channel, appName); + redisTemplate.opsForValue().set(key, (oldCount + 1) + "", ttl, TimeUnit.MILLISECONDS); } } @@ -38,7 +41,4 @@ public class SystemQuotaRedisDAO { return ObjectUtil.isNull(countStr) ? 0 : Integer.parseInt(countStr); } - private String buildKey(String channel, String appName) { - return QUOTA_KEY_PREFIX + channel + ":" + appName; - } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/job/MessageRetryJob.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/job/MessageRetryJob.java index 955886c..2790332 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/job/MessageRetryJob.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/job/MessageRetryJob.java @@ -21,26 +21,26 @@ public class MessageRetryJob { private final MessageRetryQueueService messageRetryQueueService; /** - * 定时处理短信重试队列(每3秒执行一次) + * 定时处理短信重试队列(每10秒执行一次) */ - @Scheduled(fixedRate = 3000) + @Scheduled(fixedRate = 10000) public void processSmsRetryQueue() { log.info("开始处理短信重试队列:{}", LocalDateTime.now()); messageRetryQueueService.processRetryBatch("sms"); } /** - * 定时处理邮件重试队列(每3秒执行一次) + * 定时处理邮件重试队列(每10秒执行一次) */ - @Scheduled(fixedRate = 3000) + @Scheduled(fixedRate = 10000) public void processEmailRetryQueue() { messageRetryQueueService.processRetryBatch("email"); } /** - * 定时处理APP推送重试队列(每3秒执行一次) + * 定时处理APP推送重试队列(每10秒执行一次) */ - @Scheduled(fixedRate = 3000) + @Scheduled(fixedRate = 10000) public void processAppPushRetryQueue() { messageRetryQueueService.processRetryBatch("app_push"); } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordServiceImpl.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordServiceImpl.java index 06fb2b6..c5e38a7 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordServiceImpl.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordServiceImpl.java @@ -18,6 +18,7 @@ import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfig import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryHistoryDO; import com.njcn.msgpush.module.push.dal.mysql.message.MessageRecordMapper; +import com.njcn.msgpush.module.push.dal.redis.RateLimitRedisDAO; import com.njcn.msgpush.module.push.dal.redis.SystemQuotaRedisDAO; import com.njcn.msgpush.module.push.enums.ChannelTypeEnum; import com.njcn.msgpush.module.push.service.channel.ChannelProviderConfigService; @@ -52,6 +53,9 @@ public class MessageRecordServiceImpl extends ServiceImpl send(List reqVOList) { @@ -81,6 +85,7 @@ public class MessageRecordServiceImpl extends ServiceImpl list = this.lambdaQuery().eq(SystemQuotaConfigDO::getEnabled, true).eq(SystemQuotaConfigDO::getDeleted, false).list(); - for (SystemQuotaConfigDO systemQuotaConfigDO : list) { - systemQuotaRedisDAO.set(systemQuotaConfigDO.getChannel(), systemQuotaConfigDO.getAppName(), true); - } - } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/ratelimit/RateLimitConfigService.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/ratelimit/RateLimitConfigService.java index 1fd6bc8..c7cdd03 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/ratelimit/RateLimitConfigService.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/ratelimit/RateLimitConfigService.java @@ -15,5 +15,10 @@ import java.util.List; public interface RateLimitConfigService extends IService { Page getPage(RateLimitConfigReqVO reqVO); + RateLimitConfigDO getByChannelAndAppName(String channel, String appName); + boolean delete(List ids); + + void check(List messageRecordList); + } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/ratelimit/RateLimitConfigServiceImpl.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/ratelimit/RateLimitConfigServiceImpl.java index 7bf0c18..30dfdd0 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/ratelimit/RateLimitConfigServiceImpl.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/ratelimit/RateLimitConfigServiceImpl.java @@ -1,13 +1,18 @@ package com.njcn.msgpush.module.push.service.ratelimit; +import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.njcn.msgpush.framework.common.util.object.PageUtils; +import com.njcn.msgpush.module.push.constant.MsgStatusConstant; import com.njcn.msgpush.module.push.controller.admin.ratelimit.VO.RateLimitConfigReqVO; +import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import com.njcn.msgpush.module.push.dal.dataobject.ratelimit.RateLimitConfigDO; import com.njcn.msgpush.module.push.dal.mysql.ratelimit.RateLimitConfigMapper; +import com.njcn.msgpush.module.push.dal.redis.RateLimitRedisDAO; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; @@ -18,6 +23,10 @@ import java.util.List; */ @Service public class RateLimitConfigServiceImpl extends ServiceImpl implements RateLimitConfigService { + + @Autowired + private RateLimitRedisDAO rateLimitRedisDAO; + @Override public Page getPage(RateLimitConfigReqVO reqVO) { QueryWrapper wrapper = new QueryWrapper<>(); @@ -25,6 +34,15 @@ public class RateLimitConfigServiceImpl extends ServiceImpl(PageUtils.getPageNum(reqVO), PageUtils.getPageSize(reqVO)), wrapper); } + + @Override + public RateLimitConfigDO getByChannelAndAppName(String channel, String appName) { + return this.lambdaQuery().eq(RateLimitConfigDO::getChannel, channel) + .eq(RateLimitConfigDO::getAppName, appName) + .eq(RateLimitConfigDO::getEnabled, true) + .eq(RateLimitConfigDO::getDeleted, false).one(); + } + @Override public boolean delete(List ids) { return this.lambdaUpdate() @@ -32,4 +50,20 @@ public class RateLimitConfigServiceImpl extends ServiceImpl messageRecordList) { + for (int i = 0; i < messageRecordList.size(); i++) { + MessageRecordDO messageRecordDO = messageRecordList.get(i); + RateLimitConfigDO rateLimitConfigDO = this.getByChannelAndAppName(messageRecordDO.getChannel(), messageRecordDO.getAppName()); + + if (ObjectUtil.isNotNull(rateLimitConfigDO)) { + Integer dailyQuota = rateLimitConfigDO.getDailyLimit(); + Integer count = rateLimitRedisDAO.get(messageRecordDO.getChannel(), messageRecordDO.getAppName(), messageRecordDO.getReceiver()); + if (count >= dailyQuota) { + messageRecordDO.setStatus(MsgStatusConstant.QUOTAEXCEEDED); + } + } + } + } }