Realtime¶
WebSocket и Server-Sent Events для realtime коммуникации.
Возможности¶
- WebSocket менеджер с комнатами
- SSE для односторонней коммуникации
- Автоматическое управление подключениями
- Broadcast и targeted сообщения
WebSocket Manager¶
Базовое использование¶
from kit.core.realtime import WebSocketManager
from fastapi import FastAPI, WebSocket
app = FastAPI()
ws_manager = WebSocketManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await ws_manager.connect(websocket, client_id)
try:
while True:
data = await websocket.receive_json()
# Обработка сообщения
await ws_manager.send_personal(client_id, {
"type": "response",
"data": data
})
except WebSocketDisconnect:
ws_manager.disconnect(client_id)
Комнаты¶
# Подключение к комнате
await ws_manager.join_room(client_id, "room_123")
# Broadcast в комнату
await ws_manager.broadcast_to_room("room_123", {
"event": "new_message",
"content": "Привет всем!"
})
# Выход из комнаты
await ws_manager.leave_room(client_id, "room_123")
Broadcast всем¶
# Всем подключенным клиентам
await ws_manager.broadcast({
"event": "system",
"message": "Сервер перезагружается через 5 минут"
})
# Исключая определённых клиентов
await ws_manager.broadcast(
{"event": "update"},
exclude=[client_id]
)
Server-Sent Events¶
Базовое использование¶
from kit.core.realtime import SSEManager
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
sse_manager = SSEManager()
@app.get("/events/{client_id}")
async def sse_endpoint(client_id: str):
return StreamingResponse(
sse_manager.subscribe(client_id),
media_type="text/event-stream"
)
# Отправка события
await sse_manager.send(client_id, {
"event": "progress",
"data": {"percent": 45}
})
События с типами¶
# Именованное событие
await sse_manager.send(client_id,
data={"status": "completed"},
event="task_done"
)
# На клиенте:
# eventSource.addEventListener('task_done', (e) => {...})
API Reference¶
WebSocketManager¶
class WebSocketManager:
async def connect(websocket: WebSocket, client_id: str) -> None
def disconnect(client_id: str) -> None
async def send_personal(client_id: str, message: dict) -> None
async def broadcast(message: dict, exclude: list = None) -> None
async def join_room(client_id: str, room: str) -> None
async def leave_room(client_id: str, room: str) -> None
async def broadcast_to_room(room: str, message: dict) -> None
SSEManager¶
class SSEManager:
async def subscribe(client_id: str) -> AsyncGenerator
async def send(client_id: str, data: dict, event: str = None) -> None
async def broadcast(data: dict, event: str = None) -> None
def disconnect(client_id: str) -> None
Примеры из production¶
Progress tracking (Autoshorts)¶
async def generate_video(task_id: str, ws_manager: WebSocketManager):
stages = ["script", "audio", "images", "video", "compose"]
for i, stage in enumerate(stages):
await ws_manager.send_personal(task_id, {
"type": "progress",
"stage": stage,
"percent": (i + 1) / len(stages) * 100
})
await process_stage(stage)
await ws_manager.send_personal(task_id, {
"type": "complete",
"video_url": "https://..."
})
Live chat (DedMoroz.ai)¶
@app.websocket("/chat/{session_id}")
async def chat(websocket: WebSocket, session_id: str):
await ws_manager.connect(websocket, session_id)
try:
while True:
message = await websocket.receive_text()
# Streaming ответ от LLM
async for chunk in llm.stream(message):
await ws_manager.send_personal(session_id, {
"type": "chunk",
"content": chunk
})
await ws_manager.send_personal(session_id, {
"type": "done"
})
except WebSocketDisconnect:
ws_manager.disconnect(session_id)