""" WorldModel - 主管理器 核心功能: - 管理所有 Entity (实体) - 管理 EnvironmentState (环境状态) - 管理 SystemState (系统状态) - 提供 Snapshot 导出 - 支持 Hook 回调 """ from __future__ import annotations from typing import Callable, Any import time import threading from .entity import Entity from .environment import EnvironmentState from .system_state import SystemState ChangeHook = Callable[[str, dict, Any], None] """ Hook 回调签名: def hook(change_type: str, data: dict, extra: Any) -> None Args: change_type: 变化类型 (entity_update, entity_remove, env_update, sys_update) data: 变化的数据 extra: 额外参数 """ class WorldModel: """ 世界模型主管理器 线程安全,支持并发访问 """ def __init__(self): self._entities: dict[str, Entity] = {} self._environment = EnvironmentState() self._system_state = SystemState() self._hooks: list[ChangeHook] = [] self._lock = threading.RLock() # ========== Entity 管理 ========== def update_entity( self, entity_id: str, entity_type: str, state: dict | None = None, metadata: dict | None = None, ttl: float | None = None, ) -> dict: """ 更新或创建实体 Args: entity_id: 实体 ID entity_type: 实体类型 state: 状态字典 metadata: 元数据 ttl: 生存时间 Returns: 变化字典 """ with self._lock: current_time = time.time() if entity_id in self._entities: entity = self._entities[entity_id] diff = entity.update_state(state or {}, current_time) if metadata: entity.update_metadata(metadata) if ttl is not None: entity.ttl = ttl if diff: self._trigger_hook('entity_update', { 'entity_id': entity_id, 'type': entity_type, 'diff': diff, 'state': entity.state, }) return diff else: entity = Entity( entity_id=entity_id, entity_type=entity_type, state=state or {}, metadata=metadata, ttl=ttl, last_update=current_time, ) self._entities[entity_id] = entity self._trigger_hook('entity_create', { 'entity_id': entity_id, 'type': entity_type, 'state': state, }) return {'_created': True} def remove_entity(self, entity_id: str) -> bool: """移除实体""" with self._lock: if entity_id in self._entities: del self._entities[entity_id] self._trigger_hook('entity_remove', {'entity_id': entity_id}) return True return False def get_entity(self, entity_id: str) -> Entity | None: """获取实体""" with self._lock: return self._entities.get(entity_id) def get_entities_by_type(self, entity_type: str) -> list[Entity]: """获取指定类型的所有实体""" with self._lock: return [e for e in self._entities.values() if e.entity_type == entity_type] def get_all_entities(self) -> dict[str, Entity]: """获取所有实体 (返回副本)""" with self._lock: return dict(self._entities) def cleanup_expired(self) -> int: """清理过期实体,返回清理数量""" with self._lock: current_time = time.time() expired_ids = [ eid for eid, entity in self._entities.items() if entity.is_expired(current_time) ] for eid in expired_ids: del self._entities[eid] self._trigger_hook('entity_expire', {'entity_id': eid}) return len(expired_ids) # ========== Environment 管理 ========== def update_environment(self, key: str, value: Any) -> None: """更新单个环境数据""" with self._lock: old_value = self._environment.get(key) self._environment.set(key, value) self._trigger_hook('env_update', { 'key': key, 'old': old_value, 'new': value, }) def update_environment_batch(self, data: dict) -> dict: """批量更新环境数据""" with self._lock: diff = self._environment.update(data) if diff: self._trigger_hook('env_batch_update', diff) return diff def get_environment(self) -> EnvironmentState: """获取环境状态 (返回副本)""" with self._lock: return self._environment # ========== SystemState 管理 ========== def update_system(self, key: str, value: Any) -> None: """更新单个系统数据""" with self._lock: self._system_state.update({key: value}) self._trigger_hook('sys_update', {'key': key, 'value': value}) def update_system_batch(self, data: dict) -> dict: """批量更新系统数据""" with self._lock: diff = self._system_state.update(data) if diff: self._trigger_hook('sys_batch_update', diff) return diff def get_system_state(self) -> SystemState: """获取系统状态 (返回副本)""" with self._lock: return self._system_state # ========== Hook 管理 ========== def add_hook(self, hook: ChangeHook) -> None: """添加变化回调""" with self._lock: self._hooks.append(hook) def remove_hook(self, hook: ChangeHook) -> None: """移除回调""" with self._lock: if hook in self._hooks: self._hooks.remove(hook) def _trigger_hook(self, change_type: str, data: dict) -> None: """触发所有 Hook""" for hook in self._hooks: try: hook(change_type, data, self) except Exception: pass # ========== Snapshot 导出 ========== def snapshot(self, mode: str = "minimal") -> dict: """ 生成世界状态快照 Args: mode: 模式 "minimal" (仅关键信息) | "full" (完整信息) Returns: 快照字典 """ with self._lock: current_time = time.time() if mode == "minimal": return self._snapshot_minimal(current_time) else: return self._snapshot_full(current_time) def _snapshot_minimal(self, current_time: float) -> dict: """Minimal 快照 - 仅关键信息""" entities = {} entity_types = {} for eid, entity in self._entities.items(): if entity.is_expired(current_time): continue entities[eid] = { 'id': eid, 'type': entity.entity_type, 'state': entity.state, } etype = entity.entity_type entity_types[etype] = entity_types.get(etype, 0) + 1 return { 'timestamp': current_time, 'mode': 'minimal', 'entities': entities, 'environment': {}, 'system': self._system_state.to_dict(), 'stats': { 'total_entities': len(entities), 'entity_types': entity_types, }, } def _snapshot_full(self, current_time: float) -> dict: """Full 快照 - 完整信息""" entities = {} entity_types = {} for eid, entity in self._entities.items(): if entity.is_expired(current_time): continue entities[eid] = entity.to_dict() etype = entity.entity_type entity_types[etype] = entity_types.get(etype, 0) + 1 return { 'timestamp': current_time, 'mode': 'full', 'entities': entities, 'environment': self._environment.to_dict(), 'system': self._system_state.to_dict(), 'stats': { 'total_entities': len(entities), 'entity_types': entity_types, 'total_entities_raw': len(self._entities), 'expired_count': sum(1 for e in self._entities.values() if e.is_expired(current_time)), }, } def reset(self) -> None: """重置所有状态""" with self._lock: self._entities.clear() self._environment = EnvironmentState() self._system_state = SystemState() self._trigger_hook('reset', {})