Перейти к содержанию

Base Agent

Базовый класс для создания автономных агентов и EventBus для event-driven архитектуры.

EventBus

Pub/Sub система для коммуникации между компонентами.

Базовое использование

from kit.agent import EventBus

bus = EventBus()

# Подписка на событие
@bus.on("user_created")
async def on_user_created(data):
    print(f"New user: {data['name']}")
    await send_welcome_email(data['email'])

# Emit события
await bus.emit("user_created", {
    "id": "123",
    "name": "John",
    "email": "john@example.com"
})

Множественные подписчики

@bus.on("order_placed")
async def update_inventory(order):
    await inventory.decrease(order['items'])

@bus.on("order_placed")
async def notify_warehouse(order):
    await warehouse.notify(order)

@bus.on("order_placed")
async def send_confirmation(order):
    await email.send_order_confirmation(order)

# Все три обработчика сработают
await bus.emit("order_placed", order_data)

Once — одноразовая подписка

@bus.once("initialization_complete")
async def on_init(data):
    print("System initialized")

await bus.emit("initialization_complete", {})  # Сработает
await bus.emit("initialization_complete", {})  # Не сработает

Отписка

def handler(data):
    print(data)

bus.on("event", handler)
bus.off("event", handler)  # Отписка

BaseAgent

Абстрактный класс для создания агентов.

Создание агента

from kit.agent import BaseAgent

class MonitoringAgent(BaseAgent):
    def __init__(self):
        super().__init__(name="monitoring")
        self.alerts = []

    async def setup(self):
        """Инициализация при старте."""
        self.bus.on("metric", self.handle_metric)

    async def handle_metric(self, metric):
        if metric['value'] > metric['threshold']:
            await self.create_alert(metric)

    async def create_alert(self, metric):
        alert = {
            "metric": metric['name'],
            "value": metric['value'],
            "timestamp": datetime.now()
        }
        self.alerts.append(alert)
        await self.bus.emit("alert", alert)

    async def run(self):
        """Основной цикл агента."""
        while self.running:
            metrics = await self.collect_metrics()
            for metric in metrics:
                await self.bus.emit("metric", metric)
            await asyncio.sleep(60)

# Использование
agent = MonitoringAgent()
await agent.start()

Lifecycle агента

class MyAgent(BaseAgent):
    async def setup(self):
        """Вызывается один раз при старте."""
        self.db = await Database.connect()

    async def run(self):
        """Основной цикл работы."""
        while self.running:
            await self.do_work()
            await asyncio.sleep(1)

    async def cleanup(self):
        """Вызывается при остановке."""
        await self.db.disconnect()

agent = MyAgent()
await agent.start()   # setup() -> run()
await agent.stop()    # cleanup()

API Reference

EventBus

class EventBus:
    def on(self, event: str, handler: Callable) -> None
    def once(self, event: str, handler: Callable) -> None
    def off(self, event: str, handler: Callable = None) -> None
    async def emit(self, event: str, data: Any = None) -> None
    def listeners(self, event: str) -> List[Callable]
    def clear(self) -> None

BaseAgent

class BaseAgent(ABC):
    name: str
    running: bool
    bus: EventBus

    async def start(self) -> None
    async def stop(self) -> None

    @abstractmethod
    async def setup(self) -> None
    @abstractmethod
    async def run(self) -> None
    async def cleanup(self) -> None  # Optional override

Примеры из production

Sentinel — мониторинг агент

class SentinelAgent(BaseAgent):
    def __init__(self, config: dict):
        super().__init__(name="sentinel")
        self.config = config
        self.metrics_buffer = []

    async def setup(self):
        self.http = httpx.AsyncClient()
        self.bus.on("metric", self.buffer_metric)
        self.bus.on("flush", self.send_metrics)

    async def buffer_metric(self, metric):
        self.metrics_buffer.append(metric)
        if len(self.metrics_buffer) >= 100:
            await self.bus.emit("flush")

    async def send_metrics(self):
        if not self.metrics_buffer:
            return
        await self.http.post(
            self.config['metrics_endpoint'],
            json=self.metrics_buffer
        )
        self.metrics_buffer = []

    async def run(self):
        while self.running:
            cpu = psutil.cpu_percent()
            memory = psutil.virtual_memory().percent

            await self.bus.emit("metric", {"name": "cpu", "value": cpu})
            await self.bus.emit("metric", {"name": "memory", "value": memory})

            await asyncio.sleep(10)

Pipeline агент

class PipelineAgent(BaseAgent):
    def __init__(self, steps: list):
        super().__init__(name="pipeline")
        self.steps = steps
        self.current_step = 0

    async def setup(self):
        self.bus.on("step_complete", self.next_step)
        self.bus.on("step_failed", self.handle_failure)

    async def next_step(self, result):
        self.current_step += 1
        if self.current_step < len(self.steps):
            step = self.steps[self.current_step]
            await step.execute(result)
        else:
            await self.bus.emit("pipeline_complete", result)

    async def handle_failure(self, error):
        await self.bus.emit("pipeline_failed", {
            "step": self.current_step,
            "error": str(error)
        })

    async def run(self):
        first_step = self.steps[0]
        await first_step.execute(None)