| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846 |
- """
- tool_protocol.py - AI Agent 标准协议层
- 该模块定义 AI Agent 系统中 "计划" 和 "动作" 的标准数据结构,
- 用于连接 WorldModel → Planner → Executor → Capability。
- 设计原则:
- 1. 协议层只定义结构,不执行任何 Capability
- 2. 协议层不负责 ROS 通信,不依赖 rclpy
- 3. 完全通用,无场景硬编码
- 4. JSON 序列化友好,适合 LLM 直接输出
- 使用方式:
- from tool_protocol import Plan, PlanStep, ExecutionResult
- # 创建计划
- plan = Plan(plan_id="plan_001", goal="执行任务")
- plan.add_step(PlanStep(step_id=1, action="action_name", parameters={}))
-
- # 序列化为 JSON
- json_data = plan.to_dict()
-
- # 从 JSON 恢复
- plan = Plan.from_dict(json_data)
- """
- from __future__ import annotations
- import time
- import uuid
- from dataclasses import dataclass, field
- from enum import Enum
- from typing import Any, Optional
- # =============================================================================
- # 枚举定义
- # =============================================================================
- class RiskLevel(Enum):
- """计划风险等级枚举"""
- LOW = "low"
- MEDIUM = "medium"
- HIGH = "high"
- class StepStatus(Enum):
- """单个步骤状态枚举"""
- PENDING = "pending"
- RUNNING = "running"
- SUCCESS = "success"
- FAILED = "failed"
- SKIPPED = "skipped"
- WAIT_CONFIRMATION = "wait_confirmation"
- class PlanStatus(Enum):
- """整体计划状态枚举"""
- CREATED = "created"
- APPROVED = "approved"
- EXECUTING = "executing"
- COMPLETED = "completed"
- FAILED = "failed"
- CANCELLED = "cancelled"
- WAIT_CONFIRMATION = "wait_confirmation"
- class ToolCallType(Enum):
- """步骤动作类型枚举
-
- 用于区分步骤的性质,帮助 Planner 和 Executor 理解步骤意图。
- """
- EXECUTE = "execute" # 执行具体动作
- ASK_USER = "ask_user" # 询问用户确认
- SPEAK = "speak" # 语音/文本输出
- QUERY_WORLD = "query_world" # 查询世界状态
- QUERY_KNOWLEDGE = "query_knowledge" # 查询知识库
- NOOP = "noop" # 空操作
- # =============================================================================
- # 数据类定义
- # =============================================================================
- @dataclass
- class PlanStep:
- """计划步骤
-
- 表示计划中的单个执行步骤,包含动作、参数、前置条件等信息。
-
- 属性:
- step_id: 步骤唯一标识,在一个 Plan 内必须唯一
- action: 动作名称,如 "execute_action", "speak", "query_world"
- tool_call_type: 动作类型,用于区分步骤性质
- parameters: 动作参数字典
- preconditions: 执行前检查条件
- fallback: 失败后的回退动作名称,None 表示无回退
- status: 当前步骤状态
- description: 步骤描述说明
- requires_confirmation: 是否需要用户确认
- confirmation_message: 确认提示消息
- metadata: 可扩展元数据
-
- 示例:
- >>> step = PlanStep(
- ... step_id=1,
- ... action="adjust_fan",
- ... tool_call_type=ToolCallType.EXECUTE,
- ... parameters={"fan_id": "fan_001", "speed": 3},
- ... description="调整风扇速度"
- ... )
- >>> step.update_status(StepStatus.RUNNING)
- """
-
- step_id: int
- action: str
- parameters: dict = field(default_factory=dict)
- preconditions: dict = field(default_factory=dict)
- fallback: str | None = None
- status: StepStatus = StepStatus.PENDING
- description: str = ""
- tool_call_type: ToolCallType = ToolCallType.EXECUTE
- requires_confirmation: bool = False
- confirmation_message: str | None = None
- metadata: dict = field(default_factory=dict)
-
- def __post_init__(self) -> None:
- """数据校验"""
- if not self.action or not self.action.strip():
- raise ValueError("action 不能为空")
- if self.step_id < 0:
- raise ValueError("step_id 必须为非负整数")
-
- def update_status(self, new_status: StepStatus) -> None:
- """更新步骤状态
-
- Args:
- new_status: 新的状态值
- """
- self.status = new_status
-
- def to_dict(self) -> dict[str, Any]:
- """序列化为字典
-
- Returns:
- 包含所有字段的字典,Enum 转为字符串值
- """
- return {
- "step_id": self.step_id,
- "action": self.action,
- "tool_call_type": self.tool_call_type.value,
- "parameters": self.parameters,
- "preconditions": self.preconditions,
- "fallback": self.fallback,
- "status": self.status.value,
- "description": self.description,
- "requires_confirmation": self.requires_confirmation,
- "confirmation_message": self.confirmation_message,
- "metadata": self.metadata,
- }
-
- @classmethod
- def from_dict(cls, data: dict[str, Any]) -> PlanStep:
- """从字典反序列化
-
- Args:
- data: 包含步骤数据的字典
-
- Returns:
- PlanStep 实例
- """
- return cls(
- step_id=data["step_id"],
- action=data["action"],
- tool_call_type=ToolCallType(data.get("tool_call_type", "execute")),
- parameters=data.get("parameters", {}),
- preconditions=data.get("preconditions", {}),
- fallback=data.get("fallback"),
- status=StepStatus(data.get("status", "pending")),
- description=data.get("description", ""),
- requires_confirmation=data.get("requires_confirmation", False),
- confirmation_message=data.get("confirmation_message"),
- metadata=data.get("metadata", {}),
- )
- @dataclass
- class Plan:
- """计划
-
- 表示一个完整的执行计划,包含目标、步骤列表、风险等级等信息。
- 由 Planner 生成,由 Executor 执行。
-
- 属性:
- plan_id: 计划唯一标识
- goal: 计划目标描述
- reasoning: 推理过程说明
- risk_level: 风险等级
- requires_confirmation: 是否需要用户确认后执行
- confirmation_message: 确认提示消息
- steps: 步骤列表
- status: 计划状态
- created_at: 创建时间戳
- source: 计划来源,如 "rule_engine", "llm", "hybrid"
- metadata: 可扩展元数据
-
- 示例:
- >>> plan = Plan(
- ... plan_id="plan_001",
- ... goal="完成任务A",
- ... reasoning="因为条件满足",
- ... risk_level=RiskLevel.LOW,
- ... source="llm"
- ... )
- >>> plan.add_step(PlanStep(step_id=1, action="step1", parameters={}))
- >>> plan.add_step(PlanStep(step_id=2, action="step2", parameters={}))
- >>> pending = plan.get_pending_steps()
- """
-
- plan_id: str
- goal: str
- reasoning: str = ""
- risk_level: RiskLevel = RiskLevel.LOW
- requires_confirmation: bool = False
- confirmation_message: str | None = None
- steps: list[PlanStep] = field(default_factory=list)
- status: PlanStatus = PlanStatus.CREATED
- created_at: float = field(default_factory=time.time)
- source: str = "llm"
- metadata: dict = field(default_factory=dict)
-
- def __post_init__(self) -> None:
- """数据校验"""
- if not self.plan_id or not self.plan_id.strip():
- raise ValueError("plan_id 不能为空")
- if not self.goal or not self.goal.strip():
- raise ValueError("goal 不能为空")
-
- def add_step(self, step: PlanStep) -> None:
- """添加步骤
-
- Args:
- step: 要添加的 PlanStep
-
- Raises:
- ValueError: 如果 step_id 已存在
- """
- existing_ids = {s.step_id for s in self.steps}
- if step.step_id in existing_ids:
- raise ValueError(f"step_id {step.step_id} 已存在,请使用唯一ID")
- self.steps.append(step)
-
- def get_pending_steps(self) -> list[PlanStep]:
- """获取所有待执行的步骤
-
- Returns:
- 状态为 PENDING 的步骤列表
- """
- return [s for s in self.steps if s.status == StepStatus.PENDING]
-
- def get_current_step(self) -> PlanStep | None:
- """获取当前正在执行的步骤
-
- Returns:
- 第一个状态为 RUNNING 的步骤,如果没有则返回 None
- """
- for step in self.steps:
- if step.status == StepStatus.RUNNING:
- return step
- return None
-
- def get_waiting_confirmation_steps(self) -> list[PlanStep]:
- """获取所有等待确认的步骤
-
- Returns:
- 状态为 WAIT_CONFIRMATION 的步骤列表
- """
- return [s for s in self.steps if s.status == StepStatus.WAIT_CONFIRMATION]
-
- def is_finished(self) -> bool:
- """判断计划是否完成
-
- 完成条件:所有步骤都已执行(非 PENDING、RUNNING、WAIT_CONFIRMATION)
-
- Returns:
- 如果计划完成返回 True
- """
- return all(
- s.status not in (StepStatus.PENDING, StepStatus.RUNNING, StepStatus.WAIT_CONFIRMATION)
- for s in self.steps
- )
-
- def is_success(self) -> bool:
- """判断计划是否完全成功
-
- 成功条件:
- 1. 至少存在一个步骤
- 2. 所有步骤状态都为 SUCCESS
-
- Returns:
- 如果所有步骤都成功返回 True
- """
- if not self.steps:
- return False
- return all(s.status == StepStatus.SUCCESS for s in self.steps)
-
- def get_failed_steps(self) -> list[PlanStep]:
- """获取所有失败的步骤
-
- Returns:
- 状态为 FAILED 的步骤列表
- """
- return [s for s in self.steps if s.status == StepStatus.FAILED]
-
- def next_step_id(self) -> int:
- """获取下一个可用的步骤 ID
-
- Returns:
- 新的 step_id(当前最大 ID + 1)
- """
- if not self.steps:
- return 1
- return max(s.step_id for s in self.steps) + 1
-
- def to_dict(self) -> dict[str, Any]:
- """序列化为字典
-
- Returns:
- 包含所有字段的字典,Enum 转为字符串值
- """
- return {
- "plan_id": self.plan_id,
- "goal": self.goal,
- "reasoning": self.reasoning,
- "risk_level": self.risk_level.value,
- "requires_confirmation": self.requires_confirmation,
- "confirmation_message": self.confirmation_message,
- "steps": [s.to_dict() for s in self.steps],
- "status": self.status.value,
- "created_at": self.created_at,
- "source": self.source,
- "metadata": self.metadata,
- }
-
- @classmethod
- def from_dict(cls, data: dict[str, Any]) -> Plan:
- """从字典反序列化
-
- Args:
- data: 包含计划数据的字典
-
- Returns:
- Plan 实例
- """
- steps = [
- PlanStep.from_dict(s) for s in data.get("steps", [])
- ]
- return cls(
- plan_id=data["plan_id"],
- goal=data["goal"],
- reasoning=data.get("reasoning", ""),
- risk_level=RiskLevel(data.get("risk_level", "low")),
- requires_confirmation=data.get("requires_confirmation", False),
- confirmation_message=data.get("confirmation_message"),
- steps=steps,
- status=PlanStatus(data.get("status", "created")),
- created_at=data.get("created_at", time.time()),
- source=data.get("source", "llm"),
- metadata=data.get("metadata", {}),
- )
-
- @classmethod
- def create_new(
- cls,
- goal: str,
- reasoning: str = "",
- risk_level: RiskLevel = RiskLevel.LOW,
- source: str = "llm",
- ) -> Plan:
- """创建新计划的便捷工厂方法
-
- Args:
- goal: 计划目标
- reasoning: 推理过程
- risk_level: 风险等级
- source: 计划来源
-
- Returns:
- 新的 Plan 实例
- """
- return cls(
- plan_id=f"plan_{uuid.uuid4().hex[:8]}",
- goal=goal,
- reasoning=reasoning,
- risk_level=risk_level,
- source=source,
- )
- @dataclass
- class ExecutionResult:
- """执行结果
-
- 表示 Executor 执行某个步骤后的结果。
- 用于回传执行状态、输出数据等信息。
-
- 属性:
- plan_id: 关联的计划 ID
- step_id: 执行的步骤 ID
- success: 是否执行成功
- status: 执行状态
- message: 执行消息描述
- output: 执行输出数据
- timestamp: 执行时间戳
-
- 示例:
- >>> result = ExecutionResult(
- ... plan_id="plan_001",
- ... step_id=1,
- ... success=True,
- ... status=StepStatus.SUCCESS,
- ... message="执行成功",
- ... output={"result": "ok"}
- ... )
- >>> data = result.to_dict()
- """
-
- plan_id: str
- step_id: int
- success: bool
- status: StepStatus
- message: str = ""
- output: dict = field(default_factory=dict)
- timestamp: float = field(default_factory=time.time)
-
- def __post_init__(self) -> None:
- """数据校验"""
- if not self.plan_id or not self.plan_id.strip():
- raise ValueError("plan_id 不能为空")
- if self.step_id < 0:
- raise ValueError("step_id 必须为非负整数")
- # success 与 status 的一致性校验
- if self.status == StepStatus.SUCCESS and not self.success:
- raise ValueError("status=SUCCESS 时 success 必须为 True")
- if self.status == StepStatus.FAILED and self.success:
- raise ValueError("status=FAILED 时 success 必须为 False")
-
- def to_dict(self) -> dict[str, Any]:
- """序列化为字典
-
- Returns:
- 包含所有字段的字典,Enum 转为字符串值
- """
- return {
- "plan_id": self.plan_id,
- "step_id": self.step_id,
- "success": self.success,
- "status": self.status.value,
- "message": self.message,
- "output": self.output,
- "timestamp": self.timestamp,
- }
-
- @classmethod
- def from_dict(cls, data: dict[str, Any]) -> ExecutionResult:
- """从字典反序列化
-
- Args:
- data: 包含执行结果数据的字典
-
- Returns:
- ExecutionResult 实例
- """
- return cls(
- plan_id=data["plan_id"],
- step_id=data["step_id"],
- success=data.get("success", False),
- status=StepStatus(data.get("status", "failed")),
- message=data.get("message", ""),
- output=data.get("output", {}),
- timestamp=data.get("timestamp", time.time()),
- )
-
- @classmethod
- def success_result(
- cls,
- plan_id: str,
- step_id: int,
- message: str = "",
- output: dict | None = None,
- ) -> ExecutionResult:
- """创建成功执行结果的便捷方法
-
- Args:
- plan_id: 计划 ID
- step_id: 步骤 ID
- message: 成功消息
- output: 输出数据
-
- Returns:
- 成功的 ExecutionResult 实例
- """
- return cls(
- plan_id=plan_id,
- step_id=step_id,
- success=True,
- status=StepStatus.SUCCESS,
- message=message or "执行成功",
- output=output or {},
- )
-
- @classmethod
- def failure_result(
- cls,
- plan_id: str,
- step_id: int,
- message: str,
- output: dict | None = None,
- ) -> ExecutionResult:
- """创建失败执行结果的便捷方法
-
- Args:
- plan_id: 计划 ID
- step_id: 步骤 ID
- message: 失败原因
- output: 输出数据
-
- Returns:
- 失败的 ExecutionResult 实例
- """
- return cls(
- plan_id=plan_id,
- step_id=step_id,
- success=False,
- status=StepStatus.FAILED,
- message=message,
- output=output or {},
- )
-
- @classmethod
- def skipped_result(
- cls,
- plan_id: str,
- step_id: int,
- message: str = "步骤已跳过",
- ) -> ExecutionResult:
- """创建跳过执行结果的便捷方法
-
- Args:
- plan_id: 计划 ID
- step_id: 步骤 ID
- message: 跳过原因
-
- Returns:
- 跳过的 ExecutionResult 实例
- """
- return cls(
- plan_id=plan_id,
- step_id=step_id,
- success=True,
- status=StepStatus.SKIPPED,
- message=message,
- output={},
- )
- # =============================================================================
- # 工具函数
- # =============================================================================
- def create_plan_from_llm_response(
- llm_output: dict[str, Any],
- source: str = "llm",
- ) -> Plan:
- """从 LLM 输出创建 Plan 的辅助函数
-
- 将 LLM 返回的 JSON 结构转换为 Plan 对象。
-
- Args:
- llm_output: LLM 返回的字典数据
- source: 计划来源
-
- Returns:
- Plan 实例
-
- 示例:
- >>> llm_response = {
- ... "goal": "完成任务",
- ... "reasoning": "分析后决定",
- ... "risk_level": "low",
- ... "steps": [
- ... {"step_id": 1, "action": "step1", "parameters": {}}
- ... ]
- ... }
- >>> plan = create_plan_from_llm_response(llm_response)
- """
- plan = Plan(
- plan_id=llm_output.get("plan_id", f"plan_{uuid.uuid4().hex[:8]}"),
- goal=llm_output["goal"],
- reasoning=llm_output.get("reasoning", ""),
- risk_level=RiskLevel(llm_output.get("risk_level", "low")),
- requires_confirmation=llm_output.get("requires_confirmation", False),
- confirmation_message=llm_output.get("confirmation_message"),
- source=source,
- metadata=llm_output.get("metadata", {}),
- )
-
- for step_data in llm_output.get("steps", []):
- step = PlanStep.from_dict(step_data)
- plan.add_step(step)
-
- return plan
- # =============================================================================
- # 主程序入口(测试示例)
- # =============================================================================
- if __name__ == "__main__":
- import json
-
- print("=" * 70)
- print("Tool Protocol 测试演示")
- print("=" * 70)
-
- # -------------------------------------------------------------------------
- # 1. 创建 Plan
- # -------------------------------------------------------------------------
- print("\n[1] 创建 Plan")
- print("-" * 40)
-
- plan = Plan.create_new(
- goal="执行环境监测任务",
- reasoning="检测到环境参数异常,需要执行降温操作",
- risk_level=RiskLevel.MEDIUM,
- source="llm",
- )
- print(f"创建计划: {plan.plan_id}")
- print(f"目标: {plan.goal}")
- print(f"风险等级: {plan.risk_level.value}")
- print(f"来源: {plan.source}")
-
- # -------------------------------------------------------------------------
- # 2. 添加 PlanStep
- # -------------------------------------------------------------------------
- print("\n[2] 添加 PlanStep")
- print("-" * 40)
-
- # 步骤1:查询世界状态
- step1 = PlanStep(
- step_id=1,
- action="query_world_state",
- tool_call_type=ToolCallType.QUERY_WORLD,
- parameters={"query": "temperature"},
- description="查询当前温度",
- metadata={"source": "world_model"},
- )
- plan.add_step(step1)
-
- # 步骤2:询问用户确认
- step2 = PlanStep(
- step_id=2,
- action="confirm_action",
- tool_call_type=ToolCallType.ASK_USER,
- parameters={"prompt": "是否启动降温设备?"},
- description="等待用户确认",
- requires_confirmation=True,
- confirmation_message="当前温度过高,是否启动降温?",
- )
- plan.add_step(step2)
-
- # 步骤3:执行动作
- step3 = PlanStep(
- step_id=3,
- action="adjust_cooling",
- tool_call_type=ToolCallType.EXECUTE,
- parameters={"device_id": "cooler_001", "level": 3},
- preconditions={"temperature_above": 30},
- fallback="alert_operator",
- description="调整降温设备",
- metadata={"device": "cooler", "priority": "high"},
- )
- plan.add_step(step3)
-
- # 步骤4:语音通知
- step4 = PlanStep(
- step_id=4,
- action="speak_notification",
- tool_call_type=ToolCallType.SPEAK,
- parameters={"message": "降温设备已启动", "volume": 0.8},
- description="通知操作员",
- )
- plan.add_step(step4)
-
- print(f"添加了 {len(plan.steps)} 个步骤")
- for step in plan.steps:
- print(f" - Step {step.step_id}: {step.action} ({step.tool_call_type.value})")
-
- # -------------------------------------------------------------------------
- # 3. 序列化为 Dict/JSON
- # -------------------------------------------------------------------------
- print("\n[3] 序列化为 Dict/JSON")
- print("-" * 40)
-
- plan_dict = plan.to_dict()
- plan_json = json.dumps(plan_dict, indent=2, ensure_ascii=False)
- print("Plan JSON:")
- print(plan_json[:500] + "..." if len(plan_json) > 500 else plan_json)
-
- # -------------------------------------------------------------------------
- # 4. 从 Dict 恢复
- # -------------------------------------------------------------------------
- print("\n[4] 从 Dict 恢复 Plan")
- print("-" * 40)
-
- restored_plan = Plan.from_dict(plan_dict)
- print(f"恢复计划 ID: {restored_plan.plan_id}")
- print(f"恢复步骤数: {len(restored_plan.steps)}")
- print(f"恢复风险等级: {restored_plan.risk_level.value}")
-
- # 验证恢复正确性
- assert plan.plan_id == restored_plan.plan_id
- assert len(plan.steps) == len(restored_plan.steps)
- print("✓ 序列化/反序列化验证通过")
-
- # -------------------------------------------------------------------------
- # 5. Plan 操作演示
- # -------------------------------------------------------------------------
- print("\n[5] Plan 操作演示")
- print("-" * 40)
-
- # 更新步骤状态
- print(f"初始待执行步骤数: {len(plan.get_pending_steps())}")
-
- step1.update_status(StepStatus.RUNNING)
- print(f"Step 1 状态更新为: {step1.status.value}")
-
- step1.update_status(StepStatus.SUCCESS)
- print(f"Step 1 状态更新为: {step1.status.value}")
-
- step2.update_status(StepStatus.WAIT_CONFIRMATION)
- waiting = plan.get_waiting_confirmation_steps()
- print(f"等待确认的步骤: {[s.step_id for s in waiting]}")
-
- print(f"计划是否完成: {plan.is_finished()}")
- print(f"计划是否成功: {plan.is_success()}")
-
- # -------------------------------------------------------------------------
- # 6. ExecutionResult 演示
- # -------------------------------------------------------------------------
- print("\n[6] ExecutionResult 演示")
- print("-" * 40)
-
- # 创建成功结果
- result1 = ExecutionResult.success_result(
- plan_id=plan.plan_id,
- step_id=1,
- message="温度查询成功",
- output={"temperature": 32.5, "humidity": 65},
- )
- print(f"成功结果: {result1.to_dict()}")
-
- # 创建失败结果
- result2 = ExecutionResult.failure_result(
- plan_id=plan.plan_id,
- step_id=3,
- message="设备通信失败",
- output={"error_code": "E503", "device": "cooler_001"},
- )
- print(f"失败结果: {result2.to_dict()}")
-
- # 创建跳过结果
- result3 = ExecutionResult.skipped_result(
- plan_id=plan.plan_id,
- step_id=4,
- message="条件不满足,跳过通知",
- )
- print(f"跳过结果: {result3.to_dict()}")
-
- # -------------------------------------------------------------------------
- # 7. 从 LLM 输出创建 Plan
- # -------------------------------------------------------------------------
- print("\n[7] 从 LLM 输出创建 Plan")
- print("-" * 40)
-
- llm_response = {
- "goal": "自动巡检任务",
- "reasoning": "定时任务触发,执行标准巡检流程",
- "risk_level": "low",
- "requires_confirmation": False,
- "steps": [
- {
- "step_id": 1,
- "action": "move_to_location",
- "tool_call_type": "execute",
- "parameters": {"target": "zone_A"},
- "description": "移动到巡检区域A",
- },
- {
- "step_id": 2,
- "action": "capture_sensor_data",
- "tool_call_type": "execute",
- "parameters": {"sensors": ["temp", "humidity"]},
- "description": "采集传感器数据",
- },
- {
- "step_id": 3,
- "action": "speak_report",
- "tool_call_type": "speak",
- "parameters": {"content": "巡检完成"},
- "description": "报告巡检结果",
- },
- ],
- }
-
- llm_plan = create_plan_from_llm_response(llm_response, source="llm")
- print(f"从 LLM 创建计划: {llm_plan.plan_id}")
- print(f"目标: {llm_plan.goal}")
- print(f"步骤数: {len(llm_plan.steps)}")
-
- # -------------------------------------------------------------------------
- # 8. 枚举使用演示
- # -------------------------------------------------------------------------
- print("\n[8] 枚举使用演示")
- print("-" * 40)
-
- print("RiskLevel 枚举:")
- for level in RiskLevel:
- print(f" - {level.name} = {level.value}")
-
- print("\nStepStatus 枚举:")
- for status in StepStatus:
- print(f" - {status.name} = {status.value}")
-
- print("\nPlanStatus 枚举:")
- for status in PlanStatus:
- print(f" - {status.name} = {status.value}")
-
- print("\nToolCallType 枚举:")
- for tool_type in ToolCallType:
- print(f" - {tool_type.name} = {tool_type.value}")
-
- print("\n" + "=" * 70)
- print("测试演示完成")
- print("=" * 70)
|