|
|
@@ -0,0 +1,476 @@
|
|
|
+"""
|
|
|
+MQTT 模拟器节点
|
|
|
+模拟外部模块响应大模型发出的 MQTT 命令
|
|
|
+
|
|
|
+功能:
|
|
|
+- 订阅 /ai/agent/command 话题(MQTT),接收大模型的命令
|
|
|
+- 根据命令类型模拟处理延迟
|
|
|
+- 向 /ai/module/status 和 /ai/module/event 话题(MQTT)发布模拟回复
|
|
|
+
|
|
|
+Author: sunrise
|
|
|
+"""
|
|
|
+
|
|
|
+import rclpy
|
|
|
+from rclpy.node import Node
|
|
|
+import paho.mqtt.client as mqtt
|
|
|
+import json
|
|
|
+import time
|
|
|
+import random
|
|
|
+import threading
|
|
|
+from ament_index_python.packages import get_package_share_directory
|
|
|
+import os
|
|
|
+import yaml
|
|
|
+
|
|
|
+
|
|
|
+class MQTTSimulatorNode(Node):
|
|
|
+ """
|
|
|
+ MQTT 模拟器节点
|
|
|
+
|
|
|
+ 订阅大模型发出的 MQTT 命令,模拟外部模块的响应行为
|
|
|
+ """
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ super().__init__('mqtt_simulator_node')
|
|
|
+
|
|
|
+ # ========== 声明参数 ==========
|
|
|
+ self.declare_parameter('config_file', '')
|
|
|
+ self.declare_parameter('mqtt.broker_host', '127.0.0.1')
|
|
|
+ self.declare_parameter('mqtt.broker_port', 1883)
|
|
|
+ self.declare_parameter('mqtt.client_id', 'mqtt_simulator')
|
|
|
+ self.declare_parameter('mqtt.username', '')
|
|
|
+ self.declare_parameter('mqtt.password', '')
|
|
|
+ self.declare_parameter('mqtt.keepalive', 60)
|
|
|
+ self.declare_parameter('topics.command_topic', '/ai/agent/command')
|
|
|
+ self.declare_parameter('topics.status_topic', '/ai/module/status')
|
|
|
+ self.declare_parameter('topics.event_topic', '/ai/module/event')
|
|
|
+ self.declare_parameter('simulator.enable', True)
|
|
|
+ self.declare_parameter('simulator.log_all_messages', True)
|
|
|
+ self.declare_parameter('simulator.enable_random_delay', True)
|
|
|
+ self.declare_parameter('delay.page_load', [1.0, 2.0])
|
|
|
+ self.declare_parameter('delay.face_recognition', [2.0, 4.0])
|
|
|
+ self.declare_parameter('delay.navigation', [1.0, 2.0])
|
|
|
+ self.declare_parameter('delay.customer_service', [1.0, 2.0])
|
|
|
+ self.declare_parameter('delay.visitor_register', [2.0, 3.0])
|
|
|
+ self.declare_parameter('results.face_recognition_success_rate', 1.0)
|
|
|
+ self.declare_parameter('results.always_success', True)
|
|
|
+
|
|
|
+ # ========== 加载配置文件 ==========
|
|
|
+ self._load_config()
|
|
|
+
|
|
|
+ # MQTT 客户端
|
|
|
+ self.mqtt_client = None
|
|
|
+ self.mqtt_connected = False
|
|
|
+
|
|
|
+ # 日志输出
|
|
|
+ self.get_logger().info('MQTT Simulator Node 初始化完成')
|
|
|
+ self.get_logger().info(f'Broker: {self.broker_host}:{self.broker_port}')
|
|
|
+ self.get_logger().info(f'订阅话题: {self.command_topic}')
|
|
|
+ self.get_logger().info(f'发布话题: {self.status_topic}, {self.event_topic}')
|
|
|
+
|
|
|
+ # 如果启用,初始化 MQTT 连接
|
|
|
+ if self.enable:
|
|
|
+ self.init_mqtt()
|
|
|
+ else:
|
|
|
+ self.get_logger().warn('模拟器已禁用')
|
|
|
+
|
|
|
+ def _load_config(self):
|
|
|
+ """
|
|
|
+ 从 YAML 配置文件加载参数
|
|
|
+ """
|
|
|
+ config_file = self.get_parameter('config_file').value
|
|
|
+ if not config_file:
|
|
|
+ pkg_share = get_package_share_directory('mqtt_simulator')
|
|
|
+ config_file = os.path.join(pkg_share, 'config', 'simulator.yaml')
|
|
|
+
|
|
|
+ self.get_logger().info(f'加载配置文件: {config_file}')
|
|
|
+
|
|
|
+ if os.path.exists(config_file):
|
|
|
+ try:
|
|
|
+ with open(config_file, 'r') as f:
|
|
|
+ yaml_config = yaml.safe_load(f)
|
|
|
+
|
|
|
+ mqtt_cfg = yaml_config.get('mqtt', {})
|
|
|
+ topics_cfg = yaml_config.get('topics', {})
|
|
|
+ sim_cfg = yaml_config.get('simulator', {})
|
|
|
+ delay_cfg = yaml_config.get('delay', {})
|
|
|
+ results_cfg = yaml_config.get('results', {})
|
|
|
+
|
|
|
+ self.broker_host = mqtt_cfg.get('broker_host', '127.0.0.1')
|
|
|
+ self.broker_port = mqtt_cfg.get('broker_port', 1883)
|
|
|
+ self.client_id = mqtt_cfg.get('client_id', 'mqtt_simulator')
|
|
|
+ self.username = mqtt_cfg.get('username', '')
|
|
|
+ self.password = mqtt_cfg.get('password', '')
|
|
|
+ self.keepalive = mqtt_cfg.get('keepalive', 60)
|
|
|
+ self.command_topic = topics_cfg.get('command_topic', '/ai/agent/command')
|
|
|
+ self.status_topic = topics_cfg.get('status_topic', '/ai/module/status')
|
|
|
+ self.event_topic = topics_cfg.get('event_topic', '/ai/module/event')
|
|
|
+ self.enable = sim_cfg.get('enable', True)
|
|
|
+ self.log_all_messages = sim_cfg.get('log_all_messages', True)
|
|
|
+ self.enable_random_delay = sim_cfg.get('enable_random_delay', True)
|
|
|
+ self.page_load_delay = delay_cfg.get('page_load', [1.0, 2.0])
|
|
|
+ self.face_recognition_delay = delay_cfg.get('face_recognition', [2.0, 4.0])
|
|
|
+ self.navigation_delay = delay_cfg.get('navigation', [1.0, 2.0])
|
|
|
+ self.customer_service_delay = delay_cfg.get('customer_service', [1.0, 2.0])
|
|
|
+ self.visitor_register_delay = delay_cfg.get('visitor_register', [2.0, 3.0])
|
|
|
+ self.face_success_rate = results_cfg.get('face_recognition_success_rate', 1.0)
|
|
|
+ self.always_success = results_cfg.get('always_success', True)
|
|
|
+
|
|
|
+ self.get_logger().info('配置文件加载成功')
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ self.get_logger().warn(f'配置文件加载失败: {e},使用默认值')
|
|
|
+ self._use_default_config()
|
|
|
+ else:
|
|
|
+ self.get_logger().warn(f'配置文件不存在: {config_file},使用默认值')
|
|
|
+ self._use_default_config()
|
|
|
+
|
|
|
+ def _use_default_config(self):
|
|
|
+ """
|
|
|
+ 使用默认配置
|
|
|
+ """
|
|
|
+ self.broker_host = self.get_parameter('mqtt.broker_host').value
|
|
|
+ self.broker_port = self.get_parameter('mqtt.broker_port').value
|
|
|
+ self.client_id = self.get_parameter('mqtt.client_id').value
|
|
|
+ self.username = self.get_parameter('mqtt.username').value
|
|
|
+ self.password = self.get_parameter('mqtt.password').value
|
|
|
+ self.keepalive = self.get_parameter('mqtt.keepalive').value
|
|
|
+ self.command_topic = self.get_parameter('topics.command_topic').value
|
|
|
+ self.status_topic = self.get_parameter('topics.status_topic').value
|
|
|
+ self.event_topic = self.get_parameter('topics.event_topic').value
|
|
|
+ self.enable = self.get_parameter('simulator.enable').value
|
|
|
+ self.log_all_messages = self.get_parameter('simulator.log_all_messages').value
|
|
|
+ self.enable_random_delay = self.get_parameter('simulator.enable_random_delay').value
|
|
|
+ self.page_load_delay = self.get_parameter('delay.page_load').value
|
|
|
+ self.face_recognition_delay = self.get_parameter('delay.face_recognition').value
|
|
|
+ self.navigation_delay = self.get_parameter('delay.navigation').value
|
|
|
+ self.customer_service_delay = self.get_parameter('delay.customer_service').value
|
|
|
+ self.visitor_register_delay = self.get_parameter('delay.visitor_register').value
|
|
|
+ self.face_success_rate = self.get_parameter('results.face_recognition_success_rate').value
|
|
|
+ self.always_success = self.get_parameter('results.always_success').value
|
|
|
+
|
|
|
+ def init_mqtt(self):
|
|
|
+ """
|
|
|
+ 初始化 MQTT 连接
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ self.mqtt_client = mqtt.Client(client_id=self.client_id)
|
|
|
+
|
|
|
+ if self.username:
|
|
|
+ self.mqtt_client.username_pw_set(self.username, self.password)
|
|
|
+
|
|
|
+ self.mqtt_client.on_connect = self._on_connect
|
|
|
+ self.mqtt_client.on_disconnect = self._on_disconnect
|
|
|
+ self.mqtt_client.on_message = self._on_mqtt_message
|
|
|
+
|
|
|
+ self.get_logger().info(f'正在连接到 MQTT Broker: {self.broker_host}:{self.broker_port}')
|
|
|
+ self.mqtt_client.connect(self.broker_host, self.broker_port, self.keepalive)
|
|
|
+ self.mqtt_client.loop_start()
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ self.get_logger().error(f'MQTT 初始化失败: {e}')
|
|
|
+
|
|
|
+ def _on_connect(self, client, userdata, flags, rc):
|
|
|
+ """
|
|
|
+ MQTT 连接回调
|
|
|
+ """
|
|
|
+ if rc == 0:
|
|
|
+ self.mqtt_connected = True
|
|
|
+ self.get_logger().info('MQTT 连接成功')
|
|
|
+
|
|
|
+ try:
|
|
|
+ client.subscribe(self.command_topic)
|
|
|
+ self.get_logger().info(f'已订阅话题: {self.command_topic}')
|
|
|
+ except Exception as e:
|
|
|
+ self.get_logger().error(f'订阅话题失败: {e}')
|
|
|
+ else:
|
|
|
+ self.mqtt_connected = False
|
|
|
+ self.get_logger().error(f'MQTT 连接失败, rc={rc}')
|
|
|
+
|
|
|
+ def _on_disconnect(self, client, userdata, rc):
|
|
|
+ """
|
|
|
+ MQTT 断开连接回调
|
|
|
+ """
|
|
|
+ self.mqtt_connected = False
|
|
|
+ if rc != 0:
|
|
|
+ self.get_logger().warn(f'MQTT 意外断开, rc={rc}')
|
|
|
+ else:
|
|
|
+ self.get_logger().info('MQTT 连接已断开')
|
|
|
+
|
|
|
+ def _on_mqtt_message(self, client, userdata, msg):
|
|
|
+ """
|
|
|
+ MQTT 消息回调
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ topic = msg.topic
|
|
|
+ payload_str = msg.payload.decode('utf-8')
|
|
|
+
|
|
|
+ if self.log_all_messages:
|
|
|
+ self.get_logger().info(f'收到 MQTT 消息 [{topic}]: {payload_str}')
|
|
|
+
|
|
|
+ data = json.loads(payload_str)
|
|
|
+
|
|
|
+ thread = threading.Thread(target=self._handle_command, args=(data,))
|
|
|
+ thread.daemon = True
|
|
|
+ thread.start()
|
|
|
+
|
|
|
+ except json.JSONDecodeError as e:
|
|
|
+ self.get_logger().warn(f'JSON 解析失败: {e}')
|
|
|
+ except Exception as e:
|
|
|
+ self.get_logger().error(f'处理消息异常: {e}')
|
|
|
+
|
|
|
+ def _get_delay(self, delay_config):
|
|
|
+ """
|
|
|
+ 获取延迟时间
|
|
|
+
|
|
|
+ Args:
|
|
|
+ delay_config: 延迟配置 [min, max]
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ float: 延迟时间(秒)
|
|
|
+ """
|
|
|
+ if self.enable_random_delay and isinstance(delay_config, list) and len(delay_config) == 2:
|
|
|
+ return random.uniform(delay_config[0], delay_config[1])
|
|
|
+ elif isinstance(delay_config, list) and len(delay_config) == 2:
|
|
|
+ return (delay_config[0] + delay_config[1]) / 2
|
|
|
+ else:
|
|
|
+ return float(delay_config)
|
|
|
+
|
|
|
+ def _handle_command(self, data):
|
|
|
+ """
|
|
|
+ 处理命令(在新线程中执行)
|
|
|
+
|
|
|
+ Args:
|
|
|
+ data: 命令数据字典
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ target = data.get('target', '')
|
|
|
+ module = data.get('module', '')
|
|
|
+ action = data.get('action', '')
|
|
|
+ page = data.get('page', '')
|
|
|
+ original_msg_id = data.get('msg_id', '')
|
|
|
+
|
|
|
+ self.get_logger().info(f'处理命令: target={target}, module={module}, action={action}, page={page}')
|
|
|
+
|
|
|
+ if action == 'open_page':
|
|
|
+ self._handle_open_page(target, module, page, original_msg_id)
|
|
|
+ elif action == 'start':
|
|
|
+ self._handle_start(target, module, original_msg_id, data.get('payload', {}))
|
|
|
+ elif action == 'pause':
|
|
|
+ self._handle_pause(original_msg_id)
|
|
|
+ elif action == 'navigate':
|
|
|
+ self._handle_navigate(target, original_msg_id, data.get('payload', {}))
|
|
|
+ elif action == 'call':
|
|
|
+ self._handle_call_service(original_msg_id)
|
|
|
+ elif action == 'register':
|
|
|
+ self._handle_visitor_register(original_msg_id)
|
|
|
+ else:
|
|
|
+ self.get_logger().warn(f'未知动作类型: {action}')
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ self.get_logger().error(f'处理命令异常: {e}')
|
|
|
+
|
|
|
+ def _handle_open_page(self, target, module, page, original_msg_id):
|
|
|
+ """
|
|
|
+ 处理打开页面命令
|
|
|
+ """
|
|
|
+ delay = self._get_delay(self.page_load_delay)
|
|
|
+ self.get_logger().info(f'模拟页面加载,延迟 {delay:.2f} 秒')
|
|
|
+ time.sleep(delay)
|
|
|
+
|
|
|
+ status_msg = {
|
|
|
+ 'msg_id': f'sim_{int(time.time() * 1000)}',
|
|
|
+ 'source': 'simulator',
|
|
|
+ 'target': 'agent',
|
|
|
+ 'module': module,
|
|
|
+ 'status': 'page_opened',
|
|
|
+ 'page': page,
|
|
|
+ 'original_msg_id': original_msg_id,
|
|
|
+ 'timestamp': int(time.time())
|
|
|
+ }
|
|
|
+
|
|
|
+ self._publish_mqtt(self.status_topic, status_msg)
|
|
|
+ self.get_logger().info(f'已回复页面打开状态: {page}')
|
|
|
+
|
|
|
+ def _handle_start(self, target, module, original_msg_id, payload):
|
|
|
+ """
|
|
|
+ 处理启动命令(主要用于人脸识别)
|
|
|
+ """
|
|
|
+ if target == 'face':
|
|
|
+ self._handle_face_recognition(original_msg_id, payload)
|
|
|
+ else:
|
|
|
+ self.get_logger().warn(f'未知启动目标: {target}')
|
|
|
+
|
|
|
+ def _handle_face_recognition(self, original_msg_id, payload):
|
|
|
+ """
|
|
|
+ 处理人脸识别
|
|
|
+ """
|
|
|
+ delay = self._get_delay(self.face_recognition_delay)
|
|
|
+ self.get_logger().info(f'模拟人脸识别,延迟 {delay:.2f} 秒')
|
|
|
+ time.sleep(delay)
|
|
|
+
|
|
|
+ if self.always_success:
|
|
|
+ success = True
|
|
|
+ else:
|
|
|
+ success = random.random() < self.face_success_rate
|
|
|
+
|
|
|
+ if success:
|
|
|
+ event_data = {
|
|
|
+ 'msg_id': f'sim_{int(time.time() * 1000)}',
|
|
|
+ 'source': 'simulator',
|
|
|
+ 'event': 'face_recognition_result',
|
|
|
+ 'original_msg_id': original_msg_id,
|
|
|
+ 'data': {
|
|
|
+ 'success': True,
|
|
|
+ 'person_id': 'visitor_001',
|
|
|
+ 'person_name': '测试用户',
|
|
|
+ 'confidence': round(random.uniform(0.85, 0.99), 2)
|
|
|
+ },
|
|
|
+ 'timestamp': int(time.time())
|
|
|
+ }
|
|
|
+ else:
|
|
|
+ event_data = {
|
|
|
+ 'msg_id': f'sim_{int(time.time() * 1000)}',
|
|
|
+ 'source': 'simulator',
|
|
|
+ 'event': 'face_recognition_result',
|
|
|
+ 'original_msg_id': original_msg_id,
|
|
|
+ 'data': {
|
|
|
+ 'success': False,
|
|
|
+ 'reason': 'not_found',
|
|
|
+ 'confidence': 0.0
|
|
|
+ },
|
|
|
+ 'timestamp': int(time.time())
|
|
|
+ }
|
|
|
+
|
|
|
+ self._publish_mqtt(self.event_topic, event_data)
|
|
|
+ self.get_logger().info(f'已回复人脸识别结果: success={success}')
|
|
|
+
|
|
|
+ def _handle_pause(self, original_msg_id):
|
|
|
+ """
|
|
|
+ 处理暂停命令
|
|
|
+ """
|
|
|
+ self.get_logger().info('收到全局暂停命令,已记录')
|
|
|
+
|
|
|
+ def _handle_navigate(self, target, original_msg_id, payload):
|
|
|
+ """
|
|
|
+ 处理导航命令
|
|
|
+ """
|
|
|
+ delay = self._get_delay(self.navigation_delay)
|
|
|
+ self.get_logger().info(f'模拟导航操作,延迟 {delay:.2f} 秒')
|
|
|
+ time.sleep(delay)
|
|
|
+
|
|
|
+ status_msg = {
|
|
|
+ 'msg_id': f'sim_{int(time.time() * 1000)}',
|
|
|
+ 'source': 'simulator',
|
|
|
+ 'target': 'agent',
|
|
|
+ 'module': 'nav',
|
|
|
+ 'status': 'navigation_completed',
|
|
|
+ 'original_msg_id': original_msg_id,
|
|
|
+ 'timestamp': int(time.time())
|
|
|
+ }
|
|
|
+
|
|
|
+ self._publish_mqtt(self.status_topic, status_msg)
|
|
|
+ self.get_logger().info('已回复导航完成状态')
|
|
|
+
|
|
|
+ def _handle_call_service(self, original_msg_id):
|
|
|
+ """
|
|
|
+ 处理呼叫客服命令
|
|
|
+ """
|
|
|
+ delay = self._get_delay(self.customer_service_delay)
|
|
|
+ self.get_logger().info(f'模拟呼叫客服,延迟 {delay:.2f} 秒')
|
|
|
+ time.sleep(delay)
|
|
|
+
|
|
|
+ status_msg = {
|
|
|
+ 'msg_id': f'sim_{int(time.time() * 1000)}',
|
|
|
+ 'source': 'simulator',
|
|
|
+ 'target': 'agent',
|
|
|
+ 'module': 'customer_service',
|
|
|
+ 'status': 'call_connected',
|
|
|
+ 'original_msg_id': original_msg_id,
|
|
|
+ 'timestamp': int(time.time())
|
|
|
+ }
|
|
|
+
|
|
|
+ self._publish_mqtt(self.status_topic, status_msg)
|
|
|
+ self.get_logger().info('已回复客服呼叫成功')
|
|
|
+
|
|
|
+ def _handle_visitor_register(self, original_msg_id):
|
|
|
+ """
|
|
|
+ 处理访客登记命令
|
|
|
+ """
|
|
|
+ delay = self._get_delay(self.visitor_register_delay)
|
|
|
+ self.get_logger().info(f'模拟访客登记,延迟 {delay:.2f} 秒')
|
|
|
+ time.sleep(delay)
|
|
|
+
|
|
|
+ status_msg = {
|
|
|
+ 'msg_id': f'sim_{int(time.time() * 1000)}',
|
|
|
+ 'source': 'simulator',
|
|
|
+ 'target': 'agent',
|
|
|
+ 'module': 'visitor',
|
|
|
+ 'status': 'registration_completed',
|
|
|
+ 'original_msg_id': original_msg_id,
|
|
|
+ 'data': {
|
|
|
+ 'visitor_id': f'V{int(time.time())}',
|
|
|
+ 'visitor_name': '访客'
|
|
|
+ },
|
|
|
+ 'timestamp': int(time.time())
|
|
|
+ }
|
|
|
+
|
|
|
+ self._publish_mqtt(self.status_topic, status_msg)
|
|
|
+ self.get_logger().info('已回复访客登记完成')
|
|
|
+
|
|
|
+ def _publish_mqtt(self, topic, data):
|
|
|
+ """
|
|
|
+ 通过 MQTT 发布消息
|
|
|
+
|
|
|
+ Args:
|
|
|
+ topic: MQTT topic
|
|
|
+ data: 要发布的数据字典
|
|
|
+ """
|
|
|
+ if self.mqtt_client is None or not self.mqtt_connected:
|
|
|
+ self.get_logger().warn(f'MQTT 未连接,跳过发布: topic={topic}')
|
|
|
+ return
|
|
|
+
|
|
|
+ try:
|
|
|
+ payload = json.dumps(data, ensure_ascii=False)
|
|
|
+ result = self.mqtt_client.publish(topic, payload)
|
|
|
+
|
|
|
+ if result.rc == mqtt.MQTT_ERR_SUCCESS:
|
|
|
+ if self.log_all_messages:
|
|
|
+ self.get_logger().info(f'[MQTT发布] topic={topic}, payload={payload}')
|
|
|
+ else:
|
|
|
+ self.get_logger().error(f'[MQTT发布] 失败, rc={result.rc}')
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ self.get_logger().error(f'[MQTT发布] 异常: {e}')
|
|
|
+
|
|
|
+ def destroy_node(self):
|
|
|
+ """
|
|
|
+ 销毁节点
|
|
|
+ """
|
|
|
+ if self.mqtt_client is not None:
|
|
|
+ try:
|
|
|
+ self.mqtt_client.loop_stop()
|
|
|
+ self.mqtt_client.disconnect()
|
|
|
+ self.get_logger().info('MQTT 连接已关闭')
|
|
|
+ except Exception as e:
|
|
|
+ self.get_logger().warn(f'关闭 MQTT 连接时出错: {e}')
|
|
|
+
|
|
|
+ super().destroy_node()
|
|
|
+
|
|
|
+
|
|
|
+def main(args=None):
|
|
|
+ """
|
|
|
+ 入口函数
|
|
|
+ """
|
|
|
+ rclpy.init(args=args)
|
|
|
+
|
|
|
+ try:
|
|
|
+ simulator_node = MQTTSimulatorNode()
|
|
|
+ rclpy.spin(simulator_node)
|
|
|
+ except KeyboardInterrupt:
|
|
|
+ pass
|
|
|
+ finally:
|
|
|
+ rclpy.shutdown()
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ main()
|