docs(detection): 删除CN_Gather Detection模块Netty通信架构分析文档
- 移除了详细的Netty客户端和服务端组件说明文档 - 删除了WebSocket通信组件的技术细节描述 - 移除了Socket响应处理器和管理工具类的详细分析 - 清理了通信数据对象和流程分析相关内容 - 移除了智能Socket通信机制的技术文档 - 删除了配置管理组件和Spring集成的相关说明
This commit is contained in:
File diff suppressed because it is too large
Load Diff
@@ -1,17 +1,27 @@
|
||||
package com.njcn.gather.detection.controller;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.njcn.common.pojo.annotation.OperateInfo;
|
||||
import com.njcn.common.pojo.constant.OperateType;
|
||||
import com.njcn.common.pojo.enums.common.LogEnum;
|
||||
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
|
||||
import com.njcn.common.pojo.response.HttpResult;
|
||||
import com.njcn.common.utils.LogUtil;
|
||||
import com.njcn.gather.detection.lock.DetectionLock;
|
||||
import com.njcn.gather.detection.lock.DetectionLockManager;
|
||||
import com.njcn.gather.detection.lock.DetectionLockManager.AcquireResult;
|
||||
import com.njcn.gather.detection.pojo.enums.DetectionResponseEnum;
|
||||
import com.njcn.gather.detection.pojo.param.ContrastDetectionParam;
|
||||
import com.njcn.gather.detection.pojo.param.PreDetectionParam;
|
||||
import com.njcn.gather.detection.pojo.param.SimulateDetectionParam;
|
||||
import com.njcn.gather.detection.pojo.vo.DetectionLockHolderVO;
|
||||
import com.njcn.gather.detection.service.PreDetectionService;
|
||||
import com.njcn.gather.detection.util.socket.FormalTestManager;
|
||||
import com.njcn.gather.user.user.pojo.po.SysUser;
|
||||
import com.njcn.gather.user.user.service.ISysUserService;
|
||||
import com.njcn.web.controller.BaseController;
|
||||
import com.njcn.web.utils.HttpResultUtil;
|
||||
import com.njcn.web.utils.RequestUtil;
|
||||
import io.swagger.annotations.Api;
|
||||
import io.swagger.annotations.ApiImplicitParam;
|
||||
import io.swagger.annotations.ApiOperation;
|
||||
@@ -32,6 +42,7 @@ import org.springframework.web.bind.annotation.*;
|
||||
public class PreDetectionController extends BaseController {
|
||||
|
||||
private final PreDetectionService preDetectionService;
|
||||
private final ISysUserService sysUserService;
|
||||
|
||||
/**
|
||||
* 开始检测通用入口
|
||||
@@ -42,10 +53,27 @@ public class PreDetectionController extends BaseController {
|
||||
@OperateInfo
|
||||
@ApiOperation("开始检测")
|
||||
@ApiImplicitParam(name = "param", value = "查询参数", required = true)
|
||||
public HttpResult<String> startPreTest(@RequestBody @Validated PreDetectionParam param) {
|
||||
public HttpResult<?> startPreTest(@RequestBody @Validated PreDetectionParam param) {
|
||||
String methodDescribe = getMethodDescribe("startPreTest");
|
||||
preDetectionService.sourceCommunicationCheck(param);
|
||||
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
|
||||
HttpResult<DetectionLockHolderVO> busy = tryAcquireLock(param.getUserPageId());
|
||||
if (busy != null) {
|
||||
return busy;
|
||||
}
|
||||
// 同步阶段抛异常时回滚锁(PLAN_AND_SOURCE_NOT / SOURCE_INFO_NOT 等业务异常会被全局处理器吞掉,
|
||||
// 锁会卡在用户手上直到 4 小时超时,故需 finally 兜底)
|
||||
boolean keepLock = false;
|
||||
try {
|
||||
// 重置 FormalTestManager 暂停计数残留,避免上次暂停残留计数误触发 R4
|
||||
FormalTestManager.stopTime = 0;
|
||||
FormalTestManager.hasStopFlag = false;
|
||||
preDetectionService.sourceCommunicationCheck(param);
|
||||
keepLock = true;
|
||||
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
|
||||
} finally {
|
||||
if (!keepLock) {
|
||||
releaseLockSelf("START_PRE_SYNC_FAILED");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -59,8 +87,12 @@ public class PreDetectionController extends BaseController {
|
||||
@OperateInfo
|
||||
@ApiOperation("源通讯校验")
|
||||
@ApiImplicitParam(name = "param", value = "查询参数", required = true)
|
||||
public HttpResult<String> ytxCheckSimulate(@RequestBody @Validated SimulateDetectionParam param) {
|
||||
public HttpResult<?> ytxCheckSimulate(@RequestBody @Validated SimulateDetectionParam param) {
|
||||
String methodDescribe = getMethodDescribe("ytxCheckSimulate");
|
||||
HttpResult<DetectionLockHolderVO> busy = requireFreeOrSelf();
|
||||
if (busy != null) {
|
||||
return busy;
|
||||
}
|
||||
preDetectionService.ytxCheckSimulate(param);
|
||||
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
|
||||
}
|
||||
@@ -72,8 +104,12 @@ public class PreDetectionController extends BaseController {
|
||||
@OperateInfo
|
||||
@ApiOperation("启动")
|
||||
@ApiImplicitParam(name = "param", value = "查询参数", required = true)
|
||||
public HttpResult<String> startTestSimulate(@RequestBody @Validated SimulateDetectionParam param) {
|
||||
public HttpResult<?> startTestSimulate(@RequestBody @Validated SimulateDetectionParam param) {
|
||||
String methodDescribe = getMethodDescribe("startTestSimulate");
|
||||
HttpResult<DetectionLockHolderVO> busy = requireHolderSelf();
|
||||
if (busy != null) {
|
||||
return busy;
|
||||
}
|
||||
preDetectionService.sendScript(param);
|
||||
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
|
||||
}
|
||||
@@ -85,9 +121,18 @@ public class PreDetectionController extends BaseController {
|
||||
@OperateInfo
|
||||
@ApiOperation("停止")
|
||||
@ApiImplicitParam(name = "param", value = "查询参数", required = true)
|
||||
public HttpResult<String> closeSimulateTest(@RequestBody @Validated SimulateDetectionParam param) {
|
||||
public HttpResult<?> closeSimulateTest(@RequestBody @Validated SimulateDetectionParam param) {
|
||||
String methodDescribe = getMethodDescribe("closeSimulateTest");
|
||||
preDetectionService.closeTestSimulate(param);
|
||||
HttpResult<DetectionLockHolderVO> busy = requireHolderSelf();
|
||||
if (busy != null) {
|
||||
return busy;
|
||||
}
|
||||
try {
|
||||
preDetectionService.closeTestSimulate(param);
|
||||
} finally {
|
||||
// 即使业务异常也要释放锁,避免锁残留导致他人无法接手
|
||||
releaseLockSelf("USER_STOP");
|
||||
}
|
||||
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
|
||||
}
|
||||
|
||||
@@ -99,8 +144,12 @@ public class PreDetectionController extends BaseController {
|
||||
@OperateInfo
|
||||
@ApiOperation("系数校验")
|
||||
@ApiImplicitParam(name = "param", value = "查询参数", required = true)
|
||||
public HttpResult<String> coefficientCheck(@RequestBody PreDetectionParam param) {
|
||||
public HttpResult<?> coefficientCheck(@RequestBody PreDetectionParam param) {
|
||||
String methodDescribe = getMethodDescribe("coefficientCheck");
|
||||
HttpResult<DetectionLockHolderVO> busy = requireHolderSelf();
|
||||
if (busy != null) {
|
||||
return busy;
|
||||
}
|
||||
preDetectionService.coefficientCheck(param);
|
||||
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
|
||||
}
|
||||
@@ -113,8 +162,13 @@ public class PreDetectionController extends BaseController {
|
||||
@OperateInfo
|
||||
@ApiOperation("暂停检测")
|
||||
@ApiImplicitParam(name = "param", value = "参数", required = true)
|
||||
public HttpResult<String> temStopTest() {
|
||||
public HttpResult<?> temStopTest() {
|
||||
String methodDescribe = getMethodDescribe("temStopTest");
|
||||
HttpResult<DetectionLockHolderVO> busy = requireHolderSelf();
|
||||
if (busy != null) {
|
||||
return busy;
|
||||
}
|
||||
// 暂停保持锁(spec §2.3),不释放
|
||||
preDetectionService.temStopTest();
|
||||
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
|
||||
}
|
||||
@@ -126,8 +180,12 @@ public class PreDetectionController extends BaseController {
|
||||
@OperateInfo
|
||||
@ApiOperation("重新开始检测")
|
||||
@ApiImplicitParam(name = "param", value = "参数", required = true)
|
||||
public HttpResult<String> restartTemTest(@RequestBody PreDetectionParam param) {
|
||||
public HttpResult<?> restartTemTest(@RequestBody PreDetectionParam param) {
|
||||
String methodDescribe = getMethodDescribe("restartTemTest");
|
||||
HttpResult<DetectionLockHolderVO> busy = requireHolderSelf();
|
||||
if (busy != null) {
|
||||
return busy;
|
||||
}
|
||||
preDetectionService.restartTemTest(param);
|
||||
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
|
||||
}
|
||||
@@ -140,10 +198,26 @@ public class PreDetectionController extends BaseController {
|
||||
@OperateInfo
|
||||
@ApiOperation("开始比对检测")
|
||||
@ApiImplicitParam(name = "param", value = "查询参数", required = true)
|
||||
public HttpResult<String> startContrastTest(@RequestBody @Validated ContrastDetectionParam param) {
|
||||
public HttpResult<?> startContrastTest(@RequestBody @Validated ContrastDetectionParam param) {
|
||||
String methodDescribe = getMethodDescribe("startContrastTest");
|
||||
preDetectionService.startContrastTest(param);
|
||||
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
|
||||
// ContrastDetectionParam 无 userPageId 字段,用 loginName 作为会话标识(与 WS 会话 key 一致)
|
||||
HttpResult<DetectionLockHolderVO> busy = tryAcquireLock(param.getLoginName());
|
||||
if (busy != null) {
|
||||
return busy;
|
||||
}
|
||||
// 同步阶段抛异常时回滚锁,理由同 startPreTest
|
||||
boolean keepLock = false;
|
||||
try {
|
||||
FormalTestManager.stopTime = 0;
|
||||
FormalTestManager.hasStopFlag = false;
|
||||
preDetectionService.startContrastTest(param);
|
||||
keepLock = true;
|
||||
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
|
||||
} finally {
|
||||
if (!keepLock) {
|
||||
releaseLockSelf("START_CONTRAST_SYNC_FAILED");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -169,11 +243,91 @@ public class PreDetectionController extends BaseController {
|
||||
@OperateInfo(info = LogEnum.SYSTEM_COMMON)
|
||||
@GetMapping("/startCoefficient")
|
||||
@ApiOperation("比对模式开启系数校验")
|
||||
public HttpResult<String> startCoefficient() {
|
||||
public HttpResult<?> startCoefficient() {
|
||||
String methodDescribe = getMethodDescribe("startCoefficient");
|
||||
LogUtil.njcnDebug(log, "{}", methodDescribe);
|
||||
|
||||
HttpResult<DetectionLockHolderVO> busy = requireHolderSelf();
|
||||
if (busy != null) {
|
||||
return busy;
|
||||
}
|
||||
preDetectionService.startCoefficient();
|
||||
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
|
||||
}
|
||||
|
||||
// ============ 检测互斥锁辅助方法 ============
|
||||
|
||||
/** 抢锁入口(startPreTest / startContrastTest 用)。
|
||||
* 抢到→null;被他人持有或竞态失败→返回 busy 响应;
|
||||
* 使用方:拿到非 null 返回值直接 return 给上层。 */
|
||||
private HttpResult<DetectionLockHolderVO> tryAcquireLock(String userPageId) {
|
||||
String userId = RequestUtil.getUserId();
|
||||
AcquireResult r = DetectionLockManager.getInstance()
|
||||
.tryAcquire(userId, resolveDisplayName(userId), userPageId);
|
||||
if (r.isOk()) {
|
||||
return null;
|
||||
}
|
||||
return HttpResultUtil.assembleResult(
|
||||
DetectionResponseEnum.DETECTION_BUSY.getCode(),
|
||||
r.getHolder(),
|
||||
DetectionResponseEnum.DETECTION_BUSY.getMessage());
|
||||
}
|
||||
|
||||
/** 中间接口校验:要求当前 holder == 自己。
|
||||
* 空闲 → 返回 busy data=null("请先开始检测"语义);
|
||||
* 他人持有 → 返回 busy + holder;
|
||||
* 自己持有 → 返回 null(放行)。 */
|
||||
private HttpResult<DetectionLockHolderVO> requireHolderSelf() {
|
||||
DetectionLock cur = DetectionLockManager.getInstance().getCurrent();
|
||||
String me = RequestUtil.getUserId();
|
||||
if (cur != null && me.equals(cur.getUserId())) {
|
||||
return null;
|
||||
}
|
||||
DetectionLockHolderVO holder = cur == null ? null : DetectionLockManager.toHolderVO(cur);
|
||||
return HttpResultUtil.assembleResult(
|
||||
DetectionResponseEnum.DETECTION_BUSY.getCode(),
|
||||
holder,
|
||||
DetectionResponseEnum.DETECTION_BUSY.getMessage());
|
||||
}
|
||||
|
||||
/** 辅助接口规则(ytxCheckSimulate):锁空闲 → 放行;他人持有 → busy;自己持有 → 放行。 */
|
||||
private HttpResult<DetectionLockHolderVO> requireFreeOrSelf() {
|
||||
DetectionLock cur = DetectionLockManager.getInstance().getCurrent();
|
||||
if (cur == null) {
|
||||
return null;
|
||||
}
|
||||
String me = RequestUtil.getUserId();
|
||||
if (me.equals(cur.getUserId())) {
|
||||
return null;
|
||||
}
|
||||
return HttpResultUtil.assembleResult(
|
||||
DetectionResponseEnum.DETECTION_BUSY.getCode(),
|
||||
DetectionLockManager.toHolderVO(cur),
|
||||
DetectionResponseEnum.DETECTION_BUSY.getMessage());
|
||||
}
|
||||
|
||||
/** 释放锁(用户主动终止)。 */
|
||||
private void releaseLockSelf(String reason) {
|
||||
DetectionLockManager.getInstance().releaseIfHeldBy(RequestUtil.getUserId(), reason);
|
||||
}
|
||||
|
||||
/** 解析展示给前端的用户名(昵称优先,loginName 兜底,避免 BUSY 弹窗显示 "unknown user")。 */
|
||||
private String resolveDisplayName(String userId) {
|
||||
if (StrUtil.isBlank(userId)) {
|
||||
return "";
|
||||
}
|
||||
try {
|
||||
SysUser user = sysUserService.getById(userId);
|
||||
if (user != null && StrUtil.isNotBlank(user.getName())) {
|
||||
return user.getName();
|
||||
}
|
||||
if (user != null && StrUtil.isNotBlank(user.getLoginName())) {
|
||||
return user.getLoginName();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("解析检测锁持有者昵称失败,userId={}", userId, e);
|
||||
}
|
||||
// 最终兜底:用 token 里的 loginName,不要返回 "unknown user"
|
||||
String loginName = RequestUtil.getLoginNameByToken();
|
||||
return StrUtil.isNotBlank(loginName) ? loginName : userId;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
||||
import com.njcn.gather.detection.lock.DetectionLockManager;
|
||||
import com.njcn.gather.detection.pojo.dto.DevXiNumData;
|
||||
import com.njcn.gather.detection.pojo.enums.DetectionCodeEnum;
|
||||
import com.njcn.gather.detection.pojo.enums.SourceOperateCodeEnum;
|
||||
@@ -1379,6 +1380,9 @@ public class SocketDevResponseService {
|
||||
|
||||
iPqDevService.updateResult(param.getDevIds(), valueType, param.getCode(), param.getUserId(), param.getTemperature(), param.getHumidity(), true);
|
||||
CnSocketUtil.quitSend(param);
|
||||
// 数模式检测全部小项完成 → 释放锁,避免用户必须点"停止"才能让出
|
||||
DetectionLockManager.getInstance()
|
||||
.releaseIfMatchPage(param.getUserPageId(), "DEV_TEST_FINISHED");
|
||||
}
|
||||
successComm.clear();
|
||||
FormalTestManager.realDataXiList.clear();
|
||||
|
||||
@@ -0,0 +1,29 @@
|
||||
package com.njcn.gather.detection.lock;
|
||||
|
||||
/**
|
||||
* 检测互斥锁对象(不可变)。
|
||||
* 字段含义见 docs/superpowers/specs/2026-05-28-单用户检测互斥-design.md §2.1
|
||||
*/
|
||||
public final class DetectionLock {
|
||||
|
||||
private final String userId;
|
||||
private final String userName;
|
||||
private final String userPageId;
|
||||
private final long acquireTime;
|
||||
private final long expireAt;
|
||||
|
||||
public DetectionLock(String userId, String userName, String userPageId,
|
||||
long acquireTime, long expireAt) {
|
||||
this.userId = userId;
|
||||
this.userName = userName;
|
||||
this.userPageId = userPageId;
|
||||
this.acquireTime = acquireTime;
|
||||
this.expireAt = expireAt;
|
||||
}
|
||||
|
||||
public String getUserId() { return userId; }
|
||||
public String getUserName() { return userName; }
|
||||
public String getUserPageId() { return userPageId; }
|
||||
public long getAcquireTime() { return acquireTime; }
|
||||
public long getExpireAt() { return expireAt; }
|
||||
}
|
||||
@@ -0,0 +1,134 @@
|
||||
package com.njcn.gather.detection.lock;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import com.njcn.gather.detection.pojo.vo.DetectionLockHolderVO;
|
||||
|
||||
/**
|
||||
* 检测互斥锁管理器(进程内单例)。
|
||||
* 详细设计:docs/superpowers/specs/2026-05-28-单用户检测互斥-design.md
|
||||
*/
|
||||
@Slf4j
|
||||
public final class DetectionLockManager {
|
||||
|
||||
private static final long LOCK_MAX_HOLD_MS = TimeUnit.HOURS.toMillis(4);
|
||||
|
||||
private static final DetectionLockManager INSTANCE = new DetectionLockManager();
|
||||
|
||||
public static DetectionLockManager getInstance() {
|
||||
return INSTANCE;
|
||||
}
|
||||
|
||||
private final AtomicReference<DetectionLock> current = new AtomicReference<>(null);
|
||||
|
||||
private DetectionLockManager() {}
|
||||
|
||||
/** 抢锁。同账号视为重入(刷新 page/expireAt)。 */
|
||||
public AcquireResult tryAcquire(String userId, String userName, String userPageId) {
|
||||
for (int attempt = 0; attempt < 2; attempt++) {
|
||||
DetectionLock cur = current.get();
|
||||
long now = System.currentTimeMillis();
|
||||
// 空闲 或 绝对超时已过 → 直接抢
|
||||
if (cur == null || now > cur.getExpireAt()) {
|
||||
DetectionLock fresh = new DetectionLock(userId, userName, userPageId, now, now + LOCK_MAX_HOLD_MS);
|
||||
if (current.compareAndSet(cur, fresh)) {
|
||||
log.info("DetectionLock acquired by userId={}, userName={}, userPageId={}", userId, userName, userPageId);
|
||||
return AcquireResult.ok();
|
||||
}
|
||||
continue; // CAS 失败重试
|
||||
}
|
||||
// 同账号重入 → 刷新
|
||||
if (userId.equals(cur.getUserId())) {
|
||||
DetectionLock refreshed = new DetectionLock(userId, userName, userPageId, now, now + LOCK_MAX_HOLD_MS);
|
||||
if (current.compareAndSet(cur, refreshed)) {
|
||||
log.debug("DetectionLock reentered by userId={}, new userPageId={}", userId, userPageId);
|
||||
return AcquireResult.ok();
|
||||
}
|
||||
continue;
|
||||
}
|
||||
// 被他人持有
|
||||
return AcquireResult.busy(toHolderVO(cur));
|
||||
}
|
||||
// 两次 CAS 都失败属于罕见高并发场景:绝不返回 ok()(那样会让调用方误以为持锁)。
|
||||
// 返回 busy,data 可能为 null(锁刚被释放);调用方/前端按"请重试"处理。
|
||||
DetectionLock cur = current.get();
|
||||
return AcquireResult.busy(cur == null ? null : toHolderVO(cur));
|
||||
}
|
||||
|
||||
/** 仅当 holder.userId == userId 才释放(幂等)。
|
||||
* 循环终止性:每轮 CAS 失败意味着 current 被其他线程改写;
|
||||
* 下一轮 get 后 cur 可能变成 null 或不再匹配 userId,命中前置 return 退出。
|
||||
* 唯一可能继续的情况是另一线程把它换成了同 userId 的新 lock,下一轮 CAS 会再次尝试;
|
||||
* 最坏情况下 CAS 成功,仍然终止。 */
|
||||
public void releaseIfHeldBy(String userId, String reason) {
|
||||
while (true) {
|
||||
DetectionLock cur = current.get();
|
||||
if (cur == null || !cur.getUserId().equals(userId)) return;
|
||||
if (current.compareAndSet(cur, null)) {
|
||||
log.info("DetectionLock released, reason={}, userId={}", reason, userId);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** 仅当 holder.userPageId == userPageId 才释放(幂等)。终止性同 releaseIfHeldBy。 */
|
||||
public void releaseIfMatchPage(String userPageId, String reason) {
|
||||
while (true) {
|
||||
DetectionLock cur = current.get();
|
||||
if (cur == null || !cur.getUserPageId().equals(userPageId)) return;
|
||||
if (current.compareAndSet(cur, null)) {
|
||||
log.info("DetectionLock released, reason={}, userPageId={}", reason, userPageId);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** 管理员强制释放,不校验 holder。 */
|
||||
public void forceRelease(String operatorUserId, String reason) {
|
||||
DetectionLock cur = current.getAndSet(null);
|
||||
if (cur != null) {
|
||||
log.warn("DetectionLock force-released by operator={}, victim userId={}, reason={}",
|
||||
operatorUserId, cur.getUserId(), reason);
|
||||
}
|
||||
}
|
||||
|
||||
/** 返回当前 holder 快照;返回 null 表示空闲。 */
|
||||
public DetectionLock getCurrent() {
|
||||
DetectionLock cur = current.get();
|
||||
// 顺手做惰性超时回收(spec R5)
|
||||
if (cur != null && System.currentTimeMillis() > cur.getExpireAt()) {
|
||||
current.compareAndSet(cur, null);
|
||||
return null;
|
||||
}
|
||||
return cur;
|
||||
}
|
||||
|
||||
/** 把 DetectionLock 转成给前端的 VO。 */
|
||||
public static DetectionLockHolderVO toHolderVO(DetectionLock lock) {
|
||||
DetectionLockHolderVO vo = new DetectionLockHolderVO();
|
||||
vo.setHolderUserId(lock.getUserId());
|
||||
vo.setHolderUserName(lock.getUserName());
|
||||
vo.setAcquireTime(new Date(lock.getAcquireTime()));
|
||||
vo.setExpireAt(new Date(lock.getExpireAt()));
|
||||
return vo;
|
||||
}
|
||||
|
||||
/** 抢锁结果。 */
|
||||
public static final class AcquireResult {
|
||||
private final boolean ok;
|
||||
private final DetectionLockHolderVO holder;
|
||||
|
||||
private AcquireResult(boolean ok, DetectionLockHolderVO holder) {
|
||||
this.ok = ok;
|
||||
this.holder = holder;
|
||||
}
|
||||
public static AcquireResult ok() { return new AcquireResult(true, null); }
|
||||
public static AcquireResult busy(DetectionLockHolderVO holder) { return new AcquireResult(false, holder); }
|
||||
public boolean isOk() { return ok; }
|
||||
public DetectionLockHolderVO getHolder() { return holder; }
|
||||
}
|
||||
}
|
||||
@@ -19,7 +19,8 @@ public enum DetectionResponseEnum {
|
||||
|
||||
|
||||
SCRIPT_CHECK_DATA_NOT_EXIST("A020040","测试脚本项暂无配置" ),
|
||||
EXCEED_MAX_TIME("A020041","检测次数超出最大限制!" );
|
||||
EXCEED_MAX_TIME("A020041","检测次数超出最大限制!" ),
|
||||
DETECTION_BUSY("A020042", "检测进行中");
|
||||
|
||||
private final String code;
|
||||
private final String message;
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
package com.njcn.gather.detection.pojo.vo;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonFormat;
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
/**
|
||||
* 检测锁持有者信息,用于在抢锁失败响应中返回给前端。
|
||||
*/
|
||||
@Data
|
||||
public class DetectionLockHolderVO {
|
||||
|
||||
private String holderUserId;
|
||||
private String holderUserName;
|
||||
|
||||
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
|
||||
private Date acquireTime;
|
||||
|
||||
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
|
||||
private Date expireAt;
|
||||
}
|
||||
@@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSON;
|
||||
import com.njcn.gather.detection.handler.SocketContrastResponseService;
|
||||
import com.njcn.gather.detection.handler.SocketDevResponseService;
|
||||
import com.njcn.gather.detection.handler.SocketSourceResponseService;
|
||||
import com.njcn.gather.detection.lock.DetectionLockManager;
|
||||
import com.njcn.gather.detection.pojo.enums.SourceResponseCodeEnum;
|
||||
import com.njcn.gather.detection.pojo.param.ContrastDetectionParam;
|
||||
import com.njcn.gather.detection.pojo.param.PreDetectionParam;
|
||||
@@ -26,6 +27,8 @@ import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.Resource;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
@@ -42,6 +45,20 @@ import java.util.concurrent.TimeUnit;
|
||||
@Component
|
||||
public class NettyClient {
|
||||
|
||||
// ========== TODO TEST-ONLY: 联调 BUSY 弹窗用,测试完成后整段删除 ==========
|
||||
/**
|
||||
* 测试期:连接源/设备失败时,延迟若干秒再释放检测锁,方便手动测试 BUSY 弹窗。
|
||||
* 正式上线时把 RELEASE_DELAY_FOR_TEST_SECONDS 改回 0 或直接删除调度逻辑。
|
||||
*/
|
||||
private static final long RELEASE_DELAY_FOR_TEST_SECONDS = 300L;
|
||||
private static final ScheduledExecutorService DELAYED_RELEASER =
|
||||
Executors.newSingleThreadScheduledExecutor(r -> {
|
||||
Thread t = new Thread(r, "lock-release-delay-test");
|
||||
t.setDaemon(true);
|
||||
return t;
|
||||
});
|
||||
// ========== /TODO TEST-ONLY ==========
|
||||
|
||||
@Resource
|
||||
private SocketSourceResponseService socketSourceResponseService;
|
||||
|
||||
@@ -349,7 +366,7 @@ public class NettyClient {
|
||||
NioEventLoopGroup group, String msg) {
|
||||
channelFuture.addListener((ChannelFutureListener) ch -> {
|
||||
if (!ch.isSuccess()) {
|
||||
onConnectionFailure(handler, group);
|
||||
onConnectionFailure(param, handler, group);
|
||||
} else {
|
||||
onConnectionSuccess(channelFuture, param, handler, group, msg);
|
||||
}
|
||||
@@ -358,15 +375,30 @@ public class NettyClient {
|
||||
|
||||
/**
|
||||
* 连接失败处理
|
||||
* 输出失败信息并优雅关闭事件循环组
|
||||
* 输出失败信息、优雅关闭事件循环组、通知前端、释放检测锁
|
||||
*
|
||||
* @param param 检测参数,用于定位锁与前端通知
|
||||
* @param handler 业务处理器,用于区分设备类型
|
||||
* @param group 事件循环组
|
||||
*/
|
||||
private static void onConnectionFailure(SimpleChannelInboundHandler<String> handler, NioEventLoopGroup group) {
|
||||
private static void onConnectionFailure(PreDetectionParam param,
|
||||
SimpleChannelInboundHandler<String> handler, NioEventLoopGroup group) {
|
||||
String deviceType = getDeviceType(handler);
|
||||
log.info("连接{}服务端失败...", deviceType);
|
||||
group.shutdownGracefully();
|
||||
// 异步建连失败时前端原本静默,补一次通知避免用户黑屏等待
|
||||
notifyFrontendError(param, handler);
|
||||
// 释放检测锁:抢锁后由 controller 异步发起的建连若失败,无法走 controller 兜底
|
||||
// TODO TEST-ONLY: 测试 BUSY 弹窗期间延迟释放,正式部署改回立即释放
|
||||
if (param != null && param.getUserPageId() != null) {
|
||||
final String userPageId = param.getUserPageId();
|
||||
DELAYED_RELEASER.schedule(
|
||||
() -> DetectionLockManager.getInstance()
|
||||
.releaseIfMatchPage(userPageId, "ASYNC_CONNECT_FAILED_DELAYED"),
|
||||
RELEASE_DELAY_FOR_TEST_SECONDS, TimeUnit.SECONDS);
|
||||
log.warn("[TEST-ONLY] 检测锁将在 {}s 后释放(连接失败延迟释放,便于测试 BUSY 弹窗),userPageId={}",
|
||||
RELEASE_DELAY_FOR_TEST_SECONDS, userPageId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -455,6 +487,18 @@ public class NettyClient {
|
||||
|
||||
// 通过WebSocket通知前端页面
|
||||
notifyFrontendError(param, handler);
|
||||
|
||||
// 释放检测锁,理由同 onConnectionFailure
|
||||
// TODO TEST-ONLY: 测试 BUSY 弹窗期间延迟释放,正式部署改回立即释放
|
||||
if (param != null && param.getUserPageId() != null) {
|
||||
final String userPageId = param.getUserPageId();
|
||||
DELAYED_RELEASER.schedule(
|
||||
() -> DetectionLockManager.getInstance()
|
||||
.releaseIfMatchPage(userPageId, "ASYNC_CONNECT_EXCEPTION_DELAYED"),
|
||||
RELEASE_DELAY_FOR_TEST_SECONDS, TimeUnit.SECONDS);
|
||||
log.warn("[TEST-ONLY] 检测锁将在 {}s 后释放(连接异常延迟释放,便于测试 BUSY 弹窗),userPageId={}",
|
||||
RELEASE_DELAY_FOR_TEST_SECONDS, userPageId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.njcn.gather.detection.util.socket.cilent;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import com.njcn.gather.detection.handler.SocketContrastResponseService;
|
||||
import com.njcn.gather.detection.lock.DetectionLockManager;
|
||||
import com.njcn.gather.detection.pojo.enums.SourceOperateCodeEnum;
|
||||
import com.njcn.gather.detection.pojo.enums.SourceResponseCodeEnum;
|
||||
import com.njcn.gather.detection.pojo.param.PreDetectionParam;
|
||||
@@ -68,6 +69,11 @@ public class NettyContrastClientHandler extends SimpleChannelInboundHandler<Stri
|
||||
CnSocketUtil.contrastSendquit(param.getUserPageId(), SourceOperateCodeEnum.QUIT_INIT_01, false);
|
||||
CnSocketUtil.contrastSendquit(param.getUserPageId(), SourceOperateCodeEnum.QUIT_INIT_02, false);
|
||||
CnSocketUtil.contrastSendquit(param.getUserPageId(), SourceOperateCodeEnum.QUIT_INIT_03, true);
|
||||
// 业务消息处理异常 → 释放检测锁
|
||||
if (param != null && param.getUserPageId() != null) {
|
||||
DetectionLockManager.getInstance()
|
||||
.releaseIfMatchPage(param.getUserPageId(), "CONTRAST_READ_EXCEPTION");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -76,6 +82,11 @@ public class NettyContrastClientHandler extends SimpleChannelInboundHandler<Stri
|
||||
System.out.println("与通信模块端断线");
|
||||
ctx.close();
|
||||
SocketManager.removeUser(param.getUserPageId() + CnSocketUtil.CONTRAST_DEV_TAG);
|
||||
// 比对通信模块主动断开 → 本次检测视为结束,释放检测锁
|
||||
if (param != null && param.getUserPageId() != null) {
|
||||
DetectionLockManager.getInstance()
|
||||
.releaseIfMatchPage(param.getUserPageId(), "CONTRAST_CHANNEL_INACTIVE");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -150,11 +161,18 @@ public class NettyContrastClientHandler extends SimpleChannelInboundHandler<Stri
|
||||
CnSocketUtil.contrastSendquit(param.getUserPageId(), SourceOperateCodeEnum.QUIT_INIT_02, false);
|
||||
CnSocketUtil.contrastSendquit(param.getUserPageId(), SourceOperateCodeEnum.QUIT_INIT_03, true);
|
||||
ctx.close();
|
||||
// 比对通道异常 → 释放检测锁
|
||||
if (param != null && param.getUserPageId() != null) {
|
||||
DetectionLockManager.getInstance()
|
||||
.releaseIfMatchPage(param.getUserPageId(), "CONTRAST_EXCEPTION");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 接收数据超时处理
|
||||
* 比对模式读超时(10 分钟 Pst / 1 分钟实时 / 动态统计)都汇到这里,
|
||||
* 推前端的同时一并释放检测锁(避免锁卡到 4 小时绝对超时)。
|
||||
*/
|
||||
private void timeoutSend(SourceOperateCodeEnum sourceOperateCodeEnum) {
|
||||
System.out.println("超时处理-----》" + "统计数据已超时----------------关闭");
|
||||
@@ -164,6 +182,11 @@ public class NettyContrastClientHandler extends SimpleChannelInboundHandler<Stri
|
||||
webSend.setData(sourceOperateCodeEnum.getMsg() + SourceResponseCodeEnum.RECEIVE_DATA_TIME_OUT.getMessage());
|
||||
webSend.setCode(SourceResponseCodeEnum.RECEIVE_DATA_TIME_OUT.getCode());
|
||||
WebServiceManager.sendMsg(param.getUserPageId(), MsgUtil.msgToWebData(webSend, FormalTestManager.devNameMapComm, 0));
|
||||
// 比对模式读超时终态 → 释放检测锁
|
||||
if (param != null && param.getUserPageId() != null) {
|
||||
DetectionLockManager.getInstance()
|
||||
.releaseIfMatchPage(param.getUserPageId(), "CONTRAST_IDLE_TIMEOUT");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -3,6 +3,8 @@ package com.njcn.gather.detection.util.socket.cilent;
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.njcn.gather.detection.handler.SocketDevResponseService;
|
||||
import com.njcn.gather.detection.lock.DetectionLock;
|
||||
import com.njcn.gather.detection.lock.DetectionLockManager;
|
||||
import com.njcn.gather.detection.pojo.enums.ResultEnum;
|
||||
import com.njcn.gather.detection.pojo.enums.SourceOperateCodeEnum;
|
||||
import com.njcn.gather.detection.pojo.param.PreDetectionParam;
|
||||
@@ -115,6 +117,9 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler<String> {
|
||||
ctx.close();
|
||||
SocketManager.removeUser(param.getUserPageId() + CnSocketUtil.DEV_TAG);
|
||||
CnSocketUtil.quitSendSource(param);
|
||||
// 设备主动断开 → 本次检测视为结束,释放检测锁
|
||||
DetectionLockManager.getInstance()
|
||||
.releaseIfMatchPage(param.getUserPageId(), "DEV_CHANNEL_INACTIVE");
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -132,6 +137,9 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler<String> {
|
||||
} catch (Exception e) {
|
||||
log.error("处理服务端消息异常", e);
|
||||
CnSocketUtil.quitSend(param);
|
||||
// 业务消息处理异常 → 退出并释放检测锁
|
||||
DetectionLockManager.getInstance()
|
||||
.releaseIfMatchPage(param.getUserPageId(), "DEV_READ_EXCEPTION");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -187,6 +195,9 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler<String> {
|
||||
CnSocketUtil.quitSendSource(param);
|
||||
socketResponseService.backCheckState(param);
|
||||
ctx.close();
|
||||
// 通道异常 → 释放检测锁
|
||||
DetectionLockManager.getInstance()
|
||||
.releaseIfMatchPage(param.getUserPageId(), "DEV_EXCEPTION");
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -231,6 +242,9 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler<String> {
|
||||
CnSocketUtil.quitSend(param);
|
||||
WebServiceManager.sendDetectionErrorMessage(param.getUserPageId(), SourceOperateCodeEnum.SOCKET_TIMEOUT);
|
||||
socketResponseService.backCheckState(param);
|
||||
// 常规步骤读超时兜底 → 释放检测锁
|
||||
DetectionLockManager.getInstance()
|
||||
.releaseIfMatchPage(param.getUserPageId(), "DEV_READ_TIMEOUT");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -244,6 +258,14 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler<String> {
|
||||
if (FormalTestManager.stopTime > STOP_TIMEOUT) {
|
||||
CnSocketUtil.quitSend(param);
|
||||
WebServiceManager.sendDetectionErrorMessage(param.getUserPageId(), SourceOperateCodeEnum.FORMAL_REAL.getValue(), SourceOperateCodeEnum.STOP_TIMEOUT);
|
||||
// R4 释放:暂停 10 分钟超时视同放弃本次检测
|
||||
DetectionLock cur = DetectionLockManager.getInstance().getCurrent();
|
||||
if (cur != null) {
|
||||
DetectionLockManager.getInstance().releaseIfHeldBy(cur.getUserId(), "PAUSE_TIMEOUT");
|
||||
}
|
||||
// 重置 FormalTestManager 状态,避免下次进入误判
|
||||
FormalTestManager.stopTime = 0;
|
||||
FormalTestManager.hasStopFlag = false;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -302,6 +324,9 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler<String> {
|
||||
CnSocketUtil.quitSend(param);
|
||||
timeoutSend(sourceIssue);
|
||||
socketResponseService.backCheckState(param);
|
||||
// 单项检测超时本质等于整轮中止(已 quitSend),释放检测锁
|
||||
DetectionLockManager.getInstance()
|
||||
.releaseIfMatchPage(param.getUserPageId(), "DEV_ITEM_TIMEOUT");
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.njcn.gather.detection.util.socket.cilent;
|
||||
|
||||
import com.njcn.gather.detection.handler.SocketSourceResponseService;
|
||||
import com.njcn.gather.detection.lock.DetectionLockManager;
|
||||
import com.njcn.gather.detection.pojo.enums.SourceOperateCodeEnum;
|
||||
import com.njcn.gather.detection.pojo.param.PreDetectionParam;
|
||||
import com.njcn.gather.detection.util.socket.CnSocketUtil;
|
||||
@@ -70,6 +71,9 @@ public class NettySourceClientHandler extends SimpleChannelInboundHandler<String
|
||||
// 从Socket管理器中移除用户通道映射
|
||||
if (webUser != null && StrUtil.isNotBlank(userId)) {
|
||||
SocketManager.removeUser(userId + CnSocketUtil.SOURCE_TAG);
|
||||
// 源端主动断开 → 本次检测视为结束,释放检测锁
|
||||
DetectionLockManager.getInstance()
|
||||
.releaseIfMatchPage(userId, "SOURCE_CHANNEL_INACTIVE");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -95,6 +99,11 @@ public class NettySourceClientHandler extends SimpleChannelInboundHandler<String
|
||||
log.error("源设备消息处理异常, userId: {}, message: {}", userId, msg, e);
|
||||
// 发生异常时退出发送,避免后续问题
|
||||
CnSocketUtil.quitSend(webUser);
|
||||
// 业务消息处理异常 → 释放检测锁
|
||||
if (StrUtil.isNotBlank(userId)) {
|
||||
DetectionLockManager.getInstance()
|
||||
.releaseIfMatchPage(userId, "SOURCE_READ_EXCEPTION");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -136,6 +145,13 @@ public class NettySourceClientHandler extends SimpleChannelInboundHandler<String
|
||||
|
||||
// 发生异常时关闭通道
|
||||
ctx.close();
|
||||
|
||||
// 源通道异常 → 释放检测锁
|
||||
String userIdForRelease = webUser != null ? webUser.getUserPageId() : null;
|
||||
if (StrUtil.isNotBlank(userIdForRelease)) {
|
||||
DetectionLockManager.getInstance()
|
||||
.releaseIfMatchPage(userIdForRelease, "SOURCE_EXCEPTION");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.njcn.gather.detection.util.socket.websocket;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.njcn.gather.detection.lock.DetectionLockManager;
|
||||
import com.njcn.gather.detection.pojo.param.PreDetectionParam;
|
||||
import com.njcn.gather.detection.pojo.vo.WebSocketVO;
|
||||
import com.njcn.gather.detection.pojo.enums.SourceOperateCodeEnum;
|
||||
@@ -296,6 +297,8 @@ public class WebServiceManager {
|
||||
webSocketVO.setData(errorType.getMsg());
|
||||
webSocketVO.setOperateCode(errorType.getValue());
|
||||
sendMessage(userId, webSocketVO);
|
||||
// 守门员:错误推送即视为本次检测终态,释放检测锁(幂等,与显式释放点叠加双保险)
|
||||
releaseLockOnTerminal(userId, errorType);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -313,6 +316,25 @@ public class WebServiceManager {
|
||||
webSocketVO.setData(errorType.getMsg());
|
||||
webSocketVO.setOperateCode(errorType.getValue());
|
||||
sendMessage(userId, webSocketVO);
|
||||
// 守门员:理由同上
|
||||
releaseLockOnTerminal(userId, errorType);
|
||||
}
|
||||
|
||||
/**
|
||||
* 守门员释放锁
|
||||
* <p>覆盖业务回调里所有走 {@code sendDetectionErrorMessage} 的失败路径,
|
||||
* 等价于在 detection/handler 全目录的错误终态点显式释放。与各 Netty handler
|
||||
* 内的显式释放幂等叠加,形成双保险。</p>
|
||||
*
|
||||
* <p>注:业务"正常完成"路径不走此方法(数模式 formalDeal 已在 Phase 1 显式释放;
|
||||
* 比对模式正常完成走 sendMsg 推 ERROR_FLOW_END,依赖 WS 断开后心跳超时兜底)。</p>
|
||||
*/
|
||||
private static void releaseLockOnTerminal(String userId, SourceOperateCodeEnum errorType) {
|
||||
if (userId == null || userId.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
DetectionLockManager.getInstance()
|
||||
.releaseIfMatchPage(userId, "WS_ERROR_PUSH:" + errorType.name());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.njcn.gather.detection.util.socket.websocket;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import com.njcn.gather.detection.lock.DetectionLockManager;
|
||||
import com.njcn.gather.detection.pojo.enums.SourceOperateCodeEnum;
|
||||
import com.njcn.gather.detection.pojo.param.PreDetectionParam;
|
||||
import com.njcn.gather.detection.util.socket.CnSocketUtil;
|
||||
@@ -414,6 +415,11 @@ public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketF
|
||||
}
|
||||
// 清理完成后移除该用户的检测参数
|
||||
WebServiceManager.removePreDetectionParam(userId);
|
||||
// R3 释放:WS 断开 / 客户端关页面 / 心跳超时后顺手释放检测锁
|
||||
if (preDetectionParam.getUserPageId() != null) {
|
||||
DetectionLockManager.getInstance()
|
||||
.releaseIfMatchPage(preDetectionParam.getUserPageId(), "WS_DISCONNECTED");
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("清理Socket连接时发生异常,userId: {}", userId, e);
|
||||
|
||||
Reference in New Issue
Block a user