| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- """
- EnvironmentState - 全局环境状态
- 存储环境相关的状态信息,如气体浓度、天气、区域状态等
- """
- from __future__ import annotations
- from dataclasses import dataclass, field
- from typing import Any
- import time
- @dataclass
- class EnvironmentState:
- """
- 环境状态类
-
- Attributes:
- timestamp: 状态时间戳
- data: 环境数据字典 (如 gas_readings, weather, zones, etc.)
- """
- timestamp: float = field(default_factory=time.time)
- data: dict = field(default_factory=dict)
-
- def update(self, new_data: dict, current_time: float | None = None) -> dict:
- """
- 更新环境数据,返回变化部分
-
- Args:
- new_data: 新数据字典
- current_time: 当前时间戳
-
- Returns:
- 变化的数据字典
- """
- if current_time is None:
- current_time = time.time()
-
- diff = {}
- for key, value in new_data.items():
- old_value = self.data.get(key)
- if old_value != value:
- diff[key] = {'old': old_value, 'new': value}
- self.data[key] = value
-
- self.timestamp = current_time
- return diff
-
- def get(self, key: str, default: Any = None) -> Any:
- """获取环境数据"""
- return self.data.get(key, default)
-
- def set(self, key: str, value: Any) -> None:
- """设置环境数据"""
- self.data[key] = value
- self.timestamp = time.time()
-
- def to_dict(self) -> dict:
- """转换为字典"""
- return {
- 'timestamp': self.timestamp,
- 'data': self.data,
- }
-
- @classmethod
- def from_dict(cls, data: dict) -> EnvironmentState:
- """从字典创建"""
- return cls(
- timestamp=data.get('timestamp', time.time()),
- data=data.get('data', {}),
- )
|