| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440 |
- """
- planner.py - AI Agent 通用 Planner 核心模块
- 该模块负责根据世界状态、用户意图、可用能力生成标准化的 Plan 对象。
- 职责边界:
- - 接收输入:world_snapshot, user_intent, available_tools, domain_rules, planner_mode
- - 输出:标准 Plan 对象
- - 不执行 capability
- - 不依赖 ROS2
- - 不包含场景硬编码逻辑
- Planner 支持三种模式:
- - rule:纯规则生成
- - llm:LLM 生成(调用 llm_client)
- - hybrid:优先规则,复杂情况调用 LLM
- 使用方式:
- from planner import Planner, PlannerConfig
- from tool_protocol import RiskLevel
- config = PlannerConfig(default_risk_level=RiskLevel.LOW)
- planner = Planner(config)
- plan = planner.generate_plan(
- world_snapshot={"temperature": 35},
- user_intent="降温",
- available_tools=["adjust_fan", "speak"],
- )
-
- # 使用 LLM 模式
- plan = planner.generate_plan(
- world_snapshot={"temperature": 35},
- user_intent="降温",
- available_tools=["adjust_fan", "speak"],
- planner_mode="llm",
- )
- """
- from __future__ import annotations
- import time
- import uuid
- from dataclasses import dataclass, field
- from typing import Any
- from tool_protocol import (
- Plan,
- PlanStep,
- RiskLevel,
- PlanStatus,
- StepStatus,
- ToolCallType,
- )
- # =============================================================================
- # 通用意图映射层
- # =============================================================================
- # 通用意图关键词到 action 名称的映射
- # 用途:当用户意图包含某个关键词时,映射到对应的 action
- # 可扩展:后续可通过配置文件或知识库动态加载
- INTENT_TO_ACTION: dict[str, str] = {
- # 动作类
- "喂": "feed",
- "喂食": "feed",
- "喂养": "feed",
- "打开风扇": "adjust_fan",
- "关闭风扇": "adjust_fan",
- "调整风扇": "adjust_fan",
- "降温": "adjust_fan",
- "升温": "adjust_fan",
- "开灯": "control_light",
- "关灯": "control_light",
- "调整灯光": "control_light",
- "打开": "turn_on",
- "关闭": "turn_off",
- "启动": "turn_on",
- "停止": "turn_off",
- "调节": "adjust",
- "移动": "move",
- "前往": "move",
- "走到": "move",
- # 查询类
- "查询": "query",
- "查看": "query",
- "检查": "inspect",
- "巡检": "inspect",
- "监测": "monitor",
- "监控": "monitor",
- # 通信类
- "播报": "speak",
- "报告": "speak",
- "说": "speak",
- "告诉": "speak",
- "通知": "notify",
- "提醒": "remind",
- "警告": "warn",
- "询问": "ask",
- }
- # action 到 ToolCallType 的默认映射
- ACTION_TO_TOOL_CALL_TYPE: dict[str, ToolCallType] = {
- "feed": ToolCallType.EXECUTE,
- "adjust_fan": ToolCallType.EXECUTE,
- "control_light": ToolCallType.EXECUTE,
- "turn_on": ToolCallType.EXECUTE,
- "turn_off": ToolCallType.EXECUTE,
- "adjust": ToolCallType.EXECUTE,
- "move": ToolCallType.EXECUTE,
- "query": ToolCallType.QUERY_WORLD,
- "inspect": ToolCallType.QUERY_WORLD,
- "monitor": ToolCallType.QUERY_WORLD,
- "speak": ToolCallType.SPEAK,
- "notify": ToolCallType.SPEAK,
- "remind": ToolCallType.SPEAK,
- "warn": ToolCallType.SPEAK,
- "ask": ToolCallType.ASK_USER,
- }
- # 高风险关键词列表(通用)
- HIGH_RISK_KEYWORDS: list[str] = [
- "紧急", "危险", "停止", "关机", "断电", "全部", "清空",
- "emergency", "danger", "stop", "shutdown", "all", "clear",
- ]
- # 中风险关键词列表(通用)
- MEDIUM_RISK_KEYWORDS: list[str] = [
- "调整", "改变", "修改", "设置", "配置",
- "adjust", "change", "modify", "set", "configure",
- ]
- # =============================================================================
- # Planner 配置
- # =============================================================================
- @dataclass
- class PlannerConfig:
- """Planner 配置类
-
- 用于配置 Planner 的行为参数。
-
- 属性:
- default_risk_level: 默认风险等级
- require_confirmation_on_medium_risk: 中风险是否需要确认
- require_confirmation_on_high_risk: 高风险是否需要确认
- max_plan_steps: 单个 Plan 最大步骤数
- default_source: 默认计划来源
-
- 示例:
- >>> config = PlannerConfig(
- ... default_risk_level=RiskLevel.LOW,
- ... require_confirmation_on_medium_risk=True,
- ... require_confirmation_on_high_risk=True,
- ... max_plan_steps=10,
- ... default_source="hybrid"
- ... )
- """
-
- default_risk_level: RiskLevel = RiskLevel.LOW
- require_confirmation_on_medium_risk: bool = True
- require_confirmation_on_high_risk: bool = True
- max_plan_steps: int = 10
- default_source: str = "hybrid"
- # =============================================================================
- # Planner 核心类
- # =============================================================================
- class Planner:
- """通用 Planner 类
-
- 根据世界状态、用户意图、可用能力生成标准化 Plan 对象。
-
- 属性:
- config: Planner 配置
- intent_to_action: 意图到 action 的映射字典
- action_to_tool_type: action 到 ToolCallType 的映射字典
-
- 示例:
- >>> planner = Planner()
- >>> plan = planner.generate_plan(
- ... world_snapshot={"temperature": 30},
- ... user_intent="降温",
- ... available_tools=["adjust_fan", "speak"]
- ... )
- """
-
- def __init__(
- self,
- config: PlannerConfig | None = None,
- intent_to_action: dict[str, str] | None = None,
- action_to_tool_type: dict[str, ToolCallType] | None = None,
- ) -> None:
- """初始化 Planner
-
- Args:
- config: Planner 配置,None 时使用默认配置
- intent_to_action: 自定义意图映射,None 时使用内置映射
- action_to_tool_type: 自定义 action 类型映射,None 时使用内置映射
- """
- self.config = config or PlannerConfig()
- self.intent_to_action = intent_to_action or INTENT_TO_ACTION.copy()
- self.action_to_tool_type = action_to_tool_type or ACTION_TO_TOOL_CALL_TYPE.copy()
- self._cached_normalized_snapshot: dict[str, Any] | None = None
- self._cached_snapshot_raw: dict[str, Any] | None = None
-
- def generate_plan(
- self,
- world_snapshot: dict[str, Any],
- user_intent: str,
- available_tools: list[str],
- domain_rules: dict[str, Any] | None = None,
- planner_mode: str = "hybrid",
- ) -> Plan:
- """生成执行计划
-
- 根据输入信息生成标准化的 Plan 对象。
-
- Args:
- world_snapshot: 世界状态快照
- user_intent: 用户意图描述
- available_tools: 可用能力列表
- domain_rules: 领域规则约束,None 表示无约束
- planner_mode: 规划模式,可选 "rule" / "llm" / "hybrid"
-
- Returns:
- 标准化的 Plan 对象
-
- Raises:
- ValueError: 当 planner_mode 不合法
- """
- if not user_intent or not user_intent.strip():
- return self._generate_empty_intent_plan(world_snapshot, available_tools)
-
- mode = planner_mode.lower()
- if mode == "rule":
- return self._generate_rule_based_plan(
- world_snapshot, user_intent, available_tools, domain_rules
- )
- elif mode == "llm":
- return self._generate_llm_plan(
- world_snapshot, user_intent, available_tools, domain_rules
- )
- elif mode == "hybrid":
- return self._generate_hybrid_plan(
- world_snapshot, user_intent, available_tools, domain_rules
- )
- else:
- raise ValueError(f"无效的 planner_mode: {planner_mode},有效值为: rule/llm/hybrid")
-
- def _generate_rule_based_plan(
- self,
- world_snapshot: dict[str, Any],
- user_intent: str,
- available_tools: list[str],
- domain_rules: dict[str, Any] | None,
- ) -> Plan:
- """基于规则生成 Plan
-
- 内部方法,使用规则引擎生成计划。
- Args:
- world_snapshot: 世界状态快照
- user_intent: 用户意图描述
- available_tools: 可用能力列表
- domain_rules: 领域规则约束
-
- Returns:
- Plan 对象
- """
- normalized = self._normalize_world_snapshot(world_snapshot)
- action = self._map_intent_to_action(user_intent)
- context = self._extract_recent_context(normalized)
- requires_confirm, confirm_msg = self._needs_confirmation(
- normalized, user_intent, action, domain_rules
- )
- risk_level = self._assess_risk(user_intent, action, normalized, domain_rules)
- plan_id = f"plan_{uuid.uuid4().hex[:8]}"
- if action is None:
- return self._generate_unknown_intent_plan(
- plan_id, user_intent, normalized, available_tools, risk_level, context
- )
- if not self._validate_tool_available(action, available_tools):
- return self._generate_tool_unavailable_plan(
- plan_id, user_intent, action, normalized, risk_level, context
- )
- return self._generate_executable_plan(
- plan_id, user_intent, action, normalized,
- risk_level, requires_confirm, confirm_msg, context
- )
-
- def _generate_llm_plan(
- self,
- world_snapshot: dict[str, Any],
- user_intent: str,
- available_tools: list[str],
- domain_rules: dict[str, Any] | None,
- ) -> Plan:
- """LLM 规划器实现
-
- 内部方法,调用 LLM 生成计划。
- Args:
- world_snapshot: 世界状态快照
- user_intent: 用户意图描述
- available_tools: 可用能力列表
- domain_rules: 领域规则约束
-
- Returns:
- Plan 对象
- """
- # 导入 LLM 客户端(延迟导入避免循环依赖)
- try:
- from llm_client import LLMPlannerClient
- llm_client = LLMPlannerClient()
- except ImportError:
- # LLM 客户端不可用,返回安全 fallback
- return self._generate_safe_fallback_plan(
- user_intent=user_intent,
- reason="llm_client_unavailable",
- )
- # 提取上下文(使用 normalize 后的结果)
- normalized = self._normalize_world_snapshot(world_snapshot)
- context = self._extract_recent_context(normalized)
- # 构造 Planner 配置
- planner_config = {
- "default_risk_level": self.config.default_risk_level.value,
- "require_confirmation_on_medium_risk": self.config.require_confirmation_on_medium_risk,
- "require_confirmation_on_high_risk": self.config.require_confirmation_on_high_risk,
- "max_plan_steps": self.config.max_plan_steps,
- }
- try:
- # 调用 LLM 生成 Plan JSON
- plan_json = llm_client.generate_plan_json(
- user_intent=user_intent,
- world_snapshot=normalized,
- available_tools=available_tools,
- domain_rules=domain_rules,
- planner_config=planner_config,
- )
- # 使用 Plan.from_dict() 转换
- plan = Plan.from_dict(plan_json)
- return plan
- except Exception as e:
- # LLM 调用失败,返回安全 fallback
- return self._generate_safe_fallback_plan(
- user_intent=user_intent,
- reason=f"llm_error: {e}",
- )
-
- def _generate_hybrid_plan(
- self,
- world_snapshot: dict[str, Any],
- user_intent: str,
- available_tools: list[str],
- domain_rules: dict[str, Any] | None,
- ) -> Plan:
- """混合模式生成 Plan
-
- 内部方法,优先使用规则,简单情况直接生成,复杂情况调用 LLM。
- Args:
- world_snapshot: 世界状态快照
- user_intent: 用户意图描述
- available_tools: 可用能力列表
- domain_rules: 领域规则约束
-
- Returns:
- Plan 对象
- """
- normalized = self._normalize_world_snapshot(world_snapshot)
- action = self._map_intent_to_action(user_intent)
- context = self._extract_recent_context(normalized)
- # 判断是否需要委托给 LLM
- if self._should_delegate_to_llm(
- normalized, user_intent, available_tools, domain_rules, action
- ):
- return self._generate_llm_plan(
- normalized, user_intent, available_tools, domain_rules
- )
- # 规则可处理,直接生成
- requires_confirm, confirm_msg = self._needs_confirmation(
- normalized, user_intent, action, domain_rules
- )
- risk_level = self._assess_risk(user_intent, action, normalized, domain_rules)
- plan_id = f"plan_{uuid.uuid4().hex[:8]}"
- if action is None:
- return self._generate_unknown_intent_plan(
- plan_id, user_intent, normalized, available_tools, risk_level, context
- )
- if not self._validate_tool_available(action, available_tools):
- return self._generate_tool_unavailable_plan(
- plan_id, user_intent, action, normalized, risk_level, context
- )
- return self._generate_executable_plan(
- plan_id, user_intent, action, normalized,
- risk_level, requires_confirm, confirm_msg, context
- )
-
- def _generate_executable_plan(
- self,
- plan_id: str,
- user_intent: str,
- action: str,
- world_snapshot: dict[str, Any],
- risk_level: RiskLevel,
- requires_confirm: bool,
- confirm_msg: str | None,
- context: dict[str, Any],
- ) -> Plan:
- """生成可执行的 Plan
-
- 内部方法,生成包含具体执行步骤的 Plan。
- 当 requires_confirm == True 时,生成 ASK_USER/SPEAK 询问步骤而非 EXECUTE 执行步骤,
- 以确保 Executor 不会在用户确认前误执行风险动作。
- Args:
- plan_id: 计划 ID
- user_intent: 用户意图
- action: 动作名称
- world_snapshot: 世界状态(应传入 normalized 后的结果)
- risk_level: 风险等级
- requires_confirm: 是否需要确认
- confirm_msg: 确认消息
- context: 提取的上下文
-
- Returns:
- 可执行的 Plan 对象
- """
- reasoning_parts = [
- f"用户意图: {user_intent}",
- f"匹配动作: {action}",
- f"风险等级: {risk_level.value}",
- ]
- if context:
- reasoning_parts.append(f"相关上下文: {context}")
- if requires_confirm and confirm_msg:
- reasoning_parts.append(f"需要确认: {confirm_msg}")
- plan = Plan(
- plan_id=plan_id,
- goal=user_intent,
- reasoning="; ".join(reasoning_parts),
- risk_level=risk_level,
- requires_confirmation=requires_confirm,
- confirmation_message=confirm_msg,
- status=PlanStatus.CREATED,
- source=self.config.default_source,
- )
- # 当需要确认时,生成询问/告知步骤而非执行步骤
- if requires_confirm:
- step = PlanStep(
- step_id=1,
- action="ask_user",
- tool_call_type=ToolCallType.ASK_USER,
- parameters={
- "user_intent": user_intent,
- "context": context,
- "suggested_action": action,
- "question": confirm_msg or f"您确定要执行 '{action}' 吗?",
- },
- description=f"询问用户确认: {action}",
- requires_confirmation=True,
- confirmation_message=confirm_msg,
- )
- else:
- # 不需要确认时,生成正常的执行步骤
- step = PlanStep(
- step_id=1,
- action=action,
- tool_call_type=self.action_to_tool_type.get(action, ToolCallType.EXECUTE),
- parameters={"user_intent": user_intent, "context": context},
- description=f"执行动作: {action}",
- )
- plan.add_step(step)
- return plan
-
- def _generate_tool_unavailable_plan(
- self,
- plan_id: str,
- user_intent: str,
- action: str,
- world_snapshot: dict[str, Any],
- risk_level: RiskLevel,
- context: dict[str, Any],
- ) -> Plan:
- """生成工具不可用时的 Plan
-
- 内部方法,当请求的能力不在可用列表中时调用。
-
- Args:
- plan_id: 计划 ID
- user_intent: 用户意图
- action: 尝试的动作名称
- world_snapshot: 世界状态
- risk_level: 风险等级
- context: 提取的上下文
-
- Returns:
- 提示能力不可用的 Plan 对象
- """
- plan = Plan(
- plan_id=plan_id,
- goal=f"处理不可用能力请求: {user_intent}",
- reasoning=(
- f"用户请求执行动作 '{action}',但该能力当前不可用。"
- f"用户意图: {user_intent}"
- ),
- risk_level=RiskLevel.LOW,
- requires_confirmation=False,
- status=PlanStatus.CREATED,
- source=self.config.default_source,
- )
-
- step = PlanStep(
- step_id=1,
- action="speak",
- tool_call_type=ToolCallType.SPEAK,
- parameters={
- "message": f"抱歉,当前系统不支持 '{action}' 操作。"
- f"您的请求 '{user_intent}' 无法完成。",
- "level": "info",
- },
- description=f"通知用户能力不可用: {action}",
- )
- plan.add_step(step)
-
- return plan
-
- def _generate_unknown_intent_plan(
- self,
- plan_id: str,
- user_intent: str,
- world_snapshot: dict[str, Any],
- available_tools: list[str],
- risk_level: RiskLevel,
- context: dict[str, Any],
- ) -> Plan:
- """生成意图不清晰时的 Plan
-
- 内部方法,当无法从用户意图中识别出具体动作时调用。
-
- Args:
- plan_id: 计划 ID
- user_intent: 用户意图
- world_snapshot: 世界状态
- available_tools: 可用能力列表
- risk_level: 风险等级
- context: 提取的上下文
-
- Returns:
- 需要进一步确认意图的 Plan 对象
- """
- plan = Plan(
- plan_id=plan_id,
- goal=f"澄清用户意图: {user_intent}",
- reasoning=(
- f"无法从用户意图 '{user_intent}' 中识别出具体动作。"
- f"可用能力: {available_tools}"
- ),
- risk_level=RiskLevel.LOW,
- requires_confirmation=False,
- status=PlanStatus.CREATED,
- source=self.config.default_source,
- )
-
- query_step = PlanStep(
- step_id=1,
- action="clarify_intent",
- tool_call_type=ToolCallType.ASK_USER,
- parameters={
- "original_intent": user_intent,
- "available_actions": available_tools,
- },
- description=f"询问用户确认具体意图: {user_intent}",
- requires_confirmation=True,
- confirmation_message=f"您是想要执行以下哪个操作?{', '.join(available_tools[:5])}",
- )
- plan.add_step(query_step)
-
- return plan
-
- def _generate_empty_intent_plan(
- self,
- world_snapshot: dict[str, Any],
- available_tools: list[str],
- ) -> Plan:
- """生成空意图的 Plan
-
- 内部方法,当用户意图为空时调用。
-
- Args:
- world_snapshot: 世界状态
- available_tools: 可用能力列表
-
- Returns:
- 提示输入意图的 Plan 对象
- """
- plan = Plan(
- plan_id=f"plan_{uuid.uuid4().hex[:8]}",
- goal="等待用户输入",
- reasoning="用户未提供有效意图",
- risk_level=RiskLevel.LOW,
- requires_confirmation=False,
- status=PlanStatus.CREATED,
- source=self.config.default_source,
- )
-
- step = PlanStep(
- step_id=1,
- action="speak",
- tool_call_type=ToolCallType.SPEAK,
- parameters={"message": "您好,请告诉我您想要执行的操作。"},
- description="等待用户输入有效意图",
- )
- plan.add_step(step)
-
- return plan
-
- def _needs_confirmation(
- self,
- world_snapshot: dict[str, Any],
- user_intent: str,
- action: str | None,
- domain_rules: dict[str, Any] | None,
- ) -> tuple[bool, str | None]:
- """判断是否需要用户确认
-
- 根据世界状态、用户意图、动作类型和领域规则判断。
- 优先使用 _normalize_world_snapshot() 转换后的结果。
- Args:
- world_snapshot: 世界状态快照
- user_intent: 用户意图
- action: 识别的动作名称
- domain_rules: 领域规则
-
- Returns:
- (是否需要确认, 确认消息)
- """
- if action is None:
- return True, "无法识别您的意图,请确认您想要的操作。"
- normalized = self._normalize_world_snapshot(world_snapshot)
- if self._has_world_conflict(normalized, action, domain_rules):
- return True, f"检测到与当前状态的潜在冲突,请确认是否继续执行 '{action}'?"
- risk_level = self._assess_risk(user_intent, action, normalized, domain_rules)
- if risk_level == RiskLevel.HIGH and self.config.require_confirmation_on_high_risk:
- return True, f"该操作风险较高 '{action}',是否确认执行?"
- if risk_level == RiskLevel.MEDIUM and self.config.require_confirmation_on_medium_risk:
- return True, f"执行操作 '{action}' 前,请确认。"
- return False, None
-
- def _has_world_conflict(
- self,
- world_snapshot: dict[str, Any],
- action: str,
- domain_rules: dict[str, Any] | None,
- ) -> bool:
- """检查世界状态是否存在冲突
-
- 内部方法,检测可能导致执行冲突的状态。
- 优先使用 _normalize_world_snapshot() 转换后的结果。
- Args:
- world_snapshot: 世界状态快照
- action: 动作名称
- domain_rules: 领域规则
-
- Returns:
- 是否存在冲突
- """
- normalized = self._normalize_world_snapshot(world_snapshot)
- if normalized.get("actuator_status"):
- actuator = normalized.get("actuator_status", {})
- if isinstance(actuator, dict):
- for key in ["unavailable", "error", "offline"]:
- if key in actuator and actuator[key]:
- return True
- if normalized.get("recent_actions"):
- recent = normalized.get("recent_actions", [])
- if isinstance(recent, list) and len(recent) > 0:
- last_action = recent[-1] if recent else None
- if last_action and last_action.get("action") == action:
- return True
- return False
-
- def _assess_risk(
- self,
- user_intent: str,
- action: str | None,
- world_snapshot: dict[str, Any],
- domain_rules: dict[str, Any] | None,
- ) -> RiskLevel:
- """评估风险等级
-
- 内部方法,根据多个因素综合评估风险。
- 优先使用 _normalize_world_snapshot() 转换后的结果。
- Args:
- user_intent: 用户意图
- action: 动作名称
- world_snapshot: 世界状态
- domain_rules: 领域规则
-
- Returns:
- 风险等级
- """
- normalized = self._normalize_world_snapshot(world_snapshot)
- if action is None:
- return RiskLevel.LOW
- for keyword in HIGH_RISK_KEYWORDS:
- if keyword in user_intent:
- return RiskLevel.HIGH
- for keyword in MEDIUM_RISK_KEYWORDS:
- if keyword in user_intent:
- return RiskLevel.MEDIUM
- if domain_rules and "high_risk_actions" in domain_rules:
- if action in domain_rules["high_risk_actions"]:
- return RiskLevel.HIGH
- return self.config.default_risk_level
-
- def _should_delegate_to_llm(
- self,
- world_snapshot: dict[str, Any],
- user_intent: str,
- available_tools: list[str],
- domain_rules: dict[str, Any] | None,
- action: str | None,
- ) -> bool:
- """判断是否应该委托给 LLM
-
- 内部方法,根据多个因素判断是否需要 LLM 介入。
-
- 委托条件:
- 1. action 无法明确映射(意图复杂)
- 2. 世界状态存在冲突
- 3. 需要结合领域规则做高层判断
- 4. 多步骤规划需求
- 5. 意图语义模糊但可理解
-
- Args:
- world_snapshot: 世界状态快照
- user_intent: 用户意图描述
- available_tools: 可用能力列表
- domain_rules: 领域规则约束
- action: 识别的动作(None 表示未识别)
-
- Returns:
- 是否应该委托给 LLM
- """
- domain_rules = domain_rules or {}
-
- # 条件1:action 无法明确映射
- if action is None:
- # 检查是否有任何关键词匹配
- has_partial_match = False
- for keyword in self.intent_to_action.keys():
- if keyword in user_intent:
- has_partial_match = True
- break
- # 有部分匹配但无法确定具体 action,需要 LLM
- if has_partial_match:
- return True
-
- # 条件2:世界状态存在冲突
- if self._has_world_conflict(world_snapshot, action, domain_rules):
- # 检查冲突是否严重到需要 LLM
- if self._is_complex_conflict(world_snapshot, action):
- return True
-
- # 条件3:涉及领域规则限制
- if domain_rules:
- high_risk_actions = domain_rules.get("high_risk_actions", [])
- medium_risk_actions = domain_rules.get("medium_risk_actions", [])
- if action in high_risk_actions or action in medium_risk_actions:
- # 风险动作需要更复杂判断
- return True
-
- # 条件4:多步骤意图(包含连接词)
- multi_step_keywords = ["然后", "接下来", "之后", "再", "和", "以及", "并且", "再然后"]
- if any(kw in user_intent for kw in multi_step_keywords):
- return True
-
- # 条件5:意图包含多个动作词
- action_count = 0
- for keyword in self.intent_to_action.keys():
- if keyword in user_intent:
- action_count += 1
- if action_count > 1:
- return True
-
- # 条件6:意图不完整但包含上下文暗示
- incomplete_indicators = ["它", "那个", "这个", "刚才", "上次", "再"]
- if any(ind in user_intent for ind in incomplete_indicators):
- if action is None:
- return True
-
- return False
-
- def _is_complex_conflict(
- self,
- world_snapshot: dict[str, Any],
- action: str | None,
- ) -> bool:
- """判断是否为复杂冲突
-
- 复杂冲突需要 LLM 介入判断。
- 优先使用 _normalize_world_snapshot() 转换后的结果。
- Args:
- world_snapshot: 世界状态
- action: 动作名称
-
- Returns:
- 是否为复杂冲突
- """
- normalized = self._normalize_world_snapshot(world_snapshot)
- # 检查 recent_actions 中是否有相同动作
- recent_actions = normalized.get("recent_actions", [])
- if isinstance(recent_actions, list) and len(recent_actions) > 0:
- last_action = recent_actions[-1] if recent_actions else {}
- if isinstance(last_action, dict) and last_action.get("action") == action:
- # 相同动作,检查时间间隔
- if "timestamp" in last_action:
- time_diff = time.time() - last_action.get("timestamp", 0)
- # 短时间内重复动作视为复杂冲突
- if time_diff < 600: # 10分钟内
- return True
- # 检查是否有多个警告
- warnings = normalized.get("warnings", [])
- if isinstance(warnings, list) and len(warnings) > 1:
- return True
- return False
-
- def _generate_safe_fallback_plan(
- self,
- user_intent: str,
- reason: str = "",
- ) -> Plan:
- """生成安全的 fallback Plan
-
- 当 LLM 调用失败时,返回一个安全的 fallback plan。
- 优先使用 ASK_USER 或 SPEAK,不直接执行风险动作。
-
- Args:
- user_intent: 用户原始意图
- reason: 回退原因
-
- Returns:
- 安全的 Plan 对象
- """
- plan = Plan(
- plan_id=f"plan_fallback_{uuid.uuid4().hex[:8]}",
- goal=f"无法处理的请求: {user_intent}",
- reasoning=f"LLM 处理失败或响应无效 ({reason}),返回安全 fallback",
- risk_level=RiskLevel.LOW,
- requires_confirmation=False,
- confirmation_message=None,
- status=PlanStatus.CREATED,
- source="llm_fallback",
- metadata={"fallback": True, "reason": reason},
- )
-
- step = PlanStep(
- step_id=1,
- action="ask_user",
- tool_call_type=ToolCallType.ASK_USER,
- parameters={
- "question": f"抱歉,我无法理解或处理您的请求: {user_intent}。请重新描述您的需求。",
- },
- preconditions={},
- fallback=None,
- status=StepStatus.PENDING,
- description="询问用户澄清意图",
- requires_confirmation=False,
- confirmation_message=None,
- metadata={"fallback_reason": reason},
- )
- plan.add_step(step)
-
- return plan
-
- def _validate_tool_available(self, tool_name: str, available_tools: list[str]) -> bool:
- """验证工具是否可用
-
- 内部方法,检查请求的动作是否在可用能力列表中。
-
- Args:
- tool_name: 工具/动作名称
- available_tools: 可用能力列表
-
- Returns:
- 是否可用
- """
- if not available_tools:
- return False
- return tool_name in available_tools
-
- def _normalize_world_snapshot(self, world_snapshot: dict[str, Any]) -> dict[str, Any]:
- """将真实 /world/snapshot 结构统一转换为 Planner 扁平上下文
- 真实 world_snapshot 结构可能包含嵌套的 environment、system、entities 等字段,
- 此方法将其转换为扁平的、易于访问的格式。
- 对齐真实结构:
- - environment: temperature, humidity, light_level, warnings, errors
- - system: device_status, actuator_status, mode
- - entities: 与 actuator/device/recent action 相关的信息
- Args:
- world_snapshot: 原始世界状态快照
- Returns:
- 扁平的 normalized dict
- """
- # 如果快照没有变化,使用缓存
- if self._cached_snapshot_raw is world_snapshot and self._cached_normalized_snapshot is not None:
- return self._cached_normalized_snapshot
- normalized: dict[str, Any] = {
- "temperature": None,
- "humidity": None,
- "light_level": None,
- "warnings": [],
- "errors": [],
- "device_status": {},
- "actuator_status": {},
- "mode": None,
- "recent_actions": [],
- "raw_entities": {},
- }
- # 处理 environment 字段(常见于 /world/snapshot)
- environment = world_snapshot.get("environment", {})
- if isinstance(environment, dict):
- normalized["temperature"] = environment.get("temperature") or environment.get("temperature_ambient")
- normalized["humidity"] = environment.get("humidity") or environment.get("humidity_ambient")
- normalized["light_level"] = environment.get("light_level") or environment.get("illuminance")
- normalized["warnings"] = environment.get("warnings", [])
- normalized["errors"] = environment.get("errors", [])
- # 处理 system 字段
- system = world_snapshot.get("system", {})
- if isinstance(system, dict):
- normalized["device_status"] = system.get("device_status", {})
- normalized["actuator_status"] = system.get("actuator_status", {})
- normalized["mode"] = system.get("mode") or system.get("operating_mode")
- # 直接字段(已经是扁平格式)
- for key in ["temperature", "humidity", "light_level", "warnings", "errors",
- "device_status", "actuator_status", "mode"]:
- if key not in environment and key in world_snapshot:
- if normalized.get(key) is None or normalized.get(key) == [] or normalized.get(key) == {}:
- normalized[key] = world_snapshot.get(key)
- # 处理 entities 字段(提取与 actuator/device 相关信息)
- entities = world_snapshot.get("entities", {})
- if isinstance(entities, dict):
- actuators = entities.get("actuator", [])
- devices = entities.get("device", [])
- if isinstance(actuators, list):
- for act in actuators:
- if isinstance(act, dict):
- device_id = act.get("entity_id") or act.get("device_id", "unknown")
- normalized["raw_entities"][f"actuator_{device_id}"] = act
- if isinstance(devices, list):
- for dev in devices:
- if isinstance(dev, dict):
- device_id = dev.get("entity_id") or dev.get("device_id", "unknown")
- normalized["raw_entities"][f"device_{device_id}"] = dev
- # 处理 recent_actions(可能在顶层或 recent_context 下)
- recent_context = world_snapshot.get("recent_context", {}) or world_snapshot.get("recent_actions", [])
- if isinstance(recent_context, dict):
- normalized["recent_actions"] = recent_context.get("recent_actions", [])
- elif isinstance(recent_context, list):
- normalized["recent_actions"] = recent_context
- # 如果顶层直接有 recent_actions
- if not normalized["recent_actions"] and "recent_actions" in world_snapshot:
- normalized["recent_actions"] = world_snapshot.get("recent_actions", [])
- # 缓存结果
- self._cached_snapshot_raw = world_snapshot
- self._cached_normalized_snapshot = normalized
- return normalized
- def _extract_recent_context(self, world_snapshot: dict[str, Any]) -> dict[str, Any]:
- """提取与当前决策相关的上下文
-
- 内部方法,从世界状态中提取关键信息。
- 优先使用 _normalize_world_snapshot() 转换后的结果。
-
- Args:
- world_snapshot: 世界状态快照
-
- Returns:
- 相关的上下文字典
- """
- normalized = self._normalize_world_snapshot(world_snapshot)
- context_keys = [
- "temperature",
- "humidity",
- "light_level",
- "device_status",
- "actuator_status",
- "recent_actions",
- "warnings",
- "errors",
- ]
- context = {}
- for key in context_keys:
- if key in normalized and normalized[key] is not None:
- context[key] = normalized[key]
- if normalized.get("raw_entities"):
- context["raw_entities"] = normalized["raw_entities"]
- return context
-
- def _map_intent_to_action(self, user_intent: str) -> str | None:
- """将用户意图映射到 action
-
- 内部方法,通过关键词匹配查找对应的 action。
- 按关键词长度从长到短排序后再匹配,避免短关键词优先匹配导致误判。
- Args:
- user_intent: 用户意图描述
-
- Returns:
- 匹配到的 action 名称,未匹配到返回 None
- """
- if not user_intent:
- return None
- intent_lower = user_intent.lower()
- # 按关键词长度从长到短排序,避免"喂"比"喂食"先匹配
- for keyword in sorted(self.intent_to_action.keys(), key=len, reverse=True):
- if keyword in intent_lower or keyword in user_intent:
- return self.intent_to_action[keyword]
- return None
-
- def register_intent_mapping(self, keyword: str, action: str) -> None:
- """注册新的意图映射
-
- 公共方法,运行时添加新的意图到 action 的映射。
-
- Args:
- keyword: 意图关键词
- action: 对应的 action 名称
- """
- self.intent_to_action[keyword] = action
-
- def register_tool_type(self, action: str, tool_type: ToolCallType) -> None:
- """注册 action 到 ToolCallType 的映射
-
- 公共方法,运行时添加新的映射。
-
- Args:
- action: action 名称
- tool_type: 对应的 ToolCallType
- """
- self.action_to_tool_type[action] = tool_type
- # =============================================================================
- # 主程序入口(测试示例)
- # =============================================================================
- if __name__ == "__main__":
- import json
-
- print("=" * 70)
- print("Planner 测试演示")
- print("=" * 70)
-
- # 初始化 Planner
- planner = Planner()
-
- # -------------------------------------------------------------------------
- # 构造测试数据
- # -------------------------------------------------------------------------
- world_snapshot = {
- "temperature": 32.5,
- "humidity": 70,
- "actuator_status": {
- "fan": "running",
- "light": "on",
- },
- "recent_actions": [
- {"action": "feed", "timestamp": time.time() - 300},
- ],
- "warnings": [],
- "errors": [],
- }
-
- available_tools = [
- "feed",
- "adjust_fan",
- "speak",
- "query",
- "move",
- "inspect",
- ]
-
- # -------------------------------------------------------------------------
- # 场景 1:直接可执行(工具存在 + 风险低)
- # -------------------------------------------------------------------------
- print("\n[场景 1] 直接可执行")
- print("-" * 40)
-
- user_intent_1 = "打开风扇降温"
- plan_1 = planner.generate_plan(
- world_snapshot=world_snapshot,
- user_intent=user_intent_1,
- available_tools=available_tools,
- planner_mode="rule",
- )
-
- print(f"用户意图: {user_intent_1}")
- print(f"Plan ID: {plan_1.plan_id}")
- print(f"目标: {plan_1.goal}")
- print(f"风险等级: {plan_1.risk_level.value}")
- print(f"需要确认: {plan_1.requires_confirmation}")
- print(f"步骤数: {len(plan_1.steps)}")
- if plan_1.steps:
- step = plan_1.steps[0]
- print(f" → Step {step.step_id}: {step.action} ({step.tool_call_type.value})")
-
- # -------------------------------------------------------------------------
- # 场景 2:需要确认(风险中高或世界冲突)
- # -------------------------------------------------------------------------
- print("\n[场景 2] 需要确认")
- print("-" * 40)
-
- world_snapshot_conflict = {
- "temperature": 32.5,
- "actuator_status": {
- "fan": "error",
- },
- "recent_actions": [
- {"action": "adjust_fan", "timestamp": time.time() - 60},
- ],
- }
-
- user_intent_2 = "再次降温"
- plan_2 = planner.generate_plan(
- world_snapshot=world_snapshot_conflict,
- user_intent=user_intent_2,
- available_tools=available_tools,
- planner_mode="rule",
- )
-
- print(f"用户意图: {user_intent_2}")
- print(f"Plan ID: {plan_2.plan_id}")
- print(f"需要确认: {plan_2.requires_confirmation}")
- print(f"确认消息: {plan_2.confirmation_message}")
- print(f"步骤数: {len(plan_2.steps)}")
- if plan_2.steps:
- step = plan_2.steps[0]
- print(f" → Step {step.step_id}: {step.action} ({step.tool_call_type.value})")
- print(f" 需要确认: {step.requires_confirmation}")
-
- # -------------------------------------------------------------------------
- # 场景 3:工具不存在
- # -------------------------------------------------------------------------
- print("\n[场景 3] 工具不存在")
- print("-" * 40)
-
- user_intent_3 = "打开空调"
- available_tools_3 = ["fan", "light", "speak"]
-
- plan_3 = planner.generate_plan(
- world_snapshot=world_snapshot,
- user_intent=user_intent_3,
- available_tools=available_tools_3,
- planner_mode="rule",
- )
-
- print(f"用户意图: {user_intent_3}")
- print(f"可用工具: {available_tools_3}")
- print(f"Plan ID: {plan_3.plan_id}")
- print(f"目标: {plan_3.goal}")
- print(f"步骤数: {len(plan_3.steps)}")
- if plan_3.steps:
- step = plan_3.steps[0]
- print(f" → Step {step.step_id}: {step.action} ({step.tool_call_type.value})")
- print(f" 消息: {step.parameters.get('message', '')}")
-
- # -------------------------------------------------------------------------
- # 场景 4:LLM 模式(真实实现)
- # -------------------------------------------------------------------------
- print("\n[场景 4] LLM 模式")
- print("-" * 40)
-
- user_intent_4 = "打开风扇降温"
- plan_4 = planner.generate_plan(
- world_snapshot=world_snapshot,
- user_intent=user_intent_4,
- available_tools=available_tools,
- planner_mode="llm",
- )
-
- print(f"用户意图: {user_intent_4}")
- print(f"Plan ID: {plan_4.plan_id}")
- print(f"来源: {plan_4.source}")
- print(f"目标: {plan_4.goal}")
- print(f"风险等级: {plan_4.risk_level.value}")
- print(f"步骤数: {len(plan_4.steps)}")
- if plan_4.steps:
- step = plan_4.steps[0]
- print(f" → Step {step.step_id}: {step.action} ({step.tool_call_type.value})")
-
- # -------------------------------------------------------------------------
- # 场景 5:LLM 模式 - 喂食询问
- # -------------------------------------------------------------------------
- print("\n[场景 5] LLM 模式 - 喂食询问")
- print("-" * 40)
-
- user_intent_5 = "再喂一次"
- world_snapshot_recent_feed = {
- "temperature": 28,
- "last_feed_time": "2026-04-10T10:00:00",
- "recent_actions": [
- {"action": "feed", "timestamp": time.time() - 300},
- ],
- }
- plan_5 = planner.generate_plan(
- world_snapshot=world_snapshot_recent_feed,
- user_intent=user_intent_5,
- available_tools=["feed", "speak", "query"],
- planner_mode="llm",
- )
-
- print(f"用户意图: {user_intent_5}")
- print(f"Plan ID: {plan_5.plan_id}")
- print(f"来源: {plan_5.source}")
- print(f"目标: {plan_5.goal}")
- print(f"风险等级: {plan_5.risk_level.value}")
- print(f"步骤数: {len(plan_5.steps)}")
- if plan_5.steps:
- step = plan_5.steps[0]
- print(f" → Step {step.step_id}: {step.action} ({step.tool_call_type.value})")
- if step.tool_call_type == ToolCallType.ASK_USER:
- print(f" 询问: {step.parameters.get('question', 'N/A')}")
-
- # -------------------------------------------------------------------------
- # 场景 6:Hybrid 模式 - 简单场景走规则
- # -------------------------------------------------------------------------
- print("\n[场景 6] Hybrid 模式 - 简单场景")
- print("-" * 40)
-
- user_intent_6 = "查询温度"
- plan_6 = planner.generate_plan(
- world_snapshot=world_snapshot,
- user_intent=user_intent_6,
- available_tools=available_tools,
- planner_mode="hybrid",
- )
-
- print(f"用户意图: {user_intent_6}")
- print(f"Plan ID: {plan_6.plan_id}")
- print(f"来源: {plan_6.source}")
- print(f"风险等级: {plan_6.risk_level.value}")
-
- # -------------------------------------------------------------------------
- # 场景 7:Hybrid 模式 - 复杂场景走 LLM
- # -------------------------------------------------------------------------
- print("\n[场景 7] Hybrid 模式 - 复杂场景")
- print("-" * 40)
-
- user_intent_7 = "先降温然后喂食" # 多步骤意图
- plan_7 = planner.generate_plan(
- world_snapshot=world_snapshot,
- user_intent=user_intent_7,
- available_tools=available_tools,
- planner_mode="hybrid",
- )
-
- print(f"用户意图: {user_intent_7}")
- print(f"Plan ID: {plan_7.plan_id}")
- print(f"来源: {plan_7.source}")
- print(f"目标: {plan_7.goal}")
- print(f"风险等级: {plan_7.risk_level.value}")
- print(f"步骤数: {len(plan_7.steps)}")
- for step in plan_7.steps:
- print(f" → Step {step.step_id}: {step.action} ({step.tool_call_type.value})")
-
- # -------------------------------------------------------------------------
- # 场景 8:意图不清晰 - 走规则未知意图
- # -------------------------------------------------------------------------
- print("\n[场景 8] 意图不清晰")
- print("-" * 40)
-
- user_intent_8 = "随便弄一下"
- plan_8 = planner.generate_plan(
- world_snapshot=world_snapshot,
- user_intent=user_intent_8,
- available_tools=available_tools,
- planner_mode="rule",
- )
-
- print(f"用户意图: {user_intent_8}")
- print(f"Plan ID: {plan_8.plan_id}")
- print(f"目标: {plan_8.goal}")
- print(f"步骤数: {len(plan_8.steps)}")
- if plan_8.steps:
- step = plan_8.steps[0]
- print(f" → Step {step.step_id}: {step.action} ({step.tool_call_type.value})")
-
- # -------------------------------------------------------------------------
- # 场景 9:工具不存在
- # -------------------------------------------------------------------------
- print("\n[场景 9] 工具不存在")
- print("-" * 40)
-
- user_intent_9 = "打开空调"
- available_tools_9 = ["fan", "light", "speak"]
-
- plan_9 = planner.generate_plan(
- world_snapshot=world_snapshot,
- user_intent=user_intent_9,
- available_tools=available_tools_9,
- planner_mode="rule",
- )
-
- print(f"用户意图: {user_intent_9}")
- print(f"可用工具: {available_tools_9}")
- print(f"Plan ID: {plan_9.plan_id}")
- print(f"目标: {plan_9.goal}")
- print(f"步骤数: {len(plan_9.steps)}")
- if plan_9.steps:
- step = plan_9.steps[0]
- print(f" → Step {step.step_id}: {step.action} ({step.tool_call_type.value})")
- print(f" 消息: {step.parameters.get('message', '')}")
-
- # -------------------------------------------------------------------------
- # 场景 10:LLM 工具不存在场景
- # -------------------------------------------------------------------------
- print("\n[场景 10] LLM 模式 - 工具不存在")
- print("-" * 40)
-
- user_intent_10 = "打开空调"
- plan_10 = planner.generate_plan(
- world_snapshot={"temperature": 30},
- user_intent=user_intent_10,
- available_tools=["adjust_fan", "speak"],
- planner_mode="llm",
- )
-
- print(f"用户意图: {user_intent_10}")
- print(f"Plan ID: {plan_10.plan_id}")
- print(f"来源: {plan_10.source}")
- print(f"目标: {plan_10.goal}")
- print(f"步骤数: {len(plan_10.steps)}")
- if plan_10.steps:
- step = plan_10.steps[0]
- print(f" → Step {step.step_id}: {step.action} ({step.tool_call_type.value})")
-
- # -------------------------------------------------------------------------
- # 输出完整 JSON
- # -------------------------------------------------------------------------
- print("\n[完整 JSON 输出]")
- print("-" * 40)
-
- print("\n场景 1 Plan JSON (rule 模式):")
- print(json.dumps(plan_1.to_dict(), indent=2, ensure_ascii=False))
-
- print("\n场景 4 Plan JSON (llm 模式):")
- print(json.dumps(plan_4.to_dict(), indent=2, ensure_ascii=False))
-
- # -------------------------------------------------------------------------
- # 测试自定义意图映射
- # -------------------------------------------------------------------------
- print("\n[自定义意图映射测试]")
- print("-" * 40)
-
- planner.register_intent_mapping("打扫", "clean")
- planner.register_tool_type("clean", ToolCallType.EXECUTE)
-
- user_intent_custom = "帮我打扫一下"
- plan_custom = planner.generate_plan(
- world_snapshot=world_snapshot,
- user_intent=user_intent_custom,
- available_tools=["clean", "speak"],
- planner_mode="rule",
- )
-
- print(f"用户意图: {user_intent_custom}")
- print(f"识别动作: {plan_custom.steps[0].action if plan_custom.steps else 'None'}")
-
- print("\n" + "=" * 70)
- print("Planner 测试演示完成")
- print("=" * 70)
|