""" planner.py - AI Agent 通用 Planner 核心模块 该模块负责根据世界状态、用户意图、可用能力生成标准化的 Plan 对象。 职责边界: - 接收输入:world_snapshot, user_intent, available_tools, domain_rules, planner_mode - 输出:标准 Plan 对象 - 不执行 capability - 不依赖 ROS2 - 不包含场景硬编码逻辑 Planner 支持三种模式: - rule:纯规则生成 - llm:LLM 生成(调用 llm_client) - hybrid:优先规则,复杂情况调用 LLM 使用方式: from planner import Planner, PlannerConfig from tool_protocol import RiskLevel config = PlannerConfig(default_risk_level=RiskLevel.LOW) planner = Planner(config) plan = planner.generate_plan( world_snapshot={"temperature": 35}, user_intent="降温", available_tools=["adjust_fan", "speak"], ) # 使用 LLM 模式 plan = planner.generate_plan( world_snapshot={"temperature": 35}, user_intent="降温", available_tools=["adjust_fan", "speak"], planner_mode="llm", ) """ from __future__ import annotations import time import uuid from dataclasses import dataclass, field from typing import Any from tool_protocol import ( Plan, PlanStep, RiskLevel, PlanStatus, StepStatus, ToolCallType, ) # ============================================================================= # 通用意图映射层 # ============================================================================= # 通用意图关键词到 action 名称的映射 # 用途:当用户意图包含某个关键词时,映射到对应的 action # 可扩展:后续可通过配置文件或知识库动态加载 INTENT_TO_ACTION: dict[str, str] = { # 动作类 "喂": "feed", "喂食": "feed", "喂养": "feed", "打开风扇": "adjust_fan", "关闭风扇": "adjust_fan", "调整风扇": "adjust_fan", "降温": "adjust_fan", "升温": "adjust_fan", "开灯": "control_light", "关灯": "control_light", "调整灯光": "control_light", "打开": "turn_on", "关闭": "turn_off", "启动": "turn_on", "停止": "turn_off", "调节": "adjust", "移动": "move", "前往": "move", "走到": "move", # 查询类 "查询": "query", "查看": "query", "检查": "inspect", "巡检": "inspect", "监测": "monitor", "监控": "monitor", # 通信类 "播报": "speak", "报告": "speak", "说": "speak", "告诉": "speak", "通知": "notify", "提醒": "remind", "警告": "warn", "询问": "ask", } # action 到 ToolCallType 的默认映射 ACTION_TO_TOOL_CALL_TYPE: dict[str, ToolCallType] = { "feed": ToolCallType.EXECUTE, "adjust_fan": ToolCallType.EXECUTE, "control_light": ToolCallType.EXECUTE, "turn_on": ToolCallType.EXECUTE, "turn_off": ToolCallType.EXECUTE, "adjust": ToolCallType.EXECUTE, "move": ToolCallType.EXECUTE, "query": ToolCallType.QUERY_WORLD, "inspect": ToolCallType.QUERY_WORLD, "monitor": ToolCallType.QUERY_WORLD, "speak": ToolCallType.SPEAK, "notify": ToolCallType.SPEAK, "remind": ToolCallType.SPEAK, "warn": ToolCallType.SPEAK, "ask": ToolCallType.ASK_USER, } # 高风险关键词列表(通用) HIGH_RISK_KEYWORDS: list[str] = [ "紧急", "危险", "停止", "关机", "断电", "全部", "清空", "emergency", "danger", "stop", "shutdown", "all", "clear", ] # 中风险关键词列表(通用) MEDIUM_RISK_KEYWORDS: list[str] = [ "调整", "改变", "修改", "设置", "配置", "adjust", "change", "modify", "set", "configure", ] # ============================================================================= # Planner 配置 # ============================================================================= @dataclass class PlannerConfig: """Planner 配置类 用于配置 Planner 的行为参数。 属性: default_risk_level: 默认风险等级 require_confirmation_on_medium_risk: 中风险是否需要确认 require_confirmation_on_high_risk: 高风险是否需要确认 max_plan_steps: 单个 Plan 最大步骤数 default_source: 默认计划来源 示例: >>> config = PlannerConfig( ... default_risk_level=RiskLevel.LOW, ... require_confirmation_on_medium_risk=True, ... require_confirmation_on_high_risk=True, ... max_plan_steps=10, ... default_source="hybrid" ... ) """ default_risk_level: RiskLevel = RiskLevel.LOW require_confirmation_on_medium_risk: bool = True require_confirmation_on_high_risk: bool = True max_plan_steps: int = 10 default_source: str = "hybrid" # ============================================================================= # Planner 核心类 # ============================================================================= class Planner: """通用 Planner 类 根据世界状态、用户意图、可用能力生成标准化 Plan 对象。 属性: config: Planner 配置 intent_to_action: 意图到 action 的映射字典 action_to_tool_type: action 到 ToolCallType 的映射字典 示例: >>> planner = Planner() >>> plan = planner.generate_plan( ... world_snapshot={"temperature": 30}, ... user_intent="降温", ... available_tools=["adjust_fan", "speak"] ... ) """ def __init__( self, config: PlannerConfig | None = None, intent_to_action: dict[str, str] | None = None, action_to_tool_type: dict[str, ToolCallType] | None = None, ) -> None: """初始化 Planner Args: config: Planner 配置,None 时使用默认配置 intent_to_action: 自定义意图映射,None 时使用内置映射 action_to_tool_type: 自定义 action 类型映射,None 时使用内置映射 """ self.config = config or PlannerConfig() self.intent_to_action = intent_to_action or INTENT_TO_ACTION.copy() self.action_to_tool_type = action_to_tool_type or ACTION_TO_TOOL_CALL_TYPE.copy() self._cached_normalized_snapshot: dict[str, Any] | None = None self._cached_snapshot_raw: dict[str, Any] | None = None def generate_plan( self, world_snapshot: dict[str, Any], user_intent: str, available_tools: list[str], domain_rules: dict[str, Any] | None = None, planner_mode: str = "hybrid", ) -> Plan: """生成执行计划 根据输入信息生成标准化的 Plan 对象。 Args: world_snapshot: 世界状态快照 user_intent: 用户意图描述 available_tools: 可用能力列表 domain_rules: 领域规则约束,None 表示无约束 planner_mode: 规划模式,可选 "rule" / "llm" / "hybrid" Returns: 标准化的 Plan 对象 Raises: ValueError: 当 planner_mode 不合法 """ if not user_intent or not user_intent.strip(): return self._generate_empty_intent_plan(world_snapshot, available_tools) mode = planner_mode.lower() if mode == "rule": return self._generate_rule_based_plan( world_snapshot, user_intent, available_tools, domain_rules ) elif mode == "llm": return self._generate_llm_plan( world_snapshot, user_intent, available_tools, domain_rules ) elif mode == "hybrid": return self._generate_hybrid_plan( world_snapshot, user_intent, available_tools, domain_rules ) else: raise ValueError(f"无效的 planner_mode: {planner_mode},有效值为: rule/llm/hybrid") def _generate_rule_based_plan( self, world_snapshot: dict[str, Any], user_intent: str, available_tools: list[str], domain_rules: dict[str, Any] | None, ) -> Plan: """基于规则生成 Plan 内部方法,使用规则引擎生成计划。 Args: world_snapshot: 世界状态快照 user_intent: 用户意图描述 available_tools: 可用能力列表 domain_rules: 领域规则约束 Returns: Plan 对象 """ normalized = self._normalize_world_snapshot(world_snapshot) action = self._map_intent_to_action(user_intent) context = self._extract_recent_context(normalized) requires_confirm, confirm_msg = self._needs_confirmation( normalized, user_intent, action, domain_rules ) risk_level = self._assess_risk(user_intent, action, normalized, domain_rules) plan_id = f"plan_{uuid.uuid4().hex[:8]}" if action is None: return self._generate_unknown_intent_plan( plan_id, user_intent, normalized, available_tools, risk_level, context ) if not self._validate_tool_available(action, available_tools): return self._generate_tool_unavailable_plan( plan_id, user_intent, action, normalized, risk_level, context ) return self._generate_executable_plan( plan_id, user_intent, action, normalized, risk_level, requires_confirm, confirm_msg, context ) def _generate_llm_plan( self, world_snapshot: dict[str, Any], user_intent: str, available_tools: list[str], domain_rules: dict[str, Any] | None, ) -> Plan: """LLM 规划器实现 内部方法,调用 LLM 生成计划。 Args: world_snapshot: 世界状态快照 user_intent: 用户意图描述 available_tools: 可用能力列表 domain_rules: 领域规则约束 Returns: Plan 对象 """ # 导入 LLM 客户端(延迟导入避免循环依赖) try: from llm_client import LLMPlannerClient llm_client = LLMPlannerClient() except ImportError: # LLM 客户端不可用,返回安全 fallback return self._generate_safe_fallback_plan( user_intent=user_intent, reason="llm_client_unavailable", ) # 提取上下文(使用 normalize 后的结果) normalized = self._normalize_world_snapshot(world_snapshot) context = self._extract_recent_context(normalized) # 构造 Planner 配置 planner_config = { "default_risk_level": self.config.default_risk_level.value, "require_confirmation_on_medium_risk": self.config.require_confirmation_on_medium_risk, "require_confirmation_on_high_risk": self.config.require_confirmation_on_high_risk, "max_plan_steps": self.config.max_plan_steps, } try: # 调用 LLM 生成 Plan JSON plan_json = llm_client.generate_plan_json( user_intent=user_intent, world_snapshot=normalized, available_tools=available_tools, domain_rules=domain_rules, planner_config=planner_config, ) # 使用 Plan.from_dict() 转换 plan = Plan.from_dict(plan_json) return plan except Exception as e: # LLM 调用失败,返回安全 fallback return self._generate_safe_fallback_plan( user_intent=user_intent, reason=f"llm_error: {e}", ) def _generate_hybrid_plan( self, world_snapshot: dict[str, Any], user_intent: str, available_tools: list[str], domain_rules: dict[str, Any] | None, ) -> Plan: """混合模式生成 Plan 内部方法,优先使用规则,简单情况直接生成,复杂情况调用 LLM。 Args: world_snapshot: 世界状态快照 user_intent: 用户意图描述 available_tools: 可用能力列表 domain_rules: 领域规则约束 Returns: Plan 对象 """ normalized = self._normalize_world_snapshot(world_snapshot) action = self._map_intent_to_action(user_intent) context = self._extract_recent_context(normalized) # 判断是否需要委托给 LLM if self._should_delegate_to_llm( normalized, user_intent, available_tools, domain_rules, action ): return self._generate_llm_plan( normalized, user_intent, available_tools, domain_rules ) # 规则可处理,直接生成 requires_confirm, confirm_msg = self._needs_confirmation( normalized, user_intent, action, domain_rules ) risk_level = self._assess_risk(user_intent, action, normalized, domain_rules) plan_id = f"plan_{uuid.uuid4().hex[:8]}" if action is None: return self._generate_unknown_intent_plan( plan_id, user_intent, normalized, available_tools, risk_level, context ) if not self._validate_tool_available(action, available_tools): return self._generate_tool_unavailable_plan( plan_id, user_intent, action, normalized, risk_level, context ) return self._generate_executable_plan( plan_id, user_intent, action, normalized, risk_level, requires_confirm, confirm_msg, context ) def _generate_executable_plan( self, plan_id: str, user_intent: str, action: str, world_snapshot: dict[str, Any], risk_level: RiskLevel, requires_confirm: bool, confirm_msg: str | None, context: dict[str, Any], ) -> Plan: """生成可执行的 Plan 内部方法,生成包含具体执行步骤的 Plan。 当 requires_confirm == True 时,生成 ASK_USER/SPEAK 询问步骤而非 EXECUTE 执行步骤, 以确保 Executor 不会在用户确认前误执行风险动作。 Args: plan_id: 计划 ID user_intent: 用户意图 action: 动作名称 world_snapshot: 世界状态(应传入 normalized 后的结果) risk_level: 风险等级 requires_confirm: 是否需要确认 confirm_msg: 确认消息 context: 提取的上下文 Returns: 可执行的 Plan 对象 """ reasoning_parts = [ f"用户意图: {user_intent}", f"匹配动作: {action}", f"风险等级: {risk_level.value}", ] if context: reasoning_parts.append(f"相关上下文: {context}") if requires_confirm and confirm_msg: reasoning_parts.append(f"需要确认: {confirm_msg}") plan = Plan( plan_id=plan_id, goal=user_intent, reasoning="; ".join(reasoning_parts), risk_level=risk_level, requires_confirmation=requires_confirm, confirmation_message=confirm_msg, status=PlanStatus.CREATED, source=self.config.default_source, ) # 当需要确认时,生成询问/告知步骤而非执行步骤 if requires_confirm: step = PlanStep( step_id=1, action="ask_user", tool_call_type=ToolCallType.ASK_USER, parameters={ "user_intent": user_intent, "context": context, "suggested_action": action, "question": confirm_msg or f"您确定要执行 '{action}' 吗?", }, description=f"询问用户确认: {action}", requires_confirmation=True, confirmation_message=confirm_msg, ) else: # 不需要确认时,生成正常的执行步骤 step = PlanStep( step_id=1, action=action, tool_call_type=self.action_to_tool_type.get(action, ToolCallType.EXECUTE), parameters={"user_intent": user_intent, "context": context}, description=f"执行动作: {action}", ) plan.add_step(step) return plan def _generate_tool_unavailable_plan( self, plan_id: str, user_intent: str, action: str, world_snapshot: dict[str, Any], risk_level: RiskLevel, context: dict[str, Any], ) -> Plan: """生成工具不可用时的 Plan 内部方法,当请求的能力不在可用列表中时调用。 Args: plan_id: 计划 ID user_intent: 用户意图 action: 尝试的动作名称 world_snapshot: 世界状态 risk_level: 风险等级 context: 提取的上下文 Returns: 提示能力不可用的 Plan 对象 """ plan = Plan( plan_id=plan_id, goal=f"处理不可用能力请求: {user_intent}", reasoning=( f"用户请求执行动作 '{action}',但该能力当前不可用。" f"用户意图: {user_intent}" ), risk_level=RiskLevel.LOW, requires_confirmation=False, status=PlanStatus.CREATED, source=self.config.default_source, ) step = PlanStep( step_id=1, action="speak", tool_call_type=ToolCallType.SPEAK, parameters={ "message": f"抱歉,当前系统不支持 '{action}' 操作。" f"您的请求 '{user_intent}' 无法完成。", "level": "info", }, description=f"通知用户能力不可用: {action}", ) plan.add_step(step) return plan def _generate_unknown_intent_plan( self, plan_id: str, user_intent: str, world_snapshot: dict[str, Any], available_tools: list[str], risk_level: RiskLevel, context: dict[str, Any], ) -> Plan: """生成意图不清晰时的 Plan 内部方法,当无法从用户意图中识别出具体动作时调用。 Args: plan_id: 计划 ID user_intent: 用户意图 world_snapshot: 世界状态 available_tools: 可用能力列表 risk_level: 风险等级 context: 提取的上下文 Returns: 需要进一步确认意图的 Plan 对象 """ plan = Plan( plan_id=plan_id, goal=f"澄清用户意图: {user_intent}", reasoning=( f"无法从用户意图 '{user_intent}' 中识别出具体动作。" f"可用能力: {available_tools}" ), risk_level=RiskLevel.LOW, requires_confirmation=False, status=PlanStatus.CREATED, source=self.config.default_source, ) query_step = PlanStep( step_id=1, action="clarify_intent", tool_call_type=ToolCallType.ASK_USER, parameters={ "original_intent": user_intent, "available_actions": available_tools, }, description=f"询问用户确认具体意图: {user_intent}", requires_confirmation=True, confirmation_message=f"您是想要执行以下哪个操作?{', '.join(available_tools[:5])}", ) plan.add_step(query_step) return plan def _generate_empty_intent_plan( self, world_snapshot: dict[str, Any], available_tools: list[str], ) -> Plan: """生成空意图的 Plan 内部方法,当用户意图为空时调用。 Args: world_snapshot: 世界状态 available_tools: 可用能力列表 Returns: 提示输入意图的 Plan 对象 """ plan = Plan( plan_id=f"plan_{uuid.uuid4().hex[:8]}", goal="等待用户输入", reasoning="用户未提供有效意图", risk_level=RiskLevel.LOW, requires_confirmation=False, status=PlanStatus.CREATED, source=self.config.default_source, ) step = PlanStep( step_id=1, action="speak", tool_call_type=ToolCallType.SPEAK, parameters={"message": "您好,请告诉我您想要执行的操作。"}, description="等待用户输入有效意图", ) plan.add_step(step) return plan def _needs_confirmation( self, world_snapshot: dict[str, Any], user_intent: str, action: str | None, domain_rules: dict[str, Any] | None, ) -> tuple[bool, str | None]: """判断是否需要用户确认 根据世界状态、用户意图、动作类型和领域规则判断。 优先使用 _normalize_world_snapshot() 转换后的结果。 Args: world_snapshot: 世界状态快照 user_intent: 用户意图 action: 识别的动作名称 domain_rules: 领域规则 Returns: (是否需要确认, 确认消息) """ if action is None: return True, "无法识别您的意图,请确认您想要的操作。" normalized = self._normalize_world_snapshot(world_snapshot) if self._has_world_conflict(normalized, action, domain_rules): return True, f"检测到与当前状态的潜在冲突,请确认是否继续执行 '{action}'?" risk_level = self._assess_risk(user_intent, action, normalized, domain_rules) if risk_level == RiskLevel.HIGH and self.config.require_confirmation_on_high_risk: return True, f"该操作风险较高 '{action}',是否确认执行?" if risk_level == RiskLevel.MEDIUM and self.config.require_confirmation_on_medium_risk: return True, f"执行操作 '{action}' 前,请确认。" return False, None def _has_world_conflict( self, world_snapshot: dict[str, Any], action: str, domain_rules: dict[str, Any] | None, ) -> bool: """检查世界状态是否存在冲突 内部方法,检测可能导致执行冲突的状态。 优先使用 _normalize_world_snapshot() 转换后的结果。 Args: world_snapshot: 世界状态快照 action: 动作名称 domain_rules: 领域规则 Returns: 是否存在冲突 """ normalized = self._normalize_world_snapshot(world_snapshot) if normalized.get("actuator_status"): actuator = normalized.get("actuator_status", {}) if isinstance(actuator, dict): for key in ["unavailable", "error", "offline"]: if key in actuator and actuator[key]: return True if normalized.get("recent_actions"): recent = normalized.get("recent_actions", []) if isinstance(recent, list) and len(recent) > 0: last_action = recent[-1] if recent else None if last_action and last_action.get("action") == action: return True return False def _assess_risk( self, user_intent: str, action: str | None, world_snapshot: dict[str, Any], domain_rules: dict[str, Any] | None, ) -> RiskLevel: """评估风险等级 内部方法,根据多个因素综合评估风险。 优先使用 _normalize_world_snapshot() 转换后的结果。 Args: user_intent: 用户意图 action: 动作名称 world_snapshot: 世界状态 domain_rules: 领域规则 Returns: 风险等级 """ normalized = self._normalize_world_snapshot(world_snapshot) if action is None: return RiskLevel.LOW for keyword in HIGH_RISK_KEYWORDS: if keyword in user_intent: return RiskLevel.HIGH for keyword in MEDIUM_RISK_KEYWORDS: if keyword in user_intent: return RiskLevel.MEDIUM if domain_rules and "high_risk_actions" in domain_rules: if action in domain_rules["high_risk_actions"]: return RiskLevel.HIGH return self.config.default_risk_level def _should_delegate_to_llm( self, world_snapshot: dict[str, Any], user_intent: str, available_tools: list[str], domain_rules: dict[str, Any] | None, action: str | None, ) -> bool: """判断是否应该委托给 LLM 内部方法,根据多个因素判断是否需要 LLM 介入。 委托条件: 1. action 无法明确映射(意图复杂) 2. 世界状态存在冲突 3. 需要结合领域规则做高层判断 4. 多步骤规划需求 5. 意图语义模糊但可理解 Args: world_snapshot: 世界状态快照 user_intent: 用户意图描述 available_tools: 可用能力列表 domain_rules: 领域规则约束 action: 识别的动作(None 表示未识别) Returns: 是否应该委托给 LLM """ domain_rules = domain_rules or {} # 条件1:action 无法明确映射 if action is None: # 检查是否有任何关键词匹配 has_partial_match = False for keyword in self.intent_to_action.keys(): if keyword in user_intent: has_partial_match = True break # 有部分匹配但无法确定具体 action,需要 LLM if has_partial_match: return True # 条件2:世界状态存在冲突 if self._has_world_conflict(world_snapshot, action, domain_rules): # 检查冲突是否严重到需要 LLM if self._is_complex_conflict(world_snapshot, action): return True # 条件3:涉及领域规则限制 if domain_rules: high_risk_actions = domain_rules.get("high_risk_actions", []) medium_risk_actions = domain_rules.get("medium_risk_actions", []) if action in high_risk_actions or action in medium_risk_actions: # 风险动作需要更复杂判断 return True # 条件4:多步骤意图(包含连接词) multi_step_keywords = ["然后", "接下来", "之后", "再", "和", "以及", "并且", "再然后"] if any(kw in user_intent for kw in multi_step_keywords): return True # 条件5:意图包含多个动作词 action_count = 0 for keyword in self.intent_to_action.keys(): if keyword in user_intent: action_count += 1 if action_count > 1: return True # 条件6:意图不完整但包含上下文暗示 incomplete_indicators = ["它", "那个", "这个", "刚才", "上次", "再"] if any(ind in user_intent for ind in incomplete_indicators): if action is None: return True return False def _is_complex_conflict( self, world_snapshot: dict[str, Any], action: str | None, ) -> bool: """判断是否为复杂冲突 复杂冲突需要 LLM 介入判断。 优先使用 _normalize_world_snapshot() 转换后的结果。 Args: world_snapshot: 世界状态 action: 动作名称 Returns: 是否为复杂冲突 """ normalized = self._normalize_world_snapshot(world_snapshot) # 检查 recent_actions 中是否有相同动作 recent_actions = normalized.get("recent_actions", []) if isinstance(recent_actions, list) and len(recent_actions) > 0: last_action = recent_actions[-1] if recent_actions else {} if isinstance(last_action, dict) and last_action.get("action") == action: # 相同动作,检查时间间隔 if "timestamp" in last_action: time_diff = time.time() - last_action.get("timestamp", 0) # 短时间内重复动作视为复杂冲突 if time_diff < 600: # 10分钟内 return True # 检查是否有多个警告 warnings = normalized.get("warnings", []) if isinstance(warnings, list) and len(warnings) > 1: return True return False def _generate_safe_fallback_plan( self, user_intent: str, reason: str = "", ) -> Plan: """生成安全的 fallback Plan 当 LLM 调用失败时,返回一个安全的 fallback plan。 优先使用 ASK_USER 或 SPEAK,不直接执行风险动作。 Args: user_intent: 用户原始意图 reason: 回退原因 Returns: 安全的 Plan 对象 """ plan = Plan( plan_id=f"plan_fallback_{uuid.uuid4().hex[:8]}", goal=f"无法处理的请求: {user_intent}", reasoning=f"LLM 处理失败或响应无效 ({reason}),返回安全 fallback", risk_level=RiskLevel.LOW, requires_confirmation=False, confirmation_message=None, status=PlanStatus.CREATED, source="llm_fallback", metadata={"fallback": True, "reason": reason}, ) step = PlanStep( step_id=1, action="ask_user", tool_call_type=ToolCallType.ASK_USER, parameters={ "question": f"抱歉,我无法理解或处理您的请求: {user_intent}。请重新描述您的需求。", }, preconditions={}, fallback=None, status=StepStatus.PENDING, description="询问用户澄清意图", requires_confirmation=False, confirmation_message=None, metadata={"fallback_reason": reason}, ) plan.add_step(step) return plan def _validate_tool_available(self, tool_name: str, available_tools: list[str]) -> bool: """验证工具是否可用 内部方法,检查请求的动作是否在可用能力列表中。 Args: tool_name: 工具/动作名称 available_tools: 可用能力列表 Returns: 是否可用 """ if not available_tools: return False return tool_name in available_tools def _normalize_world_snapshot(self, world_snapshot: dict[str, Any]) -> dict[str, Any]: """将真实 /world/snapshot 结构统一转换为 Planner 扁平上下文 真实 world_snapshot 结构可能包含嵌套的 environment、system、entities 等字段, 此方法将其转换为扁平的、易于访问的格式。 对齐真实结构: - environment: temperature, humidity, light_level, warnings, errors - system: device_status, actuator_status, mode - entities: 与 actuator/device/recent action 相关的信息 Args: world_snapshot: 原始世界状态快照 Returns: 扁平的 normalized dict """ # 如果快照没有变化,使用缓存 if self._cached_snapshot_raw is world_snapshot and self._cached_normalized_snapshot is not None: return self._cached_normalized_snapshot normalized: dict[str, Any] = { "temperature": None, "humidity": None, "light_level": None, "warnings": [], "errors": [], "device_status": {}, "actuator_status": {}, "mode": None, "recent_actions": [], "raw_entities": {}, } # 处理 environment 字段(常见于 /world/snapshot) environment = world_snapshot.get("environment", {}) if isinstance(environment, dict): normalized["temperature"] = environment.get("temperature") or environment.get("temperature_ambient") normalized["humidity"] = environment.get("humidity") or environment.get("humidity_ambient") normalized["light_level"] = environment.get("light_level") or environment.get("illuminance") normalized["warnings"] = environment.get("warnings", []) normalized["errors"] = environment.get("errors", []) # 处理 system 字段 system = world_snapshot.get("system", {}) if isinstance(system, dict): normalized["device_status"] = system.get("device_status", {}) normalized["actuator_status"] = system.get("actuator_status", {}) normalized["mode"] = system.get("mode") or system.get("operating_mode") # 直接字段(已经是扁平格式) for key in ["temperature", "humidity", "light_level", "warnings", "errors", "device_status", "actuator_status", "mode"]: if key not in environment and key in world_snapshot: if normalized.get(key) is None or normalized.get(key) == [] or normalized.get(key) == {}: normalized[key] = world_snapshot.get(key) # 处理 entities 字段(提取与 actuator/device 相关信息) entities = world_snapshot.get("entities", {}) if isinstance(entities, dict): actuators = entities.get("actuator", []) devices = entities.get("device", []) if isinstance(actuators, list): for act in actuators: if isinstance(act, dict): device_id = act.get("entity_id") or act.get("device_id", "unknown") normalized["raw_entities"][f"actuator_{device_id}"] = act if isinstance(devices, list): for dev in devices: if isinstance(dev, dict): device_id = dev.get("entity_id") or dev.get("device_id", "unknown") normalized["raw_entities"][f"device_{device_id}"] = dev # 处理 recent_actions(可能在顶层或 recent_context 下) recent_context = world_snapshot.get("recent_context", {}) or world_snapshot.get("recent_actions", []) if isinstance(recent_context, dict): normalized["recent_actions"] = recent_context.get("recent_actions", []) elif isinstance(recent_context, list): normalized["recent_actions"] = recent_context # 如果顶层直接有 recent_actions if not normalized["recent_actions"] and "recent_actions" in world_snapshot: normalized["recent_actions"] = world_snapshot.get("recent_actions", []) # 缓存结果 self._cached_snapshot_raw = world_snapshot self._cached_normalized_snapshot = normalized return normalized def _extract_recent_context(self, world_snapshot: dict[str, Any]) -> dict[str, Any]: """提取与当前决策相关的上下文 内部方法,从世界状态中提取关键信息。 优先使用 _normalize_world_snapshot() 转换后的结果。 Args: world_snapshot: 世界状态快照 Returns: 相关的上下文字典 """ normalized = self._normalize_world_snapshot(world_snapshot) context_keys = [ "temperature", "humidity", "light_level", "device_status", "actuator_status", "recent_actions", "warnings", "errors", ] context = {} for key in context_keys: if key in normalized and normalized[key] is not None: context[key] = normalized[key] if normalized.get("raw_entities"): context["raw_entities"] = normalized["raw_entities"] return context def _map_intent_to_action(self, user_intent: str) -> str | None: """将用户意图映射到 action 内部方法,通过关键词匹配查找对应的 action。 按关键词长度从长到短排序后再匹配,避免短关键词优先匹配导致误判。 Args: user_intent: 用户意图描述 Returns: 匹配到的 action 名称,未匹配到返回 None """ if not user_intent: return None intent_lower = user_intent.lower() # 按关键词长度从长到短排序,避免"喂"比"喂食"先匹配 for keyword in sorted(self.intent_to_action.keys(), key=len, reverse=True): if keyword in intent_lower or keyword in user_intent: return self.intent_to_action[keyword] return None def register_intent_mapping(self, keyword: str, action: str) -> None: """注册新的意图映射 公共方法,运行时添加新的意图到 action 的映射。 Args: keyword: 意图关键词 action: 对应的 action 名称 """ self.intent_to_action[keyword] = action def register_tool_type(self, action: str, tool_type: ToolCallType) -> None: """注册 action 到 ToolCallType 的映射 公共方法,运行时添加新的映射。 Args: action: action 名称 tool_type: 对应的 ToolCallType """ self.action_to_tool_type[action] = tool_type # ============================================================================= # 主程序入口(测试示例) # ============================================================================= if __name__ == "__main__": import json print("=" * 70) print("Planner 测试演示") print("=" * 70) # 初始化 Planner planner = Planner() # ------------------------------------------------------------------------- # 构造测试数据 # ------------------------------------------------------------------------- world_snapshot = { "temperature": 32.5, "humidity": 70, "actuator_status": { "fan": "running", "light": "on", }, "recent_actions": [ {"action": "feed", "timestamp": time.time() - 300}, ], "warnings": [], "errors": [], } available_tools = [ "feed", "adjust_fan", "speak", "query", "move", "inspect", ] # ------------------------------------------------------------------------- # 场景 1:直接可执行(工具存在 + 风险低) # ------------------------------------------------------------------------- print("\n[场景 1] 直接可执行") print("-" * 40) user_intent_1 = "打开风扇降温" plan_1 = planner.generate_plan( world_snapshot=world_snapshot, user_intent=user_intent_1, available_tools=available_tools, planner_mode="rule", ) print(f"用户意图: {user_intent_1}") print(f"Plan ID: {plan_1.plan_id}") print(f"目标: {plan_1.goal}") print(f"风险等级: {plan_1.risk_level.value}") print(f"需要确认: {plan_1.requires_confirmation}") print(f"步骤数: {len(plan_1.steps)}") if plan_1.steps: step = plan_1.steps[0] print(f" → Step {step.step_id}: {step.action} ({step.tool_call_type.value})") # ------------------------------------------------------------------------- # 场景 2:需要确认(风险中高或世界冲突) # ------------------------------------------------------------------------- print("\n[场景 2] 需要确认") print("-" * 40) world_snapshot_conflict = { "temperature": 32.5, "actuator_status": { "fan": "error", }, "recent_actions": [ {"action": "adjust_fan", "timestamp": time.time() - 60}, ], } user_intent_2 = "再次降温" plan_2 = planner.generate_plan( world_snapshot=world_snapshot_conflict, user_intent=user_intent_2, available_tools=available_tools, planner_mode="rule", ) print(f"用户意图: {user_intent_2}") print(f"Plan ID: {plan_2.plan_id}") print(f"需要确认: {plan_2.requires_confirmation}") print(f"确认消息: {plan_2.confirmation_message}") print(f"步骤数: {len(plan_2.steps)}") if plan_2.steps: step = plan_2.steps[0] print(f" → Step {step.step_id}: {step.action} ({step.tool_call_type.value})") print(f" 需要确认: {step.requires_confirmation}") # ------------------------------------------------------------------------- # 场景 3:工具不存在 # ------------------------------------------------------------------------- print("\n[场景 3] 工具不存在") print("-" * 40) user_intent_3 = "打开空调" available_tools_3 = ["fan", "light", "speak"] plan_3 = planner.generate_plan( world_snapshot=world_snapshot, user_intent=user_intent_3, available_tools=available_tools_3, planner_mode="rule", ) print(f"用户意图: {user_intent_3}") print(f"可用工具: {available_tools_3}") print(f"Plan ID: {plan_3.plan_id}") print(f"目标: {plan_3.goal}") print(f"步骤数: {len(plan_3.steps)}") if plan_3.steps: step = plan_3.steps[0] print(f" → Step {step.step_id}: {step.action} ({step.tool_call_type.value})") print(f" 消息: {step.parameters.get('message', '')}") # ------------------------------------------------------------------------- # 场景 4:LLM 模式(真实实现) # ------------------------------------------------------------------------- print("\n[场景 4] LLM 模式") print("-" * 40) user_intent_4 = "打开风扇降温" plan_4 = planner.generate_plan( world_snapshot=world_snapshot, user_intent=user_intent_4, available_tools=available_tools, planner_mode="llm", ) print(f"用户意图: {user_intent_4}") print(f"Plan ID: {plan_4.plan_id}") print(f"来源: {plan_4.source}") print(f"目标: {plan_4.goal}") print(f"风险等级: {plan_4.risk_level.value}") print(f"步骤数: {len(plan_4.steps)}") if plan_4.steps: step = plan_4.steps[0] print(f" → Step {step.step_id}: {step.action} ({step.tool_call_type.value})") # ------------------------------------------------------------------------- # 场景 5:LLM 模式 - 喂食询问 # ------------------------------------------------------------------------- print("\n[场景 5] LLM 模式 - 喂食询问") print("-" * 40) user_intent_5 = "再喂一次" world_snapshot_recent_feed = { "temperature": 28, "last_feed_time": "2026-04-10T10:00:00", "recent_actions": [ {"action": "feed", "timestamp": time.time() - 300}, ], } plan_5 = planner.generate_plan( world_snapshot=world_snapshot_recent_feed, user_intent=user_intent_5, available_tools=["feed", "speak", "query"], planner_mode="llm", ) print(f"用户意图: {user_intent_5}") print(f"Plan ID: {plan_5.plan_id}") print(f"来源: {plan_5.source}") print(f"目标: {plan_5.goal}") print(f"风险等级: {plan_5.risk_level.value}") print(f"步骤数: {len(plan_5.steps)}") if plan_5.steps: step = plan_5.steps[0] print(f" → Step {step.step_id}: {step.action} ({step.tool_call_type.value})") if step.tool_call_type == ToolCallType.ASK_USER: print(f" 询问: {step.parameters.get('question', 'N/A')}") # ------------------------------------------------------------------------- # 场景 6:Hybrid 模式 - 简单场景走规则 # ------------------------------------------------------------------------- print("\n[场景 6] Hybrid 模式 - 简单场景") print("-" * 40) user_intent_6 = "查询温度" plan_6 = planner.generate_plan( world_snapshot=world_snapshot, user_intent=user_intent_6, available_tools=available_tools, planner_mode="hybrid", ) print(f"用户意图: {user_intent_6}") print(f"Plan ID: {plan_6.plan_id}") print(f"来源: {plan_6.source}") print(f"风险等级: {plan_6.risk_level.value}") # ------------------------------------------------------------------------- # 场景 7:Hybrid 模式 - 复杂场景走 LLM # ------------------------------------------------------------------------- print("\n[场景 7] Hybrid 模式 - 复杂场景") print("-" * 40) user_intent_7 = "先降温然后喂食" # 多步骤意图 plan_7 = planner.generate_plan( world_snapshot=world_snapshot, user_intent=user_intent_7, available_tools=available_tools, planner_mode="hybrid", ) print(f"用户意图: {user_intent_7}") print(f"Plan ID: {plan_7.plan_id}") print(f"来源: {plan_7.source}") print(f"目标: {plan_7.goal}") print(f"风险等级: {plan_7.risk_level.value}") print(f"步骤数: {len(plan_7.steps)}") for step in plan_7.steps: print(f" → Step {step.step_id}: {step.action} ({step.tool_call_type.value})") # ------------------------------------------------------------------------- # 场景 8:意图不清晰 - 走规则未知意图 # ------------------------------------------------------------------------- print("\n[场景 8] 意图不清晰") print("-" * 40) user_intent_8 = "随便弄一下" plan_8 = planner.generate_plan( world_snapshot=world_snapshot, user_intent=user_intent_8, available_tools=available_tools, planner_mode="rule", ) print(f"用户意图: {user_intent_8}") print(f"Plan ID: {plan_8.plan_id}") print(f"目标: {plan_8.goal}") print(f"步骤数: {len(plan_8.steps)}") if plan_8.steps: step = plan_8.steps[0] print(f" → Step {step.step_id}: {step.action} ({step.tool_call_type.value})") # ------------------------------------------------------------------------- # 场景 9:工具不存在 # ------------------------------------------------------------------------- print("\n[场景 9] 工具不存在") print("-" * 40) user_intent_9 = "打开空调" available_tools_9 = ["fan", "light", "speak"] plan_9 = planner.generate_plan( world_snapshot=world_snapshot, user_intent=user_intent_9, available_tools=available_tools_9, planner_mode="rule", ) print(f"用户意图: {user_intent_9}") print(f"可用工具: {available_tools_9}") print(f"Plan ID: {plan_9.plan_id}") print(f"目标: {plan_9.goal}") print(f"步骤数: {len(plan_9.steps)}") if plan_9.steps: step = plan_9.steps[0] print(f" → Step {step.step_id}: {step.action} ({step.tool_call_type.value})") print(f" 消息: {step.parameters.get('message', '')}") # ------------------------------------------------------------------------- # 场景 10:LLM 工具不存在场景 # ------------------------------------------------------------------------- print("\n[场景 10] LLM 模式 - 工具不存在") print("-" * 40) user_intent_10 = "打开空调" plan_10 = planner.generate_plan( world_snapshot={"temperature": 30}, user_intent=user_intent_10, available_tools=["adjust_fan", "speak"], planner_mode="llm", ) print(f"用户意图: {user_intent_10}") print(f"Plan ID: {plan_10.plan_id}") print(f"来源: {plan_10.source}") print(f"目标: {plan_10.goal}") print(f"步骤数: {len(plan_10.steps)}") if plan_10.steps: step = plan_10.steps[0] print(f" → Step {step.step_id}: {step.action} ({step.tool_call_type.value})") # ------------------------------------------------------------------------- # 输出完整 JSON # ------------------------------------------------------------------------- print("\n[完整 JSON 输出]") print("-" * 40) print("\n场景 1 Plan JSON (rule 模式):") print(json.dumps(plan_1.to_dict(), indent=2, ensure_ascii=False)) print("\n场景 4 Plan JSON (llm 模式):") print(json.dumps(plan_4.to_dict(), indent=2, ensure_ascii=False)) # ------------------------------------------------------------------------- # 测试自定义意图映射 # ------------------------------------------------------------------------- print("\n[自定义意图映射测试]") print("-" * 40) planner.register_intent_mapping("打扫", "clean") planner.register_tool_type("clean", ToolCallType.EXECUTE) user_intent_custom = "帮我打扫一下" plan_custom = planner.generate_plan( world_snapshot=world_snapshot, user_intent=user_intent_custom, available_tools=["clean", "speak"], planner_mode="rule", ) print(f"用户意图: {user_intent_custom}") print(f"识别动作: {plan_custom.steps[0].action if plan_custom.steps else 'None'}") print("\n" + "=" * 70) print("Planner 测试演示完成") print("=" * 70)