| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493 |
- import os
- import json
- import rclpy
- from rclpy.node import Node
- from interfaces.action import Rot
- from std_msgs.msg import String
- from utils import large_model_interface
- from rclpy.action import ActionClient
- from ament_index_python.packages import get_package_share_directory
- from utils.promot import get_prompt, get_map_mapping, set_map_mapping, set_large_model_config, set_model_paths, set_system_config, set_environment_data
- import time
- import re
- import functools
- def measure_execution_time(func):
- """
- 装饰器:测量函数执行时间并使用 ROS 日志打印结果
- """
- @functools.wraps(func)
- def wrapper(self, *args, **kwargs):
- start_time = time.time()
- result = func(self, *args, **kwargs)
- end_time = time.time()
- execution_time = end_time - start_time
-
- # 使用 ROS 日志系统记录执行时间
- if hasattr(self, 'get_logger'):
- self.get_logger().info(f"[性能统计] {func.__name__} 函数执行时间: {execution_time:.4f} 秒")
- else:
- print(f"[性能统计] {func.__name__} 函数执行时间: {execution_time:.4f} 秒")
- return result
- return wrapper
- class LargeModelService(Node):
- def __init__(self):
- super().__init__("LargeModelService")
- self.init_param_config() # 初始化参数配置 / Initialize parameter configuration
- self.init_largemodel() # 初始化大模型 / Initialize large model
- self.init_ros_comunication() # 初始化ROS通信 / Initialize ROS communication
- self.init_language() # 初始化语言/Initialize language
- self.get_logger().info(
- "LargeModelService node Initialization completed..."
- ) # 打印日志 / Print log
- def init_largemodel(self):
- # 创建模型接口客户端 / Create model interface client
- # 传入 logger 用于调试日志
- self.model_client = large_model_interface.model_interface(logger=self.get_logger())
- self.new_order_cycle = True # 新指令周期标志 / New order cycle flag
- if self.regional_setting == "China": # 如果是中国地区
- self.model_client.init_Multimodal() # 初始化执行层模型,决策层模型无需初始化 / Initialize execution layer model, decision layer model does not need initialization
- elif self.regional_setting == "international": # 如果是国际地区
- self.model_client.init_dify_client()
- else:
- while True:
- self.get_logger().info()(
- 'Please check the regional_setting parameter in yahboom.yaml file, it should be either "China" or "international".'
- )
- time.sleep(1)
- def init_param_config(self):
- self.pkg_path = get_package_share_directory("largemodel")
- self.image_save_path = os.path.join(
- self.pkg_path, "resources_file", "image.png"
- )
- # 参数声明 / Parameter declaration
- self.declare_parameter("language", "zh")
- self.declare_parameter("regional_setting", "China")
- self.declare_parameter("text_chat_mode", False)
- # 获取参数服务器参数 / Get parameters from the parameter server
- self.language = (
- self.get_parameter("language").get_parameter_value().string_value
- )
- self.regional_setting = (
- self.get_parameter("regional_setting").get_parameter_value().string_value
- )
- self.text_chat_mode = (
- self.get_parameter("text_chat_mode").get_parameter_value().bool_value
- )
- self.conversation_id = None # 会话id
- self.map_mapping = "" # 地图映射(从订阅获取)
- self.config_data = {} # 配置数据(从订阅获取)
- self.map_points = [] # 地图导航点列表(从订阅获取)
- self.environment_data = {} # 环境数据(从订阅获取)
- def init_language(self):
- self.language_dict = {
- "zh": "中文",
- "en": "English",
- }
- language_list = ["zh", "en"]
- if self.language not in language_list:
- while True:
- self.get_logger().info(
- "The language setting is incorrect. Please check the action_service'' language setting in the yahboom.yaml file"
- )
- self.get_logger().info(self.language)
- time.sleep(1)
- self.prompt_dict = { #
- "zh": { # 中文 / Chinese
- "prompt_1": "用户:{prompt},决策层AI规划:{execute_instructions}",
- "prompt_2": "机器人反馈:执行seewhat()完成",
- "prompt_3": "决策层AI规划:{execute_instructions}",
- },
- "en": { # 英文 / English
- "prompt_1": "user:{prompt},Decision making AI planning:{execute_instructions}",
- "prompt_2": "Robot feedback: Execute seewhat() completed",
- "prompt_3": "Decision making AI planning:{execute_instructions}",
- },
- }
- def init_ros_comunication(self):
- # 创建执行动作状态订阅者 / Create action status subscriber
- self.actionstatus_sub = self.create_subscription(
- String, "actionstatus", self.actionstatus_callback, 1
- )
- # 创建动作客户端,连接到 'action_service' / Create action client, connect to 'action_service'
- self._action_client = ActionClient(self, Rot, "action_service")
- # asr话题订阅者 / ASR topic subscriber
- self.asrsub = self.create_subscription(String, "asr", self.asr_callback, 1)
- # 创建seehat订阅者 / Create seewhat subscriber
- self.seewhat_sub = self.create_subscription(
- String, "seewhat_handle", self.seewhat_callback, 1
- )
- # 创建执行动作状态发布者 / Create action status publisher
- self.actionstatus_pub = self.create_publisher(String, "actionstatus", 1)
- # 创建文字交互发布者 / Create text interaction publisher
- self.text_pub = self.create_publisher(String, "text_response", 1)
- # 订阅配置节点数据 / Subscribe config node data
- self.config_sub = self.create_subscription(
- String, "/ai/config", self.config_callback, 10
- )
- self.environment_sub = None # 环境数据订阅者(需要动态更新)
- self.environment_topic = "/ai/env" # 默认值(与 environment_node publish_topic 一致)
- # 初始化时就创建环境数据订阅(使用默认 topic)
- self.update_environment_subscription()
- def seewhat_callback(self, msg):
- if msg.data == "seewhat":
- if (
- self.regional_setting == "China"
- ): # 在线模型推理方式:决策层推理+执行层监督 / Online model inference method: Decision layer reasoning + Execution layer supervision
- self.dual_large_model_mode(type="image")
- else:
- self.dual_large_model_international_model(type="image")
- def asr_callback(self, msg):
- if (
- self.regional_setting == "China"
- ): # 在线模型推理方式:决策层推理+执行层监督 / Online model inference method: Decision layer reasoning + Execution layer supervision
- self.dual_large_model_mode(type="text", prompt=msg.data)
- else:
- self.dual_large_model_international_model(type="text", prompt=msg.data)
- def actionstatus_callback(self, msg):
- if (
- msg.data == "finish"
- ): # 如果收到的是finish则表示当前指令执行完成,开启新的指令执行周期 / If "finish" is received, it means the current instruction has been executed and a new instruction cycle begins
- self.new_order_cycle = True
- self.get_logger().info(
- "The current instruction cycle has ended"
- ) # 当前指令周期已结束...
- elif msg.data == "ask_user_timeout":
- # ask_user 超时,触发空推理
- self.get_logger().warn("[多轮对话] ask_user 超时,触发空推理")
- if self.regional_setting == "China":
- self.dual_large_model_mode(type="text", prompt="ask_user_timeout")
- else:
- self.dual_large_model_international_model(type="text", prompt="ask_user_timeout")
- else: # 向指令执行层大模型反馈动作执行结果 / Feedback action execution results to the large model in the command execution layer
- if self.regional_setting == "China":
- self.dual_large_model_mode(type="text", prompt=msg.data)
- else:
- self.dual_large_model_international_model(type="text", prompt=msg.data)
- def config_callback(self, msg):
- """
- 订阅配置数据回调函数
- 从 /ai/config topic 接收配置数据
- """
- try:
- config_json = json.loads(msg.data)
- self.config_data = config_json.get('config', {})
- # 更新大模型配置
- large_model_config = self.config_data.get('large_model', {})
- if large_model_config:
- set_large_model_config(large_model_config)
- # 如果模型接口支持动态更新,则调用更新接口
- if hasattr(self.model_client, 'update_config'):
- self.model_client.update_config(large_model_config)
- # 更新模型路径
- model_paths = self.config_data.get('model_paths', {})
- if model_paths:
- set_model_paths(model_paths)
- # 更新系统配置
- system_config = self.config_data.get('system', {})
- if system_config:
- set_system_config(system_config)
- # 更新 topics 配置
- topics_config = self.config_data.get('topics', {})
- if topics_config:
- environment_node_config = topics_config.get('environment_node', {})
- if environment_node_config:
- new_topic = environment_node_config.get('environment_topic', '/ai/env')
- if new_topic != self.environment_topic or self.environment_sub is None:
- self.environment_topic = new_topic
- self.update_environment_subscription()
- self.get_logger().info(f'[配置] 环境数据订阅 Topic 已更新: {self.environment_topic}')
- except Exception as e:
- self.get_logger().warn(f'解析配置数据失败: {e}')
- def update_environment_subscription(self):
- """动态更新环境数据订阅"""
- try:
- if self.environment_sub:
- self.destroy_subscription(self.environment_sub)
- self.environment_sub = self.create_subscription(
- String, self.environment_topic, self.environment_callback, 10
- )
- self.get_logger().info(f'[配置] 已订阅环境数据 Topic: {self.environment_topic}')
- except Exception as e:
- self.get_logger().warn(f'更新环境订阅失败: {e}')
- def environment_callback(self, msg):
- """
- 订阅环境数据回调函数
- 从 /ai/environment topic 接收环境数据
- """
- try:
- env_json = json.loads(msg.data)
- self.environment_data = env_json
- # 更新环境数据缓存(供提示词使用)
- set_environment_data(env_json)
- # 更新地图映射数据
- map_data = env_json.get('map', {})
- if map_data:
- points = map_data.get('points', [])
- if points:
- # 将 points 字典转换为地图映射格式
- # 使用更清晰的格式,让大模型知道用 id 调用
- map_str = "#地图映射\n\n"
- for point in points:
- point_id = point.get('id', '')
- name = point.get('name', '')
- if point_id and name:
- map_str += f"{point_id} -> {name}\n"
- self.map_mapping = map_str
- set_map_mapping(map_str)
- self.map_points = points
- else:
- self.get_logger().warn("[环境回调] points 为空")
- else:
- self.get_logger().warn("[环境回调] map_data 为空")
- except Exception as e:
- self.get_logger().warn(f'解析环境数据失败: {e}')
- # @measure_execution_time
- def dual_large_model_mode(self, type, prompt=""):
- """
- 此函数实现了双模型推理模式,即先由文本生成模型进行任务规划,然后由多模态大模型生成动作列表
- This function implements the dual model inference mode, where the text generation model first plans the task, and then the multimodal large model generates the action list.
- """
- if (
- self.new_order_cycle
- ): # 判断是否是新任务周期 / Determine if it is a new task cycle
- # 获取完整的 prompt 并打印
- full_prompt = get_prompt()
- self.get_logger().info("=" * 80)
- self.get_logger().info("[调试] 发送给决策层大模型的完整 Prompt:")
- self.get_logger().info("=" * 80)
- self.get_logger().info(full_prompt)
- self.get_logger().info("=" * 80)
- self.get_logger().info(f"[调试] 用户输入: {prompt}")
- self.get_logger().info("=" * 80)
-
- # 判断上一轮对话指令是否完成如果完成就清空历史上下文,开启新的上下文 / Determine if the previous round of dialogue instructions are completed. If completed, clear the historical context and start a new context
- self.model_client.init_Multimodal_history(full_prompt) # 初始化执行层上下文历史 / Initialize execution layer context history
- execute_instructions = self.model_client.TaskDecision(
- prompt
- ) # 调用决策层大模型进行任务规划 / Call the decision layer large model for task planning
- if not execute_instructions == "error":
- prompt_desidon = (
- self.prompt_dict[self.language]
- .get("prompt_3")
- .format(execute_instructions=execute_instructions[1])
- ) # 翻译成对应语言的prompt /translate into the corresponding language prompt
- if self.text_chat_mode:
- msg = String(data=prompt_desidon)
- self.text_pub.publish(msg)
- else:
- self.get_logger().info(prompt_desidon) # 即将执行的任务:...
- prompt_desidon = (
- self.prompt_dict[self.language]
- .get("prompt_1")
- .format(prompt=prompt, execute_instructions=execute_instructions[1])
- ) # 翻译成对应语言的prompt /translate into the corresponding language prompt
- self.instruction_process(
- type="text",
- prompt=prompt_desidon,
- ) # 传递决策层模型规划好的执行步骤给执行层模型 / Pass the planned execution steps from the decision layer model to the execution layer model
- self.new_order_cycle = (
- False # 重置指令周期标志位 / Reset the instruction cycle flag
- )
- else:
- self.get_logger().info(
- "The model service is abnormal. Check the large model account or configuration options"
- ) # 模型推理失败,请检查模型配额和账户是否正常!!!
- else:
- self.instruction_process(
- prompt, type
- ) # 调用执行层大模型生成成动作列表并执行 / Call the execution layer large model to generate an action list and execute
- def instruction_process(self, prompt, type, conversation_id=None):
- """
- 根据输入信息的类型(文字/图片),构建不同的请求体进行推理,并返回结果)
- Based on the type of input information (text/image), construct different request bodies for inference and return the result.
- """
- prompt_seewhat = self.prompt_dict[self.language].get("prompt_2")
- if self.regional_setting == "China": # 国内版
- if type == "text":
- raw_content = self.model_client.multimodalinfer(prompt)
- elif type == "image":
- raw_content = self.model_client.multimodalinfer(
- prompt_seewhat, image_path=self.image_save_path
- )
- json_str = self.extract_json_content(raw_content)
- elif self.regional_setting == "international": # 国际版
- if type == "text":
- result = self.model_client.TaskExecution(
- input=prompt,
- map_mapping=self.map_mapping,
- language=self.language_dict[self.language],
- conversation_id=conversation_id,
- )
- if result[0]:
- json_str = self.extract_json_content(result[1])
- self.conversation_id = result[2]
- else:
- self.get_logger().info(f"ERROR:{result[1]}")
- elif type == "image":
- result = self.model_client.TaskExecution(
- input=prompt_seewhat,
- map_mapping=self.map_mapping,
- language=self.language_dict[self.language],
- image_path=self.image_save_path,
- conversation_id=conversation_id,
- )
- if result[0]:
- json_str = self.extract_json_content(result[1])
- self.conversation_id = result[2]
- else:
- self.get_logger().info(f"ERROR:{result[1]}")
- if json_str is not None:
- # 解析JSON字符串,分离"action"、"response"字段 / Parse JSON string, separate "action" and "response" fields
- action_plan_json = json.loads(json_str)
- action_list = action_plan_json.get("action", [])
- llm_response = action_plan_json.get("response", "")
- else:
- self.get_logger().info(
- f"LargeScaleModel return: {json_str},The format was unexpected. The output format of the AI model at the execution layer did not meet the requirements"
- )
- return
- if self.text_chat_mode:
- msg = String(data=f'"action": {action_list}, "response": {llm_response}')
- self.text_pub.publish(msg)
- else:
- self.get_logger().info(
- f'"action": {action_list}, "response": {llm_response}'
- )
- self.send_action_service(
- action_list, llm_response
- ) # 异步发送动作列表、回复内容给ActionServer / Asynchronously send action list and response content to ActionServer
- def dual_large_model_international_model(self, type, prompt=""):
- """
- 此函数适用于国际版双模型推理模式,使用dify作为中间件
- /this function is suitable for international model inference mode, using dify as the middleware
- """
- if (
- self.new_order_cycle
- ): # 判断是否是新任务周期 / Determine if it is a new task cycle
- self.conversation_id = None
- result = self.model_client.TaskDecision(prompt)
- if result[0]:
- prompt_desidon = (
- self.prompt_dict[self.language]
- .get("prompt_3")
- .format(execute_instructions=result[1])
- ) # 翻译成对应语言的prompt /translate into the corresponding language prompt
- if self.text_chat_mode: # 文字交互模式 / Text interaction mode
- msg = String(data=prompt_desidon)
- self.text_pub.publish(msg)
- else: # 语音交互模式 / Voice interaction mode
- self.get_logger().info(prompt_desidon)
- prompt_desion = (
- self.prompt_dict[self.language]
- .get("prompt_1")
- .format(prompt=prompt, execute_instructions=result[1])
- )
- self.instruction_process(type="text", prompt=prompt_desion)
- self.new_order_cycle = (
- False # 重置指令周期标志位 / Reset the instruction cycle flag
- )
- else:
- self.get_logger().info(
- "The model service is abnormal. Check the large model account or configuration options"
- ) # 模型推理失败,请检查模型配额和账户是否正常!!!
- else:
- self.instruction_process(
- prompt, type, conversation_id=self.conversation_id
- ) # 调用执行层大模型生成成动作列表并执行 / Call the execution layer large model to generate an action list and execute
- def send_action_service(self, actions, text):
- goal_msg = Rot.Goal() # 创建目标消息对象 / Create goal message object
- goal_msg.actions = actions # 设置目标消息中的动作列表 / Set the action list in the goal message
- goal_msg.llm_response = text
- self._send_goal_future = self._action_client.send_goal_async(goal_msg)
- # 添加目标发送后的响应回调函数 / Add response callback function after sending the goal
- self._send_goal_future.add_done_callback(self.goal_response_callback)
- def goal_response_callback(self, future):
- goal_handle = future.result() # 获取目标句柄 / Get goal handle
- if not goal_handle.accepted:
- self.get_logger().info(
- "action_client message: action service rejected action list"
- ) # 目标被拒绝...
- @staticmethod
- def extract_json_content(
- raw_content,
- ): # 解析变量提取json / Extract JSON by parsing variables
- try:
- # 方法一:分割代码块 / Method 1: Split code blocks
- if "```json" in raw_content:
- # 分割代码块并取中间部分 / Split code blocks and take the middle part
- json_str = raw_content.split("```json")[1].split("```")[0].strip()
- elif "```" in raw_content:
- # 处理没有指定类型的代码块 / Handle code blocks without specified types
- json_str = raw_content.split("```")[1].strip()
- else:
- # 直接尝试解析 / Try parsing directly
- json_str = raw_content
- # 方法二:正则表达式提取(备用方案) / Method 2: Regular expression extraction (backup plan)
- if not json_str:
- match = re.search(r"\{.*\}", raw_content, re.DOTALL)
- if match:
- json_str = match.group()
- return json_str
- except Exception as e:
- return None
- def main(args=None):
- rclpy.init(args=args)
- model_service = LargeModelService()
- rclpy.spin(model_service)
- rclpy.shutdown()
- if __name__ == "__main__":
- main()
|