Перейти к содержанию

Watcher

Система мониторинга с подключаемыми сенсорами.

Возможности

  • Pluggable сенсоры
  • Настраиваемые интервалы проверки
  • Callback на изменения
  • Пороги для алертов
  • История метрик

Использование

Базовое использование

from kit.agent.watcher import Watcher, Sensor

# Создание сенсора
class CPUSensor(Sensor):
    name = "cpu"
    interval = 10  # Проверка каждые 10 секунд

    async def read(self) -> float:
        import psutil
        return psutil.cpu_percent()

# Использование
watcher = Watcher()
watcher.add_sensor(CPUSensor())

# Callback на каждое чтение
@watcher.on_reading
async def handle_reading(sensor_name: str, value: float):
    print(f"{sensor_name}: {value}")

await watcher.start()

Пороги и алерты

class MemorySensor(Sensor):
    name = "memory"
    interval = 30
    threshold = 80.0  # Алерт если > 80%

    async def read(self) -> float:
        import psutil
        return psutil.virtual_memory().percent

watcher = Watcher()
watcher.add_sensor(MemorySensor())

@watcher.on_alert
async def handle_alert(sensor_name: str, value: float, threshold: float):
    await send_notification(f"High {sensor_name}: {value}% (threshold: {threshold}%)")

await watcher.start()

Множественные сенсоры

class DiskSensor(Sensor):
    name = "disk"
    interval = 60

    async def read(self) -> dict:
        import psutil
        usage = psutil.disk_usage('/')
        return {
            "used_percent": usage.percent,
            "free_gb": usage.free / (1024**3)
        }

class NetworkSensor(Sensor):
    name = "network"
    interval = 5

    async def read(self) -> dict:
        import psutil
        io = psutil.net_io_counters()
        return {
            "bytes_sent": io.bytes_sent,
            "bytes_recv": io.bytes_recv
        }

watcher = Watcher()
watcher.add_sensor(CPUSensor())
watcher.add_sensor(MemorySensor())
watcher.add_sensor(DiskSensor())
watcher.add_sensor(NetworkSensor())

await watcher.start()

История метрик

watcher = Watcher(history_size=100)  # Хранить последние 100 значений

# Получение истории
history = watcher.get_history("cpu")
print(f"CPU avg: {sum(history) / len(history):.1f}%")

# Последнее значение
current = watcher.get_latest("cpu")

Custom сенсоры

class APIHealthSensor(Sensor):
    name = "api_health"
    interval = 30

    def __init__(self, url: str):
        self.url = url
        self.client = httpx.AsyncClient()

    async def read(self) -> dict:
        try:
            start = time.time()
            response = await self.client.get(self.url, timeout=10)
            latency = (time.time() - start) * 1000

            return {
                "status": response.status_code,
                "latency_ms": latency,
                "healthy": response.status_code == 200
            }
        except Exception as e:
            return {
                "status": 0,
                "latency_ms": -1,
                "healthy": False,
                "error": str(e)
            }

watcher.add_sensor(APIHealthSensor("https://api.example.com/health"))

API Reference

Sensor

class Sensor(ABC):
    name: str           # Уникальное имя сенсора
    interval: int       # Интервал в секундах
    threshold: float    # Опциональный порог для алерта

    @abstractmethod
    async def read(self) -> Union[float, dict]

Watcher

class Watcher:
    def __init__(self, history_size: int = 100)

    def add_sensor(self, sensor: Sensor) -> None
    def remove_sensor(self, name: str) -> None

    async def start(self) -> None
    async def stop(self) -> None

    def on_reading(self, handler: Callable) -> None
    def on_alert(self, handler: Callable) -> None

    def get_latest(self, sensor_name: str) -> Any
    def get_history(self, sensor_name: str) -> List[Any]
    def get_all_latest(self) -> Dict[str, Any]

Примеры из production

Sentinel — мониторинг инфраструктуры

class SentinelWatcher:
    def __init__(self, config: dict):
        self.watcher = Watcher(history_size=1000)
        self.alerting = AlertManager(config['alerting'])

        # Системные сенсоры
        self.watcher.add_sensor(CPUSensor())
        self.watcher.add_sensor(MemorySensor())
        self.watcher.add_sensor(DiskSensor())

        # API сенсоры
        for api in config['apis']:
            self.watcher.add_sensor(APIHealthSensor(
                name=f"api_{api['name']}",
                url=api['health_url']
            ))

        # Database сенсоры
        for db in config['databases']:
            self.watcher.add_sensor(DatabaseSensor(
                name=f"db_{db['name']}",
                connection_string=db['connection']
            ))

        # Handlers
        self.watcher.on_reading(self.log_metric)
        self.watcher.on_alert(self.handle_alert)

    async def log_metric(self, name: str, value: Any):
        await self.metrics_store.write(name, value)

    async def handle_alert(self, name: str, value: Any, threshold: float):
        await self.alerting.send({
            "sensor": name,
            "value": value,
            "threshold": threshold,
            "timestamp": datetime.now()
        })

    async def run(self):
        await self.watcher.start()

Autoshorts — мониторинг задач

class TaskWatcher:
    def __init__(self, task_manager):
        self.watcher = Watcher()
        self.task_manager = task_manager

        self.watcher.add_sensor(QueueSensor(task_manager))
        self.watcher.add_sensor(WorkerSensor(task_manager))
        self.watcher.add_sensor(GPUSensor())

class QueueSensor(Sensor):
    name = "queue"
    interval = 5

    def __init__(self, task_manager):
        self.tm = task_manager

    async def read(self) -> dict:
        return {
            "pending": len(self.tm.pending_tasks),
            "running": len(self.tm.running_tasks),
            "completed": self.tm.completed_count,
            "failed": self.tm.failed_count
        }

class GPUSensor(Sensor):
    name = "gpu"
    interval = 10
    threshold = 90.0

    async def read(self) -> dict:
        try:
            import pynvml
            pynvml.nvmlInit()
            handle = pynvml.nvmlDeviceGetHandleByIndex(0)
            util = pynvml.nvmlDeviceGetUtilizationRates(handle)
            memory = pynvml.nvmlDeviceGetMemoryInfo(handle)

            return {
                "gpu_util": util.gpu,
                "memory_util": (memory.used / memory.total) * 100,
                "memory_used_gb": memory.used / (1024**3)
            }
        except:
            return {"gpu_util": 0, "memory_util": 0, "available": False}

Dashboard — realtime метрики

class DashboardWatcher:
    def __init__(self, ws_manager):
        self.watcher = Watcher()
        self.ws = ws_manager

        self.watcher.on_reading(self.broadcast_metric)

    async def broadcast_metric(self, name: str, value: Any):
        await self.ws.broadcast({
            "type": "metric",
            "name": name,
            "value": value,
            "timestamp": datetime.now().isoformat()
        })