tool_protocol.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846
  1. """
  2. tool_protocol.py - AI Agent 标准协议层
  3. 该模块定义 AI Agent 系统中 "计划" 和 "动作" 的标准数据结构,
  4. 用于连接 WorldModel → Planner → Executor → Capability。
  5. 设计原则:
  6. 1. 协议层只定义结构,不执行任何 Capability
  7. 2. 协议层不负责 ROS 通信,不依赖 rclpy
  8. 3. 完全通用,无场景硬编码
  9. 4. JSON 序列化友好,适合 LLM 直接输出
  10. 使用方式:
  11. from tool_protocol import Plan, PlanStep, ExecutionResult
  12. # 创建计划
  13. plan = Plan(plan_id="plan_001", goal="执行任务")
  14. plan.add_step(PlanStep(step_id=1, action="action_name", parameters={}))
  15. # 序列化为 JSON
  16. json_data = plan.to_dict()
  17. # 从 JSON 恢复
  18. plan = Plan.from_dict(json_data)
  19. """
  20. from __future__ import annotations
  21. import time
  22. import uuid
  23. from dataclasses import dataclass, field
  24. from enum import Enum
  25. from typing import Any, Optional
  26. # =============================================================================
  27. # 枚举定义
  28. # =============================================================================
  29. class RiskLevel(Enum):
  30. """计划风险等级枚举"""
  31. LOW = "low"
  32. MEDIUM = "medium"
  33. HIGH = "high"
  34. class StepStatus(Enum):
  35. """单个步骤状态枚举"""
  36. PENDING = "pending"
  37. RUNNING = "running"
  38. SUCCESS = "success"
  39. FAILED = "failed"
  40. SKIPPED = "skipped"
  41. WAIT_CONFIRMATION = "wait_confirmation"
  42. class PlanStatus(Enum):
  43. """整体计划状态枚举"""
  44. CREATED = "created"
  45. APPROVED = "approved"
  46. EXECUTING = "executing"
  47. COMPLETED = "completed"
  48. FAILED = "failed"
  49. CANCELLED = "cancelled"
  50. WAIT_CONFIRMATION = "wait_confirmation"
  51. class ToolCallType(Enum):
  52. """步骤动作类型枚举
  53. 用于区分步骤的性质,帮助 Planner 和 Executor 理解步骤意图。
  54. """
  55. EXECUTE = "execute" # 执行具体动作
  56. ASK_USER = "ask_user" # 询问用户确认
  57. SPEAK = "speak" # 语音/文本输出
  58. QUERY_WORLD = "query_world" # 查询世界状态
  59. QUERY_KNOWLEDGE = "query_knowledge" # 查询知识库
  60. NOOP = "noop" # 空操作
  61. # =============================================================================
  62. # 数据类定义
  63. # =============================================================================
  64. @dataclass
  65. class PlanStep:
  66. """计划步骤
  67. 表示计划中的单个执行步骤,包含动作、参数、前置条件等信息。
  68. 属性:
  69. step_id: 步骤唯一标识,在一个 Plan 内必须唯一
  70. action: 动作名称,如 "execute_action", "speak", "query_world"
  71. tool_call_type: 动作类型,用于区分步骤性质
  72. parameters: 动作参数字典
  73. preconditions: 执行前检查条件
  74. fallback: 失败后的回退动作名称,None 表示无回退
  75. status: 当前步骤状态
  76. description: 步骤描述说明
  77. requires_confirmation: 是否需要用户确认
  78. confirmation_message: 确认提示消息
  79. metadata: 可扩展元数据
  80. 示例:
  81. >>> step = PlanStep(
  82. ... step_id=1,
  83. ... action="adjust_fan",
  84. ... tool_call_type=ToolCallType.EXECUTE,
  85. ... parameters={"fan_id": "fan_001", "speed": 3},
  86. ... description="调整风扇速度"
  87. ... )
  88. >>> step.update_status(StepStatus.RUNNING)
  89. """
  90. step_id: int
  91. action: str
  92. parameters: dict = field(default_factory=dict)
  93. preconditions: dict = field(default_factory=dict)
  94. fallback: str | None = None
  95. status: StepStatus = StepStatus.PENDING
  96. description: str = ""
  97. tool_call_type: ToolCallType = ToolCallType.EXECUTE
  98. requires_confirmation: bool = False
  99. confirmation_message: str | None = None
  100. metadata: dict = field(default_factory=dict)
  101. def __post_init__(self) -> None:
  102. """数据校验"""
  103. if not self.action or not self.action.strip():
  104. raise ValueError("action 不能为空")
  105. if self.step_id < 0:
  106. raise ValueError("step_id 必须为非负整数")
  107. def update_status(self, new_status: StepStatus) -> None:
  108. """更新步骤状态
  109. Args:
  110. new_status: 新的状态值
  111. """
  112. self.status = new_status
  113. def to_dict(self) -> dict[str, Any]:
  114. """序列化为字典
  115. Returns:
  116. 包含所有字段的字典,Enum 转为字符串值
  117. """
  118. return {
  119. "step_id": self.step_id,
  120. "action": self.action,
  121. "tool_call_type": self.tool_call_type.value,
  122. "parameters": self.parameters,
  123. "preconditions": self.preconditions,
  124. "fallback": self.fallback,
  125. "status": self.status.value,
  126. "description": self.description,
  127. "requires_confirmation": self.requires_confirmation,
  128. "confirmation_message": self.confirmation_message,
  129. "metadata": self.metadata,
  130. }
  131. @classmethod
  132. def from_dict(cls, data: dict[str, Any]) -> PlanStep:
  133. """从字典反序列化
  134. Args:
  135. data: 包含步骤数据的字典
  136. Returns:
  137. PlanStep 实例
  138. """
  139. return cls(
  140. step_id=data["step_id"],
  141. action=data["action"],
  142. tool_call_type=ToolCallType(data.get("tool_call_type", "execute")),
  143. parameters=data.get("parameters", {}),
  144. preconditions=data.get("preconditions", {}),
  145. fallback=data.get("fallback"),
  146. status=StepStatus(data.get("status", "pending")),
  147. description=data.get("description", ""),
  148. requires_confirmation=data.get("requires_confirmation", False),
  149. confirmation_message=data.get("confirmation_message"),
  150. metadata=data.get("metadata", {}),
  151. )
  152. @dataclass
  153. class Plan:
  154. """计划
  155. 表示一个完整的执行计划,包含目标、步骤列表、风险等级等信息。
  156. 由 Planner 生成,由 Executor 执行。
  157. 属性:
  158. plan_id: 计划唯一标识
  159. goal: 计划目标描述
  160. reasoning: 推理过程说明
  161. risk_level: 风险等级
  162. requires_confirmation: 是否需要用户确认后执行
  163. confirmation_message: 确认提示消息
  164. steps: 步骤列表
  165. status: 计划状态
  166. created_at: 创建时间戳
  167. source: 计划来源,如 "rule_engine", "llm", "hybrid"
  168. metadata: 可扩展元数据
  169. 示例:
  170. >>> plan = Plan(
  171. ... plan_id="plan_001",
  172. ... goal="完成任务A",
  173. ... reasoning="因为条件满足",
  174. ... risk_level=RiskLevel.LOW,
  175. ... source="llm"
  176. ... )
  177. >>> plan.add_step(PlanStep(step_id=1, action="step1", parameters={}))
  178. >>> plan.add_step(PlanStep(step_id=2, action="step2", parameters={}))
  179. >>> pending = plan.get_pending_steps()
  180. """
  181. plan_id: str
  182. goal: str
  183. reasoning: str = ""
  184. risk_level: RiskLevel = RiskLevel.LOW
  185. requires_confirmation: bool = False
  186. confirmation_message: str | None = None
  187. steps: list[PlanStep] = field(default_factory=list)
  188. status: PlanStatus = PlanStatus.CREATED
  189. created_at: float = field(default_factory=time.time)
  190. source: str = "llm"
  191. metadata: dict = field(default_factory=dict)
  192. def __post_init__(self) -> None:
  193. """数据校验"""
  194. if not self.plan_id or not self.plan_id.strip():
  195. raise ValueError("plan_id 不能为空")
  196. if not self.goal or not self.goal.strip():
  197. raise ValueError("goal 不能为空")
  198. def add_step(self, step: PlanStep) -> None:
  199. """添加步骤
  200. Args:
  201. step: 要添加的 PlanStep
  202. Raises:
  203. ValueError: 如果 step_id 已存在
  204. """
  205. existing_ids = {s.step_id for s in self.steps}
  206. if step.step_id in existing_ids:
  207. raise ValueError(f"step_id {step.step_id} 已存在,请使用唯一ID")
  208. self.steps.append(step)
  209. def get_pending_steps(self) -> list[PlanStep]:
  210. """获取所有待执行的步骤
  211. Returns:
  212. 状态为 PENDING 的步骤列表
  213. """
  214. return [s for s in self.steps if s.status == StepStatus.PENDING]
  215. def get_current_step(self) -> PlanStep | None:
  216. """获取当前正在执行的步骤
  217. Returns:
  218. 第一个状态为 RUNNING 的步骤,如果没有则返回 None
  219. """
  220. for step in self.steps:
  221. if step.status == StepStatus.RUNNING:
  222. return step
  223. return None
  224. def get_waiting_confirmation_steps(self) -> list[PlanStep]:
  225. """获取所有等待确认的步骤
  226. Returns:
  227. 状态为 WAIT_CONFIRMATION 的步骤列表
  228. """
  229. return [s for s in self.steps if s.status == StepStatus.WAIT_CONFIRMATION]
  230. def is_finished(self) -> bool:
  231. """判断计划是否完成
  232. 完成条件:所有步骤都已执行(非 PENDING、RUNNING、WAIT_CONFIRMATION)
  233. Returns:
  234. 如果计划完成返回 True
  235. """
  236. return all(
  237. s.status not in (StepStatus.PENDING, StepStatus.RUNNING, StepStatus.WAIT_CONFIRMATION)
  238. for s in self.steps
  239. )
  240. def is_success(self) -> bool:
  241. """判断计划是否完全成功
  242. 成功条件:
  243. 1. 至少存在一个步骤
  244. 2. 所有步骤状态都为 SUCCESS
  245. Returns:
  246. 如果所有步骤都成功返回 True
  247. """
  248. if not self.steps:
  249. return False
  250. return all(s.status == StepStatus.SUCCESS for s in self.steps)
  251. def get_failed_steps(self) -> list[PlanStep]:
  252. """获取所有失败的步骤
  253. Returns:
  254. 状态为 FAILED 的步骤列表
  255. """
  256. return [s for s in self.steps if s.status == StepStatus.FAILED]
  257. def next_step_id(self) -> int:
  258. """获取下一个可用的步骤 ID
  259. Returns:
  260. 新的 step_id(当前最大 ID + 1)
  261. """
  262. if not self.steps:
  263. return 1
  264. return max(s.step_id for s in self.steps) + 1
  265. def to_dict(self) -> dict[str, Any]:
  266. """序列化为字典
  267. Returns:
  268. 包含所有字段的字典,Enum 转为字符串值
  269. """
  270. return {
  271. "plan_id": self.plan_id,
  272. "goal": self.goal,
  273. "reasoning": self.reasoning,
  274. "risk_level": self.risk_level.value,
  275. "requires_confirmation": self.requires_confirmation,
  276. "confirmation_message": self.confirmation_message,
  277. "steps": [s.to_dict() for s in self.steps],
  278. "status": self.status.value,
  279. "created_at": self.created_at,
  280. "source": self.source,
  281. "metadata": self.metadata,
  282. }
  283. @classmethod
  284. def from_dict(cls, data: dict[str, Any]) -> Plan:
  285. """从字典反序列化
  286. Args:
  287. data: 包含计划数据的字典
  288. Returns:
  289. Plan 实例
  290. """
  291. steps = [
  292. PlanStep.from_dict(s) for s in data.get("steps", [])
  293. ]
  294. return cls(
  295. plan_id=data["plan_id"],
  296. goal=data["goal"],
  297. reasoning=data.get("reasoning", ""),
  298. risk_level=RiskLevel(data.get("risk_level", "low")),
  299. requires_confirmation=data.get("requires_confirmation", False),
  300. confirmation_message=data.get("confirmation_message"),
  301. steps=steps,
  302. status=PlanStatus(data.get("status", "created")),
  303. created_at=data.get("created_at", time.time()),
  304. source=data.get("source", "llm"),
  305. metadata=data.get("metadata", {}),
  306. )
  307. @classmethod
  308. def create_new(
  309. cls,
  310. goal: str,
  311. reasoning: str = "",
  312. risk_level: RiskLevel = RiskLevel.LOW,
  313. source: str = "llm",
  314. ) -> Plan:
  315. """创建新计划的便捷工厂方法
  316. Args:
  317. goal: 计划目标
  318. reasoning: 推理过程
  319. risk_level: 风险等级
  320. source: 计划来源
  321. Returns:
  322. 新的 Plan 实例
  323. """
  324. return cls(
  325. plan_id=f"plan_{uuid.uuid4().hex[:8]}",
  326. goal=goal,
  327. reasoning=reasoning,
  328. risk_level=risk_level,
  329. source=source,
  330. )
  331. @dataclass
  332. class ExecutionResult:
  333. """执行结果
  334. 表示 Executor 执行某个步骤后的结果。
  335. 用于回传执行状态、输出数据等信息。
  336. 属性:
  337. plan_id: 关联的计划 ID
  338. step_id: 执行的步骤 ID
  339. success: 是否执行成功
  340. status: 执行状态
  341. message: 执行消息描述
  342. output: 执行输出数据
  343. timestamp: 执行时间戳
  344. 示例:
  345. >>> result = ExecutionResult(
  346. ... plan_id="plan_001",
  347. ... step_id=1,
  348. ... success=True,
  349. ... status=StepStatus.SUCCESS,
  350. ... message="执行成功",
  351. ... output={"result": "ok"}
  352. ... )
  353. >>> data = result.to_dict()
  354. """
  355. plan_id: str
  356. step_id: int
  357. success: bool
  358. status: StepStatus
  359. message: str = ""
  360. output: dict = field(default_factory=dict)
  361. timestamp: float = field(default_factory=time.time)
  362. def __post_init__(self) -> None:
  363. """数据校验"""
  364. if not self.plan_id or not self.plan_id.strip():
  365. raise ValueError("plan_id 不能为空")
  366. if self.step_id < 0:
  367. raise ValueError("step_id 必须为非负整数")
  368. # success 与 status 的一致性校验
  369. if self.status == StepStatus.SUCCESS and not self.success:
  370. raise ValueError("status=SUCCESS 时 success 必须为 True")
  371. if self.status == StepStatus.FAILED and self.success:
  372. raise ValueError("status=FAILED 时 success 必须为 False")
  373. def to_dict(self) -> dict[str, Any]:
  374. """序列化为字典
  375. Returns:
  376. 包含所有字段的字典,Enum 转为字符串值
  377. """
  378. return {
  379. "plan_id": self.plan_id,
  380. "step_id": self.step_id,
  381. "success": self.success,
  382. "status": self.status.value,
  383. "message": self.message,
  384. "output": self.output,
  385. "timestamp": self.timestamp,
  386. }
  387. @classmethod
  388. def from_dict(cls, data: dict[str, Any]) -> ExecutionResult:
  389. """从字典反序列化
  390. Args:
  391. data: 包含执行结果数据的字典
  392. Returns:
  393. ExecutionResult 实例
  394. """
  395. return cls(
  396. plan_id=data["plan_id"],
  397. step_id=data["step_id"],
  398. success=data.get("success", False),
  399. status=StepStatus(data.get("status", "failed")),
  400. message=data.get("message", ""),
  401. output=data.get("output", {}),
  402. timestamp=data.get("timestamp", time.time()),
  403. )
  404. @classmethod
  405. def success_result(
  406. cls,
  407. plan_id: str,
  408. step_id: int,
  409. message: str = "",
  410. output: dict | None = None,
  411. ) -> ExecutionResult:
  412. """创建成功执行结果的便捷方法
  413. Args:
  414. plan_id: 计划 ID
  415. step_id: 步骤 ID
  416. message: 成功消息
  417. output: 输出数据
  418. Returns:
  419. 成功的 ExecutionResult 实例
  420. """
  421. return cls(
  422. plan_id=plan_id,
  423. step_id=step_id,
  424. success=True,
  425. status=StepStatus.SUCCESS,
  426. message=message or "执行成功",
  427. output=output or {},
  428. )
  429. @classmethod
  430. def failure_result(
  431. cls,
  432. plan_id: str,
  433. step_id: int,
  434. message: str,
  435. output: dict | None = None,
  436. ) -> ExecutionResult:
  437. """创建失败执行结果的便捷方法
  438. Args:
  439. plan_id: 计划 ID
  440. step_id: 步骤 ID
  441. message: 失败原因
  442. output: 输出数据
  443. Returns:
  444. 失败的 ExecutionResult 实例
  445. """
  446. return cls(
  447. plan_id=plan_id,
  448. step_id=step_id,
  449. success=False,
  450. status=StepStatus.FAILED,
  451. message=message,
  452. output=output or {},
  453. )
  454. @classmethod
  455. def skipped_result(
  456. cls,
  457. plan_id: str,
  458. step_id: int,
  459. message: str = "步骤已跳过",
  460. ) -> ExecutionResult:
  461. """创建跳过执行结果的便捷方法
  462. Args:
  463. plan_id: 计划 ID
  464. step_id: 步骤 ID
  465. message: 跳过原因
  466. Returns:
  467. 跳过的 ExecutionResult 实例
  468. """
  469. return cls(
  470. plan_id=plan_id,
  471. step_id=step_id,
  472. success=True,
  473. status=StepStatus.SKIPPED,
  474. message=message,
  475. output={},
  476. )
  477. # =============================================================================
  478. # 工具函数
  479. # =============================================================================
  480. def create_plan_from_llm_response(
  481. llm_output: dict[str, Any],
  482. source: str = "llm",
  483. ) -> Plan:
  484. """从 LLM 输出创建 Plan 的辅助函数
  485. 将 LLM 返回的 JSON 结构转换为 Plan 对象。
  486. Args:
  487. llm_output: LLM 返回的字典数据
  488. source: 计划来源
  489. Returns:
  490. Plan 实例
  491. 示例:
  492. >>> llm_response = {
  493. ... "goal": "完成任务",
  494. ... "reasoning": "分析后决定",
  495. ... "risk_level": "low",
  496. ... "steps": [
  497. ... {"step_id": 1, "action": "step1", "parameters": {}}
  498. ... ]
  499. ... }
  500. >>> plan = create_plan_from_llm_response(llm_response)
  501. """
  502. plan = Plan(
  503. plan_id=llm_output.get("plan_id", f"plan_{uuid.uuid4().hex[:8]}"),
  504. goal=llm_output["goal"],
  505. reasoning=llm_output.get("reasoning", ""),
  506. risk_level=RiskLevel(llm_output.get("risk_level", "low")),
  507. requires_confirmation=llm_output.get("requires_confirmation", False),
  508. confirmation_message=llm_output.get("confirmation_message"),
  509. source=source,
  510. metadata=llm_output.get("metadata", {}),
  511. )
  512. for step_data in llm_output.get("steps", []):
  513. step = PlanStep.from_dict(step_data)
  514. plan.add_step(step)
  515. return plan
  516. # =============================================================================
  517. # 主程序入口(测试示例)
  518. # =============================================================================
  519. if __name__ == "__main__":
  520. import json
  521. print("=" * 70)
  522. print("Tool Protocol 测试演示")
  523. print("=" * 70)
  524. # -------------------------------------------------------------------------
  525. # 1. 创建 Plan
  526. # -------------------------------------------------------------------------
  527. print("\n[1] 创建 Plan")
  528. print("-" * 40)
  529. plan = Plan.create_new(
  530. goal="执行环境监测任务",
  531. reasoning="检测到环境参数异常,需要执行降温操作",
  532. risk_level=RiskLevel.MEDIUM,
  533. source="llm",
  534. )
  535. print(f"创建计划: {plan.plan_id}")
  536. print(f"目标: {plan.goal}")
  537. print(f"风险等级: {plan.risk_level.value}")
  538. print(f"来源: {plan.source}")
  539. # -------------------------------------------------------------------------
  540. # 2. 添加 PlanStep
  541. # -------------------------------------------------------------------------
  542. print("\n[2] 添加 PlanStep")
  543. print("-" * 40)
  544. # 步骤1:查询世界状态
  545. step1 = PlanStep(
  546. step_id=1,
  547. action="query_world_state",
  548. tool_call_type=ToolCallType.QUERY_WORLD,
  549. parameters={"query": "temperature"},
  550. description="查询当前温度",
  551. metadata={"source": "world_model"},
  552. )
  553. plan.add_step(step1)
  554. # 步骤2:询问用户确认
  555. step2 = PlanStep(
  556. step_id=2,
  557. action="confirm_action",
  558. tool_call_type=ToolCallType.ASK_USER,
  559. parameters={"prompt": "是否启动降温设备?"},
  560. description="等待用户确认",
  561. requires_confirmation=True,
  562. confirmation_message="当前温度过高,是否启动降温?",
  563. )
  564. plan.add_step(step2)
  565. # 步骤3:执行动作
  566. step3 = PlanStep(
  567. step_id=3,
  568. action="adjust_cooling",
  569. tool_call_type=ToolCallType.EXECUTE,
  570. parameters={"device_id": "cooler_001", "level": 3},
  571. preconditions={"temperature_above": 30},
  572. fallback="alert_operator",
  573. description="调整降温设备",
  574. metadata={"device": "cooler", "priority": "high"},
  575. )
  576. plan.add_step(step3)
  577. # 步骤4:语音通知
  578. step4 = PlanStep(
  579. step_id=4,
  580. action="speak_notification",
  581. tool_call_type=ToolCallType.SPEAK,
  582. parameters={"message": "降温设备已启动", "volume": 0.8},
  583. description="通知操作员",
  584. )
  585. plan.add_step(step4)
  586. print(f"添加了 {len(plan.steps)} 个步骤")
  587. for step in plan.steps:
  588. print(f" - Step {step.step_id}: {step.action} ({step.tool_call_type.value})")
  589. # -------------------------------------------------------------------------
  590. # 3. 序列化为 Dict/JSON
  591. # -------------------------------------------------------------------------
  592. print("\n[3] 序列化为 Dict/JSON")
  593. print("-" * 40)
  594. plan_dict = plan.to_dict()
  595. plan_json = json.dumps(plan_dict, indent=2, ensure_ascii=False)
  596. print("Plan JSON:")
  597. print(plan_json[:500] + "..." if len(plan_json) > 500 else plan_json)
  598. # -------------------------------------------------------------------------
  599. # 4. 从 Dict 恢复
  600. # -------------------------------------------------------------------------
  601. print("\n[4] 从 Dict 恢复 Plan")
  602. print("-" * 40)
  603. restored_plan = Plan.from_dict(plan_dict)
  604. print(f"恢复计划 ID: {restored_plan.plan_id}")
  605. print(f"恢复步骤数: {len(restored_plan.steps)}")
  606. print(f"恢复风险等级: {restored_plan.risk_level.value}")
  607. # 验证恢复正确性
  608. assert plan.plan_id == restored_plan.plan_id
  609. assert len(plan.steps) == len(restored_plan.steps)
  610. print("✓ 序列化/反序列化验证通过")
  611. # -------------------------------------------------------------------------
  612. # 5. Plan 操作演示
  613. # -------------------------------------------------------------------------
  614. print("\n[5] Plan 操作演示")
  615. print("-" * 40)
  616. # 更新步骤状态
  617. print(f"初始待执行步骤数: {len(plan.get_pending_steps())}")
  618. step1.update_status(StepStatus.RUNNING)
  619. print(f"Step 1 状态更新为: {step1.status.value}")
  620. step1.update_status(StepStatus.SUCCESS)
  621. print(f"Step 1 状态更新为: {step1.status.value}")
  622. step2.update_status(StepStatus.WAIT_CONFIRMATION)
  623. waiting = plan.get_waiting_confirmation_steps()
  624. print(f"等待确认的步骤: {[s.step_id for s in waiting]}")
  625. print(f"计划是否完成: {plan.is_finished()}")
  626. print(f"计划是否成功: {plan.is_success()}")
  627. # -------------------------------------------------------------------------
  628. # 6. ExecutionResult 演示
  629. # -------------------------------------------------------------------------
  630. print("\n[6] ExecutionResult 演示")
  631. print("-" * 40)
  632. # 创建成功结果
  633. result1 = ExecutionResult.success_result(
  634. plan_id=plan.plan_id,
  635. step_id=1,
  636. message="温度查询成功",
  637. output={"temperature": 32.5, "humidity": 65},
  638. )
  639. print(f"成功结果: {result1.to_dict()}")
  640. # 创建失败结果
  641. result2 = ExecutionResult.failure_result(
  642. plan_id=plan.plan_id,
  643. step_id=3,
  644. message="设备通信失败",
  645. output={"error_code": "E503", "device": "cooler_001"},
  646. )
  647. print(f"失败结果: {result2.to_dict()}")
  648. # 创建跳过结果
  649. result3 = ExecutionResult.skipped_result(
  650. plan_id=plan.plan_id,
  651. step_id=4,
  652. message="条件不满足,跳过通知",
  653. )
  654. print(f"跳过结果: {result3.to_dict()}")
  655. # -------------------------------------------------------------------------
  656. # 7. 从 LLM 输出创建 Plan
  657. # -------------------------------------------------------------------------
  658. print("\n[7] 从 LLM 输出创建 Plan")
  659. print("-" * 40)
  660. llm_response = {
  661. "goal": "自动巡检任务",
  662. "reasoning": "定时任务触发,执行标准巡检流程",
  663. "risk_level": "low",
  664. "requires_confirmation": False,
  665. "steps": [
  666. {
  667. "step_id": 1,
  668. "action": "move_to_location",
  669. "tool_call_type": "execute",
  670. "parameters": {"target": "zone_A"},
  671. "description": "移动到巡检区域A",
  672. },
  673. {
  674. "step_id": 2,
  675. "action": "capture_sensor_data",
  676. "tool_call_type": "execute",
  677. "parameters": {"sensors": ["temp", "humidity"]},
  678. "description": "采集传感器数据",
  679. },
  680. {
  681. "step_id": 3,
  682. "action": "speak_report",
  683. "tool_call_type": "speak",
  684. "parameters": {"content": "巡检完成"},
  685. "description": "报告巡检结果",
  686. },
  687. ],
  688. }
  689. llm_plan = create_plan_from_llm_response(llm_response, source="llm")
  690. print(f"从 LLM 创建计划: {llm_plan.plan_id}")
  691. print(f"目标: {llm_plan.goal}")
  692. print(f"步骤数: {len(llm_plan.steps)}")
  693. # -------------------------------------------------------------------------
  694. # 8. 枚举使用演示
  695. # -------------------------------------------------------------------------
  696. print("\n[8] 枚举使用演示")
  697. print("-" * 40)
  698. print("RiskLevel 枚举:")
  699. for level in RiskLevel:
  700. print(f" - {level.name} = {level.value}")
  701. print("\nStepStatus 枚举:")
  702. for status in StepStatus:
  703. print(f" - {status.name} = {status.value}")
  704. print("\nPlanStatus 枚举:")
  705. for status in PlanStatus:
  706. print(f" - {status.name} = {status.value}")
  707. print("\nToolCallType 枚举:")
  708. for tool_type in ToolCallType:
  709. print(f" - {tool_type.name} = {tool_type.value}")
  710. print("\n" + "=" * 70)
  711. print("测试演示完成")
  712. print("=" * 70)