| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306 |
- """
- MQTT 客户端封装模块
- 提供 MQTT 连接、发布、订阅的基础能力
- 依赖: pip install paho-mqtt
- Author: sunrise
- """
- import paho.mqtt.client as mqtt
- import json
- import time
- class MQTTClient:
- """
- MQTT 客户端封装类
- 功能:
- - MQTT 连接管理
- - JSON 消息发布
- - 命令消息封装发布
- - 状态/事件消息订阅回调
- 使用方式:
- mqtt_client = MQTTClient(logger=self.get_logger())
- mqtt_client.init(config)
- mqtt_client.publish_command(target="screen", module="system", action="ping")
- mqtt_client.shutdown()
- """
- def __init__(self, logger=None):
- """
- 初始化 MQTT 客户端
- Args:
- logger: ROS2 logger 对象,用于日志输出
- """
- self.logger = logger
- self.mqtt_enable = False
- self.mqtt_client = None
- self.mqtt_connected = False
- self._initialized = False # 防止重复初始化
- # Topic 配置
- self.command_topic = "/ai/agent/command"
- self.status_topic = "/ai/module/status"
- self.event_topic = "/ai/module/event"
- # 回调处理函数(由外部设置)
- self.on_status_callback = None
- self.on_event_callback = None
- def _log_info(self, msg):
- """日志输出 - Info"""
- if self.logger:
- self.logger.info(f"[MQTT] {msg}")
- else:
- print(f"[MQTT] {msg}")
- def _log_warn(self, msg):
- """日志输出 - Warning"""
- if self.logger:
- self.logger.warning(f"[MQTT] {msg}")
- else:
- print(f"[MQTT] {msg}")
- def _log_error(self, msg):
- """日志输出 - Error"""
- if self.logger:
- self.logger.error(f"[MQTT] {msg}")
- else:
- print(f"[MQTT] {msg}")
- def init(self, config):
- """
- 从配置初始化 MQTT 客户端
- Args:
- config: dict, MQTT 配置字典
- - enable: bool, 是否启用 MQTT
- - broker_host: str, Broker 地址
- - broker_port: int, Broker 端口
- - username: str, 用户名
- - password: str, 密码
- - client_id: str, 客户端 ID
- - keepalive: int, 保活时间(秒)
- - command_topic: str, 命令 topic
- - status_topic: str, 状态 topic
- - event_topic: str, 事件 topic
- """
- # 初始化完成后忽略后续调用
- if self._initialized:
- return
- try:
- # 读取配置
- self.mqtt_enable = config.get("enable", False)
- if not self.mqtt_enable:
- self._log_info("MQTT disabled")
- self._initialized = True
- return
- # Broker 配置
- broker_host = config.get("broker_host", "127.0.0.1")
- broker_port = int(config.get("broker_port", 1883))
- username = config.get("username", "")
- password = config.get("password", "")
- client_id = config.get("client_id", "rdk_agent")
- keepalive = int(config.get("keepalive", 60))
- # Topic 配置
- self.command_topic = config.get("command_topic", "/ai/agent/command")
- self.status_topic = config.get("status_topic", "/ai/module/status")
- self.event_topic = config.get("event_topic", "/ai/module/event")
- # 创建 MQTT 客户端
- self.mqtt_client = mqtt.Client(client_id=client_id)
- # 设置用户名密码
- if username:
- self.mqtt_client.username_pw_set(username, password)
- # 注册回调
- self.mqtt_client.on_connect = self._on_connect
- self.mqtt_client.on_disconnect = self._on_disconnect
- self.mqtt_client.on_message = self._on_message
- # 连接 Broker
- self.mqtt_client.connect(broker_host, broker_port, keepalive)
- self.mqtt_client.loop_start()
- self._log_info(f"Connecting to broker {broker_host}:{broker_port}")
- self._initialized = True
- except Exception as e:
- self.mqtt_connected = False
- self._log_warn(f"Init failed: {e}")
- def _on_connect(self, client, userdata, flags, rc):
- """
- MQTT 连接回调
- Args:
- client: MQTT 客户端实例
- userdata: 用户数据
- flags: 连接标志
- rc: 连接结果码,0 表示成功
- """
- if rc == 0:
- self.mqtt_connected = True
- self._log_info("Connected")
- try:
- # 订阅状态和事件 topic
- client.subscribe(self.status_topic)
- client.subscribe(self.event_topic)
- self._log_info(f"Subscribed: {self.status_topic}, {self.event_topic}")
- except Exception as e:
- self._log_warn(f"Subscribe failed: {e}")
- else:
- self.mqtt_connected = False
- self._log_warn(f"Connect failed, rc={rc}")
- def _on_disconnect(self, client, userdata, rc):
- """
- MQTT 断开连接回调
- Args:
- client: MQTT 客户端实例
- userdata: 用户数据
- rc: 断开结果码
- """
- self.mqtt_connected = False
- self._log_warn(f"Disconnected, rc={rc}")
- def _on_message(self, client, userdata, msg):
- """
- MQTT 消息回调
- Args:
- client: MQTT 客户端实例
- userdata: 用户数据
- msg: MQTT 消息对象
- """
- try:
- topic = msg.topic
- payload_str = msg.payload.decode("utf-8")
- data = json.loads(payload_str)
- self._log_info(f"Received topic={topic}, data={data}")
- # 根据 topic 分发消息
- if topic == self.status_topic:
- if self.on_status_callback:
- self.on_status_callback(data)
- elif topic == self.event_topic:
- if self.on_event_callback:
- self.on_event_callback(data)
- else:
- self._log_info(f"Ignored topic: {topic}")
- except json.JSONDecodeError:
- self._log_warn(f"JSON parse failed for payload: {msg.payload}")
- except Exception as e:
- self._log_warn(f"Message parse failed: {e}")
- def publish_json(self, topic, data):
- """
- 通用 JSON 消息发布
- Args:
- topic: str, 消息 topic
- data: dict, 要发布的 JSON 数据
- Returns:
- bool: True 表示发布成功,False 表示失败
- """
- if not self.mqtt_enable:
- self._log_info(f"Disabled, skip publish: topic={topic}")
- return False
- if self.mqtt_client is None:
- self._log_warn("Client is None, skip publish")
- return False
- try:
- payload = json.dumps(data, ensure_ascii=False)
- if not self.mqtt_connected:
- self._log_warn("Not connected, try publish anyway")
- result = self.mqtt_client.publish(topic, payload)
- if result.rc == mqtt.MQTT_ERR_SUCCESS:
- self._log_info(f"Published topic={topic}, payload={payload}")
- return True
- else:
- self._log_warn(f"Publish failed, rc={result.rc}")
- return False
- except Exception as e:
- self._log_warn(f"Publish exception: {e}")
- return False
- def publish_command(self, target, module, action, page="", payload=None):
- """
- 发布命令消息(封装好的通用格式)
- Args:
- target: str, 目标模块(如 "screen", "advertisement", "face")
- module: str, 模块类型(如 "system", "navigation", "visitor")
- action: str, 动作名称(如 "ping", "open_page", "start")
- page: str, 页面标识(可选)
- payload: dict, 额外载荷数据(可选)
- Returns:
- bool: True 表示发布成功,False 表示失败
- 消息格式:
- {
- "msg_id": "agent_<timestamp_ms>",
- "source": "agent",
- "target": target,
- "module": module,
- "action": action,
- "page": page,
- "payload": payload,
- "timestamp": <timestamp_s>
- }
- """
- if payload is None:
- payload = {}
- msg = {
- "msg_id": f"agent_{int(time.time() * 1000)}",
- "source": "agent",
- "target": target,
- "module": module,
- "action": action,
- "page": page,
- "payload": payload,
- "timestamp": int(time.time())
- }
- return self.publish_json(self.command_topic, msg)
- def shutdown(self):
- """
- 关闭 MQTT 客户端连接
- 注意:
- - 调用此函数后,MQTT 客户端将被停止
- - 不会抛出异常
- """
- try:
- if self.mqtt_client is not None:
- self.mqtt_client.loop_stop()
- self.mqtt_client.disconnect()
- self._log_info("Client stopped")
- except Exception as e:
- self._log_warn(f"Shutdown failed: {e}")
- finally:
- self.mqtt_client = None
- self.mqtt_connected = False
|