""" MQTT 客户端封装模块 提供 MQTT 连接、发布、订阅的基础能力 依赖: pip install paho-mqtt Author: sunrise """ import paho.mqtt.client as mqtt import json import time class MQTTClient: """ MQTT 客户端封装类 功能: - MQTT 连接管理 - JSON 消息发布 - 命令消息封装发布 - 状态/事件消息订阅回调 使用方式: mqtt_client = MQTTClient(logger=self.get_logger()) mqtt_client.init(config) mqtt_client.publish_command(target="screen", module="system", action="ping") mqtt_client.shutdown() """ def __init__(self, logger=None): """ 初始化 MQTT 客户端 Args: logger: ROS2 logger 对象,用于日志输出 """ self.logger = logger self.mqtt_enable = False self.mqtt_client = None self.mqtt_connected = False self._initialized = False # 防止重复初始化 # Topic 配置 self.command_topic = "/ai/agent/command" self.status_topic = "/ai/module/status" self.event_topic = "/ai/module/event" # 回调处理函数(由外部设置) self.on_status_callback = None self.on_event_callback = None def _log_info(self, msg): """日志输出 - Info""" if self.logger: self.logger.info(f"[MQTT] {msg}") else: print(f"[MQTT] {msg}") def _log_warn(self, msg): """日志输出 - Warning""" if self.logger: self.logger.warning(f"[MQTT] {msg}") else: print(f"[MQTT] {msg}") def _log_error(self, msg): """日志输出 - Error""" if self.logger: self.logger.error(f"[MQTT] {msg}") else: print(f"[MQTT] {msg}") def init(self, config): """ 从配置初始化 MQTT 客户端 Args: config: dict, MQTT 配置字典 - enable: bool, 是否启用 MQTT - broker_host: str, Broker 地址 - broker_port: int, Broker 端口 - username: str, 用户名 - password: str, 密码 - client_id: str, 客户端 ID - keepalive: int, 保活时间(秒) - command_topic: str, 命令 topic - status_topic: str, 状态 topic - event_topic: str, 事件 topic """ # 初始化完成后忽略后续调用 if self._initialized: return try: # 读取配置 self.mqtt_enable = config.get("enable", False) if not self.mqtt_enable: self._log_info("MQTT disabled") self._initialized = True return # Broker 配置 broker_host = config.get("broker_host", "127.0.0.1") broker_port = int(config.get("broker_port", 1883)) username = config.get("username", "") password = config.get("password", "") client_id = config.get("client_id", "rdk_agent") keepalive = int(config.get("keepalive", 60)) # Topic 配置 self.command_topic = config.get("command_topic", "/ai/agent/command") self.status_topic = config.get("status_topic", "/ai/module/status") self.event_topic = config.get("event_topic", "/ai/module/event") # 创建 MQTT 客户端 self.mqtt_client = mqtt.Client(client_id=client_id) # 设置用户名密码 if username: self.mqtt_client.username_pw_set(username, password) # 注册回调 self.mqtt_client.on_connect = self._on_connect self.mqtt_client.on_disconnect = self._on_disconnect self.mqtt_client.on_message = self._on_message # 连接 Broker self.mqtt_client.connect(broker_host, broker_port, keepalive) self.mqtt_client.loop_start() self._log_info(f"Connecting to broker {broker_host}:{broker_port}") self._initialized = True except Exception as e: self.mqtt_connected = False self._log_warn(f"Init failed: {e}") def _on_connect(self, client, userdata, flags, rc): """ MQTT 连接回调 Args: client: MQTT 客户端实例 userdata: 用户数据 flags: 连接标志 rc: 连接结果码,0 表示成功 """ if rc == 0: self.mqtt_connected = True self._log_info("Connected") try: # 订阅状态和事件 topic client.subscribe(self.status_topic) client.subscribe(self.event_topic) self._log_info(f"Subscribed: {self.status_topic}, {self.event_topic}") except Exception as e: self._log_warn(f"Subscribe failed: {e}") else: self.mqtt_connected = False self._log_warn(f"Connect failed, rc={rc}") def _on_disconnect(self, client, userdata, rc): """ MQTT 断开连接回调 Args: client: MQTT 客户端实例 userdata: 用户数据 rc: 断开结果码 """ self.mqtt_connected = False self._log_warn(f"Disconnected, rc={rc}") def _on_message(self, client, userdata, msg): """ MQTT 消息回调 Args: client: MQTT 客户端实例 userdata: 用户数据 msg: MQTT 消息对象 """ try: topic = msg.topic payload_str = msg.payload.decode("utf-8") data = json.loads(payload_str) self._log_info(f"Received topic={topic}, data={data}") # 根据 topic 分发消息 if topic == self.status_topic: if self.on_status_callback: self.on_status_callback(data) elif topic == self.event_topic: if self.on_event_callback: self.on_event_callback(data) else: self._log_info(f"Ignored topic: {topic}") except json.JSONDecodeError: self._log_warn(f"JSON parse failed for payload: {msg.payload}") except Exception as e: self._log_warn(f"Message parse failed: {e}") def publish_json(self, topic, data): """ 通用 JSON 消息发布 Args: topic: str, 消息 topic data: dict, 要发布的 JSON 数据 Returns: bool: True 表示发布成功,False 表示失败 """ if not self.mqtt_enable: self._log_info(f"Disabled, skip publish: topic={topic}") return False if self.mqtt_client is None: self._log_warn("Client is None, skip publish") return False try: payload = json.dumps(data, ensure_ascii=False) if not self.mqtt_connected: self._log_warn("Not connected, try publish anyway") result = self.mqtt_client.publish(topic, payload) if result.rc == mqtt.MQTT_ERR_SUCCESS: self._log_info(f"Published topic={topic}, payload={payload}") return True else: self._log_warn(f"Publish failed, rc={result.rc}") return False except Exception as e: self._log_warn(f"Publish exception: {e}") return False def publish_command(self, target, module, action, page="", payload=None): """ 发布命令消息(封装好的通用格式) Args: target: str, 目标模块(如 "screen", "advertisement", "face") module: str, 模块类型(如 "system", "navigation", "visitor") action: str, 动作名称(如 "ping", "open_page", "start") page: str, 页面标识(可选) payload: dict, 额外载荷数据(可选) Returns: bool: True 表示发布成功,False 表示失败 消息格式: { "msg_id": "agent_", "source": "agent", "target": target, "module": module, "action": action, "page": page, "payload": payload, "timestamp": } """ if payload is None: payload = {} msg = { "msg_id": f"agent_{int(time.time() * 1000)}", "source": "agent", "target": target, "module": module, "action": action, "page": page, "payload": payload, "timestamp": int(time.time()) } return self.publish_json(self.command_topic, msg) def shutdown(self): """ 关闭 MQTT 客户端连接 注意: - 调用此函数后,MQTT 客户端将被停止 - 不会抛出异常 """ try: if self.mqtt_client is not None: self.mqtt_client.loop_stop() self.mqtt_client.disconnect() self._log_info("Client stopped") except Exception as e: self._log_warn(f"Shutdown failed: {e}") finally: self.mqtt_client = None self.mqtt_connected = False