PlannerNode2 机器人决策系统使用 MQTT 协议实现智能体与外部业务模块(屏幕、机械臂、人脸识别等)之间的通信。
依赖安装:
pip install paho-mqtt
| Topic 名称 | 方向 | 说明 |
|---|---|---|
/ai/agent/command |
智能体 → 外部模块 | 发布命令消息 |
| Topic 名称 | 方向 | 说明 |
|---|---|---|
/ai/module/status |
外部模块 → 智能体 | 接收外部模块状态更新 |
/ai/module/event |
外部模块 → 智能体 | 接收外部模块事件通知(如人脸识别结果) |
所有通过 MQTT 发送的命令消息统一使用以下 JSON 格式:
{
"msg_id": "agent_<timestamp_ms>",
"source": "agent",
"target": "<目标模块>",
"module": "<模块类型>",
"action": "<动作名称>",
"page": "<页面标识>",
"payload": {
// 额外参数
},
"timestamp": <unix_timestamp>
}
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
msg_id |
string | 是 | 消息唯一 ID,格式:agent_<毫秒时间戳> |
source |
string | 是 | 消息来源,固定为 "agent" |
target |
string | 是 | 目标模块(见下方模块列表) |
module |
string | 是 | 模块类型 |
action |
string | 是 | 动作名称 |
page |
string | 否 | 页面标识,用于 UI 相关操作 |
payload |
object | 否 | 额外参数 |
timestamp |
int | 是 | Unix 时间戳(秒) |
| target 值 | 说明 | 接收方 |
|---|---|---|
all |
所有模块(广播) | 全系统 |
screen |
屏幕/UI 模块 | 屏幕控制系统 |
face |
人脸识别模块 | 人脸识别服务 |
nav |
导航模块 | 导航控制系统 |
visitor |
访客管理模块 | 访客登记系统 |
customer_service |
客服模块 | 客服呼叫系统 |
advertisement |
广告模块 | 广告播放系统 |
arm |
机械臂模块 | 机械臂控制系统 |
功能:通知所有外部模块暂停当前业务。
使用场景:
调用方式:
self.mqtt_client.publish_command(
target="all",
module="system",
action="pause",
page="",
payload={
"reason": "wakeup",
"source_behavior": "agent_wakeup"
}
)
示例消息:
{
"msg_id": "agent_1747734123000",
"source": "agent",
"target": "all",
"module": "system",
"action": "pause",
"page": "",
"payload": {
"reason": "wakeup",
"source_behavior": "agent_wakeup"
},
"timestamp": 1747734123
}
功能:唤醒访客登记页面。
调用方式:
self.mqtt_client.publish_command(
target="screen",
module="visitor",
action="open_page",
page="visitor_register",
payload={
"title": "访客登记",
"input_mode": "touch_and_voice"
}
)
示例消息:
{
"msg_id": "agent_1747734123001",
"source": "agent",
"target": "screen",
"module": "visitor",
"action": "open_page",
"page": "visitor_register",
"payload": {
"title": "访客登记",
"input_mode": "touch_and_voice"
},
"timestamp": 1747734123
}
功能:唤醒预约确认页面。
调用方式:
self.mqtt_client.publish_command(
target="screen",
module="visitor",
action="open_page",
page="appointment_confirm",
payload={
"title": "预约确认",
"input_mode": "touch"
}
)
示例消息:
{
"msg_id": "agent_1747734123002",
"source": "agent",
"target": "screen",
"module": "visitor",
"action": "open_page",
"page": "appointment_confirm",
"payload": {
"title": "预约确认",
"input_mode": "touch"
},
"timestamp": 1747734123
}
功能:唤醒导航页面。
调用方式:
self.mqtt_client.publish_command(
target="screen",
module="nav",
action="open_page",
page="navigation",
payload={
"title": "导航服务",
"mode": "select_destination"
}
)
示例消息:
{
"msg_id": "agent_1747734123003",
"source": "agent",
"target": "screen",
"module": "nav",
"action": "open_page",
"page": "navigation",
"payload": {
"title": "导航服务",
"mode": "select_destination"
},
"timestamp": 1747734123
}
功能:唤醒呼叫客服页面。
调用方式:
self.mqtt_client.publish_command(
target="screen",
module="customer_service",
action="open_page",
page="customer_service",
payload={
"title": "呼叫客服",
"mode": "call_service"
}
)
示例消息:
{
"msg_id": "agent_1747734123004",
"source": "agent",
"target": "screen",
"module": "customer_service",
"action": "open_page",
"page": "customer_service",
"payload": {
"title": "呼叫客服",
"mode": "call_service"
},
"timestamp": 1747734123
}
功能:唤醒人脸识别页面并启动识别。此函数发送两条 MQTT 消息。
调用方式:
# 第一条 - 打开页面
self.mqtt_client.publish_command(
target="screen",
module="face",
action="open_page",
page="face_recognition",
payload={
"title": "人脸识别",
"tips": "请面向摄像头",
"mode": "whitelist"
}
)
# 第二条 - 启动识别
self.mqtt_client.publish_command(
target="face",
module="face",
action="start",
page="",
payload={
"mode": "whitelist",
"timeout_ms": 10000,
"threshold": 0.65
}
)
示例消息 1(打开页面):
{
"msg_id": "agent_1747734123005",
"source": "agent",
"target": "screen",
"module": "face",
"action": "open_page",
"page": "face_recognition",
"payload": {
"title": "人脸识别",
"tips": "请面向摄像头",
"mode": "whitelist"
},
"timestamp": 1747734123
}
示例消息 2(启动识别):
{
"msg_id": "agent_1747734123006",
"source": "agent",
"target": "face",
"module": "face",
"action": "start",
"page": "",
"payload": {
"mode": "whitelist",
"timeout_ms": 10000,
"threshold": 0.65
},
"timestamp": 1747734123
}
订阅 /ai/module/status 接收外部模块的状态更新。
回调注册:
mqtt_client.on_status_callback = self.on_status_received
def on_status_received(self, data):
# 处理状态数据
pass
订阅 /ai/module/event 接收外部模块的事件通知(如人脸识别结果)。
回调注册:
mqtt_client.on_event_callback = self.on_event_received
def on_event_received(self, data):
# 处理事件数据
pass
MQTT 配置来自 /ai/config 中的 mqtt 字段:
{
"mqtt": {
"enable": true,
"broker_host": "192.168.0.30",
"broker_port": 1883,
"username": "",
"password": "",
"client_id": "rdk_agent",
"keepalive": 60,
"command_topic": "/ai/agent/command",
"status_topic": "/ai/module/status",
"event_topic": "/ai/module/event"
}
}
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
enable |
bool | false |
是否启用 MQTT |
broker_host |
string | "127.0.0.1" |
MQTT Broker 地址 |
broker_port |
int | 1883 |
MQTT Broker 端口 |
username |
string | "" |
用户名(可选) |
password |
string | "" |
密码(可选) |
client_id |
string | "rdk_agent" |
客户端唯一标识 |
keepalive |
int | 60 |
保活时间(秒) |
command_topic |
string | /ai/agent/command |
命令发布话题 |
status_topic |
string | /ai/module/status |
状态订阅话题 |
event_topic |
string | /ai/module/event |
事件订阅话题 |
from utils.mqttclient import MQTTClient
# 创建 MQTT 客户端
self.mqtt_client = MQTTClient(logger=self.get_logger())
# 从配置初始化
mqtt_config = {
"enable": True,
"broker_host": "192.168.0.30",
"broker_port": 1883,
"client_id": "rdk_agent"
}
self.mqtt_client.init(mqtt_config)
# 注册回调
self.mqtt_client.on_status_callback = self.on_status_received
self.mqtt_client.on_event_callback = self.on_event_received
# 使用 publish_command 发送命令
ok = self.mqtt_client.publish_command(
target="screen",
module="nav",
action="open_page",
page="navigation",
payload={
"title": "导航服务",
"mode": "select_destination"
}
)
if ok:
self.get_logger().info("命令发送成功")
else:
self.get_logger().warn("命令发送失败")
在 CustomActionServer 类中新增成员函数即可被大模型调用(通过 hasattr 判断)。
def open_xxx_page(self):
"""
打开XXX页面
"""
try:
ok = self.mqtt_client.publish_command(
target="screen",
module="xxx",
action="open_page",
page="xxx_page",
payload={
"title": "XXX",
"mode": "xxx"
}
)
self.get_logger().info(
f"[MQTT业务] open_xxx_page sent, ok={ok}"
)
except Exception as e:
self.get_logger().warn(
f"[MQTT业务] open_xxx_page failed: {e}"
)
MQTT 初始化时机:MQTT 客户端在收到 /ai/config 配置后初始化,仅初始化一次,后续配置更新不会重新初始化。
发布失败不影响主流程:所有业务函数内部已做异常捕获,MQTT 发布失败只会打印 warning 日志,不影响原有 ROS2 逻辑。
日志前缀:所有 MQTT 业务日志使用 [MQTT业务] 前缀,方便排查。
唤醒时自动暂停:wakeup_callback 收到唤醒信号后会自动调用 global_pause("wakeup"),通知全车暂停。
| 日期 | 版本 | 更新内容 |
|---|---|---|
| 2026-05-20 | 1.0 | 初始版本,支持 MQTT 客户端模块及基础业务函数 |