Перейти к содержанию

Realtime

WebSocket и Server-Sent Events для realtime коммуникации.

Возможности

  • WebSocket менеджер с комнатами
  • SSE для односторонней коммуникации
  • Автоматическое управление подключениями
  • Broadcast и targeted сообщения

WebSocket Manager

Базовое использование

from kit.core.realtime import WebSocketManager
from fastapi import FastAPI, WebSocket

app = FastAPI()
ws_manager = WebSocketManager()

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await ws_manager.connect(websocket, client_id)
    try:
        while True:
            data = await websocket.receive_json()
            # Обработка сообщения
            await ws_manager.send_personal(client_id, {
                "type": "response",
                "data": data
            })
    except WebSocketDisconnect:
        ws_manager.disconnect(client_id)

Комнаты

# Подключение к комнате
await ws_manager.join_room(client_id, "room_123")

# Broadcast в комнату
await ws_manager.broadcast_to_room("room_123", {
    "event": "new_message",
    "content": "Привет всем!"
})

# Выход из комнаты
await ws_manager.leave_room(client_id, "room_123")

Broadcast всем

# Всем подключенным клиентам
await ws_manager.broadcast({
    "event": "system",
    "message": "Сервер перезагружается через 5 минут"
})

# Исключая определённых клиентов
await ws_manager.broadcast(
    {"event": "update"},
    exclude=[client_id]
)

Server-Sent Events

Базовое использование

from kit.core.realtime import SSEManager
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()
sse_manager = SSEManager()

@app.get("/events/{client_id}")
async def sse_endpoint(client_id: str):
    return StreamingResponse(
        sse_manager.subscribe(client_id),
        media_type="text/event-stream"
    )

# Отправка события
await sse_manager.send(client_id, {
    "event": "progress",
    "data": {"percent": 45}
})

События с типами

# Именованное событие
await sse_manager.send(client_id,
    data={"status": "completed"},
    event="task_done"
)

# На клиенте:
# eventSource.addEventListener('task_done', (e) => {...})

API Reference

WebSocketManager

class WebSocketManager:
    async def connect(websocket: WebSocket, client_id: str) -> None
    def disconnect(client_id: str) -> None
    async def send_personal(client_id: str, message: dict) -> None
    async def broadcast(message: dict, exclude: list = None) -> None
    async def join_room(client_id: str, room: str) -> None
    async def leave_room(client_id: str, room: str) -> None
    async def broadcast_to_room(room: str, message: dict) -> None

SSEManager

class SSEManager:
    async def subscribe(client_id: str) -> AsyncGenerator
    async def send(client_id: str, data: dict, event: str = None) -> None
    async def broadcast(data: dict, event: str = None) -> None
    def disconnect(client_id: str) -> None

Примеры из production

Progress tracking (Autoshorts)

async def generate_video(task_id: str, ws_manager: WebSocketManager):
    stages = ["script", "audio", "images", "video", "compose"]

    for i, stage in enumerate(stages):
        await ws_manager.send_personal(task_id, {
            "type": "progress",
            "stage": stage,
            "percent": (i + 1) / len(stages) * 100
        })

        await process_stage(stage)

    await ws_manager.send_personal(task_id, {
        "type": "complete",
        "video_url": "https://..."
    })

Live chat (DedMoroz.ai)

@app.websocket("/chat/{session_id}")
async def chat(websocket: WebSocket, session_id: str):
    await ws_manager.connect(websocket, session_id)

    try:
        while True:
            message = await websocket.receive_text()

            # Streaming ответ от LLM
            async for chunk in llm.stream(message):
                await ws_manager.send_personal(session_id, {
                    "type": "chunk",
                    "content": chunk
                })

            await ws_manager.send_personal(session_id, {
                "type": "done"
            })
    except WebSocketDisconnect:
        ws_manager.disconnect(session_id)