""" tool_protocol.py - AI Agent 标准协议层 该模块定义 AI Agent 系统中 "计划" 和 "动作" 的标准数据结构, 用于连接 WorldModel → Planner → Executor → Capability。 设计原则: 1. 协议层只定义结构,不执行任何 Capability 2. 协议层不负责 ROS 通信,不依赖 rclpy 3. 完全通用,无场景硬编码 4. JSON 序列化友好,适合 LLM 直接输出 使用方式: from tool_protocol import Plan, PlanStep, ExecutionResult # 创建计划 plan = Plan(plan_id="plan_001", goal="执行任务") plan.add_step(PlanStep(step_id=1, action="action_name", parameters={})) # 序列化为 JSON json_data = plan.to_dict() # 从 JSON 恢复 plan = Plan.from_dict(json_data) """ from __future__ import annotations import time import uuid from dataclasses import dataclass, field from enum import Enum from typing import Any, Optional # ============================================================================= # 枚举定义 # ============================================================================= class RiskLevel(Enum): """计划风险等级枚举""" LOW = "low" MEDIUM = "medium" HIGH = "high" class StepStatus(Enum): """单个步骤状态枚举""" PENDING = "pending" RUNNING = "running" SUCCESS = "success" FAILED = "failed" SKIPPED = "skipped" WAIT_CONFIRMATION = "wait_confirmation" class PlanStatus(Enum): """整体计划状态枚举""" CREATED = "created" APPROVED = "approved" EXECUTING = "executing" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" WAIT_CONFIRMATION = "wait_confirmation" class ToolCallType(Enum): """步骤动作类型枚举 用于区分步骤的性质,帮助 Planner 和 Executor 理解步骤意图。 """ EXECUTE = "execute" # 执行具体动作 ASK_USER = "ask_user" # 询问用户确认 SPEAK = "speak" # 语音/文本输出 QUERY_WORLD = "query_world" # 查询世界状态 QUERY_KNOWLEDGE = "query_knowledge" # 查询知识库 NOOP = "noop" # 空操作 # ============================================================================= # 数据类定义 # ============================================================================= @dataclass class PlanStep: """计划步骤 表示计划中的单个执行步骤,包含动作、参数、前置条件等信息。 属性: step_id: 步骤唯一标识,在一个 Plan 内必须唯一 action: 动作名称,如 "execute_action", "speak", "query_world" tool_call_type: 动作类型,用于区分步骤性质 parameters: 动作参数字典 preconditions: 执行前检查条件 fallback: 失败后的回退动作名称,None 表示无回退 status: 当前步骤状态 description: 步骤描述说明 requires_confirmation: 是否需要用户确认 confirmation_message: 确认提示消息 metadata: 可扩展元数据 示例: >>> step = PlanStep( ... step_id=1, ... action="adjust_fan", ... tool_call_type=ToolCallType.EXECUTE, ... parameters={"fan_id": "fan_001", "speed": 3}, ... description="调整风扇速度" ... ) >>> step.update_status(StepStatus.RUNNING) """ step_id: int action: str parameters: dict = field(default_factory=dict) preconditions: dict = field(default_factory=dict) fallback: str | None = None status: StepStatus = StepStatus.PENDING description: str = "" tool_call_type: ToolCallType = ToolCallType.EXECUTE requires_confirmation: bool = False confirmation_message: str | None = None metadata: dict = field(default_factory=dict) def __post_init__(self) -> None: """数据校验""" if not self.action or not self.action.strip(): raise ValueError("action 不能为空") if self.step_id < 0: raise ValueError("step_id 必须为非负整数") def update_status(self, new_status: StepStatus) -> None: """更新步骤状态 Args: new_status: 新的状态值 """ self.status = new_status def to_dict(self) -> dict[str, Any]: """序列化为字典 Returns: 包含所有字段的字典,Enum 转为字符串值 """ return { "step_id": self.step_id, "action": self.action, "tool_call_type": self.tool_call_type.value, "parameters": self.parameters, "preconditions": self.preconditions, "fallback": self.fallback, "status": self.status.value, "description": self.description, "requires_confirmation": self.requires_confirmation, "confirmation_message": self.confirmation_message, "metadata": self.metadata, } @classmethod def from_dict(cls, data: dict[str, Any]) -> PlanStep: """从字典反序列化 Args: data: 包含步骤数据的字典 Returns: PlanStep 实例 """ return cls( step_id=data["step_id"], action=data["action"], tool_call_type=ToolCallType(data.get("tool_call_type", "execute")), parameters=data.get("parameters", {}), preconditions=data.get("preconditions", {}), fallback=data.get("fallback"), status=StepStatus(data.get("status", "pending")), description=data.get("description", ""), requires_confirmation=data.get("requires_confirmation", False), confirmation_message=data.get("confirmation_message"), metadata=data.get("metadata", {}), ) @dataclass class Plan: """计划 表示一个完整的执行计划,包含目标、步骤列表、风险等级等信息。 由 Planner 生成,由 Executor 执行。 属性: plan_id: 计划唯一标识 goal: 计划目标描述 reasoning: 推理过程说明 risk_level: 风险等级 requires_confirmation: 是否需要用户确认后执行 confirmation_message: 确认提示消息 steps: 步骤列表 status: 计划状态 created_at: 创建时间戳 source: 计划来源,如 "rule_engine", "llm", "hybrid" metadata: 可扩展元数据 示例: >>> plan = Plan( ... plan_id="plan_001", ... goal="完成任务A", ... reasoning="因为条件满足", ... risk_level=RiskLevel.LOW, ... source="llm" ... ) >>> plan.add_step(PlanStep(step_id=1, action="step1", parameters={})) >>> plan.add_step(PlanStep(step_id=2, action="step2", parameters={})) >>> pending = plan.get_pending_steps() """ plan_id: str goal: str reasoning: str = "" risk_level: RiskLevel = RiskLevel.LOW requires_confirmation: bool = False confirmation_message: str | None = None steps: list[PlanStep] = field(default_factory=list) status: PlanStatus = PlanStatus.CREATED created_at: float = field(default_factory=time.time) source: str = "llm" metadata: dict = field(default_factory=dict) def __post_init__(self) -> None: """数据校验""" if not self.plan_id or not self.plan_id.strip(): raise ValueError("plan_id 不能为空") if not self.goal or not self.goal.strip(): raise ValueError("goal 不能为空") def add_step(self, step: PlanStep) -> None: """添加步骤 Args: step: 要添加的 PlanStep Raises: ValueError: 如果 step_id 已存在 """ existing_ids = {s.step_id for s in self.steps} if step.step_id in existing_ids: raise ValueError(f"step_id {step.step_id} 已存在,请使用唯一ID") self.steps.append(step) def get_pending_steps(self) -> list[PlanStep]: """获取所有待执行的步骤 Returns: 状态为 PENDING 的步骤列表 """ return [s for s in self.steps if s.status == StepStatus.PENDING] def get_current_step(self) -> PlanStep | None: """获取当前正在执行的步骤 Returns: 第一个状态为 RUNNING 的步骤,如果没有则返回 None """ for step in self.steps: if step.status == StepStatus.RUNNING: return step return None def get_waiting_confirmation_steps(self) -> list[PlanStep]: """获取所有等待确认的步骤 Returns: 状态为 WAIT_CONFIRMATION 的步骤列表 """ return [s for s in self.steps if s.status == StepStatus.WAIT_CONFIRMATION] def is_finished(self) -> bool: """判断计划是否完成 完成条件:所有步骤都已执行(非 PENDING、RUNNING、WAIT_CONFIRMATION) Returns: 如果计划完成返回 True """ return all( s.status not in (StepStatus.PENDING, StepStatus.RUNNING, StepStatus.WAIT_CONFIRMATION) for s in self.steps ) def is_success(self) -> bool: """判断计划是否完全成功 成功条件: 1. 至少存在一个步骤 2. 所有步骤状态都为 SUCCESS Returns: 如果所有步骤都成功返回 True """ if not self.steps: return False return all(s.status == StepStatus.SUCCESS for s in self.steps) def get_failed_steps(self) -> list[PlanStep]: """获取所有失败的步骤 Returns: 状态为 FAILED 的步骤列表 """ return [s for s in self.steps if s.status == StepStatus.FAILED] def next_step_id(self) -> int: """获取下一个可用的步骤 ID Returns: 新的 step_id(当前最大 ID + 1) """ if not self.steps: return 1 return max(s.step_id for s in self.steps) + 1 def to_dict(self) -> dict[str, Any]: """序列化为字典 Returns: 包含所有字段的字典,Enum 转为字符串值 """ return { "plan_id": self.plan_id, "goal": self.goal, "reasoning": self.reasoning, "risk_level": self.risk_level.value, "requires_confirmation": self.requires_confirmation, "confirmation_message": self.confirmation_message, "steps": [s.to_dict() for s in self.steps], "status": self.status.value, "created_at": self.created_at, "source": self.source, "metadata": self.metadata, } @classmethod def from_dict(cls, data: dict[str, Any]) -> Plan: """从字典反序列化 Args: data: 包含计划数据的字典 Returns: Plan 实例 """ steps = [ PlanStep.from_dict(s) for s in data.get("steps", []) ] return cls( plan_id=data["plan_id"], goal=data["goal"], reasoning=data.get("reasoning", ""), risk_level=RiskLevel(data.get("risk_level", "low")), requires_confirmation=data.get("requires_confirmation", False), confirmation_message=data.get("confirmation_message"), steps=steps, status=PlanStatus(data.get("status", "created")), created_at=data.get("created_at", time.time()), source=data.get("source", "llm"), metadata=data.get("metadata", {}), ) @classmethod def create_new( cls, goal: str, reasoning: str = "", risk_level: RiskLevel = RiskLevel.LOW, source: str = "llm", ) -> Plan: """创建新计划的便捷工厂方法 Args: goal: 计划目标 reasoning: 推理过程 risk_level: 风险等级 source: 计划来源 Returns: 新的 Plan 实例 """ return cls( plan_id=f"plan_{uuid.uuid4().hex[:8]}", goal=goal, reasoning=reasoning, risk_level=risk_level, source=source, ) @dataclass class ExecutionResult: """执行结果 表示 Executor 执行某个步骤后的结果。 用于回传执行状态、输出数据等信息。 属性: plan_id: 关联的计划 ID step_id: 执行的步骤 ID success: 是否执行成功 status: 执行状态 message: 执行消息描述 output: 执行输出数据 timestamp: 执行时间戳 示例: >>> result = ExecutionResult( ... plan_id="plan_001", ... step_id=1, ... success=True, ... status=StepStatus.SUCCESS, ... message="执行成功", ... output={"result": "ok"} ... ) >>> data = result.to_dict() """ plan_id: str step_id: int success: bool status: StepStatus message: str = "" output: dict = field(default_factory=dict) timestamp: float = field(default_factory=time.time) def __post_init__(self) -> None: """数据校验""" if not self.plan_id or not self.plan_id.strip(): raise ValueError("plan_id 不能为空") if self.step_id < 0: raise ValueError("step_id 必须为非负整数") # success 与 status 的一致性校验 if self.status == StepStatus.SUCCESS and not self.success: raise ValueError("status=SUCCESS 时 success 必须为 True") if self.status == StepStatus.FAILED and self.success: raise ValueError("status=FAILED 时 success 必须为 False") def to_dict(self) -> dict[str, Any]: """序列化为字典 Returns: 包含所有字段的字典,Enum 转为字符串值 """ return { "plan_id": self.plan_id, "step_id": self.step_id, "success": self.success, "status": self.status.value, "message": self.message, "output": self.output, "timestamp": self.timestamp, } @classmethod def from_dict(cls, data: dict[str, Any]) -> ExecutionResult: """从字典反序列化 Args: data: 包含执行结果数据的字典 Returns: ExecutionResult 实例 """ return cls( plan_id=data["plan_id"], step_id=data["step_id"], success=data.get("success", False), status=StepStatus(data.get("status", "failed")), message=data.get("message", ""), output=data.get("output", {}), timestamp=data.get("timestamp", time.time()), ) @classmethod def success_result( cls, plan_id: str, step_id: int, message: str = "", output: dict | None = None, ) -> ExecutionResult: """创建成功执行结果的便捷方法 Args: plan_id: 计划 ID step_id: 步骤 ID message: 成功消息 output: 输出数据 Returns: 成功的 ExecutionResult 实例 """ return cls( plan_id=plan_id, step_id=step_id, success=True, status=StepStatus.SUCCESS, message=message or "执行成功", output=output or {}, ) @classmethod def failure_result( cls, plan_id: str, step_id: int, message: str, output: dict | None = None, ) -> ExecutionResult: """创建失败执行结果的便捷方法 Args: plan_id: 计划 ID step_id: 步骤 ID message: 失败原因 output: 输出数据 Returns: 失败的 ExecutionResult 实例 """ return cls( plan_id=plan_id, step_id=step_id, success=False, status=StepStatus.FAILED, message=message, output=output or {}, ) @classmethod def skipped_result( cls, plan_id: str, step_id: int, message: str = "步骤已跳过", ) -> ExecutionResult: """创建跳过执行结果的便捷方法 Args: plan_id: 计划 ID step_id: 步骤 ID message: 跳过原因 Returns: 跳过的 ExecutionResult 实例 """ return cls( plan_id=plan_id, step_id=step_id, success=True, status=StepStatus.SKIPPED, message=message, output={}, ) # ============================================================================= # 工具函数 # ============================================================================= def create_plan_from_llm_response( llm_output: dict[str, Any], source: str = "llm", ) -> Plan: """从 LLM 输出创建 Plan 的辅助函数 将 LLM 返回的 JSON 结构转换为 Plan 对象。 Args: llm_output: LLM 返回的字典数据 source: 计划来源 Returns: Plan 实例 示例: >>> llm_response = { ... "goal": "完成任务", ... "reasoning": "分析后决定", ... "risk_level": "low", ... "steps": [ ... {"step_id": 1, "action": "step1", "parameters": {}} ... ] ... } >>> plan = create_plan_from_llm_response(llm_response) """ plan = Plan( plan_id=llm_output.get("plan_id", f"plan_{uuid.uuid4().hex[:8]}"), goal=llm_output["goal"], reasoning=llm_output.get("reasoning", ""), risk_level=RiskLevel(llm_output.get("risk_level", "low")), requires_confirmation=llm_output.get("requires_confirmation", False), confirmation_message=llm_output.get("confirmation_message"), source=source, metadata=llm_output.get("metadata", {}), ) for step_data in llm_output.get("steps", []): step = PlanStep.from_dict(step_data) plan.add_step(step) return plan # ============================================================================= # 主程序入口(测试示例) # ============================================================================= if __name__ == "__main__": import json print("=" * 70) print("Tool Protocol 测试演示") print("=" * 70) # ------------------------------------------------------------------------- # 1. 创建 Plan # ------------------------------------------------------------------------- print("\n[1] 创建 Plan") print("-" * 40) plan = Plan.create_new( goal="执行环境监测任务", reasoning="检测到环境参数异常,需要执行降温操作", risk_level=RiskLevel.MEDIUM, source="llm", ) print(f"创建计划: {plan.plan_id}") print(f"目标: {plan.goal}") print(f"风险等级: {plan.risk_level.value}") print(f"来源: {plan.source}") # ------------------------------------------------------------------------- # 2. 添加 PlanStep # ------------------------------------------------------------------------- print("\n[2] 添加 PlanStep") print("-" * 40) # 步骤1:查询世界状态 step1 = PlanStep( step_id=1, action="query_world_state", tool_call_type=ToolCallType.QUERY_WORLD, parameters={"query": "temperature"}, description="查询当前温度", metadata={"source": "world_model"}, ) plan.add_step(step1) # 步骤2:询问用户确认 step2 = PlanStep( step_id=2, action="confirm_action", tool_call_type=ToolCallType.ASK_USER, parameters={"prompt": "是否启动降温设备?"}, description="等待用户确认", requires_confirmation=True, confirmation_message="当前温度过高,是否启动降温?", ) plan.add_step(step2) # 步骤3:执行动作 step3 = PlanStep( step_id=3, action="adjust_cooling", tool_call_type=ToolCallType.EXECUTE, parameters={"device_id": "cooler_001", "level": 3}, preconditions={"temperature_above": 30}, fallback="alert_operator", description="调整降温设备", metadata={"device": "cooler", "priority": "high"}, ) plan.add_step(step3) # 步骤4:语音通知 step4 = PlanStep( step_id=4, action="speak_notification", tool_call_type=ToolCallType.SPEAK, parameters={"message": "降温设备已启动", "volume": 0.8}, description="通知操作员", ) plan.add_step(step4) print(f"添加了 {len(plan.steps)} 个步骤") for step in plan.steps: print(f" - Step {step.step_id}: {step.action} ({step.tool_call_type.value})") # ------------------------------------------------------------------------- # 3. 序列化为 Dict/JSON # ------------------------------------------------------------------------- print("\n[3] 序列化为 Dict/JSON") print("-" * 40) plan_dict = plan.to_dict() plan_json = json.dumps(plan_dict, indent=2, ensure_ascii=False) print("Plan JSON:") print(plan_json[:500] + "..." if len(plan_json) > 500 else plan_json) # ------------------------------------------------------------------------- # 4. 从 Dict 恢复 # ------------------------------------------------------------------------- print("\n[4] 从 Dict 恢复 Plan") print("-" * 40) restored_plan = Plan.from_dict(plan_dict) print(f"恢复计划 ID: {restored_plan.plan_id}") print(f"恢复步骤数: {len(restored_plan.steps)}") print(f"恢复风险等级: {restored_plan.risk_level.value}") # 验证恢复正确性 assert plan.plan_id == restored_plan.plan_id assert len(plan.steps) == len(restored_plan.steps) print("✓ 序列化/反序列化验证通过") # ------------------------------------------------------------------------- # 5. Plan 操作演示 # ------------------------------------------------------------------------- print("\n[5] Plan 操作演示") print("-" * 40) # 更新步骤状态 print(f"初始待执行步骤数: {len(plan.get_pending_steps())}") step1.update_status(StepStatus.RUNNING) print(f"Step 1 状态更新为: {step1.status.value}") step1.update_status(StepStatus.SUCCESS) print(f"Step 1 状态更新为: {step1.status.value}") step2.update_status(StepStatus.WAIT_CONFIRMATION) waiting = plan.get_waiting_confirmation_steps() print(f"等待确认的步骤: {[s.step_id for s in waiting]}") print(f"计划是否完成: {plan.is_finished()}") print(f"计划是否成功: {plan.is_success()}") # ------------------------------------------------------------------------- # 6. ExecutionResult 演示 # ------------------------------------------------------------------------- print("\n[6] ExecutionResult 演示") print("-" * 40) # 创建成功结果 result1 = ExecutionResult.success_result( plan_id=plan.plan_id, step_id=1, message="温度查询成功", output={"temperature": 32.5, "humidity": 65}, ) print(f"成功结果: {result1.to_dict()}") # 创建失败结果 result2 = ExecutionResult.failure_result( plan_id=plan.plan_id, step_id=3, message="设备通信失败", output={"error_code": "E503", "device": "cooler_001"}, ) print(f"失败结果: {result2.to_dict()}") # 创建跳过结果 result3 = ExecutionResult.skipped_result( plan_id=plan.plan_id, step_id=4, message="条件不满足,跳过通知", ) print(f"跳过结果: {result3.to_dict()}") # ------------------------------------------------------------------------- # 7. 从 LLM 输出创建 Plan # ------------------------------------------------------------------------- print("\n[7] 从 LLM 输出创建 Plan") print("-" * 40) llm_response = { "goal": "自动巡检任务", "reasoning": "定时任务触发,执行标准巡检流程", "risk_level": "low", "requires_confirmation": False, "steps": [ { "step_id": 1, "action": "move_to_location", "tool_call_type": "execute", "parameters": {"target": "zone_A"}, "description": "移动到巡检区域A", }, { "step_id": 2, "action": "capture_sensor_data", "tool_call_type": "execute", "parameters": {"sensors": ["temp", "humidity"]}, "description": "采集传感器数据", }, { "step_id": 3, "action": "speak_report", "tool_call_type": "speak", "parameters": {"content": "巡检完成"}, "description": "报告巡检结果", }, ], } llm_plan = create_plan_from_llm_response(llm_response, source="llm") print(f"从 LLM 创建计划: {llm_plan.plan_id}") print(f"目标: {llm_plan.goal}") print(f"步骤数: {len(llm_plan.steps)}") # ------------------------------------------------------------------------- # 8. 枚举使用演示 # ------------------------------------------------------------------------- print("\n[8] 枚举使用演示") print("-" * 40) print("RiskLevel 枚举:") for level in RiskLevel: print(f" - {level.name} = {level.value}") print("\nStepStatus 枚举:") for status in StepStatus: print(f" - {status.name} = {status.value}") print("\nPlanStatus 枚举:") for status in PlanStatus: print(f" - {status.name} = {status.value}") print("\nToolCallType 枚举:") for tool_type in ToolCallType: print(f" - {tool_type.name} = {tool_type.value}") print("\n" + "=" * 70) print("测试演示完成") print("=" * 70)