| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621 |
- """
- planner_config_loader.py - Planner 配置加载模块
- 该模块负责从统一 config.yaml 中读取 Planner 相关配置,
- 并将其转换为 planner.py 可直接使用的配置对象和运行参数。
- 职责边界:
- - 读取、校验、转换配置
- - 不做规划逻辑
- - 不做 ROS 通信
- - 不包含场景硬编码
- 使用方式:
- from planner_config_loader import load_planner_runtime_config
- runtime_config = load_planner_runtime_config("planner_config.yaml")
- planner = Planner(runtime_config["planner_config"])
- plan = planner.generate_plan(
- world_snapshot={...},
- user_intent="降温",
- available_tools=runtime_config["available_tools"],
- domain_rules=runtime_config["domain_rules"],
- planner_mode=runtime_config["planner_mode"],
- )
- """
- from __future__ import annotations
- import os
- from dataclasses import dataclass, field
- from typing import Any
- try:
- import yaml
- YAML_AVAILABLE = True
- except ImportError:
- YAML_AVAILABLE = False
- from planner import PlannerConfig
- from tool_protocol import RiskLevel
- # =============================================================================
- # 配置校验异常
- # =============================================================================
- class ConfigValidationError(ValueError):
- """配置校验异常"""
- pass
- # =============================================================================
- # 运行时配置数据类
- # =============================================================================
- @dataclass
- class PlannerRuntimeConfig:
- """Planner 运行时配置
-
- 聚合所有 Planner 运行所需的配置项。
-
- 属性:
- planner_mode: 规划模式
- planner_config: PlannerConfig 对象
- available_tools: 可用能力列表
- tool_descriptions: 工具语义描述列表
- intent_to_action: 意图到 action 的映射
- domain_rules: 领域规则
-
- 示例:
- >>> config = PlannerRuntimeConfig(
- ... planner_mode="hybrid",
- ... planner_config=PlannerConfig(),
- ... available_tools=["feed", "speak"],
- ... tool_descriptions=[{"name": "feed", "description": "..."}],
- ... intent_to_action={"降温": "adjust_fan"},
- ... domain_rules={"high_risk_actions": []},
- ... )
- """
-
- planner_mode: str = "hybrid"
- planner_config: PlannerConfig = field(default_factory=PlannerConfig)
- available_tools: list[str] = field(default_factory=list)
- tool_descriptions: list[dict[str, Any]] = field(default_factory=list)
- intent_to_action: dict[str, str] = field(default_factory=dict)
- domain_rules: dict[str, Any] = field(default_factory=dict)
-
- def to_dict(self) -> dict[str, Any]:
- """转换为字典
-
- Returns:
- 包含所有配置项的字典
- """
- return {
- "planner_mode": self.planner_mode,
- "planner_config": {
- "default_risk_level": self.planner_config.default_risk_level.value,
- "require_confirmation_on_medium_risk": self.planner_config.require_confirmation_on_medium_risk,
- "require_confirmation_on_high_risk": self.planner_config.require_confirmation_on_high_risk,
- "max_plan_steps": self.planner_config.max_plan_steps,
- "default_source": self.planner_config.default_source,
- },
- "available_tools": self.available_tools,
- "tool_descriptions": self.tool_descriptions,
- "intent_to_action": self.intent_to_action,
- "domain_rules": self.domain_rules,
- }
- # =============================================================================
- # 配置加载函数
- # =============================================================================
- def load_yaml_config(config_path: str) -> dict[str, Any]:
- """加载 YAML 配置文件
-
- Args:
- config_path: 配置文件路径
-
- Returns:
- 解析后的配置字典
-
- Raises:
- FileNotFoundError: 配置文件不存在
- ValueError: YAML 格式错误
- ImportError: PyYAML 未安装
- """
- if not os.path.exists(config_path):
- raise FileNotFoundError(f"配置文件不存在: {config_path}")
-
- if not YAML_AVAILABLE:
- raise ImportError(
- "PyYAML 未安装,请运行: pip install pyyaml"
- )
-
- try:
- with open(config_path, "r", encoding="utf-8") as f:
- config = yaml.safe_load(f)
- except yaml.YAMLError as e:
- raise ValueError(f"YAML 格式错误 in {config_path}: {e}")
-
- if config is None:
- return {}
-
- return config
- def load_planner_section(config: dict[str, Any]) -> dict[str, Any]:
- """提取 planner 配置段
-
- Args:
- config: 完整配置字典
-
- Returns:
- planner 配置段,缺省时返回空字典
- """
- return config.get("planner", {})
- def build_planner_config(planner_section: dict[str, Any]) -> PlannerConfig:
- """构造 PlannerConfig 对象
-
- Args:
- planner_section: planner 配置段
-
- Returns:
- PlannerConfig 对象
-
- Raises:
- ConfigValidationError: 配置值非法
- """
- # 获取各字段,缺省时使用默认值
- risk_level_str = planner_section.get("default_risk_level", "low")
- require_medium = planner_section.get("require_confirmation_on_medium_risk", True)
- require_high = planner_section.get("require_confirmation_on_high_risk", True)
- max_steps = planner_section.get("max_plan_steps", 10)
- default_source = planner_section.get("default_source", "hybrid")
-
- # 校验并转换 risk_level
- risk_level_str = str(risk_level_str).lower()
- if risk_level_str not in ("low", "medium", "high"):
- raise ConfigValidationError(
- f"无效的 default_risk_level: '{risk_level_str}',有效值为: low/medium/high"
- )
-
- risk_level = RiskLevel(risk_level_str)
-
- # 校验 max_plan_steps
- if not isinstance(max_steps, int) or max_steps <= 0:
- raise ConfigValidationError(
- f"无效的 max_plan_steps: {max_steps},必须为大于 0 的整数"
- )
-
- # 校验 default_source
- valid_sources = ("rule_engine", "llm", "hybrid")
- if default_source not in valid_sources:
- raise ConfigValidationError(
- f"无效的 default_source: '{default_source}',有效值为: {valid_sources}"
- )
-
- return PlannerConfig(
- default_risk_level=risk_level,
- require_confirmation_on_medium_risk=require_medium,
- require_confirmation_on_high_risk=require_high,
- max_plan_steps=max_steps,
- default_source=default_source,
- )
- def load_available_tools(planner_section: dict[str, Any]) -> list[str]:
- """加载可用能力列表
-
- Args:
- planner_section: planner 配置段
-
- Returns:
- 去重后的工具列表,缺省时返回空列表
- """
- raw_tools = planner_section.get("available_tools", [])
-
- if not isinstance(raw_tools, list):
- return []
-
- # 去重,保持顺序
- seen = set()
- tools = []
- for tool in raw_tools:
- if isinstance(tool, str) and tool and tool not in seen:
- seen.add(tool)
- tools.append(tool)
-
- return tools
- def load_tool_descriptions(planner_section: dict[str, Any]) -> list[dict[str, Any]]:
- """加载工具语义描述列表
-
- Args:
- planner_section: planner 配置段
-
- Returns:
- 工具描述列表,每项包含 name, description, tool_call_type, category
- 缺失时返回空列表
- 配置非法时过滤无效项并打印 warning
- """
- raw_descriptions = planner_section.get("tool_descriptions", [])
-
- if not isinstance(raw_descriptions, list):
- print(f"[PlannerConfigLoader] Warning: tool_descriptions 应为列表类型,忽略")
- return []
-
- valid_descriptions = []
- for i, item in enumerate(raw_descriptions):
- if not isinstance(item, dict):
- print(f"[PlannerConfigLoader] Warning: tool_descriptions[{i}] 应为字典,忽略")
- continue
-
- # 必须包含 name 和 description
- name = item.get("name")
- description = item.get("description")
-
- if not name or not isinstance(name, str):
- print(f"[PlannerConfigLoader] Warning: tool_descriptions[{i}] 缺少有效 name 字段,忽略")
- continue
-
- if not description or not isinstance(description, str):
- print(f"[PlannerConfigLoader] Warning: tool_descriptions[{i}] 缺少有效 description 字段,忽略")
- continue
-
- # 构建有效描述,补充默认值
- valid_item = {
- "name": str(name).strip(),
- "description": str(description).strip(),
- "tool_call_type": str(item.get("tool_call_type", "execute")).strip(),
- "category": str(item.get("category", "action")).strip(),
- }
- valid_descriptions.append(valid_item)
-
- if len(valid_descriptions) < len(raw_descriptions):
- print(f"[PlannerConfigLoader] Warning: 过滤了 {len(raw_descriptions) - len(valid_descriptions)} 个无效 tool_descriptions 项")
-
- return valid_descriptions
- def load_intent_to_action(planner_section: dict[str, Any]) -> dict[str, str]:
- """加载意图到 action 的映射
-
- Args:
- planner_section: planner 配置段
-
- Returns:
- 关键词到 action 的映射字典
-
- Raises:
- ConfigValidationError: key 或 value 为空
- """
- raw_mapping = planner_section.get("intent_to_action", {})
-
- if not isinstance(raw_mapping, dict):
- return {}
-
- result = {}
- for key, value in raw_mapping.items():
- key_str = str(key).strip()
- value_str = str(value).strip()
-
- if not key_str:
- raise ConfigValidationError("intent_to_action 的 key 不能为空字符串")
- if not value_str:
- raise ConfigValidationError(
- f"intent_to_action 的 value 不能为空字符串,key='{key_str}'"
- )
-
- result[key_str] = value_str
-
- return result
- def load_high_risk_actions(planner_section: dict[str, Any]) -> list[str]:
- """加载高风险动作列表
-
- Args:
- planner_section: planner 配置段
-
- Returns:
- 高风险动作列表
- """
- raw_list = planner_section.get("high_risk_actions", [])
-
- if not isinstance(raw_list, list):
- return []
-
- return [str(item) for item in raw_list if item]
- def load_medium_risk_actions(planner_section: dict[str, Any]) -> list[str]:
- """加载中风险动作列表
-
- Args:
- planner_section: planner 配置段
-
- Returns:
- 中风险动作列表
- """
- raw_list = planner_section.get("medium_risk_actions", [])
-
- if not isinstance(raw_list, list):
- return []
-
- return [str(item) for item in raw_list if item]
- def load_confirmation_rules(planner_section: dict[str, Any]) -> dict[str, bool]:
- """加载确认规则配置
-
- Args:
- planner_section: planner 配置段
-
- Returns:
- 确认规则字典
- """
- raw_rules = planner_section.get("confirmation_rules", {})
-
- if not isinstance(raw_rules, dict):
- raw_rules = {}
-
- return {
- "repeated_action_requires_confirmation": bool(
- raw_rules.get("repeated_action_requires_confirmation", True)
- ),
- "unavailable_actuator_requires_confirmation": bool(
- raw_rules.get("unavailable_actuator_requires_confirmation", True)
- ),
- "high_risk_requires_confirmation": bool(
- raw_rules.get("high_risk_requires_confirmation", True)
- ),
- "medium_risk_requires_confirmation": bool(
- raw_rules.get("medium_risk_requires_confirmation", True)
- ),
- }
- def load_domain_rules(planner_section: dict[str, Any]) -> dict[str, Any]:
- """加载领域规则
-
- Args:
- planner_section: planner 配置段
-
- Returns:
- 领域规则字典
- """
- return {
- "high_risk_actions": load_high_risk_actions(planner_section),
- "medium_risk_actions": load_medium_risk_actions(planner_section),
- "confirmation_rules": load_confirmation_rules(planner_section),
- }
- def validate_planner_mode(mode: str) -> str:
- """校验并规范化 planner 模式
-
- Args:
- mode: 原始模式字符串
-
- Returns:
- 规范化后的模式字符串
-
- Raises:
- ConfigValidationError: 模式非法
- """
- mode_lower = str(mode).lower().strip()
- valid_modes = ("rule", "llm", "hybrid")
-
- if mode_lower not in valid_modes:
- raise ConfigValidationError(
- f"无效的 planner mode: '{mode}',有效值为: {valid_modes}"
- )
-
- return mode_lower
- def load_planner_runtime_config(config_path: str) -> PlannerRuntimeConfig:
- """加载 Planner 运行时配置(统一入口)
-
- 读取 YAML 文件,提取 planner 配置段,
- 进行校验和转换,返回可直接使用的配置对象。
-
- Args:
- config_path: 配置文件路径
-
- Returns:
- PlannerRuntimeConfig 对象
-
- Raises:
- FileNotFoundError: 配置文件不存在
- ValueError: YAML 格式错误
- ConfigValidationError: 配置值非法
- ImportError: PyYAML 未安装
- """
- # 加载并解析 YAML
- full_config = load_yaml_config(config_path)
-
- # 提取 planner 配置段
- planner_section = load_planner_section(full_config)
-
- # 校验并加载 planner_mode
- planner_mode = planner_section.get("mode", "hybrid")
- planner_mode = validate_planner_mode(planner_mode)
-
- # 构造 PlannerConfig
- planner_config = build_planner_config(planner_section)
-
- # 加载其他配置项
- available_tools = load_available_tools(planner_section)
- tool_descriptions = load_tool_descriptions(planner_section)
- intent_to_action = load_intent_to_action(planner_section)
- domain_rules = load_domain_rules(planner_section)
-
- # 构建并返回运行时配置
- return PlannerRuntimeConfig(
- planner_mode=planner_mode,
- planner_config=planner_config,
- available_tools=available_tools,
- tool_descriptions=tool_descriptions,
- intent_to_action=intent_to_action,
- domain_rules=domain_rules,
- )
- def load_planner_runtime_config_as_dict(config_path: str) -> dict[str, Any]:
- """加载 Planner 运行时配置(返回字典形式)
-
- 与 load_planner_runtime_config 功能相同,但返回字典格式。
- 兼容不需要 dataclass 的场景。
-
- Args:
- config_path: 配置文件路径
-
- Returns:
- 包含所有配置项的字典
- """
- runtime_config = load_planner_runtime_config(config_path)
-
- return {
- "planner_mode": runtime_config.planner_mode,
- "planner_config": runtime_config.planner_config,
- "available_tools": runtime_config.available_tools,
- "intent_to_action": runtime_config.intent_to_action,
- "domain_rules": runtime_config.domain_rules,
- }
- # =============================================================================
- # 主程序入口(测试示例)
- # =============================================================================
- if __name__ == "__main__":
- import json
- import os
-
- print("=" * 70)
- print("Planner 配置加载测试")
- print("=" * 70)
-
- # 获取当前脚本所在目录
- script_dir = os.path.dirname(os.path.abspath(__file__))
- config_path = os.path.join(script_dir, "planner_config.yaml")
-
- # 检查配置文件是否存在
- if not os.path.exists(config_path):
- print(f"\n配置文件不存在: {config_path}")
- print("创建示例配置文件...")
-
- # 创建一个临时配置用于测试
- from planner import PlannerConfig
- from tool_protocol import RiskLevel
-
- runtime_config = PlannerRuntimeConfig(
- planner_mode="hybrid",
- planner_config=PlannerConfig(),
- available_tools=["feed", "adjust_fan", "speak"],
- intent_to_action={"降温": "adjust_fan"},
- domain_rules={
- "high_risk_actions": ["turn_off"],
- "medium_risk_actions": ["adjust_fan"],
- "confirmation_rules": {},
- },
- )
- else:
- # 加载配置
- print(f"\n加载配置文件: {config_path}")
- runtime_config = load_planner_runtime_config(config_path)
-
- # -------------------------------------------------------------------------
- # 打印配置信息
- # -------------------------------------------------------------------------
- print("\n[1] planner_mode")
- print("-" * 40)
- print(f" {runtime_config.planner_mode}")
-
- print("\n[2] PlannerConfig")
- print("-" * 40)
- pc = runtime_config.planner_config
- print(f" default_risk_level: {pc.default_risk_level.value}")
- print(f" require_confirmation_on_medium_risk: {pc.require_confirmation_on_medium_risk}")
- print(f" require_confirmation_on_high_risk: {pc.require_confirmation_on_high_risk}")
- print(f" max_plan_steps: {pc.max_plan_steps}")
- print(f" default_source: {pc.default_source}")
-
- print("\n[3] available_tools")
- print("-" * 40)
- tools = runtime_config.available_tools
- print(f" 数量: {len(tools)}")
- for i, tool in enumerate(tools, 1):
- print(f" {i}. {tool}")
-
- print("\n[4] tool_descriptions")
- print("-" * 40)
- tool_descs = runtime_config.tool_descriptions
- print(f" 数量: {len(tool_descs)}")
- for i, desc in enumerate(tool_descs, 1):
- print(f" {i}. name={desc.get('name')}, type={desc.get('tool_call_type')}, category={desc.get('category')}")
- print(f" description: {desc.get('description', '')[:50]}...")
-
- print("\n[5] intent_to_action")
- print("-" * 40)
- mapping = runtime_config.intent_to_action
- print(f" 数量: {len(mapping)}")
- for keyword, action in mapping.items():
- print(f" '{keyword}' → '{action}'")
-
- print("\n[6] domain_rules")
- print("-" * 40)
- rules = runtime_config.domain_rules
- print(f" high_risk_actions: {rules.get('high_risk_actions', [])}")
- print(f" medium_risk_actions: {rules.get('medium_risk_actions', [])}")
- print(f" confirmation_rules:")
- for key, value in rules.get("confirmation_rules", {}).items():
- print(f" {key}: {value}")
-
- # -------------------------------------------------------------------------
- # 测试校验功能
- # -------------------------------------------------------------------------
- print("\n[7] 配置校验测试")
- print("-" * 40)
-
- # 测试无效 risk_level
- try:
- bad_section = {"default_risk_level": "invalid"}
- build_planner_config(bad_section)
- print(" ❌ 无效 risk_level 未被捕获")
- except ConfigValidationError as e:
- print(f" ✓ 无效 risk_level 正确捕获: {e}")
-
- # 测试无效 max_plan_steps
- try:
- bad_section = {"max_plan_steps": 0}
- build_planner_config(bad_section)
- print(" ❌ 无效 max_plan_steps 未被捕获")
- except ConfigValidationError as e:
- print(f" ✓ 无效 max_plan_steps 正确捕获: {e}")
-
- # 测试空的 intent key
- try:
- bad_section = {"intent_to_action": {"": "feed"}}
- load_intent_to_action(bad_section)
- print(" ❌ 空的 intent key 未被捕获")
- except ConfigValidationError as e:
- print(f" ✓ 空的 intent key 正确捕获")
-
- # -------------------------------------------------------------------------
- # 完整 JSON 输出
- # -------------------------------------------------------------------------
- print("\n[8] 完整配置 JSON")
- print("-" * 40)
- print(json.dumps(runtime_config.to_dict(), indent=2, ensure_ascii=False))
-
- print("\n" + "=" * 70)
- print("Planner 配置加载测试完成")
- print("=" * 70)
|