Base Agent¶
Базовый класс для создания автономных агентов и EventBus для event-driven архитектуры.
EventBus¶
Pub/Sub система для коммуникации между компонентами.
Базовое использование¶
from kit.agent import EventBus
bus = EventBus()
# Подписка на событие
@bus.on("user_created")
async def on_user_created(data):
print(f"New user: {data['name']}")
await send_welcome_email(data['email'])
# Emit события
await bus.emit("user_created", {
"id": "123",
"name": "John",
"email": "john@example.com"
})
Множественные подписчики¶
@bus.on("order_placed")
async def update_inventory(order):
await inventory.decrease(order['items'])
@bus.on("order_placed")
async def notify_warehouse(order):
await warehouse.notify(order)
@bus.on("order_placed")
async def send_confirmation(order):
await email.send_order_confirmation(order)
# Все три обработчика сработают
await bus.emit("order_placed", order_data)
Once — одноразовая подписка¶
@bus.once("initialization_complete")
async def on_init(data):
print("System initialized")
await bus.emit("initialization_complete", {}) # Сработает
await bus.emit("initialization_complete", {}) # Не сработает
Отписка¶
BaseAgent¶
Абстрактный класс для создания агентов.
Создание агента¶
from kit.agent import BaseAgent
class MonitoringAgent(BaseAgent):
def __init__(self):
super().__init__(name="monitoring")
self.alerts = []
async def setup(self):
"""Инициализация при старте."""
self.bus.on("metric", self.handle_metric)
async def handle_metric(self, metric):
if metric['value'] > metric['threshold']:
await self.create_alert(metric)
async def create_alert(self, metric):
alert = {
"metric": metric['name'],
"value": metric['value'],
"timestamp": datetime.now()
}
self.alerts.append(alert)
await self.bus.emit("alert", alert)
async def run(self):
"""Основной цикл агента."""
while self.running:
metrics = await self.collect_metrics()
for metric in metrics:
await self.bus.emit("metric", metric)
await asyncio.sleep(60)
# Использование
agent = MonitoringAgent()
await agent.start()
Lifecycle агента¶
class MyAgent(BaseAgent):
async def setup(self):
"""Вызывается один раз при старте."""
self.db = await Database.connect()
async def run(self):
"""Основной цикл работы."""
while self.running:
await self.do_work()
await asyncio.sleep(1)
async def cleanup(self):
"""Вызывается при остановке."""
await self.db.disconnect()
agent = MyAgent()
await agent.start() # setup() -> run()
await agent.stop() # cleanup()
API Reference¶
EventBus¶
class EventBus:
def on(self, event: str, handler: Callable) -> None
def once(self, event: str, handler: Callable) -> None
def off(self, event: str, handler: Callable = None) -> None
async def emit(self, event: str, data: Any = None) -> None
def listeners(self, event: str) -> List[Callable]
def clear(self) -> None
BaseAgent¶
class BaseAgent(ABC):
name: str
running: bool
bus: EventBus
async def start(self) -> None
async def stop(self) -> None
@abstractmethod
async def setup(self) -> None
@abstractmethod
async def run(self) -> None
async def cleanup(self) -> None # Optional override
Примеры из production¶
Sentinel — мониторинг агент¶
class SentinelAgent(BaseAgent):
def __init__(self, config: dict):
super().__init__(name="sentinel")
self.config = config
self.metrics_buffer = []
async def setup(self):
self.http = httpx.AsyncClient()
self.bus.on("metric", self.buffer_metric)
self.bus.on("flush", self.send_metrics)
async def buffer_metric(self, metric):
self.metrics_buffer.append(metric)
if len(self.metrics_buffer) >= 100:
await self.bus.emit("flush")
async def send_metrics(self):
if not self.metrics_buffer:
return
await self.http.post(
self.config['metrics_endpoint'],
json=self.metrics_buffer
)
self.metrics_buffer = []
async def run(self):
while self.running:
cpu = psutil.cpu_percent()
memory = psutil.virtual_memory().percent
await self.bus.emit("metric", {"name": "cpu", "value": cpu})
await self.bus.emit("metric", {"name": "memory", "value": memory})
await asyncio.sleep(10)
Pipeline агент¶
class PipelineAgent(BaseAgent):
def __init__(self, steps: list):
super().__init__(name="pipeline")
self.steps = steps
self.current_step = 0
async def setup(self):
self.bus.on("step_complete", self.next_step)
self.bus.on("step_failed", self.handle_failure)
async def next_step(self, result):
self.current_step += 1
if self.current_step < len(self.steps):
step = self.steps[self.current_step]
await step.execute(result)
else:
await self.bus.emit("pipeline_complete", result)
async def handle_failure(self, error):
await self.bus.emit("pipeline_failed", {
"step": self.current_step,
"error": str(error)
})
async def run(self):
first_step = self.steps[0]
await first_step.execute(None)