""" planner_config_loader.py - Planner 配置加载模块 该模块负责从统一 config.yaml 中读取 Planner 相关配置, 并将其转换为 planner.py 可直接使用的配置对象和运行参数。 职责边界: - 读取、校验、转换配置 - 不做规划逻辑 - 不做 ROS 通信 - 不包含场景硬编码 使用方式: from planner_config_loader import load_planner_runtime_config runtime_config = load_planner_runtime_config("planner_config.yaml") planner = Planner(runtime_config["planner_config"]) plan = planner.generate_plan( world_snapshot={...}, user_intent="降温", available_tools=runtime_config["available_tools"], domain_rules=runtime_config["domain_rules"], planner_mode=runtime_config["planner_mode"], ) """ from __future__ import annotations import os from dataclasses import dataclass, field from typing import Any try: import yaml YAML_AVAILABLE = True except ImportError: YAML_AVAILABLE = False from planner import PlannerConfig from tool_protocol import RiskLevel # ============================================================================= # 配置校验异常 # ============================================================================= class ConfigValidationError(ValueError): """配置校验异常""" pass # ============================================================================= # 运行时配置数据类 # ============================================================================= @dataclass class PlannerRuntimeConfig: """Planner 运行时配置 聚合所有 Planner 运行所需的配置项。 属性: planner_mode: 规划模式 planner_config: PlannerConfig 对象 available_tools: 可用能力列表 tool_descriptions: 工具语义描述列表 intent_to_action: 意图到 action 的映射 domain_rules: 领域规则 示例: >>> config = PlannerRuntimeConfig( ... planner_mode="hybrid", ... planner_config=PlannerConfig(), ... available_tools=["feed", "speak"], ... tool_descriptions=[{"name": "feed", "description": "..."}], ... intent_to_action={"降温": "adjust_fan"}, ... domain_rules={"high_risk_actions": []}, ... ) """ planner_mode: str = "hybrid" planner_config: PlannerConfig = field(default_factory=PlannerConfig) available_tools: list[str] = field(default_factory=list) tool_descriptions: list[dict[str, Any]] = field(default_factory=list) intent_to_action: dict[str, str] = field(default_factory=dict) domain_rules: dict[str, Any] = field(default_factory=dict) def to_dict(self) -> dict[str, Any]: """转换为字典 Returns: 包含所有配置项的字典 """ return { "planner_mode": self.planner_mode, "planner_config": { "default_risk_level": self.planner_config.default_risk_level.value, "require_confirmation_on_medium_risk": self.planner_config.require_confirmation_on_medium_risk, "require_confirmation_on_high_risk": self.planner_config.require_confirmation_on_high_risk, "max_plan_steps": self.planner_config.max_plan_steps, "default_source": self.planner_config.default_source, }, "available_tools": self.available_tools, "tool_descriptions": self.tool_descriptions, "intent_to_action": self.intent_to_action, "domain_rules": self.domain_rules, } # ============================================================================= # 配置加载函数 # ============================================================================= def load_yaml_config(config_path: str) -> dict[str, Any]: """加载 YAML 配置文件 Args: config_path: 配置文件路径 Returns: 解析后的配置字典 Raises: FileNotFoundError: 配置文件不存在 ValueError: YAML 格式错误 ImportError: PyYAML 未安装 """ if not os.path.exists(config_path): raise FileNotFoundError(f"配置文件不存在: {config_path}") if not YAML_AVAILABLE: raise ImportError( "PyYAML 未安装,请运行: pip install pyyaml" ) try: with open(config_path, "r", encoding="utf-8") as f: config = yaml.safe_load(f) except yaml.YAMLError as e: raise ValueError(f"YAML 格式错误 in {config_path}: {e}") if config is None: return {} return config def load_planner_section(config: dict[str, Any]) -> dict[str, Any]: """提取 planner 配置段 Args: config: 完整配置字典 Returns: planner 配置段,缺省时返回空字典 """ return config.get("planner", {}) def build_planner_config(planner_section: dict[str, Any]) -> PlannerConfig: """构造 PlannerConfig 对象 Args: planner_section: planner 配置段 Returns: PlannerConfig 对象 Raises: ConfigValidationError: 配置值非法 """ # 获取各字段,缺省时使用默认值 risk_level_str = planner_section.get("default_risk_level", "low") require_medium = planner_section.get("require_confirmation_on_medium_risk", True) require_high = planner_section.get("require_confirmation_on_high_risk", True) max_steps = planner_section.get("max_plan_steps", 10) default_source = planner_section.get("default_source", "hybrid") # 校验并转换 risk_level risk_level_str = str(risk_level_str).lower() if risk_level_str not in ("low", "medium", "high"): raise ConfigValidationError( f"无效的 default_risk_level: '{risk_level_str}',有效值为: low/medium/high" ) risk_level = RiskLevel(risk_level_str) # 校验 max_plan_steps if not isinstance(max_steps, int) or max_steps <= 0: raise ConfigValidationError( f"无效的 max_plan_steps: {max_steps},必须为大于 0 的整数" ) # 校验 default_source valid_sources = ("rule_engine", "llm", "hybrid") if default_source not in valid_sources: raise ConfigValidationError( f"无效的 default_source: '{default_source}',有效值为: {valid_sources}" ) return PlannerConfig( default_risk_level=risk_level, require_confirmation_on_medium_risk=require_medium, require_confirmation_on_high_risk=require_high, max_plan_steps=max_steps, default_source=default_source, ) def load_available_tools(planner_section: dict[str, Any]) -> list[str]: """加载可用能力列表 Args: planner_section: planner 配置段 Returns: 去重后的工具列表,缺省时返回空列表 """ raw_tools = planner_section.get("available_tools", []) if not isinstance(raw_tools, list): return [] # 去重,保持顺序 seen = set() tools = [] for tool in raw_tools: if isinstance(tool, str) and tool and tool not in seen: seen.add(tool) tools.append(tool) return tools def load_tool_descriptions(planner_section: dict[str, Any]) -> list[dict[str, Any]]: """加载工具语义描述列表 Args: planner_section: planner 配置段 Returns: 工具描述列表,每项包含 name, description, tool_call_type, category 缺失时返回空列表 配置非法时过滤无效项并打印 warning """ raw_descriptions = planner_section.get("tool_descriptions", []) if not isinstance(raw_descriptions, list): print(f"[PlannerConfigLoader] Warning: tool_descriptions 应为列表类型,忽略") return [] valid_descriptions = [] for i, item in enumerate(raw_descriptions): if not isinstance(item, dict): print(f"[PlannerConfigLoader] Warning: tool_descriptions[{i}] 应为字典,忽略") continue # 必须包含 name 和 description name = item.get("name") description = item.get("description") if not name or not isinstance(name, str): print(f"[PlannerConfigLoader] Warning: tool_descriptions[{i}] 缺少有效 name 字段,忽略") continue if not description or not isinstance(description, str): print(f"[PlannerConfigLoader] Warning: tool_descriptions[{i}] 缺少有效 description 字段,忽略") continue # 构建有效描述,补充默认值 valid_item = { "name": str(name).strip(), "description": str(description).strip(), "tool_call_type": str(item.get("tool_call_type", "execute")).strip(), "category": str(item.get("category", "action")).strip(), } valid_descriptions.append(valid_item) if len(valid_descriptions) < len(raw_descriptions): print(f"[PlannerConfigLoader] Warning: 过滤了 {len(raw_descriptions) - len(valid_descriptions)} 个无效 tool_descriptions 项") return valid_descriptions def load_intent_to_action(planner_section: dict[str, Any]) -> dict[str, str]: """加载意图到 action 的映射 Args: planner_section: planner 配置段 Returns: 关键词到 action 的映射字典 Raises: ConfigValidationError: key 或 value 为空 """ raw_mapping = planner_section.get("intent_to_action", {}) if not isinstance(raw_mapping, dict): return {} result = {} for key, value in raw_mapping.items(): key_str = str(key).strip() value_str = str(value).strip() if not key_str: raise ConfigValidationError("intent_to_action 的 key 不能为空字符串") if not value_str: raise ConfigValidationError( f"intent_to_action 的 value 不能为空字符串,key='{key_str}'" ) result[key_str] = value_str return result def load_high_risk_actions(planner_section: dict[str, Any]) -> list[str]: """加载高风险动作列表 Args: planner_section: planner 配置段 Returns: 高风险动作列表 """ raw_list = planner_section.get("high_risk_actions", []) if not isinstance(raw_list, list): return [] return [str(item) for item in raw_list if item] def load_medium_risk_actions(planner_section: dict[str, Any]) -> list[str]: """加载中风险动作列表 Args: planner_section: planner 配置段 Returns: 中风险动作列表 """ raw_list = planner_section.get("medium_risk_actions", []) if not isinstance(raw_list, list): return [] return [str(item) for item in raw_list if item] def load_confirmation_rules(planner_section: dict[str, Any]) -> dict[str, bool]: """加载确认规则配置 Args: planner_section: planner 配置段 Returns: 确认规则字典 """ raw_rules = planner_section.get("confirmation_rules", {}) if not isinstance(raw_rules, dict): raw_rules = {} return { "repeated_action_requires_confirmation": bool( raw_rules.get("repeated_action_requires_confirmation", True) ), "unavailable_actuator_requires_confirmation": bool( raw_rules.get("unavailable_actuator_requires_confirmation", True) ), "high_risk_requires_confirmation": bool( raw_rules.get("high_risk_requires_confirmation", True) ), "medium_risk_requires_confirmation": bool( raw_rules.get("medium_risk_requires_confirmation", True) ), } def load_domain_rules(planner_section: dict[str, Any]) -> dict[str, Any]: """加载领域规则 Args: planner_section: planner 配置段 Returns: 领域规则字典 """ return { "high_risk_actions": load_high_risk_actions(planner_section), "medium_risk_actions": load_medium_risk_actions(planner_section), "confirmation_rules": load_confirmation_rules(planner_section), } def validate_planner_mode(mode: str) -> str: """校验并规范化 planner 模式 Args: mode: 原始模式字符串 Returns: 规范化后的模式字符串 Raises: ConfigValidationError: 模式非法 """ mode_lower = str(mode).lower().strip() valid_modes = ("rule", "llm", "hybrid") if mode_lower not in valid_modes: raise ConfigValidationError( f"无效的 planner mode: '{mode}',有效值为: {valid_modes}" ) return mode_lower def load_planner_runtime_config(config_path: str) -> PlannerRuntimeConfig: """加载 Planner 运行时配置(统一入口) 读取 YAML 文件,提取 planner 配置段, 进行校验和转换,返回可直接使用的配置对象。 Args: config_path: 配置文件路径 Returns: PlannerRuntimeConfig 对象 Raises: FileNotFoundError: 配置文件不存在 ValueError: YAML 格式错误 ConfigValidationError: 配置值非法 ImportError: PyYAML 未安装 """ # 加载并解析 YAML full_config = load_yaml_config(config_path) # 提取 planner 配置段 planner_section = load_planner_section(full_config) # 校验并加载 planner_mode planner_mode = planner_section.get("mode", "hybrid") planner_mode = validate_planner_mode(planner_mode) # 构造 PlannerConfig planner_config = build_planner_config(planner_section) # 加载其他配置项 available_tools = load_available_tools(planner_section) tool_descriptions = load_tool_descriptions(planner_section) intent_to_action = load_intent_to_action(planner_section) domain_rules = load_domain_rules(planner_section) # 构建并返回运行时配置 return PlannerRuntimeConfig( planner_mode=planner_mode, planner_config=planner_config, available_tools=available_tools, tool_descriptions=tool_descriptions, intent_to_action=intent_to_action, domain_rules=domain_rules, ) def load_planner_runtime_config_as_dict(config_path: str) -> dict[str, Any]: """加载 Planner 运行时配置(返回字典形式) 与 load_planner_runtime_config 功能相同,但返回字典格式。 兼容不需要 dataclass 的场景。 Args: config_path: 配置文件路径 Returns: 包含所有配置项的字典 """ runtime_config = load_planner_runtime_config(config_path) return { "planner_mode": runtime_config.planner_mode, "planner_config": runtime_config.planner_config, "available_tools": runtime_config.available_tools, "intent_to_action": runtime_config.intent_to_action, "domain_rules": runtime_config.domain_rules, } # ============================================================================= # 主程序入口(测试示例) # ============================================================================= if __name__ == "__main__": import json import os print("=" * 70) print("Planner 配置加载测试") print("=" * 70) # 获取当前脚本所在目录 script_dir = os.path.dirname(os.path.abspath(__file__)) config_path = os.path.join(script_dir, "planner_config.yaml") # 检查配置文件是否存在 if not os.path.exists(config_path): print(f"\n配置文件不存在: {config_path}") print("创建示例配置文件...") # 创建一个临时配置用于测试 from planner import PlannerConfig from tool_protocol import RiskLevel runtime_config = PlannerRuntimeConfig( planner_mode="hybrid", planner_config=PlannerConfig(), available_tools=["feed", "adjust_fan", "speak"], intent_to_action={"降温": "adjust_fan"}, domain_rules={ "high_risk_actions": ["turn_off"], "medium_risk_actions": ["adjust_fan"], "confirmation_rules": {}, }, ) else: # 加载配置 print(f"\n加载配置文件: {config_path}") runtime_config = load_planner_runtime_config(config_path) # ------------------------------------------------------------------------- # 打印配置信息 # ------------------------------------------------------------------------- print("\n[1] planner_mode") print("-" * 40) print(f" {runtime_config.planner_mode}") print("\n[2] PlannerConfig") print("-" * 40) pc = runtime_config.planner_config print(f" default_risk_level: {pc.default_risk_level.value}") print(f" require_confirmation_on_medium_risk: {pc.require_confirmation_on_medium_risk}") print(f" require_confirmation_on_high_risk: {pc.require_confirmation_on_high_risk}") print(f" max_plan_steps: {pc.max_plan_steps}") print(f" default_source: {pc.default_source}") print("\n[3] available_tools") print("-" * 40) tools = runtime_config.available_tools print(f" 数量: {len(tools)}") for i, tool in enumerate(tools, 1): print(f" {i}. {tool}") print("\n[4] tool_descriptions") print("-" * 40) tool_descs = runtime_config.tool_descriptions print(f" 数量: {len(tool_descs)}") for i, desc in enumerate(tool_descs, 1): print(f" {i}. name={desc.get('name')}, type={desc.get('tool_call_type')}, category={desc.get('category')}") print(f" description: {desc.get('description', '')[:50]}...") print("\n[5] intent_to_action") print("-" * 40) mapping = runtime_config.intent_to_action print(f" 数量: {len(mapping)}") for keyword, action in mapping.items(): print(f" '{keyword}' → '{action}'") print("\n[6] domain_rules") print("-" * 40) rules = runtime_config.domain_rules print(f" high_risk_actions: {rules.get('high_risk_actions', [])}") print(f" medium_risk_actions: {rules.get('medium_risk_actions', [])}") print(f" confirmation_rules:") for key, value in rules.get("confirmation_rules", {}).items(): print(f" {key}: {value}") # ------------------------------------------------------------------------- # 测试校验功能 # ------------------------------------------------------------------------- print("\n[7] 配置校验测试") print("-" * 40) # 测试无效 risk_level try: bad_section = {"default_risk_level": "invalid"} build_planner_config(bad_section) print(" ❌ 无效 risk_level 未被捕获") except ConfigValidationError as e: print(f" ✓ 无效 risk_level 正确捕获: {e}") # 测试无效 max_plan_steps try: bad_section = {"max_plan_steps": 0} build_planner_config(bad_section) print(" ❌ 无效 max_plan_steps 未被捕获") except ConfigValidationError as e: print(f" ✓ 无效 max_plan_steps 正确捕获: {e}") # 测试空的 intent key try: bad_section = {"intent_to_action": {"": "feed"}} load_intent_to_action(bad_section) print(" ❌ 空的 intent key 未被捕获") except ConfigValidationError as e: print(f" ✓ 空的 intent key 正确捕获") # ------------------------------------------------------------------------- # 完整 JSON 输出 # ------------------------------------------------------------------------- print("\n[8] 完整配置 JSON") print("-" * 40) print(json.dumps(runtime_config.to_dict(), indent=2, ensure_ascii=False)) print("\n" + "=" * 70) print("Planner 配置加载测试完成") print("=" * 70)