MQTT协议.md 11 KB

MQTT 通信协议文档

1. 概述

PlannerNode2 机器人决策系统使用 MQTT 协议实现智能体与外部业务模块(屏幕、机械臂、人脸识别等)之间的通信。

依赖安装

pip install paho-mqtt

2. Topic 定义

2.1 发布 (Publish) Topic

Topic 名称 方向 说明
/ai/agent/command 智能体 → 外部模块 发布命令消息

2.2 订阅 (Subscribe) Topic

Topic 名称 方向 说明
/ai/module/status 外部模块 → 智能体 接收外部模块状态更新
/ai/module/event 外部模块 → 智能体 接收外部模块事件通知(如人脸识别结果)

3. 消息格式

3.1 命令消息结构 (Command Message)

所有通过 MQTT 发送的命令消息统一使用以下 JSON 格式:

{
  "msg_id": "agent_<timestamp_ms>",
  "source": "agent",
  "target": "<目标模块>",
  "module": "<模块类型>",
  "action": "<动作名称>",
  "page": "<页面标识>",
  "payload": {
    // 额外参数
  },
  "timestamp": <unix_timestamp>
}

3.2 字段说明

字段 类型 必填 说明
msg_id string 消息唯一 ID,格式:agent_<毫秒时间戳>
source string 消息来源,固定为 "agent"
target string 目标模块(见下方模块列表)
module string 模块类型
action string 动作名称
page string 页面标识,用于 UI 相关操作
payload object 额外参数
timestamp int Unix 时间戳(秒)

4. 目标模块 (target)

target 值 说明 接收方
all 所有模块(广播) 全系统
screen 屏幕/UI 模块 屏幕控制系统
face 人脸识别模块 人脸识别服务
nav 导航模块 导航控制系统
visitor 访客管理模块 访客登记系统
customer_service 客服模块 客服呼叫系统
advertisement 广告模块 广告播放系统
arm 机械臂模块 机械臂控制系统

5. 已实现的业务函数

5.1 global_pause - 全局暂停

功能:通知所有外部模块暂停当前业务。

使用场景

  • 机器人被唤醒时
  • 进入语音交互前
  • 欢迎语播放前
  • 业务页面切换时

调用方式

self.mqtt_client.publish_command(
    target="all",
    module="system",
    action="pause",
    page="",
    payload={
        "reason": "wakeup",
        "source_behavior": "agent_wakeup"
    }
)

示例消息

{
  "msg_id": "agent_1747734123000",
  "source": "agent",
  "target": "all",
  "module": "system",
  "action": "pause",
  "page": "",
  "payload": {
    "reason": "wakeup",
    "source_behavior": "agent_wakeup"
  },
  "timestamp": 1747734123
}

5.2 open_visitor_register_page - 访客登记页面

功能:唤醒访客登记页面。

调用方式

self.mqtt_client.publish_command(
    target="screen",
    module="visitor",
    action="open_page",
    page="visitor_register",
    payload={
        "title": "访客登记",
        "input_mode": "touch_and_voice"
    }
)

示例消息

{
  "msg_id": "agent_1747734123001",
  "source": "agent",
  "target": "screen",
  "module": "visitor",
  "action": "open_page",
  "page": "visitor_register",
  "payload": {
    "title": "访客登记",
    "input_mode": "touch_and_voice"
  },
  "timestamp": 1747734123
}

5.3 open_appointment_confirm_page - 预约确认页面

功能:唤醒预约确认页面。

调用方式

self.mqtt_client.publish_command(
    target="screen",
    module="visitor",
    action="open_page",
    page="appointment_confirm",
    payload={
        "title": "预约确认",
        "input_mode": "touch"
    }
)

示例消息

{
  "msg_id": "agent_1747734123002",
  "source": "agent",
  "target": "screen",
  "module": "visitor",
  "action": "open_page",
  "page": "appointment_confirm",
  "payload": {
    "title": "预约确认",
    "input_mode": "touch"
  },
  "timestamp": 1747734123
}

5.4 open_navigation_page - 导航页面

功能:唤醒导航页面。

调用方式

self.mqtt_client.publish_command(
    target="screen",
    module="nav",
    action="open_page",
    page="navigation",
    payload={
        "title": "导航服务",
        "mode": "select_destination"
    }
)

示例消息

{
  "msg_id": "agent_1747734123003",
  "source": "agent",
  "target": "screen",
  "module": "nav",
  "action": "open_page",
  "page": "navigation",
  "payload": {
    "title": "导航服务",
    "mode": "select_destination"
  },
  "timestamp": 1747734123
}

5.5 open_customer_service_page - 呼叫客服页面

功能:唤醒呼叫客服页面。

调用方式

self.mqtt_client.publish_command(
    target="screen",
    module="customer_service",
    action="open_page",
    page="customer_service",
    payload={
        "title": "呼叫客服",
        "mode": "call_service"
    }
)

示例消息

{
  "msg_id": "agent_1747734123004",
  "source": "agent",
  "target": "screen",
  "module": "customer_service",
  "action": "open_page",
  "page": "customer_service",
  "payload": {
    "title": "呼叫客服",
    "mode": "call_service"
  },
  "timestamp": 1747734123
}

5.6 open_face_recognition_page - 人脸识别页面

功能:唤醒人脸识别页面并启动识别。此函数发送两条 MQTT 消息。

调用方式

# 第一条 - 打开页面
self.mqtt_client.publish_command(
    target="screen",
    module="face",
    action="open_page",
    page="face_recognition",
    payload={
        "title": "人脸识别",
        "tips": "请面向摄像头",
        "mode": "whitelist"
    }
)

# 第二条 - 启动识别
self.mqtt_client.publish_command(
    target="face",
    module="face",
    action="start",
    page="",
    payload={
        "mode": "whitelist",
        "timeout_ms": 10000,
        "threshold": 0.65
    }
)

示例消息 1(打开页面)

{
  "msg_id": "agent_1747734123005",
  "source": "agent",
  "target": "screen",
  "module": "face",
  "action": "open_page",
  "page": "face_recognition",
  "payload": {
    "title": "人脸识别",
    "tips": "请面向摄像头",
    "mode": "whitelist"
  },
  "timestamp": 1747734123
}

示例消息 2(启动识别)

{
  "msg_id": "agent_1747734123006",
  "source": "agent",
  "target": "face",
  "module": "face",
  "action": "start",
  "page": "",
  "payload": {
    "mode": "whitelist",
    "timeout_ms": 10000,
    "threshold": 0.65
  },
  "timestamp": 1747734123
}

6. 订阅消息处理

6.1 状态消息 (status)

订阅 /ai/module/status 接收外部模块的状态更新。

回调注册

mqtt_client.on_status_callback = self.on_status_received

def on_status_received(self, data):
    # 处理状态数据
    pass

6.2 事件消息 (event)

订阅 /ai/module/event 接收外部模块的事件通知(如人脸识别结果)。

回调注册

mqtt_client.on_event_callback = self.on_event_received

def on_event_received(self, data):
    # 处理事件数据
    pass

7. MQTT 配置格式

MQTT 配置来自 /ai/config 中的 mqtt 字段:

{
  "mqtt": {
    "enable": true,
    "broker_host": "192.168.0.30",
    "broker_port": 1883,
    "username": "",
    "password": "",
    "client_id": "rdk_agent",
    "keepalive": 60,
    "command_topic": "/ai/agent/command",
    "status_topic": "/ai/module/status",
    "event_topic": "/ai/module/event"
  }
}

7.1 配置参数说明

参数 类型 默认值 说明
enable bool false 是否启用 MQTT
broker_host string "127.0.0.1" MQTT Broker 地址
broker_port int 1883 MQTT Broker 端口
username string "" 用户名(可选)
password string "" 密码(可选)
client_id string "rdk_agent" 客户端唯一标识
keepalive int 60 保活时间(秒)
command_topic string /ai/agent/command 命令发布话题
status_topic string /ai/module/status 状态订阅话题
event_topic string /ai/module/event 事件订阅话题

8. 使用示例

8.1 MQTT 客户端初始化

from utils.mqttclient import MQTTClient

# 创建 MQTT 客户端
self.mqtt_client = MQTTClient(logger=self.get_logger())

# 从配置初始化
mqtt_config = {
    "enable": True,
    "broker_host": "192.168.0.30",
    "broker_port": 1883,
    "client_id": "rdk_agent"
}
self.mqtt_client.init(mqtt_config)

# 注册回调
self.mqtt_client.on_status_callback = self.on_status_received
self.mqtt_client.on_event_callback = self.on_event_received

8.2 发送命令

# 使用 publish_command 发送命令
ok = self.mqtt_client.publish_command(
    target="screen",
    module="nav",
    action="open_page",
    page="navigation",
    payload={
        "title": "导航服务",
        "mode": "select_destination"
    }
)

if ok:
    self.get_logger().info("命令发送成功")
else:
    self.get_logger().warn("命令发送失败")

8.3 扩展新的 MQTT 业务函数

CustomActionServer 类中新增成员函数即可被大模型调用(通过 hasattr 判断)。

def open_xxx_page(self):
    """
    打开XXX页面
    """
    try:
        ok = self.mqtt_client.publish_command(
            target="screen",
            module="xxx",
            action="open_page",
            page="xxx_page",
            payload={
                "title": "XXX",
                "mode": "xxx"
            }
        )
        self.get_logger().info(
            f"[MQTT业务] open_xxx_page sent, ok={ok}"
        )
    except Exception as e:
        self.get_logger().warn(
            f"[MQTT业务] open_xxx_page failed: {e}"
        )

9. 注意事项

  1. MQTT 初始化时机:MQTT 客户端在收到 /ai/config 配置后初始化,仅初始化一次,后续配置更新不会重新初始化。

  2. 发布失败不影响主流程:所有业务函数内部已做异常捕获,MQTT 发布失败只会打印 warning 日志,不影响原有 ROS2 逻辑。

  3. 日志前缀:所有 MQTT 业务日志使用 [MQTT业务] 前缀,方便排查。

  4. 唤醒时自动暂停wakeup_callback 收到唤醒信号后会自动调用 global_pause("wakeup"),通知全车暂停。


10. 更新日志

日期 版本 更新内容
2026-05-20 1.0 初始版本,支持 MQTT 客户端模块及基础业务函数