#!/usr/bin/env python3 """ world_simulator.py - World 节点数据模拟器 向 world2 节点发送模拟数据: 1. 天气数据 -> /sensors/weather/data 2. 电池状态 -> /robot/status 3. 机器人位置 -> /robot/position 4. 地图区域 -> /map/zone/status 5. 地点实体 -> /map/location/* Usage: python3 scripts/world_simulator.py """ import json import time import argparse from typing import Optional import rclpy from rclpy.node import Node from std_msgs.msg import String class WorldSimulator(Node): """World 节点数据模拟器""" def __init__(self) -> None: super().__init__("world_simulator") self.declare_parameter("publish_rate", 1.0) self.declare_parameter("simulate_movement", True) self.declare_parameter("initial_battery", 85.0) self.rate = self.get_parameter("publish_rate").value self.simulate_movement = self.get_parameter("simulate_movement").value self.battery_level = self.get_parameter("initial_battery").value self.current_location = "square" self.pois = { "square": {"name": "广场", "poi_type": "area", "status": "available", "index": 1}, "gate": {"name": "大门", "poi_type": "entrance", "status": "available", "index": 2}, "charging_station": {"name": "充电站", "poi_type": "facility", "status": "available", "index": 3}, "barn_1": {"name": "牛棚1号", "poi_type": "building", "status": "available", "index": 4}, "barn_2": {"name": "牛棚2号", "poi_type": "building", "status": "repairing", "index": 5}, } # Publisher self.weather_pub = self.create_publisher(String, "/sensors/weather/data", 10) self.robot_pub = self.create_publisher(String, "/robot/status", 10) self.position_pub = self.create_publisher(String, "/robot/position", 10) self.poi_pub = self.create_publisher(String, "/map/poi", 10) self.timer = self.create_timer(1.0 / self.rate, self.publish_all) self.get_logger().info("World Simulator started") self.get_logger().info(f" - Publish rate: {self.rate} Hz") self.get_logger().info(f" - Initial battery: {self.battery_level}%") self.get_logger().info(f" - Locations: {list(self.pois.keys())}") self.publish_all() def publish_all(self) -> None: self.publish_weather() self.publish_robot_status() self.publish_robot_position() self.publish_pois() def publish_weather(self) -> None: msg = String() data = { "condition": "sunny", "temperature": 25.5, "wind_speed": 3.2, "rain_intensity": 0.0, "illuminance": 85000, "humidity": 45.0, "timestamp": time.time(), } msg.data = json.dumps(data, ensure_ascii=False) self.weather_pub.publish(msg) def publish_robot_status(self) -> None: if self.simulate_movement: self.battery_level = max(10.0, self.battery_level - 0.05) msg = String() data = { "battery_level": round(self.battery_level, 1), "operating_mode": "patrolling", "current_task": f"巡逻中 - {self.pois[self.current_location]['name']}", "error_code": 0, "timestamp": time.time(), } msg.data = json.dumps(data, ensure_ascii=False) self.robot_pub.publish(msg) def publish_robot_position(self) -> None: """发布机器人位置""" positions = { "square": {"x": 0.0, "y": 0.0, "z": 0.0}, "barn_1": {"x": 10.0, "y": 5.0, "z": 0.0}, "barn_2": {"x": 15.0, "y": 5.0, "z": 0.0}, "charging_station": {"x": -5.0, "y": -3.0, "z": 0.0}, } msg = String() data = { "entity_id": "robot_1", "name": "巡逻机器人", "type": "robot", "position": positions[self.current_location], "battery": self.battery_level, "status": "patrolling", "timestamp": time.time(), } msg.data = json.dumps(data, ensure_ascii=False) self.position_pub.publish(msg) def publish_pois(self) -> None: """发布所有 POI(地点 + 状态)""" positions = { "square": {"x": 0.0, "y": 0.0, "z": 0.0}, "barn_1": {"x": 10.0, "y": 5.0, "z": 0.0}, "barn_2": {"x": 15.0, "y": 5.0, "z": 0.0}, "charging_station": {"x": -5.0, "y": -3.0, "z": 0.0}, "gate": {"x": -8.0, "y": 0.0, "z": 0.0}, } for poi_id, poi_info in self.pois.items(): msg = String() data = { "entity_id": poi_id, "name": poi_info["name"], "poi_type": poi_info["poi_type"], "index": poi_info["index"], "position": positions[poi_id], "status": poi_info["status"], "is_current_location": (poi_id == self.current_location), "properties": { "accessible": poi_info["status"] == "available", "has_shelter": poi_info["poi_type"] == "building", "has_power": poi_id == "charging_station", }, "timestamp": time.time(), } msg.data = json.dumps(data, ensure_ascii=False) self.poi_pub.publish(msg) def move_to(self, location_id: str) -> None: """移动到指定位置""" if location_id in self.pois: self.current_location = location_id self.get_logger().info(f"Moved to: {self.pois[location_id]['name']}") self.publish_all() else: self.get_logger().warn(f"Unknown location: {location_id}") def main(args=None) -> None: rclpy.init(args=args) node = WorldSimulator() try: rclpy.spin(node) except KeyboardInterrupt: node.get_logger().info("Shutting down World Simulator") finally: node.destroy_node() rclpy.shutdown() if __name__ == "__main__": main()