world_simulator.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. #!/usr/bin/env python3
  2. """
  3. world_simulator.py - World 节点数据模拟器
  4. 向 world2 节点发送模拟数据:
  5. 1. 天气数据 -> /sensors/weather/data
  6. 2. 电池状态 -> /robot/status
  7. 3. 机器人位置 -> /robot/position
  8. 4. 地图区域 -> /map/zone/status
  9. 5. 地点实体 -> /map/location/*
  10. Usage:
  11. python3 scripts/world_simulator.py
  12. """
  13. import json
  14. import time
  15. import argparse
  16. from typing import Optional
  17. import rclpy
  18. from rclpy.node import Node
  19. from std_msgs.msg import String
  20. class WorldSimulator(Node):
  21. """World 节点数据模拟器"""
  22. def __init__(self) -> None:
  23. super().__init__("world_simulator")
  24. self.declare_parameter("publish_rate", 1.0)
  25. self.declare_parameter("simulate_movement", True)
  26. self.declare_parameter("initial_battery", 85.0)
  27. self.rate = self.get_parameter("publish_rate").value
  28. self.simulate_movement = self.get_parameter("simulate_movement").value
  29. self.battery_level = self.get_parameter("initial_battery").value
  30. self.current_location = "square"
  31. self.pois = {
  32. "square": {"name": "广场", "poi_type": "area", "status": "available", "index": 1},
  33. "gate": {"name": "大门", "poi_type": "entrance", "status": "available", "index": 2},
  34. "charging_station": {"name": "充电站", "poi_type": "facility", "status": "available", "index": 3},
  35. "barn_1": {"name": "牛棚1号", "poi_type": "building", "status": "available", "index": 4},
  36. "barn_2": {"name": "牛棚2号", "poi_type": "building", "status": "repairing", "index": 5},
  37. }
  38. # Publisher
  39. self.weather_pub = self.create_publisher(String, "/sensors/weather/data", 10)
  40. self.robot_pub = self.create_publisher(String, "/robot/status", 10)
  41. self.position_pub = self.create_publisher(String, "/robot/position", 10)
  42. self.poi_pub = self.create_publisher(String, "/map/poi", 10)
  43. self.timer = self.create_timer(1.0 / self.rate, self.publish_all)
  44. self.get_logger().info("World Simulator started")
  45. self.get_logger().info(f" - Publish rate: {self.rate} Hz")
  46. self.get_logger().info(f" - Initial battery: {self.battery_level}%")
  47. self.get_logger().info(f" - Locations: {list(self.pois.keys())}")
  48. self.publish_all()
  49. def publish_all(self) -> None:
  50. self.publish_weather()
  51. self.publish_robot_status()
  52. self.publish_robot_position()
  53. self.publish_pois()
  54. def publish_weather(self) -> None:
  55. msg = String()
  56. data = {
  57. "condition": "sunny",
  58. "temperature": 25.5,
  59. "wind_speed": 3.2,
  60. "rain_intensity": 0.0,
  61. "illuminance": 85000,
  62. "humidity": 45.0,
  63. "timestamp": time.time(),
  64. }
  65. msg.data = json.dumps(data, ensure_ascii=False)
  66. self.weather_pub.publish(msg)
  67. def publish_robot_status(self) -> None:
  68. if self.simulate_movement:
  69. self.battery_level = max(10.0, self.battery_level - 0.05)
  70. msg = String()
  71. data = {
  72. "battery_level": round(self.battery_level, 1),
  73. "operating_mode": "patrolling",
  74. "current_task": f"巡逻中 - {self.pois[self.current_location]['name']}",
  75. "error_code": 0,
  76. "timestamp": time.time(),
  77. }
  78. msg.data = json.dumps(data, ensure_ascii=False)
  79. self.robot_pub.publish(msg)
  80. def publish_robot_position(self) -> None:
  81. """发布机器人位置"""
  82. positions = {
  83. "square": {"x": 0.0, "y": 0.0, "z": 0.0},
  84. "barn_1": {"x": 10.0, "y": 5.0, "z": 0.0},
  85. "barn_2": {"x": 15.0, "y": 5.0, "z": 0.0},
  86. "charging_station": {"x": -5.0, "y": -3.0, "z": 0.0},
  87. }
  88. msg = String()
  89. data = {
  90. "entity_id": "robot_1",
  91. "name": "巡逻机器人",
  92. "type": "robot",
  93. "position": positions[self.current_location],
  94. "battery": self.battery_level,
  95. "status": "patrolling",
  96. "timestamp": time.time(),
  97. }
  98. msg.data = json.dumps(data, ensure_ascii=False)
  99. self.position_pub.publish(msg)
  100. def publish_pois(self) -> None:
  101. """发布所有 POI(地点 + 状态)"""
  102. positions = {
  103. "square": {"x": 0.0, "y": 0.0, "z": 0.0},
  104. "barn_1": {"x": 10.0, "y": 5.0, "z": 0.0},
  105. "barn_2": {"x": 15.0, "y": 5.0, "z": 0.0},
  106. "charging_station": {"x": -5.0, "y": -3.0, "z": 0.0},
  107. "gate": {"x": -8.0, "y": 0.0, "z": 0.0},
  108. }
  109. for poi_id, poi_info in self.pois.items():
  110. msg = String()
  111. data = {
  112. "entity_id": poi_id,
  113. "name": poi_info["name"],
  114. "poi_type": poi_info["poi_type"],
  115. "index": poi_info["index"],
  116. "position": positions[poi_id],
  117. "status": poi_info["status"],
  118. "is_current_location": (poi_id == self.current_location),
  119. "properties": {
  120. "accessible": poi_info["status"] == "available",
  121. "has_shelter": poi_info["poi_type"] == "building",
  122. "has_power": poi_id == "charging_station",
  123. },
  124. "timestamp": time.time(),
  125. }
  126. msg.data = json.dumps(data, ensure_ascii=False)
  127. self.poi_pub.publish(msg)
  128. def move_to(self, location_id: str) -> None:
  129. """移动到指定位置"""
  130. if location_id in self.pois:
  131. self.current_location = location_id
  132. self.get_logger().info(f"Moved to: {self.pois[location_id]['name']}")
  133. self.publish_all()
  134. else:
  135. self.get_logger().warn(f"Unknown location: {location_id}")
  136. def main(args=None) -> None:
  137. rclpy.init(args=args)
  138. node = WorldSimulator()
  139. try:
  140. rclpy.spin(node)
  141. except KeyboardInterrupt:
  142. node.get_logger().info("Shutting down World Simulator")
  143. finally:
  144. node.destroy_node()
  145. rclpy.shutdown()
  146. if __name__ == "__main__":
  147. main()