environment_node.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. #!/usr/bin/env python3
  2. """
  3. 环境节点 (Environment Node)
  4. 统一发布机器人环境相关数据,供其他节点订阅使用
  5. 功能:
  6. 1. 订阅 config_node 获取配置
  7. 2. 发布电池数据、温度数据、天气数据、地图导航点
  8. 3. 统一发布到 /ai/environment topic
  9. Author: sunrise
  10. """
  11. import rclpy
  12. from rclpy.node import Node
  13. from std_msgs.msg import String
  14. import json
  15. import time
  16. from datetime import datetime
  17. from threading import Thread, Event
  18. class EnvironmentNode(Node):
  19. """
  20. 环境节点
  21. 订阅 config_node 配置,统一发布环境数据到 /ai/environment topic
  22. """
  23. def __init__(self):
  24. super().__init__('environment_node')
  25. # ========== 参数声明 ==========
  26. self.declare_parameter('config_topic', '/ai/config')
  27. self.declare_parameter('environment_topic', '/ai/env')
  28. self.declare_parameter('use_mock_data', True)
  29. # 获取参数
  30. self.config_topic = self.get_parameter('config_topic').value
  31. self.environment_topic = self.get_parameter('environment_topic').value
  32. self.use_mock_data = self.get_parameter('use_mock_data').value
  33. # ========== 配置数据 (从 config_node 获取) ==========
  34. self.config_data = None
  35. self.environment_config = {}
  36. # ========== 环境数据状态 (模拟数据,由节点自己生成) ==========
  37. self.battery_data = {}
  38. self.temperature_data = {}
  39. self.weather_data = {}
  40. self.map_data = {}
  41. # ========== 发布频率 (秒) - 等待配置更新 ==========
  42. self.intervals = {
  43. 'battery': 5,
  44. 'temperature': 30,
  45. 'weather': 3600,
  46. 'map': 60
  47. }
  48. # ========== 定时器状态 ==========
  49. self.publish_timers = {}
  50. self.last_publish_time = {
  51. 'battery': 0,
  52. 'temperature': 0,
  53. 'weather': 0,
  54. 'map': 0
  55. }
  56. # ========== 线程和事件 ==========
  57. self.stop_event = Event()
  58. # ========== 初始化 ==========
  59. self.init_subscriber() # 订阅 config_node
  60. self.init_publisher() # 发布环境数据
  61. self.init_default_data() # 初始化默认数据(生成模拟数据)
  62. self.get_logger().info('=' * 60)
  63. self.get_logger().info('Environment Node 启动成功')
  64. self.get_logger().info(f'订阅配置 Topic: {self.config_topic}')
  65. self.get_logger().info(f'发布环境 Topic: {self.environment_topic}')
  66. self.get_logger().info('等待 config_node 配置...')
  67. self.get_logger().info('=' * 60)
  68. def init_subscriber(self):
  69. """初始化订阅者 - 订阅 config_node 配置"""
  70. self.config_subscriber = self.create_subscription(
  71. String,
  72. self.config_topic,
  73. self.config_callback,
  74. 10
  75. )
  76. self.get_logger().info(f'已订阅配置 Topic: {self.config_topic}')
  77. def init_publisher(self):
  78. """初始化发布者 - 发布环境数据"""
  79. self.environment_publisher = self.create_publisher(
  80. String,
  81. self.environment_topic,
  82. 10
  83. )
  84. self.get_logger().info(f'已创建发布者: {self.environment_topic}')
  85. # 添加统一发布定时器:每秒发布一次最新数据
  86. self.unified_publish_timer = self.create_timer(
  87. 1.0,
  88. self.publish_latest_data
  89. )
  90. self.get_logger().info('已创建统一发布定时器 (1秒)')
  91. def publish_latest_data(self):
  92. """每秒发布一次最新环境数据"""
  93. env_data = {
  94. "timestamp": datetime.now().strftime('%Y-%m-%dT%H:%M:%S'),
  95. "battery": self.battery_data,
  96. "temperature": self.temperature_data,
  97. "weather": self.weather_data,
  98. "map": self.map_data
  99. }
  100. try:
  101. json_str = json.dumps(env_data, ensure_ascii=False, indent=2)
  102. msg = String()
  103. msg.data = json_str
  104. self.environment_publisher.publish(msg)
  105. except Exception as e:
  106. self.get_logger().error(f'发布环境数据失败: {e}')
  107. def init_default_data(self):
  108. """初始化默认环境模拟数据"""
  109. self.generate_mock_battery()
  110. self.generate_mock_temperature()
  111. self.generate_mock_weather()
  112. self.generate_mock_map()
  113. self.get_logger().info('模拟数据生成完成')
  114. def generate_mock_battery(self):
  115. """生成电池模拟数据"""
  116. self.battery_data = {
  117. "level": 85,
  118. "voltage": 12.6,
  119. "is_charging": False,
  120. "capacity_wh": 5000,
  121. "current_ma": -500
  122. }
  123. def generate_mock_temperature(self):
  124. """生成温度模拟数据"""
  125. self.temperature_data = {
  126. "indoor": 25.5,
  127. "outdoor": 28.0,
  128. "unit": "celsius",
  129. "humidity": 60
  130. }
  131. def generate_mock_weather(self):
  132. """生成天气模拟数据"""
  133. self.weather_data = {
  134. "condition": "sunny",
  135. "description": "晴",
  136. "temperature": 28,
  137. "humidity": 65,
  138. "wind_speed": 3.5
  139. }
  140. def generate_mock_map(self):
  141. """生成地图导航点模拟数据"""
  142. self.map_data = {
  143. "points": [
  144. {"id": "A", "name": "办公室", "position": {"x": 1.633, "y": 3.490, "z": 0.0}},
  145. {"id": "B", "name": "酒店大堂", "position": {"x": 2.436, "y": -0.574, "z": 0.0}},
  146. {"id": "C", "name": "园区", "position": {"x": 0.024, "y": -1.820, "z": 0.0}}
  147. ]
  148. }
  149. def config_callback(self, msg: String):
  150. """配置订阅回调 - 解析 config_node 发布的配置"""
  151. try:
  152. config = json.loads(msg.data)
  153. self.get_logger().debug(f'收到配置: version={config.get("version")}')
  154. # 提取 environment 配置
  155. if 'config' in config and 'environment' in config['config']:
  156. self.environment_config = config['config']['environment']
  157. self.parse_environment_config()
  158. except json.JSONDecodeError as e:
  159. self.get_logger().error(f'配置解析失败: {e}')
  160. except Exception as e:
  161. self.get_logger().error(f'配置处理异常: {e}')
  162. def parse_environment_config(self):
  163. """解析 environment 配置并启动定时器"""
  164. if not self.environment_config:
  165. return
  166. self.get_logger().info('解析环境配置...')
  167. # 更新发布间隔
  168. intervals = self.environment_config.get('intervals', {})
  169. self.intervals['battery'] = intervals.get('battery_seconds', 5)
  170. self.intervals['temperature'] = intervals.get('temperature_seconds', 30)
  171. self.intervals['weather'] = intervals.get('weather_seconds', 3600)
  172. self.intervals['map'] = intervals.get('map_seconds', 60)
  173. # 更新 topic 名称
  174. topic = self.environment_config.get('publish_topic')
  175. if topic and topic != self.environment_topic:
  176. self.environment_topic = topic
  177. # 重新创建发布者
  178. self.environment_publisher = self.create_publisher(
  179. String,
  180. self.environment_topic,
  181. 10
  182. )
  183. self.get_logger().info(f'环境数据发布 Topic 已更新: {self.environment_topic}')
  184. # 重新生成模拟数据
  185. self.generate_mock_battery()
  186. self.generate_mock_temperature()
  187. self.generate_mock_weather()
  188. self.generate_mock_map()
  189. # 启动定时器
  190. self.start_timers()
  191. self.get_logger().info('环境配置解析完成,定时器已启动')
  192. def start_timers(self):
  193. """启动各个数据类型的定时发布"""
  194. # 电池数据定时器
  195. if 'battery' not in self.publish_timers:
  196. self.publish_timers['battery'] = self.create_timer(
  197. self.intervals['battery'],
  198. lambda: self.publish_data('battery')
  199. )
  200. # 温度数据定时器
  201. if 'temperature' not in self.publish_timers:
  202. self.publish_timers['temperature'] = self.create_timer(
  203. self.intervals['temperature'],
  204. lambda: self.publish_data('temperature')
  205. )
  206. # 天气数据定时器
  207. if 'weather' not in self.publish_timers:
  208. self.publish_timers['weather'] = self.create_timer(
  209. self.intervals['weather'],
  210. lambda: self.publish_data('weather')
  211. )
  212. # 地图数据定时器
  213. if 'map' not in self.publish_timers:
  214. self.publish_timers['map'] = self.create_timer(
  215. self.intervals['map'],
  216. lambda: self.publish_data('map')
  217. )
  218. def publish_data(self, data_type: str):
  219. """发布指定类型的数据"""
  220. current_time = time.time()
  221. # 检查是否到发布时间
  222. if current_time - self.last_publish_time.get(data_type, 0) < self.intervals.get(data_type, 5):
  223. return
  224. self.last_publish_time[data_type] = current_time
  225. # 构建环境数据
  226. env_data = {
  227. "timestamp": datetime.now().strftime('%Y-%m-%dT%H:%M:%S'),
  228. "battery": self.battery_data if data_type == 'battery' else self.get_latest_data('battery'),
  229. "temperature": self.temperature_data if data_type == 'temperature' else self.get_latest_data('temperature'),
  230. "weather": self.weather_data if data_type == 'weather' else self.get_latest_data('weather'),
  231. "map": self.map_data if data_type == 'map' else self.get_latest_data('map')
  232. }
  233. # 发布数据
  234. try:
  235. json_str = json.dumps(env_data, ensure_ascii=False, indent=2)
  236. msg = String()
  237. msg.data = json_str
  238. self.environment_publisher.publish(msg)
  239. self.get_logger().debug(f'发布 {data_type} 数据成功')
  240. except Exception as e:
  241. self.get_logger().error(f'发布 {data_type} 数据失败: {e}')
  242. def get_latest_data(self, data_type: str):
  243. """获取最新数据"""
  244. if data_type == 'battery':
  245. return self.battery_data
  246. elif data_type == 'temperature':
  247. return self.temperature_data
  248. elif data_type == 'weather':
  249. return self.weather_data
  250. elif data_type == 'map':
  251. return self.map_data
  252. return {}
  253. def publish_all(self):
  254. """立即发布所有环境数据"""
  255. env_data = {
  256. "timestamp": datetime.now().strftime('%Y-%m-%dT%H:%M:%S'),
  257. "battery": self.battery_data,
  258. "temperature": self.temperature_data,
  259. "weather": self.weather_data,
  260. "map": self.map_data
  261. }
  262. try:
  263. json_str = json.dumps(env_data, ensure_ascii=False, indent=2)
  264. msg = String()
  265. msg.data = json_str
  266. self.environment_publisher.publish(msg)
  267. self.get_logger().info('全量环境数据发布成功')
  268. except Exception as e:
  269. self.get_logger().error(f'全量环境数据发布失败: {e}')
  270. def get_current_environment(self):
  271. """获取当前完整环境数据 (供其他节点调用)"""
  272. return {
  273. "timestamp": datetime.now().strftime('%Y-%m-%dT%H:%M:%S'),
  274. "battery": self.battery_data,
  275. "temperature": self.temperature_data,
  276. "weather": self.weather_data,
  277. "map": self.map_data
  278. }
  279. def shutdown(self):
  280. """关闭节点"""
  281. self.get_logger().info('关闭 Environment Node...')
  282. self.stop_event.set()
  283. # 取消统一发布定时器
  284. if hasattr(self, 'unified_publish_timer'):
  285. self.unified_publish_timer.cancel()
  286. # 取消所有定时器
  287. for timer in self.publish_timers.values():
  288. timer.cancel()
  289. self.get_logger().info('Environment Node 已关闭')
  290. def main(args=None):
  291. rclpy.init(args=args)
  292. node = EnvironmentNode()
  293. try:
  294. rclpy.spin(node)
  295. except KeyboardInterrupt:
  296. pass
  297. finally:
  298. node.shutdown()
  299. node.destroy_node()
  300. rclpy.shutdown()
  301. if __name__ == '__main__':
  302. main()