| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- #!/usr/bin/env python3
- """
- world_simulator.py - World 节点数据模拟器
- 向 world2 节点发送模拟数据:
- 1. 天气数据 -> /sensors/weather/data
- 2. 电池状态 -> /robot/status
- 3. 机器人位置 -> /robot/position
- 4. 地图区域 -> /map/zone/status
- 5. 地点实体 -> /map/location/*
- Usage:
- python3 scripts/world_simulator.py
- """
- import json
- import time
- import argparse
- from typing import Optional
- import rclpy
- from rclpy.node import Node
- from std_msgs.msg import String
- class WorldSimulator(Node):
- """World 节点数据模拟器"""
- def __init__(self) -> None:
- super().__init__("world_simulator")
- self.declare_parameter("publish_rate", 1.0)
- self.declare_parameter("simulate_movement", True)
- self.declare_parameter("initial_battery", 85.0)
- self.rate = self.get_parameter("publish_rate").value
- self.simulate_movement = self.get_parameter("simulate_movement").value
- self.battery_level = self.get_parameter("initial_battery").value
- self.current_location = "square"
- self.pois = {
- "square": {"name": "广场", "poi_type": "area", "status": "available", "index": 1},
- "gate": {"name": "大门", "poi_type": "entrance", "status": "available", "index": 2},
- "charging_station": {"name": "充电站", "poi_type": "facility", "status": "available", "index": 3},
- "barn_1": {"name": "牛棚1号", "poi_type": "building", "status": "available", "index": 4},
- "barn_2": {"name": "牛棚2号", "poi_type": "building", "status": "repairing", "index": 5},
- }
- # Publisher
- self.weather_pub = self.create_publisher(String, "/sensors/weather/data", 10)
- self.robot_pub = self.create_publisher(String, "/robot/status", 10)
- self.position_pub = self.create_publisher(String, "/robot/position", 10)
- self.poi_pub = self.create_publisher(String, "/map/poi", 10)
- self.timer = self.create_timer(1.0 / self.rate, self.publish_all)
- self.get_logger().info("World Simulator started")
- self.get_logger().info(f" - Publish rate: {self.rate} Hz")
- self.get_logger().info(f" - Initial battery: {self.battery_level}%")
- self.get_logger().info(f" - Locations: {list(self.pois.keys())}")
- self.publish_all()
- def publish_all(self) -> None:
- self.publish_weather()
- self.publish_robot_status()
- self.publish_robot_position()
- self.publish_pois()
- def publish_weather(self) -> None:
- msg = String()
- data = {
- "condition": "sunny",
- "temperature": 25.5,
- "wind_speed": 3.2,
- "rain_intensity": 0.0,
- "illuminance": 85000,
- "humidity": 45.0,
- "timestamp": time.time(),
- }
- msg.data = json.dumps(data, ensure_ascii=False)
- self.weather_pub.publish(msg)
- def publish_robot_status(self) -> None:
- if self.simulate_movement:
- self.battery_level = max(10.0, self.battery_level - 0.05)
- msg = String()
- data = {
- "battery_level": round(self.battery_level, 1),
- "operating_mode": "patrolling",
- "current_task": f"巡逻中 - {self.pois[self.current_location]['name']}",
- "error_code": 0,
- "timestamp": time.time(),
- }
- msg.data = json.dumps(data, ensure_ascii=False)
- self.robot_pub.publish(msg)
- def publish_robot_position(self) -> None:
- """发布机器人位置"""
- positions = {
- "square": {"x": 0.0, "y": 0.0, "z": 0.0},
- "barn_1": {"x": 10.0, "y": 5.0, "z": 0.0},
- "barn_2": {"x": 15.0, "y": 5.0, "z": 0.0},
- "charging_station": {"x": -5.0, "y": -3.0, "z": 0.0},
- }
- msg = String()
- data = {
- "entity_id": "robot_1",
- "name": "巡逻机器人",
- "type": "robot",
- "position": positions[self.current_location],
- "battery": self.battery_level,
- "status": "patrolling",
- "timestamp": time.time(),
- }
- msg.data = json.dumps(data, ensure_ascii=False)
- self.position_pub.publish(msg)
- def publish_pois(self) -> None:
- """发布所有 POI(地点 + 状态)"""
- positions = {
- "square": {"x": 0.0, "y": 0.0, "z": 0.0},
- "barn_1": {"x": 10.0, "y": 5.0, "z": 0.0},
- "barn_2": {"x": 15.0, "y": 5.0, "z": 0.0},
- "charging_station": {"x": -5.0, "y": -3.0, "z": 0.0},
- "gate": {"x": -8.0, "y": 0.0, "z": 0.0},
- }
- for poi_id, poi_info in self.pois.items():
- msg = String()
- data = {
- "entity_id": poi_id,
- "name": poi_info["name"],
- "poi_type": poi_info["poi_type"],
- "index": poi_info["index"],
- "position": positions[poi_id],
- "status": poi_info["status"],
- "is_current_location": (poi_id == self.current_location),
- "properties": {
- "accessible": poi_info["status"] == "available",
- "has_shelter": poi_info["poi_type"] == "building",
- "has_power": poi_id == "charging_station",
- },
- "timestamp": time.time(),
- }
- msg.data = json.dumps(data, ensure_ascii=False)
- self.poi_pub.publish(msg)
- def move_to(self, location_id: str) -> None:
- """移动到指定位置"""
- if location_id in self.pois:
- self.current_location = location_id
- self.get_logger().info(f"Moved to: {self.pois[location_id]['name']}")
- self.publish_all()
- else:
- self.get_logger().warn(f"Unknown location: {location_id}")
- def main(args=None) -> None:
- rclpy.init(args=args)
- node = WorldSimulator()
- try:
- rclpy.spin(node)
- except KeyboardInterrupt:
- node.get_logger().info("Shutting down World Simulator")
- finally:
- node.destroy_node()
- rclpy.shutdown()
- if __name__ == "__main__":
- main()
|