|
|
@@ -1,61 +1,71 @@
|
|
|
package com.ruoyi.mqtt.listener;
|
|
|
|
|
|
import com.alibaba.fastjson2.JSON;
|
|
|
+import com.alibaba.fastjson2.JSONArray;
|
|
|
+import com.alibaba.fastjson2.JSONObject;
|
|
|
import com.ruoyi.common.core.redis.RedisCache;
|
|
|
+import com.ruoyi.common.utils.DateUtils;
|
|
|
import com.ruoyi.common.utils.StringUtils;
|
|
|
+import com.ruoyi.mqtt.constant.MqttTopicConstants;
|
|
|
+import com.ruoyi.mqtt.dto.localization.RobotPoseInfo;
|
|
|
+import com.ruoyi.mqtt.dto.task.ArriveEventInfo;
|
|
|
+import com.ruoyi.mqtt.dto.task.TaskRealtimeInfo;
|
|
|
+import com.ruoyi.mqtt.service.MqttSendService;
|
|
|
+import com.ruoyi.mqtt.util.MqttTopicUtil;
|
|
|
+import com.ruoyi.robot.domain.LdTask;
|
|
|
+import com.ruoyi.robot.domain.LdTaskExecutionLog;
|
|
|
+import com.ruoyi.robot.mapper.LdMapProjectMapper;
|
|
|
+import com.ruoyi.robot.service.ILdTaskExecutionLogService;
|
|
|
+import com.ruoyi.robot.service.ILdTaskService;
|
|
|
+import com.ruoyi.websocket.service.WebSocketPushService;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
-import org.springframework.http.HttpEntity;
|
|
|
-import org.springframework.http.HttpHeaders;
|
|
|
-import org.springframework.http.MediaType;
|
|
|
-import org.springframework.http.ResponseEntity;
|
|
|
import org.springframework.messaging.Message;
|
|
|
import org.springframework.messaging.MessageHandler;
|
|
|
import org.springframework.messaging.MessagingException;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
-import org.springframework.web.client.RestTemplate;
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
|
import javax.annotation.PreDestroy;
|
|
|
-import java.math.BigDecimal;
|
|
|
-import java.time.LocalDateTime;
|
|
|
-import java.time.format.DateTimeFormatter;
|
|
|
-import java.time.format.DateTimeParseException;
|
|
|
-import java.util.*;
|
|
|
-import java.util.concurrent.ConcurrentLinkedQueue;
|
|
|
+import java.util.List;
|
|
|
import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
-import java.util.regex.Matcher;
|
|
|
-import java.util.regex.Pattern;
|
|
|
|
|
|
/**
|
|
|
* MQTT 消息监听器
|
|
|
* 处理设备上报的作业相关消息、状态及日志事件
|
|
|
+ * 集成WebSocket推送服务,将数据实时推送给前端
|
|
|
*/
|
|
|
@Component
|
|
|
public class MqttMessageListener implements MessageHandler {
|
|
|
|
|
|
private static final Logger log = LoggerFactory.getLogger(MqttMessageListener.class);
|
|
|
|
|
|
- private static final Pattern DEVICE_TOPIC_PATTERN = Pattern.compile("device/(\\w+)/(cmd|report)/job/(\\w+)");
|
|
|
- private static final Pattern LOG_TOPIC_PATTERN = Pattern.compile("log/(\\w+)/event");
|
|
|
- private static final Pattern DISEASE_RECOGNITION_TOPIC_PATTERN =
|
|
|
- Pattern.compile("device/([a-zA-Z0-9_-]+)/cmd/jc/execute");
|
|
|
-
|
|
|
- private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
-
|
|
|
-// private final Queue<VehicleRealtimeData> buffer = new ConcurrentLinkedQueue<>();
|
|
|
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
|
|
|
|
|
|
@Autowired
|
|
|
private RedisCache redisCache;
|
|
|
|
|
|
+ @Autowired(required = false)
|
|
|
+ private WebSocketPushService webSocketPushService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private LdMapProjectMapper mapProjectMapper;
|
|
|
+
|
|
|
+ @Autowired(required = false)
|
|
|
+ private MqttSendService mqttSendService;
|
|
|
+
|
|
|
+ @Autowired(required = false)
|
|
|
+ private ILdTaskService ldTaskService;
|
|
|
+
|
|
|
+ @Autowired(required = false)
|
|
|
+ private ILdTaskExecutionLogService ldTaskExecutionLogService;
|
|
|
|
|
|
@PostConstruct
|
|
|
public void init() {
|
|
|
-// scheduler.scheduleAtFixedRate(this::flushBuffer, 30, 30, TimeUnit.SECONDS);
|
|
|
+ log.info("MQTT消息监听器初始化完成, WebSocket推送服务: {}",
|
|
|
+ webSocketPushService != null ? "已启用" : "未启用(请检查websocket模块)");
|
|
|
}
|
|
|
|
|
|
@PreDestroy
|
|
|
@@ -69,20 +79,481 @@ public class MqttMessageListener implements MessageHandler {
|
|
|
String payload = message.getPayload().toString();
|
|
|
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
|
|
|
int qos = (int) message.getHeaders().get("mqtt_receivedQos");
|
|
|
- log.info("接收MQTT消息 -> 主题:{},QoS:{},内容:{}", topic, qos, payload);
|
|
|
+
|
|
|
+ log.info("接收MQTT消息 -> 主题:{},QoS:{}", topic, qos);
|
|
|
+
|
|
|
if (StringUtils.isEmpty(payload)) {
|
|
|
log.warn("消息体为空,主题:{}", topic);
|
|
|
return;
|
|
|
}
|
|
|
- if (topic.startsWith("")) {
|
|
|
+
|
|
|
+ // 优先处理LD导航系统消息
|
|
|
+ if (MqttTopicUtil.isValidTopic(topic)) {
|
|
|
+ handleLdNavigationMessage(topic, payload);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("处理MQTT消息失败", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理LD导航系统消息
|
|
|
+ */
|
|
|
+ private void handleLdNavigationMessage(String topic, String payload) {
|
|
|
+ String deviceId = MqttTopicUtil.extractDeviceId(topic);
|
|
|
+ String shortTopic = MqttTopicUtil.extractShortTopic(topic);
|
|
|
+
|
|
|
+ if (deviceId == null || shortTopic == null) {
|
|
|
+ log.warn("无法解析主题:{}", topic);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("处理LD导航系统消息 -> 设备ID:{},短主题:{}", deviceId, shortTopic);
|
|
|
+
|
|
|
+ try {
|
|
|
+ switch (shortTopic) {
|
|
|
+ // 地图相关
|
|
|
+ case MqttTopicConstants.MAP_SERVICE_MAPLIST_RESPONSE:
|
|
|
+ handleMapListResponse(deviceId, payload);
|
|
|
+ break;
|
|
|
+ case MqttTopicConstants.MAP_PROPERTY_CURRENT_MAP:
|
|
|
+ handleCurrentMap(deviceId, payload);
|
|
|
+ break;
|
|
|
+
|
|
|
+ // 定位相关
|
|
|
+ case MqttTopicConstants.LOCALIZATION_POSE:
|
|
|
+ handleLocalizationPose(deviceId, payload);
|
|
|
+ break;
|
|
|
+ case MqttTopicConstants.LOCALIZATION_ACTION_INIT_REPLY:
|
|
|
+ handleInitPoseReply(deviceId, payload);
|
|
|
+ break;
|
|
|
+
|
|
|
+ // 导航相关
|
|
|
+ case MqttTopicConstants.NAVIGATION_STACK_ACTION_START_REPLY:
|
|
|
+ case MqttTopicConstants.NAVIGATION_STACK_ACTION_STOP_REPLY:
|
|
|
+ case MqttTopicConstants.NAVIGATION_STACK_ACTION_RESTART_REPLY:
|
|
|
+ handleNavigationReply(deviceId, shortTopic, payload);
|
|
|
+ break;
|
|
|
+
|
|
|
+ // 规划相关
|
|
|
+ case MqttTopicConstants.PLANNING_SERVICE_PLAN_RESPONSE:
|
|
|
+ handlePlanningResponse(deviceId, payload);
|
|
|
+ break;
|
|
|
+ case MqttTopicConstants.PLANNING_TRAJECTORY:
|
|
|
+ case MqttTopicConstants.PLANNING_TRAJECTORY_2D_COMPACT:
|
|
|
+ handleTrajectory(deviceId, shortTopic, payload);
|
|
|
+ break;
|
|
|
+ case MqttTopicConstants.PLANNING_ACTION_REPLAN_REPLY:
|
|
|
+ handleReplanReply(deviceId, payload);
|
|
|
+ break;
|
|
|
+
|
|
|
+ // 任务相关
|
|
|
+ case MqttTopicConstants.TASK_TARGET_ACTION_GOTO_REPLY:
|
|
|
+ handleGotoReply(deviceId, payload);
|
|
|
+ break;
|
|
|
+ case MqttTopicConstants.TASK_TARGET_EVENT_ARRIVE:
|
|
|
+ handleArriveEvent(deviceId, payload);
|
|
|
+ break;
|
|
|
+ case MqttTopicConstants.TASK_REALTIME_INFO:
|
|
|
+ handleTaskRealtimeInfo(deviceId, payload);
|
|
|
+ break;
|
|
|
+ case MqttTopicConstants.TASK_PROCEDURE_ACTION_PAUSE_REPLY:
|
|
|
+ case MqttTopicConstants.TASK_PROCEDURE_ACTION_RESUME_REPLY:
|
|
|
+ case MqttTopicConstants.TASK_PROCEDURE_ACTION_CANCEL_REPLY:
|
|
|
+ handleTaskProcedureReply(deviceId, shortTopic, payload);
|
|
|
+ break;
|
|
|
+
|
|
|
+ // ASM远程功能调用相关
|
|
|
+ case MqttTopicConstants.ABILITY_FUNCTION_ACTION_EXEC_REPLY:
|
|
|
+ handleAsmExecReply(deviceId, payload);
|
|
|
+ break;
|
|
|
+ case MqttTopicConstants.ABILITY_FUNCTION_ACTION_EXEC_PROGRESS:
|
|
|
+ handleAsmExecProgress(deviceId, payload);
|
|
|
+ break;
|
|
|
+ case MqttTopicConstants.ABILITY_FUNCTION_ACTION_EXEC_STATE:
|
|
|
+ handleAsmExecState(deviceId, payload);
|
|
|
+ break;
|
|
|
+
|
|
|
+ // 传感器相关
|
|
|
+ case MqttTopicConstants.SENSOR_BATTERY:
|
|
|
+ handleBatteryInfo(deviceId, payload);
|
|
|
+ break;
|
|
|
+
|
|
|
+ default:
|
|
|
+ log.warn("未处理的LD导航系统主题:{}", shortTopic);
|
|
|
+ // 推送原始消息到WebSocket,便于前端处理未预定义的消息
|
|
|
+ if (webSocketPushService != null) {
|
|
|
+ webSocketPushService.pushRawMessage(deviceId, topic, payload);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("处理LD导航系统消息异常,设备ID:{},主题:{}", deviceId, shortTopic, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // ==================== LD导航系统消息处理方法 ====================
|
|
|
+
|
|
|
+ private void handleMapListResponse(String deviceId, String payload) {
|
|
|
+ log.info("收到地图列表响应 -> 设备ID:{}", deviceId);
|
|
|
+ // 推送WebSocket
|
|
|
+ if (webSocketPushService != null) {
|
|
|
+ webSocketPushService.pushMapListResponse(deviceId, payload);
|
|
|
+ }
|
|
|
+ // 缓存到Redis
|
|
|
+ String cacheKey = "ld:maplist:" + deviceId;
|
|
|
+ redisCache.setCacheObject(cacheKey, payload);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void handleCurrentMap(String deviceId, String payload) {
|
|
|
+ log.info("收到当前地图 -> 设备ID:{}", deviceId);
|
|
|
+ // 推送WebSocket
|
|
|
+ if (webSocketPushService != null) {
|
|
|
+ webSocketPushService.pushCurrentMap(deviceId, payload);
|
|
|
+ }
|
|
|
+ // 缓存到Redis
|
|
|
+ String cacheKey = "ld:currentmap:" + deviceId;
|
|
|
+ redisCache.setCacheObject(cacheKey, payload);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void handleLocalizationPose(String deviceId, String payload) {
|
|
|
+ try {
|
|
|
+ JSONObject json = JSON.parseObject(payload);
|
|
|
+ JSONArray argsArray = json.getJSONArray("args");
|
|
|
+ if (argsArray != null && !argsArray.isEmpty()) {
|
|
|
+ RobotPoseInfo poseInfo = argsArray.getObject(0, RobotPoseInfo.class);
|
|
|
+ log.debug("机器人位姿 -> 设备ID:{},坐标:{},置信度:{},状态:{}",
|
|
|
+ deviceId,
|
|
|
+ poseInfo.getPose() != null ? poseInfo.getPose().getXyz() : null,
|
|
|
+ poseInfo.getConfidence(),
|
|
|
+ poseInfo.getRunState());
|
|
|
+
|
|
|
+ // 缓存到Redis
|
|
|
+ String cacheKey = "ld:pose:" + deviceId;
|
|
|
+ redisCache.setCacheObject(cacheKey, poseInfo);
|
|
|
+
|
|
|
+ // 推送WebSocket(关键:实时推送位姿数据给前端)
|
|
|
+ if (webSocketPushService != null) {
|
|
|
+ webSocketPushService.pushRobotPose(deviceId, payload);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("解析机器人位姿失败", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void handleInitPoseReply(String deviceId, String payload) {
|
|
|
+ log.info("收到位姿初始化响应 -> 设备ID:{}", deviceId);
|
|
|
+ if (webSocketPushService != null) {
|
|
|
+ webSocketPushService.pushNavigationReply(deviceId, "init_reply", payload);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void handleNavigationReply(String deviceId, String shortTopic, String payload) {
|
|
|
+ log.info("收到导航操作响应 -> 设备ID:{},操作:{}", deviceId, shortTopic);
|
|
|
+ if (webSocketPushService != null) {
|
|
|
+ webSocketPushService.pushNavigationReply(deviceId, shortTopic, payload);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void handlePlanningResponse(String deviceId, String payload) {
|
|
|
+ log.info("收到路径规划响应 -> 设备ID:{}", deviceId);
|
|
|
+ if (webSocketPushService != null) {
|
|
|
+ webSocketPushService.pushPlanningResponse(deviceId, "plan_response", payload);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void handleTrajectory(String deviceId, String shortTopic, String payload) {
|
|
|
+ log.debug("收到轨迹数据 -> 设备ID:{},类型:{}", deviceId, shortTopic);
|
|
|
+ if (webSocketPushService != null) {
|
|
|
+ webSocketPushService.pushTrajectory(deviceId, shortTopic, payload);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void handleReplanReply(String deviceId, String payload) {
|
|
|
+ log.info("收到重规划响应 -> 设备ID:{}", deviceId);
|
|
|
+ if (webSocketPushService != null) {
|
|
|
+ webSocketPushService.pushPlanningResponse(deviceId, "replan_reply", payload);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void handleGotoReply(String deviceId, String payload) {
|
|
|
+ log.info("收到前往目标点响应 -> 设备ID:{}", deviceId);
|
|
|
+ if (webSocketPushService != null) {
|
|
|
+ webSocketPushService.pushPlanningResponse(deviceId, "goto_reply", payload);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void handleArriveEvent(String deviceId, String payload) {
|
|
|
+ try {
|
|
|
+ JSONObject json = JSON.parseObject(payload);
|
|
|
+ JSONArray argsArray = json.getJSONArray("args");
|
|
|
+ if (argsArray != null && !argsArray.isEmpty()) {
|
|
|
+ ArriveEventInfo arriveInfo = argsArray.getObject(0, ArriveEventInfo.class);
|
|
|
+ log.info("机器人到达目标点 -> 设备ID:{},状态:{},错误码:{}",
|
|
|
+ deviceId, arriveInfo.getStatus(), arriveInfo.getError());
|
|
|
+
|
|
|
+ // 推送WebSocket
|
|
|
+ if (webSocketPushService != null) {
|
|
|
+ webSocketPushService.pushArriveEvent(deviceId, payload);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("解析到达事件失败", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void handleTaskRealtimeInfo(String deviceId, String payload) {
|
|
|
+ try {
|
|
|
+ JSONObject json = JSON.parseObject(payload);
|
|
|
+ JSONArray argsArray = json.getJSONArray("args");
|
|
|
+ if (argsArray != null && !argsArray.isEmpty()) {
|
|
|
+ TaskRealtimeInfo taskInfo = argsArray.getObject(0, TaskRealtimeInfo.class);
|
|
|
+ log.debug("任务实时信息 -> 设备ID:{},驾驶模式:{},剩余里程:{}m",
|
|
|
+ deviceId,
|
|
|
+ taskInfo.getDriveMode(),
|
|
|
+ taskInfo.getOdom() != null ? taskInfo.getOdom().getRemain() : null);
|
|
|
+
|
|
|
+ // 缓存到Redis
|
|
|
+ String cacheKey = "ld:task:realtime:" + deviceId;
|
|
|
+ redisCache.setCacheObject(cacheKey, taskInfo);
|
|
|
+
|
|
|
+ // 推送WebSocket(关键:实时推送任务信息给前端)
|
|
|
+ if (webSocketPushService != null) {
|
|
|
+ webSocketPushService.pushTaskRealtimeInfo(deviceId, payload);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("解析任务实时信息失败", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理任务控制响应(暂停/继续/取消)
|
|
|
+ * 根据响应结果更新数据库中对应任务的状态
|
|
|
+ */
|
|
|
+ private void handleTaskProcedureReply(String deviceId, String shortTopic, String payload) {
|
|
|
+ log.info("收到任务控制响应 -> 设备ID:{},操作:{}", deviceId, shortTopic);
|
|
|
+ try {
|
|
|
+ JSONObject json = JSON.parseObject(payload);
|
|
|
+ String status = json.getString("status");
|
|
|
+
|
|
|
+ // 判断响应是否成功
|
|
|
+ boolean success = "ok".equalsIgnoreCase(status);
|
|
|
+
|
|
|
+ // 根据操作类型确定目标状态
|
|
|
+ String targetStatus = determineTargetStatus(shortTopic, success);
|
|
|
+
|
|
|
+ // 更新数据库中该设备对应的任务状态
|
|
|
+ if (ldTaskService != null && StringUtils.isNotEmpty(targetStatus)) {
|
|
|
+ int updatedCount = ldTaskService.updateTaskStatusByDeviceId(deviceId, targetStatus);
|
|
|
+ if (updatedCount > 0) {
|
|
|
+ log.info("任务状态已更新 -> 设备ID:{},操作:{},新状态:{},更新记录数:{}",
|
|
|
+ deviceId, shortTopic, targetStatus, updatedCount);
|
|
|
|
|
|
- } else if (topic.startsWith("")) {
|
|
|
+ // 如果是取消操作,记录执行日志
|
|
|
+ if (success && MqttTopicConstants.TASK_PROCEDURE_ACTION_CANCEL_REPLY.equals(shortTopic)) {
|
|
|
+ recordTaskExecutionLog(deviceId, "cancelled", shortTopic);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ log.warn("未找到该设备对应的运行中任务 -> 设备ID:{}", deviceId);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- } else {
|
|
|
- log.warn("未匹配的消息主题:{}", topic);
|
|
|
+ // 推送WebSocket通知前端
|
|
|
+ if (webSocketPushService != null) {
|
|
|
+ webSocketPushService.pushTaskProcedureReply(deviceId, shortTopic, payload);
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
- log.error("处理MQTT消息失败", e);
|
|
|
+ log.error("处理任务控制响应失败 -> 设备ID:{},操作:{}", deviceId, shortTopic, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 根据操作类型确定目标状态
|
|
|
+ *
|
|
|
+ * @param shortTopic 操作主题
|
|
|
+ * @param success 操作是否成功
|
|
|
+ * @return 目标状态,null表示不需要更新
|
|
|
+ */
|
|
|
+ private String determineTargetStatus(String shortTopic, boolean success) {
|
|
|
+ if (!success) {
|
|
|
+ // 操作失败时保持原状态
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (MqttTopicConstants.TASK_PROCEDURE_ACTION_PAUSE_REPLY.equals(shortTopic)) {
|
|
|
+ return "paused";
|
|
|
+ } else if (MqttTopicConstants.TASK_PROCEDURE_ACTION_RESUME_REPLY.equals(shortTopic)) {
|
|
|
+ return "running";
|
|
|
+ } else if (MqttTopicConstants.TASK_PROCEDURE_ACTION_CANCEL_REPLY.equals(shortTopic)) {
|
|
|
+ return "idle";
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 记录任务执行日志
|
|
|
+ */
|
|
|
+ private void recordTaskExecutionLog(String deviceId, String status, String operation) {
|
|
|
+ try {
|
|
|
+ if (ldTaskExecutionLogService == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 获取该设备正在运行的任务
|
|
|
+ if (ldTaskService != null) {
|
|
|
+ List<LdTask> runningTasks = ldTaskService.selectRunningTasksByDeviceId(deviceId);
|
|
|
+ if (runningTasks != null && !runningTasks.isEmpty()) {
|
|
|
+ for (LdTask task : runningTasks) {
|
|
|
+ LdTaskExecutionLog log = new LdTaskExecutionLog();
|
|
|
+ log.setTaskId(task.getId());
|
|
|
+ log.setTaskName(task.getTaskName());
|
|
|
+ log.setMapName(task.getMapName());
|
|
|
+ log.setExecuteTime(task.getLastExecuteTime());
|
|
|
+ log.setEndTime(DateUtils.getNowDate());
|
|
|
+ log.setStatus(status);
|
|
|
+ log.setDeviceId(deviceId);
|
|
|
+ log.setRemark("任务被取消,操作类型:" + operation);
|
|
|
+
|
|
|
+ ldTaskExecutionLogService.insertLdTaskExecutionLog(log);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("记录任务执行日志失败 -> 设备ID:{}", deviceId, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // ==================== ASM远程功能调用消息处理方法 ====================
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理ASM远程调用响应
|
|
|
+ */
|
|
|
+ private void handleAsmExecReply(String deviceId, String payload) {
|
|
|
+ log.info("收到ASM远程调用响应 -> 设备ID:{}", deviceId);
|
|
|
+ try {
|
|
|
+ JSONObject json = JSON.parseObject(payload);
|
|
|
+ String function = json.getString("function");
|
|
|
+ String status = json.getString("status");
|
|
|
+ Long pubTimestamp = json.getLong("pub_timestamp");
|
|
|
+ log.info("ASM响应 -> 功能:{},状态:{},请求时间:{}", function, status, pubTimestamp);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("解析ASM响应失败", e);
|
|
|
+ }
|
|
|
+ if (webSocketPushService != null) {
|
|
|
+ webSocketPushService.pushAsmReply(deviceId, payload);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理ASM远程调用进度反馈
|
|
|
+ */
|
|
|
+ private void handleAsmExecProgress(String deviceId, String payload) {
|
|
|
+ log.debug("收到ASM远程调用进度反馈 -> 设备ID:{}", deviceId);
|
|
|
+ try {
|
|
|
+ JSONObject json = JSON.parseObject(payload);
|
|
|
+ JSONArray argsArray = json.getJSONArray("args");
|
|
|
+ if (argsArray != null && !argsArray.isEmpty()) {
|
|
|
+ JSONObject argObj = argsArray.getJSONObject(0);
|
|
|
+ String function = argObj.getString("function");
|
|
|
+ Object progress = argObj.get("progress");
|
|
|
+ log.debug("ASM进度 -> 功能:{},进度:{}", function, progress);
|
|
|
+
|
|
|
+ // 缓存ASM进度到Redis
|
|
|
+ String cacheKey = "ld:asm:progress:" + deviceId;
|
|
|
+ redisCache.setCacheObject(cacheKey, argObj);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("解析ASM进度反馈失败", e);
|
|
|
+ }
|
|
|
+ if (webSocketPushService != null) {
|
|
|
+ webSocketPushService.pushAsmProgress(deviceId, payload);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理ASM远程调用状态反馈
|
|
|
+ * state值:0-准备,1-进行中,2-执行完毕,3-执行失败,4-取消
|
|
|
+ */
|
|
|
+ private void handleAsmExecState(String deviceId, String payload) {
|
|
|
+ log.info("收到ASM远程调用状态反馈 -> 设备ID:{}", deviceId);
|
|
|
+ try {
|
|
|
+ JSONObject json = JSON.parseObject(payload);
|
|
|
+ JSONArray argsArray = json.getJSONArray("args");
|
|
|
+ if (argsArray != null && !argsArray.isEmpty()) {
|
|
|
+ JSONObject argObj = argsArray.getJSONObject(0);
|
|
|
+ String function = argObj.getString("function");
|
|
|
+ Integer state = argObj.getInteger("state");
|
|
|
+
|
|
|
+ String stateDesc;
|
|
|
+ switch (state != null ? state : -1) {
|
|
|
+ case 0: stateDesc = "准备"; break;
|
|
|
+ case 1: stateDesc = "进行中"; break;
|
|
|
+ case 2: stateDesc = "执行完毕";break;
|
|
|
+ case 3: stateDesc = "执行失败"; break;
|
|
|
+ case 4: stateDesc = "取消"; break;
|
|
|
+ default: stateDesc = "未知状态(" + state + ")";
|
|
|
+ }
|
|
|
+ log.info("ASM状态 -> 功能:{},状态:{},描述:{}", function, state, stateDesc);
|
|
|
+
|
|
|
+ // 缓存ASM状态到Redis
|
|
|
+ String cacheKey = "ld:asm:state:" + deviceId;
|
|
|
+ redisCache.setCacheObject(cacheKey, argObj);
|
|
|
+
|
|
|
+ // 更新地图列表(ASM状态变化时需要刷新地图列表)
|
|
|
+ if (function != null && (function.startsWith("ASM.sensor_record") ||
|
|
|
+ function.startsWith("ASM.map_build") || function.startsWith("ASM.map_slam"))) {
|
|
|
+ log.info("ASM地图相关操作状态变化,触发地图列表刷新 -> 功能:{},状态:{}", function, stateDesc);
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("解析ASM状态反馈失败", e);
|
|
|
+ }
|
|
|
+ if (webSocketPushService != null) {
|
|
|
+ webSocketPushService.pushAsmState(deviceId, payload);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理电池信息
|
|
|
+ * capacity: 电量百分比,范围0.0 ~ 1.0,对应电量0% ~ 100%
|
|
|
+ * voltage: 电池电压,单位V
|
|
|
+ * current: 电池电流,单位A,一般充电时候为正值,非充电状态下为负值
|
|
|
+ * charging: 充电状态,true表示正在充电,false表示未充电
|
|
|
+ * other: 其他电量百分比,范围0~100,用于上传车上其他设备电量
|
|
|
+ */
|
|
|
+ private void handleBatteryInfo(String deviceId, String payload) {
|
|
|
+ try {
|
|
|
+ JSONObject json = JSON.parseObject(payload);
|
|
|
+ JSONArray argsArray = json.getJSONArray("args");
|
|
|
+ if (argsArray != null && !argsArray.isEmpty()) {
|
|
|
+ JSONObject batteryData = argsArray.getJSONObject(0);
|
|
|
+
|
|
|
+ Double capacity = batteryData.getDouble("capacity");
|
|
|
+ Double temperature = batteryData.getDouble("temperature");
|
|
|
+ Double voltage = batteryData.getDouble("voltage");
|
|
|
+ Double current = batteryData.getDouble("current");
|
|
|
+ Boolean charging = batteryData.getBoolean("charging");
|
|
|
+ JSONArray otherArray = batteryData.getJSONArray("other");
|
|
|
+
|
|
|
+ log.debug("电池信息 -> 设备ID:{},电量:{},电压:{}V,电流:{}A,温度:{}°C,充电:{},其他设备电量:{}",
|
|
|
+ deviceId, capacity != null ? (capacity * 100) + "%" : "N/A",
|
|
|
+ voltage, current, temperature, charging,
|
|
|
+ otherArray != null ? otherArray.toString() : "N/A");
|
|
|
+
|
|
|
+ // 缓存到Redis
|
|
|
+ String cacheKey = "ld:battery:" + deviceId;
|
|
|
+ redisCache.setCacheObject(cacheKey, batteryData);
|
|
|
+
|
|
|
+ // 推送WebSocket
|
|
|
+ if (webSocketPushService != null) {
|
|
|
+ webSocketPushService.pushBatteryInfo(deviceId, payload);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("解析电池信息失败", e);
|
|
|
}
|
|
|
}
|
|
|
}
|