Перейти к содержанию

Provider Factory

Паттерн auto-fallback для провайдеров с автоматическим переключением при ошибках.

Возможности

  • Автоматический fallback при ошибках
  • Приоритеты провайдеров
  • Health checks
  • Circuit breaker
  • Метрики и логирование

Использование

Базовый fallback

from kit.providers.factory import ProviderFactory
from kit.providers.llm import OpenAIProvider, AnthropicProvider

factory = ProviderFactory()

# Регистрация провайдеров с приоритетами
factory.register("openai", OpenAIProvider(api_key="..."), priority=1)
factory.register("anthropic", AnthropicProvider(api_key="..."), priority=2)

# Автоматический fallback при ошибке
response = await factory.generate(
    prompt="Hello world",
    model="gpt-4o"  # Попробует OpenAI, при ошибке — Anthropic
)

Health checks

factory = ProviderFactory(
    health_check_interval=60,  # Проверка каждую минуту
    health_check_timeout=10
)

factory.register("primary", PrimaryProvider(...))
factory.register("backup", BackupProvider(...))

# Запуск health checks
await factory.start_health_checks()

# Статус провайдеров
status = factory.get_status()
# {"primary": "healthy", "backup": "healthy"}

Circuit Breaker

factory = ProviderFactory(
    failure_threshold=5,     # После 5 ошибок — открыть circuit
    recovery_timeout=60,     # Через 60 секунд — попробовать снова
    half_open_requests=2     # 2 тестовых запроса в half-open состоянии
)

# При множественных ошибках провайдер временно отключается
try:
    response = await factory.generate(prompt="test")
except AllProvidersUnavailable:
    print("Все провайдеры недоступны")

Кастомная логика fallback

factory = ProviderFactory()

@factory.on_fallback
async def log_fallback(from_provider: str, to_provider: str, error: Exception):
    logger.warning(f"Fallback: {from_provider} -> {to_provider}: {error}")
    await notify_ops_team(from_provider, error)

@factory.on_all_failed
async def handle_all_failed(errors: dict):
    logger.error(f"All providers failed: {errors}")
    await trigger_incident()

Разные провайдеры для разных задач

# LLM Factory
llm_factory = ProviderFactory()
llm_factory.register("openai", OpenAIProvider(...))
llm_factory.register("anthropic", AnthropicProvider(...))

# Video Factory
video_factory = ProviderFactory()
video_factory.register("kling", KlingProvider(...))
video_factory.register("fal", FalProvider(...))

# TTS Factory
tts_factory = ProviderFactory()
tts_factory.register("edge", EdgeTTSProvider(...))
tts_factory.register("elevenlabs", ElevenLabsProvider(...))

Выбор провайдера по условию

factory = ProviderFactory()

factory.register("gpt4", OpenAIProvider(..., model="gpt-4o"))
factory.register("gpt4mini", OpenAIProvider(..., model="gpt-4o-mini"))
factory.register("claude", AnthropicProvider(...))

# Выбор на основе условия
response = await factory.generate(
    prompt="Hello",
    selector=lambda providers: (
        "gpt4" if len(prompt) > 1000 else "gpt4mini"
    )
)

API Reference

ProviderFactory

class ProviderFactory:
    def __init__(
        self,
        health_check_interval: int = 60,
        health_check_timeout: float = 10.0,
        failure_threshold: int = 5,
        recovery_timeout: int = 60,
        half_open_requests: int = 2
    )

    def register(
        self,
        name: str,
        provider: BaseProvider,
        priority: int = 0
    ) -> None

    def unregister(self, name: str) -> None

    async def generate(
        self,
        *args,
        selector: Callable = None,
        **kwargs
    ) -> Any

    def get_status(self) -> Dict[str, str]
    def get_metrics(self) -> Dict[str, ProviderMetrics]

    async def start_health_checks(self) -> None
    async def stop_health_checks(self) -> None

    def on_fallback(self, handler: Callable) -> None
    def on_all_failed(self, handler: Callable) -> None

    async def close(self) -> None

ProviderMetrics

@dataclass
class ProviderMetrics:
    name: str
    total_requests: int
    successful_requests: int
    failed_requests: int
    avg_latency_ms: float
    circuit_state: str  # closed, open, half_open
    last_error: Optional[str]
    last_success: Optional[datetime]

Паттерны использования

Cost-aware fallback

class CostAwareFactory(ProviderFactory):
    """Fallback с учётом стоимости."""

    def __init__(self):
        super().__init__()
        self.costs = {
            "gpt4": 0.03,
            "gpt4mini": 0.00015,
            "claude": 0.003
        }

    async def generate(self, prompt: str, max_cost: float = None, **kwargs):
        # Сортируем по стоимости
        providers = sorted(
            self.providers.items(),
            key=lambda x: self.costs.get(x[0], float('inf'))
        )

        for name, provider in providers:
            if max_cost and self.costs.get(name, 0) > max_cost:
                continue
            try:
                return await provider.generate(prompt, **kwargs)
            except Exception:
                continue

        raise AllProvidersUnavailable()

Latency-based routing

class LatencyAwareFactory(ProviderFactory):
    """Routing на основе latency."""

    async def select_provider(self):
        metrics = self.get_metrics()

        # Выбираем провайдера с минимальной latency
        available = [
            (name, m) for name, m in metrics.items()
            if m.circuit_state == "closed"
        ]

        if not available:
            raise AllProvidersUnavailable()

        return min(available, key=lambda x: x[1].avg_latency_ms)[0]

Примеры из production

Autoshorts — LLM с fallback

class LLMService:
    def __init__(self):
        self.factory = ProviderFactory(
            failure_threshold=3,
            recovery_timeout=120
        )

        # Основной провайдер
        self.factory.register(
            "openai",
            OpenAIProvider(api_key=settings.openai_api_key),
            priority=1
        )

        # Fallback
        self.factory.register(
            "anthropic",
            AnthropicProvider(api_key=settings.anthropic_api_key),
            priority=2
        )

        # Логирование fallback
        self.factory.on_fallback(self.log_fallback)

    async def log_fallback(self, from_p: str, to_p: str, error: Exception):
        logger.warning(f"LLM fallback: {from_p} -> {to_p}")
        await self.metrics.increment("llm_fallback")

    async def generate_script(self, prompt: str) -> str:
        response = await self.factory.generate(
            prompt=prompt,
            system="You are a video script writer...",
            max_tokens=2000
        )
        return response.content

Multi-region Video Generation

class VideoService:
    def __init__(self):
        self.factory = ProviderFactory(health_check_interval=30)

        # Разные регионы/провайдеры
        self.factory.register("kling_us", KlingProvider(region="us"))
        self.factory.register("kling_eu", KlingProvider(region="eu"))
        self.factory.register("fal", FalProvider(...))

        # Health checks
        asyncio.create_task(self.factory.start_health_checks())

    async def generate_video(self, prompt: str) -> str:
        result = await self.factory.generate(
            prompt=prompt,
            duration=5
        )
        return result.video_url