#!/usr/bin/env python3 """ agint_brain.planner.launch - Planner Node 启动文件 功能: - 启动 planner_node - 加载 planner_config.yaml - 配置 ROS2 参数 - 输出到 screen 使用方式: ros2 launch agint_brain planner.launch.py 可选参数: planner_config_path: 配置文件路径 input_world_topic: 世界状态 topic input_user_intent_topic: 用户意图 topic output_plan_topic: Plan 输出 topic llm_request_topic: LLM 请求 topic llm_response_topic: LLM 响应 topic use_llm: 是否启用 LLM llm_timeout_sec: LLM 超时时间 debug_log: 是否输出调试日志 """ import os from pathlib import Path from ament_index_python.packages import get_package_share_directory from launch import LaunchDescription from launch.actions import DeclareLaunchArgument, LogInfo, RegisterEventHandler from launch.event_handlers import OnProcessExit from launch.substitutions import LaunchConfiguration, PathJoinSubstitution from launch_ros.actions import Node def generate_launch_description() -> LaunchDescription: """生成 launch description""" # 获取包共享目录 pkg_share = get_package_share_directory("agint_brain") # 默认配置文件路径 default_config_path = PathJoinSubstitution([ pkg_share, "config", "planner_config.yaml" ]) # ========================================================================= # Launch Arguments # ========================================================================= planner_config_path_arg = DeclareLaunchArgument( "planner_config_path", default_value=default_config_path, description="planner_config.yaml 配置文件路径", ) gate_config_path_arg = DeclareLaunchArgument( "gate_config_path", default_value=PathJoinSubstitution([ pkg_share, "config", "planner_gate_config.yaml" ]), description="planner_gate_config.yaml 配置文件路径(Gate 前置治理配置)", ) input_world_topic_arg = DeclareLaunchArgument( "input_world_topic", default_value="/world/snapshot", description="World snapshot 输入 topic", ) input_user_intent_topic_arg = DeclareLaunchArgument( "input_user_intent_topic", default_value="/planner/user_intent", description="用户意图输入 topic(已废弃,使用 input_asr_topic)", ) input_asr_topic_arg = DeclareLaunchArgument( "input_asr_topic", default_value="/asr", description="ASR 语音识别输入 topic", ) output_plan_topic_arg = DeclareLaunchArgument( "output_plan_topic", default_value="/plan", description="Plan 输出 topic", ) llm_request_topic_arg = DeclareLaunchArgument( "llm_request_topic", default_value="/planner/llm_request", description="LLM 请求 topic", ) llm_response_topic_arg = DeclareLaunchArgument( "llm_response_topic", default_value="/planner/llm_response", description="LLM 响应 topic", ) use_llm_arg = DeclareLaunchArgument( "use_llm", default_value="true", description="是否启用 LLM", ) llm_timeout_sec_arg = DeclareLaunchArgument( "llm_timeout_sec", default_value="10.0", description="LLM 请求超时时间(秒)", ) debug_log_arg = DeclareLaunchArgument( "debug_log", default_value="true", description="是否输出调试日志", ) default_session_id_arg = DeclareLaunchArgument( "default_session_id", default_value="default", description="默认会话 ID", ) # ========================================================================= # Planner Node # ========================================================================= planner_node = Node( package="agint_brain", executable="planner_node", name="planner_node", output="screen", emulate_tty=True, parameters=[ { "planner_config_path": LaunchConfiguration("planner_config_path"), "gate_config_path": LaunchConfiguration("gate_config_path"), "input_world_topic": LaunchConfiguration("input_world_topic"), "input_asr_topic": LaunchConfiguration("input_asr_topic"), "output_plan_topic": LaunchConfiguration("output_plan_topic"), "llm_request_topic": LaunchConfiguration("llm_request_topic"), "llm_response_topic": LaunchConfiguration("llm_response_topic"), "use_llm": LaunchConfiguration("use_llm"), "llm_timeout_sec": LaunchConfiguration("llm_timeout_sec"), "debug_log": LaunchConfiguration("debug_log"), "default_session_id": LaunchConfiguration("default_session_id", default="default"), } ], ) # ========================================================================= # Event Handlers # ========================================================================= exit_event = RegisterEventHandler( event_handler=OnProcessExit( target_action=planner_node, on_exit=[LogInfo(msg="Planner Node 已退出")], ) ) # ========================================================================= # Launch Description # ========================================================================= ld = LaunchDescription([ LogInfo(msg="========================================"), LogInfo(msg="正在启动 Planner Node (含 Gate 前置治理层)..."), LogInfo(msg="========================================"), planner_config_path_arg, gate_config_path_arg, input_world_topic_arg, input_asr_topic_arg, output_plan_topic_arg, llm_request_topic_arg, llm_response_topic_arg, use_llm_arg, llm_timeout_sec_arg, debug_log_arg, default_session_id_arg, planner_node, exit_event, ]) return ld