| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363 |
- #!/usr/bin/env python3
- """
- 环境节点 (Environment Node)
- 统一发布机器人环境相关数据,供其他节点订阅使用
- 功能:
- 1. 订阅 config_node 获取配置
- 2. 发布电池数据、温度数据、天气数据、地图导航点
- 3. 统一发布到 /ai/environment topic
- Author: sunrise
- """
- import rclpy
- from rclpy.node import Node
- from std_msgs.msg import String
- import json
- import time
- from datetime import datetime
- from threading import Thread, Event
- class EnvironmentNode(Node):
- """
- 环境节点
- 订阅 config_node 配置,统一发布环境数据到 /ai/environment topic
- """
- def __init__(self):
- super().__init__('environment_node')
- # ========== 参数声明 ==========
- self.declare_parameter('config_topic', '/ai/config')
- self.declare_parameter('environment_topic', '/ai/env')
- self.declare_parameter('use_mock_data', True)
- # 获取参数
- self.config_topic = self.get_parameter('config_topic').value
- self.environment_topic = self.get_parameter('environment_topic').value
- self.use_mock_data = self.get_parameter('use_mock_data').value
- # ========== 配置数据 (从 config_node 获取) ==========
- self.config_data = None
- self.environment_config = {}
- # ========== 环境数据状态 (模拟数据,由节点自己生成) ==========
- self.battery_data = {}
- self.temperature_data = {}
- self.weather_data = {}
- self.map_data = {}
- # ========== 发布频率 (秒) - 等待配置更新 ==========
- self.intervals = {
- 'battery': 5,
- 'temperature': 30,
- 'weather': 3600,
- 'map': 60
- }
- # ========== 定时器状态 ==========
- self.publish_timers = {}
- self.last_publish_time = {
- 'battery': 0,
- 'temperature': 0,
- 'weather': 0,
- 'map': 0
- }
- # ========== 线程和事件 ==========
- self.stop_event = Event()
- # ========== 初始化 ==========
- self.init_subscriber() # 订阅 config_node
- self.init_publisher() # 发布环境数据
- self.init_default_data() # 初始化默认数据(生成模拟数据)
- self.get_logger().info('=' * 60)
- self.get_logger().info('Environment Node 启动成功')
- self.get_logger().info(f'订阅配置 Topic: {self.config_topic}')
- self.get_logger().info(f'发布环境 Topic: {self.environment_topic}')
- self.get_logger().info('等待 config_node 配置...')
- self.get_logger().info('=' * 60)
- def init_subscriber(self):
- """初始化订阅者 - 订阅 config_node 配置"""
- self.config_subscriber = self.create_subscription(
- String,
- self.config_topic,
- self.config_callback,
- 10
- )
- self.get_logger().info(f'已订阅配置 Topic: {self.config_topic}')
- def init_publisher(self):
- """初始化发布者 - 发布环境数据"""
- self.environment_publisher = self.create_publisher(
- String,
- self.environment_topic,
- 10
- )
- self.get_logger().info(f'已创建发布者: {self.environment_topic}')
- # 添加统一发布定时器:每秒发布一次最新数据
- self.unified_publish_timer = self.create_timer(
- 1.0,
- self.publish_latest_data
- )
- self.get_logger().info('已创建统一发布定时器 (1秒)')
- def publish_latest_data(self):
- """每秒发布一次最新环境数据"""
- env_data = {
- "timestamp": datetime.now().strftime('%Y-%m-%dT%H:%M:%S'),
- "battery": self.battery_data,
- "temperature": self.temperature_data,
- "weather": self.weather_data,
- "map": self.map_data
- }
- try:
- json_str = json.dumps(env_data, ensure_ascii=False, indent=2)
- msg = String()
- msg.data = json_str
- self.environment_publisher.publish(msg)
- except Exception as e:
- self.get_logger().error(f'发布环境数据失败: {e}')
- def init_default_data(self):
- """初始化默认环境模拟数据"""
- self.generate_mock_battery()
- self.generate_mock_temperature()
- self.generate_mock_weather()
- self.generate_mock_map()
- self.get_logger().info('模拟数据生成完成')
- def generate_mock_battery(self):
- """生成电池模拟数据"""
- self.battery_data = {
- "level": 85,
- "voltage": 12.6,
- "is_charging": False,
- "capacity_wh": 5000,
- "current_ma": -500
- }
- def generate_mock_temperature(self):
- """生成温度模拟数据"""
- self.temperature_data = {
- "indoor": 25.5,
- "outdoor": 28.0,
- "unit": "celsius",
- "humidity": 60
- }
- def generate_mock_weather(self):
- """生成天气模拟数据"""
- self.weather_data = {
- "condition": "sunny",
- "description": "晴",
- "temperature": 28,
- "humidity": 65,
- "wind_speed": 3.5
- }
- def generate_mock_map(self):
- """生成地图导航点模拟数据"""
- self.map_data = {
- "points": [
- {"id": "A", "name": "办公室", "position": {"x": 1.633, "y": 3.490, "z": 0.0}},
- {"id": "B", "name": "酒店大堂", "position": {"x": 2.436, "y": -0.574, "z": 0.0}},
- {"id": "C", "name": "园区", "position": {"x": 0.024, "y": -1.820, "z": 0.0}}
- ]
- }
- def config_callback(self, msg: String):
- """配置订阅回调 - 解析 config_node 发布的配置"""
- try:
- config = json.loads(msg.data)
- self.get_logger().debug(f'收到配置: version={config.get("version")}')
- # 提取 environment 配置
- if 'config' in config and 'environment' in config['config']:
- self.environment_config = config['config']['environment']
- self.parse_environment_config()
- except json.JSONDecodeError as e:
- self.get_logger().error(f'配置解析失败: {e}')
- except Exception as e:
- self.get_logger().error(f'配置处理异常: {e}')
- def parse_environment_config(self):
- """解析 environment 配置并启动定时器"""
- if not self.environment_config:
- return
- self.get_logger().info('解析环境配置...')
- # 更新发布间隔
- intervals = self.environment_config.get('intervals', {})
- self.intervals['battery'] = intervals.get('battery_seconds', 5)
- self.intervals['temperature'] = intervals.get('temperature_seconds', 30)
- self.intervals['weather'] = intervals.get('weather_seconds', 3600)
- self.intervals['map'] = intervals.get('map_seconds', 60)
- # 更新 topic 名称
- topic = self.environment_config.get('publish_topic')
- if topic and topic != self.environment_topic:
- self.environment_topic = topic
- # 重新创建发布者
- self.environment_publisher = self.create_publisher(
- String,
- self.environment_topic,
- 10
- )
- self.get_logger().info(f'环境数据发布 Topic 已更新: {self.environment_topic}')
- # 重新生成模拟数据
- self.generate_mock_battery()
- self.generate_mock_temperature()
- self.generate_mock_weather()
- self.generate_mock_map()
- # 启动定时器
- self.start_timers()
- self.get_logger().info('环境配置解析完成,定时器已启动')
- def start_timers(self):
- """启动各个数据类型的定时发布"""
- # 电池数据定时器
- if 'battery' not in self.publish_timers:
- self.publish_timers['battery'] = self.create_timer(
- self.intervals['battery'],
- lambda: self.publish_data('battery')
- )
- # 温度数据定时器
- if 'temperature' not in self.publish_timers:
- self.publish_timers['temperature'] = self.create_timer(
- self.intervals['temperature'],
- lambda: self.publish_data('temperature')
- )
- # 天气数据定时器
- if 'weather' not in self.publish_timers:
- self.publish_timers['weather'] = self.create_timer(
- self.intervals['weather'],
- lambda: self.publish_data('weather')
- )
- # 地图数据定时器
- if 'map' not in self.publish_timers:
- self.publish_timers['map'] = self.create_timer(
- self.intervals['map'],
- lambda: self.publish_data('map')
- )
- def publish_data(self, data_type: str):
- """发布指定类型的数据"""
- current_time = time.time()
- # 检查是否到发布时间
- if current_time - self.last_publish_time.get(data_type, 0) < self.intervals.get(data_type, 5):
- return
- self.last_publish_time[data_type] = current_time
- # 构建环境数据
- env_data = {
- "timestamp": datetime.now().strftime('%Y-%m-%dT%H:%M:%S'),
- "battery": self.battery_data if data_type == 'battery' else self.get_latest_data('battery'),
- "temperature": self.temperature_data if data_type == 'temperature' else self.get_latest_data('temperature'),
- "weather": self.weather_data if data_type == 'weather' else self.get_latest_data('weather'),
- "map": self.map_data if data_type == 'map' else self.get_latest_data('map')
- }
- # 发布数据
- try:
- json_str = json.dumps(env_data, ensure_ascii=False, indent=2)
- msg = String()
- msg.data = json_str
- self.environment_publisher.publish(msg)
- self.get_logger().debug(f'发布 {data_type} 数据成功')
- except Exception as e:
- self.get_logger().error(f'发布 {data_type} 数据失败: {e}')
- def get_latest_data(self, data_type: str):
- """获取最新数据"""
- if data_type == 'battery':
- return self.battery_data
- elif data_type == 'temperature':
- return self.temperature_data
- elif data_type == 'weather':
- return self.weather_data
- elif data_type == 'map':
- return self.map_data
- return {}
- def publish_all(self):
- """立即发布所有环境数据"""
- env_data = {
- "timestamp": datetime.now().strftime('%Y-%m-%dT%H:%M:%S'),
- "battery": self.battery_data,
- "temperature": self.temperature_data,
- "weather": self.weather_data,
- "map": self.map_data
- }
- try:
- json_str = json.dumps(env_data, ensure_ascii=False, indent=2)
- msg = String()
- msg.data = json_str
- self.environment_publisher.publish(msg)
- self.get_logger().info('全量环境数据发布成功')
- except Exception as e:
- self.get_logger().error(f'全量环境数据发布失败: {e}')
- def get_current_environment(self):
- """获取当前完整环境数据 (供其他节点调用)"""
- return {
- "timestamp": datetime.now().strftime('%Y-%m-%dT%H:%M:%S'),
- "battery": self.battery_data,
- "temperature": self.temperature_data,
- "weather": self.weather_data,
- "map": self.map_data
- }
- def shutdown(self):
- """关闭节点"""
- self.get_logger().info('关闭 Environment Node...')
- self.stop_event.set()
- # 取消统一发布定时器
- if hasattr(self, 'unified_publish_timer'):
- self.unified_publish_timer.cancel()
- # 取消所有定时器
- for timer in self.publish_timers.values():
- timer.cancel()
- self.get_logger().info('Environment Node 已关闭')
- def main(args=None):
- rclpy.init(args=args)
- node = EnvironmentNode()
- try:
- rclpy.spin(node)
- except KeyboardInterrupt:
- pass
- finally:
- node.shutdown()
- node.destroy_node()
- rclpy.shutdown()
- if __name__ == '__main__':
- main()
|