| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299 |
- """
- WorldModel - 主管理器
- 核心功能:
- - 管理所有 Entity (实体)
- - 管理 EnvironmentState (环境状态)
- - 管理 SystemState (系统状态)
- - 提供 Snapshot 导出
- - 支持 Hook 回调
- """
- from __future__ import annotations
- from typing import Callable, Any
- import time
- import threading
- from .entity import Entity
- from .environment import EnvironmentState
- from .system_state import SystemState
- ChangeHook = Callable[[str, dict, Any], None]
- """
- Hook 回调签名:
- def hook(change_type: str, data: dict, extra: Any) -> None
-
- Args:
- change_type: 变化类型 (entity_update, entity_remove, env_update, sys_update)
- data: 变化的数据
- extra: 额外参数
- """
- class WorldModel:
- """
- 世界模型主管理器
-
- 线程安全,支持并发访问
- """
-
- def __init__(self):
- self._entities: dict[str, Entity] = {}
- self._environment = EnvironmentState()
- self._system_state = SystemState()
- self._hooks: list[ChangeHook] = []
- self._lock = threading.RLock()
-
- # ========== Entity 管理 ==========
-
- def update_entity(
- self,
- entity_id: str,
- entity_type: str,
- state: dict | None = None,
- metadata: dict | None = None,
- ttl: float | None = None,
- ) -> dict:
- """
- 更新或创建实体
-
- Args:
- entity_id: 实体 ID
- entity_type: 实体类型
- state: 状态字典
- metadata: 元数据
- ttl: 生存时间
-
- Returns:
- 变化字典
- """
- with self._lock:
- current_time = time.time()
-
- if entity_id in self._entities:
- entity = self._entities[entity_id]
- diff = entity.update_state(state or {}, current_time)
- if metadata:
- entity.update_metadata(metadata)
- if ttl is not None:
- entity.ttl = ttl
-
- if diff:
- self._trigger_hook('entity_update', {
- 'entity_id': entity_id,
- 'type': entity_type,
- 'diff': diff,
- 'state': entity.state,
- })
-
- return diff
- else:
- entity = Entity(
- entity_id=entity_id,
- entity_type=entity_type,
- state=state or {},
- metadata=metadata,
- ttl=ttl,
- last_update=current_time,
- )
- self._entities[entity_id] = entity
-
- self._trigger_hook('entity_create', {
- 'entity_id': entity_id,
- 'type': entity_type,
- 'state': state,
- })
-
- return {'_created': True}
-
- def remove_entity(self, entity_id: str) -> bool:
- """移除实体"""
- with self._lock:
- if entity_id in self._entities:
- del self._entities[entity_id]
- self._trigger_hook('entity_remove', {'entity_id': entity_id})
- return True
- return False
-
- def get_entity(self, entity_id: str) -> Entity | None:
- """获取实体"""
- with self._lock:
- return self._entities.get(entity_id)
-
- def get_entities_by_type(self, entity_type: str) -> list[Entity]:
- """获取指定类型的所有实体"""
- with self._lock:
- return [e for e in self._entities.values() if e.entity_type == entity_type]
-
- def get_all_entities(self) -> dict[str, Entity]:
- """获取所有实体 (返回副本)"""
- with self._lock:
- return dict(self._entities)
-
- def cleanup_expired(self) -> int:
- """清理过期实体,返回清理数量"""
- with self._lock:
- current_time = time.time()
- expired_ids = [
- eid for eid, entity in self._entities.items()
- if entity.is_expired(current_time)
- ]
- for eid in expired_ids:
- del self._entities[eid]
- self._trigger_hook('entity_expire', {'entity_id': eid})
- return len(expired_ids)
-
- # ========== Environment 管理 ==========
-
- def update_environment(self, key: str, value: Any) -> None:
- """更新单个环境数据"""
- with self._lock:
- old_value = self._environment.get(key)
- self._environment.set(key, value)
- self._trigger_hook('env_update', {
- 'key': key,
- 'old': old_value,
- 'new': value,
- })
-
- def update_environment_batch(self, data: dict) -> dict:
- """批量更新环境数据"""
- with self._lock:
- diff = self._environment.update(data)
- if diff:
- self._trigger_hook('env_batch_update', diff)
- return diff
-
- def get_environment(self) -> EnvironmentState:
- """获取环境状态 (返回副本)"""
- with self._lock:
- return self._environment
-
- # ========== SystemState 管理 ==========
-
- def update_system(self, key: str, value: Any) -> None:
- """更新单个系统数据"""
- with self._lock:
- self._system_state.update({key: value})
- self._trigger_hook('sys_update', {'key': key, 'value': value})
-
- def update_system_batch(self, data: dict) -> dict:
- """批量更新系统数据"""
- with self._lock:
- diff = self._system_state.update(data)
- if diff:
- self._trigger_hook('sys_batch_update', diff)
- return diff
-
- def get_system_state(self) -> SystemState:
- """获取系统状态 (返回副本)"""
- with self._lock:
- return self._system_state
-
- # ========== Hook 管理 ==========
-
- def add_hook(self, hook: ChangeHook) -> None:
- """添加变化回调"""
- with self._lock:
- self._hooks.append(hook)
-
- def remove_hook(self, hook: ChangeHook) -> None:
- """移除回调"""
- with self._lock:
- if hook in self._hooks:
- self._hooks.remove(hook)
-
- def _trigger_hook(self, change_type: str, data: dict) -> None:
- """触发所有 Hook"""
- for hook in self._hooks:
- try:
- hook(change_type, data, self)
- except Exception:
- pass
-
- # ========== Snapshot 导出 ==========
-
- def snapshot(self, mode: str = "minimal") -> dict:
- """
- 生成世界状态快照
-
- Args:
- mode: 模式 "minimal" (仅关键信息) | "full" (完整信息)
-
- Returns:
- 快照字典
- """
- with self._lock:
- current_time = time.time()
-
- if mode == "minimal":
- return self._snapshot_minimal(current_time)
- else:
- return self._snapshot_full(current_time)
-
- def _snapshot_minimal(self, current_time: float) -> dict:
- """Minimal 快照 - 仅关键信息"""
- entities = {}
- entity_types = {}
-
- for eid, entity in self._entities.items():
- if entity.is_expired(current_time):
- continue
-
- entities[eid] = {
- 'id': eid,
- 'type': entity.entity_type,
- 'state': entity.state,
- }
-
- etype = entity.entity_type
- entity_types[etype] = entity_types.get(etype, 0) + 1
-
- return {
- 'timestamp': current_time,
- 'mode': 'minimal',
- 'entities': entities,
- 'environment': {},
- 'system': self._system_state.to_dict(),
- 'stats': {
- 'total_entities': len(entities),
- 'entity_types': entity_types,
- },
- }
-
- def _snapshot_full(self, current_time: float) -> dict:
- """Full 快照 - 完整信息"""
- entities = {}
- entity_types = {}
-
- for eid, entity in self._entities.items():
- if entity.is_expired(current_time):
- continue
-
- entities[eid] = entity.to_dict()
-
- etype = entity.entity_type
- entity_types[etype] = entity_types.get(etype, 0) + 1
-
- return {
- 'timestamp': current_time,
- 'mode': 'full',
- 'entities': entities,
- 'environment': self._environment.to_dict(),
- 'system': self._system_state.to_dict(),
- 'stats': {
- 'total_entities': len(entities),
- 'entity_types': entity_types,
- 'total_entities_raw': len(self._entities),
- 'expired_count': sum(1 for e in self._entities.values() if e.is_expired(current_time)),
- },
- }
-
- def reset(self) -> None:
- """重置所有状态"""
- with self._lock:
- self._entities.clear()
- self._environment = EnvironmentState()
- self._system_state = SystemState()
- self._trigger_hook('reset', {})
|