Watcher¶
Система мониторинга с подключаемыми сенсорами.
Возможности¶
- Pluggable сенсоры
- Настраиваемые интервалы проверки
- Callback на изменения
- Пороги для алертов
- История метрик
Использование¶
Базовое использование¶
from kit.agent.watcher import Watcher, Sensor
# Создание сенсора
class CPUSensor(Sensor):
name = "cpu"
interval = 10 # Проверка каждые 10 секунд
async def read(self) -> float:
import psutil
return psutil.cpu_percent()
# Использование
watcher = Watcher()
watcher.add_sensor(CPUSensor())
# Callback на каждое чтение
@watcher.on_reading
async def handle_reading(sensor_name: str, value: float):
print(f"{sensor_name}: {value}")
await watcher.start()
Пороги и алерты¶
class MemorySensor(Sensor):
name = "memory"
interval = 30
threshold = 80.0 # Алерт если > 80%
async def read(self) -> float:
import psutil
return psutil.virtual_memory().percent
watcher = Watcher()
watcher.add_sensor(MemorySensor())
@watcher.on_alert
async def handle_alert(sensor_name: str, value: float, threshold: float):
await send_notification(f"High {sensor_name}: {value}% (threshold: {threshold}%)")
await watcher.start()
Множественные сенсоры¶
class DiskSensor(Sensor):
name = "disk"
interval = 60
async def read(self) -> dict:
import psutil
usage = psutil.disk_usage('/')
return {
"used_percent": usage.percent,
"free_gb": usage.free / (1024**3)
}
class NetworkSensor(Sensor):
name = "network"
interval = 5
async def read(self) -> dict:
import psutil
io = psutil.net_io_counters()
return {
"bytes_sent": io.bytes_sent,
"bytes_recv": io.bytes_recv
}
watcher = Watcher()
watcher.add_sensor(CPUSensor())
watcher.add_sensor(MemorySensor())
watcher.add_sensor(DiskSensor())
watcher.add_sensor(NetworkSensor())
await watcher.start()
История метрик¶
watcher = Watcher(history_size=100) # Хранить последние 100 значений
# Получение истории
history = watcher.get_history("cpu")
print(f"CPU avg: {sum(history) / len(history):.1f}%")
# Последнее значение
current = watcher.get_latest("cpu")
Custom сенсоры¶
class APIHealthSensor(Sensor):
name = "api_health"
interval = 30
def __init__(self, url: str):
self.url = url
self.client = httpx.AsyncClient()
async def read(self) -> dict:
try:
start = time.time()
response = await self.client.get(self.url, timeout=10)
latency = (time.time() - start) * 1000
return {
"status": response.status_code,
"latency_ms": latency,
"healthy": response.status_code == 200
}
except Exception as e:
return {
"status": 0,
"latency_ms": -1,
"healthy": False,
"error": str(e)
}
watcher.add_sensor(APIHealthSensor("https://api.example.com/health"))
API Reference¶
Sensor¶
class Sensor(ABC):
name: str # Уникальное имя сенсора
interval: int # Интервал в секундах
threshold: float # Опциональный порог для алерта
@abstractmethod
async def read(self) -> Union[float, dict]
Watcher¶
class Watcher:
def __init__(self, history_size: int = 100)
def add_sensor(self, sensor: Sensor) -> None
def remove_sensor(self, name: str) -> None
async def start(self) -> None
async def stop(self) -> None
def on_reading(self, handler: Callable) -> None
def on_alert(self, handler: Callable) -> None
def get_latest(self, sensor_name: str) -> Any
def get_history(self, sensor_name: str) -> List[Any]
def get_all_latest(self) -> Dict[str, Any]
Примеры из production¶
Sentinel — мониторинг инфраструктуры¶
class SentinelWatcher:
def __init__(self, config: dict):
self.watcher = Watcher(history_size=1000)
self.alerting = AlertManager(config['alerting'])
# Системные сенсоры
self.watcher.add_sensor(CPUSensor())
self.watcher.add_sensor(MemorySensor())
self.watcher.add_sensor(DiskSensor())
# API сенсоры
for api in config['apis']:
self.watcher.add_sensor(APIHealthSensor(
name=f"api_{api['name']}",
url=api['health_url']
))
# Database сенсоры
for db in config['databases']:
self.watcher.add_sensor(DatabaseSensor(
name=f"db_{db['name']}",
connection_string=db['connection']
))
# Handlers
self.watcher.on_reading(self.log_metric)
self.watcher.on_alert(self.handle_alert)
async def log_metric(self, name: str, value: Any):
await self.metrics_store.write(name, value)
async def handle_alert(self, name: str, value: Any, threshold: float):
await self.alerting.send({
"sensor": name,
"value": value,
"threshold": threshold,
"timestamp": datetime.now()
})
async def run(self):
await self.watcher.start()
Autoshorts — мониторинг задач¶
class TaskWatcher:
def __init__(self, task_manager):
self.watcher = Watcher()
self.task_manager = task_manager
self.watcher.add_sensor(QueueSensor(task_manager))
self.watcher.add_sensor(WorkerSensor(task_manager))
self.watcher.add_sensor(GPUSensor())
class QueueSensor(Sensor):
name = "queue"
interval = 5
def __init__(self, task_manager):
self.tm = task_manager
async def read(self) -> dict:
return {
"pending": len(self.tm.pending_tasks),
"running": len(self.tm.running_tasks),
"completed": self.tm.completed_count,
"failed": self.tm.failed_count
}
class GPUSensor(Sensor):
name = "gpu"
interval = 10
threshold = 90.0
async def read(self) -> dict:
try:
import pynvml
pynvml.nvmlInit()
handle = pynvml.nvmlDeviceGetHandleByIndex(0)
util = pynvml.nvmlDeviceGetUtilizationRates(handle)
memory = pynvml.nvmlDeviceGetMemoryInfo(handle)
return {
"gpu_util": util.gpu,
"memory_util": (memory.used / memory.total) * 100,
"memory_used_gb": memory.used / (1024**3)
}
except:
return {"gpu_util": 0, "memory_util": 0, "available": False}
Dashboard — realtime метрики¶
class DashboardWatcher:
def __init__(self, ws_manager):
self.watcher = Watcher()
self.ws = ws_manager
self.watcher.on_reading(self.broadcast_metric)
async def broadcast_metric(self, name: str, value: Any):
await self.ws.broadcast({
"type": "metric",
"name": name,
"value": value,
"timestamp": datetime.now().isoformat()
})