4
0

mqttclient.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. """
  2. MQTT 客户端封装模块
  3. 提供 MQTT 连接、发布、订阅的基础能力
  4. 依赖: pip install paho-mqtt
  5. Author: sunrise
  6. """
  7. import paho.mqtt.client as mqtt
  8. import json
  9. import time
  10. class MQTTClient:
  11. """
  12. MQTT 客户端封装类
  13. 功能:
  14. - MQTT 连接管理
  15. - JSON 消息发布
  16. - 命令消息封装发布
  17. - 状态/事件消息订阅回调
  18. 使用方式:
  19. mqtt_client = MQTTClient(logger=self.get_logger())
  20. mqtt_client.init(config)
  21. mqtt_client.publish_command(target="screen", module="system", action="ping")
  22. mqtt_client.shutdown()
  23. """
  24. def __init__(self, logger=None):
  25. """
  26. 初始化 MQTT 客户端
  27. Args:
  28. logger: ROS2 logger 对象,用于日志输出
  29. """
  30. self.logger = logger
  31. self.mqtt_enable = False
  32. self.mqtt_client = None
  33. self.mqtt_connected = False
  34. self._initialized = False # 防止重复初始化
  35. # Topic 配置
  36. self.command_topic = "/ai/agent/command"
  37. self.status_topic = "/ai/module/status"
  38. self.event_topic = "/ai/module/event"
  39. # 回调处理函数(由外部设置)
  40. self.on_status_callback = None
  41. self.on_event_callback = None
  42. def _log_info(self, msg):
  43. """日志输出 - Info"""
  44. if self.logger:
  45. self.logger.info(f"[MQTT] {msg}")
  46. else:
  47. print(f"[MQTT] {msg}")
  48. def _log_warn(self, msg):
  49. """日志输出 - Warning"""
  50. if self.logger:
  51. self.logger.warning(f"[MQTT] {msg}")
  52. else:
  53. print(f"[MQTT] {msg}")
  54. def _log_error(self, msg):
  55. """日志输出 - Error"""
  56. if self.logger:
  57. self.logger.error(f"[MQTT] {msg}")
  58. else:
  59. print(f"[MQTT] {msg}")
  60. def init(self, config):
  61. """
  62. 从配置初始化 MQTT 客户端
  63. Args:
  64. config: dict, MQTT 配置字典
  65. - enable: bool, 是否启用 MQTT
  66. - broker_host: str, Broker 地址
  67. - broker_port: int, Broker 端口
  68. - username: str, 用户名
  69. - password: str, 密码
  70. - client_id: str, 客户端 ID
  71. - keepalive: int, 保活时间(秒)
  72. - command_topic: str, 命令 topic
  73. - status_topic: str, 状态 topic
  74. - event_topic: str, 事件 topic
  75. """
  76. # 初始化完成后忽略后续调用
  77. if self._initialized:
  78. return
  79. try:
  80. # 读取配置
  81. self.mqtt_enable = config.get("enable", False)
  82. if not self.mqtt_enable:
  83. self._log_info("MQTT disabled")
  84. self._initialized = True
  85. return
  86. # Broker 配置
  87. broker_host = config.get("broker_host", "127.0.0.1")
  88. broker_port = int(config.get("broker_port", 1883))
  89. username = config.get("username", "")
  90. password = config.get("password", "")
  91. client_id = config.get("client_id", "rdk_agent")
  92. keepalive = int(config.get("keepalive", 60))
  93. # Topic 配置
  94. self.command_topic = config.get("command_topic", "/ai/agent/command")
  95. self.status_topic = config.get("status_topic", "/ai/module/status")
  96. self.event_topic = config.get("event_topic", "/ai/module/event")
  97. # 创建 MQTT 客户端
  98. self.mqtt_client = mqtt.Client(client_id=client_id)
  99. # 设置用户名密码
  100. if username:
  101. self.mqtt_client.username_pw_set(username, password)
  102. # 注册回调
  103. self.mqtt_client.on_connect = self._on_connect
  104. self.mqtt_client.on_disconnect = self._on_disconnect
  105. self.mqtt_client.on_message = self._on_message
  106. # 连接 Broker
  107. self.mqtt_client.connect(broker_host, broker_port, keepalive)
  108. self.mqtt_client.loop_start()
  109. self._log_info(f"Connecting to broker {broker_host}:{broker_port}")
  110. self._initialized = True
  111. except Exception as e:
  112. self.mqtt_connected = False
  113. self._log_warn(f"Init failed: {e}")
  114. def _on_connect(self, client, userdata, flags, rc):
  115. """
  116. MQTT 连接回调
  117. Args:
  118. client: MQTT 客户端实例
  119. userdata: 用户数据
  120. flags: 连接标志
  121. rc: 连接结果码,0 表示成功
  122. """
  123. if rc == 0:
  124. self.mqtt_connected = True
  125. self._log_info("Connected")
  126. try:
  127. # 订阅状态和事件 topic
  128. client.subscribe(self.status_topic)
  129. client.subscribe(self.event_topic)
  130. self._log_info(f"Subscribed: {self.status_topic}, {self.event_topic}")
  131. except Exception as e:
  132. self._log_warn(f"Subscribe failed: {e}")
  133. else:
  134. self.mqtt_connected = False
  135. self._log_warn(f"Connect failed, rc={rc}")
  136. def _on_disconnect(self, client, userdata, rc):
  137. """
  138. MQTT 断开连接回调
  139. Args:
  140. client: MQTT 客户端实例
  141. userdata: 用户数据
  142. rc: 断开结果码
  143. """
  144. self.mqtt_connected = False
  145. self._log_warn(f"Disconnected, rc={rc}")
  146. def _on_message(self, client, userdata, msg):
  147. """
  148. MQTT 消息回调
  149. Args:
  150. client: MQTT 客户端实例
  151. userdata: 用户数据
  152. msg: MQTT 消息对象
  153. """
  154. try:
  155. topic = msg.topic
  156. payload_str = msg.payload.decode("utf-8")
  157. data = json.loads(payload_str)
  158. self._log_info(f"Received topic={topic}, data={data}")
  159. # 根据 topic 分发消息
  160. if topic == self.status_topic:
  161. if self.on_status_callback:
  162. self.on_status_callback(data)
  163. elif topic == self.event_topic:
  164. if self.on_event_callback:
  165. self.on_event_callback(data)
  166. else:
  167. self._log_info(f"Ignored topic: {topic}")
  168. except json.JSONDecodeError:
  169. self._log_warn(f"JSON parse failed for payload: {msg.payload}")
  170. except Exception as e:
  171. self._log_warn(f"Message parse failed: {e}")
  172. def publish_json(self, topic, data):
  173. """
  174. 通用 JSON 消息发布
  175. Args:
  176. topic: str, 消息 topic
  177. data: dict, 要发布的 JSON 数据
  178. Returns:
  179. bool: True 表示发布成功,False 表示失败
  180. """
  181. if not self.mqtt_enable:
  182. self._log_info(f"Disabled, skip publish: topic={topic}")
  183. return False
  184. if self.mqtt_client is None:
  185. self._log_warn("Client is None, skip publish")
  186. return False
  187. try:
  188. payload = json.dumps(data, ensure_ascii=False)
  189. if not self.mqtt_connected:
  190. self._log_warn("Not connected, try publish anyway")
  191. result = self.mqtt_client.publish(topic, payload)
  192. if result.rc == mqtt.MQTT_ERR_SUCCESS:
  193. self._log_info(f"Published topic={topic}, payload={payload}")
  194. return True
  195. else:
  196. self._log_warn(f"Publish failed, rc={result.rc}")
  197. return False
  198. except Exception as e:
  199. self._log_warn(f"Publish exception: {e}")
  200. return False
  201. def publish_command(self, target, module, action, page="", payload=None):
  202. """
  203. 发布命令消息(封装好的通用格式)
  204. Args:
  205. target: str, 目标模块(如 "screen", "advertisement", "face")
  206. module: str, 模块类型(如 "system", "navigation", "visitor")
  207. action: str, 动作名称(如 "ping", "open_page", "start")
  208. page: str, 页面标识(可选)
  209. payload: dict, 额外载荷数据(可选)
  210. Returns:
  211. bool: True 表示发布成功,False 表示失败
  212. 消息格式:
  213. {
  214. "msg_id": "agent_<timestamp_ms>",
  215. "source": "agent",
  216. "target": target,
  217. "module": module,
  218. "action": action,
  219. "page": page,
  220. "payload": payload,
  221. "timestamp": <timestamp_s>
  222. }
  223. """
  224. if payload is None:
  225. payload = {}
  226. msg = {
  227. "msg_id": f"agent_{int(time.time() * 1000)}",
  228. "source": "agent",
  229. "target": target,
  230. "module": module,
  231. "action": action,
  232. "page": page,
  233. "payload": payload,
  234. "timestamp": int(time.time())
  235. }
  236. return self.publish_json(self.command_topic, msg)
  237. def shutdown(self):
  238. """
  239. 关闭 MQTT 客户端连接
  240. 注意:
  241. - 调用此函数后,MQTT 客户端将被停止
  242. - 不会抛出异常
  243. """
  244. try:
  245. if self.mqtt_client is not None:
  246. self.mqtt_client.loop_stop()
  247. self.mqtt_client.disconnect()
  248. self._log_info("Client stopped")
  249. except Exception as e:
  250. self._log_warn(f"Shutdown failed: {e}")
  251. finally:
  252. self.mqtt_client = None
  253. self.mqtt_connected = False