Provider Factory¶
Паттерн auto-fallback для провайдеров с автоматическим переключением при ошибках.
Возможности¶
- Автоматический fallback при ошибках
- Приоритеты провайдеров
- Health checks
- Circuit breaker
- Метрики и логирование
Использование¶
Базовый fallback¶
from kit.providers.factory import ProviderFactory
from kit.providers.llm import OpenAIProvider, AnthropicProvider
factory = ProviderFactory()
# Регистрация провайдеров с приоритетами
factory.register("openai", OpenAIProvider(api_key="..."), priority=1)
factory.register("anthropic", AnthropicProvider(api_key="..."), priority=2)
# Автоматический fallback при ошибке
response = await factory.generate(
prompt="Hello world",
model="gpt-4o" # Попробует OpenAI, при ошибке — Anthropic
)
Health checks¶
factory = ProviderFactory(
health_check_interval=60, # Проверка каждую минуту
health_check_timeout=10
)
factory.register("primary", PrimaryProvider(...))
factory.register("backup", BackupProvider(...))
# Запуск health checks
await factory.start_health_checks()
# Статус провайдеров
status = factory.get_status()
# {"primary": "healthy", "backup": "healthy"}
Circuit Breaker¶
factory = ProviderFactory(
failure_threshold=5, # После 5 ошибок — открыть circuit
recovery_timeout=60, # Через 60 секунд — попробовать снова
half_open_requests=2 # 2 тестовых запроса в half-open состоянии
)
# При множественных ошибках провайдер временно отключается
try:
response = await factory.generate(prompt="test")
except AllProvidersUnavailable:
print("Все провайдеры недоступны")
Кастомная логика fallback¶
factory = ProviderFactory()
@factory.on_fallback
async def log_fallback(from_provider: str, to_provider: str, error: Exception):
logger.warning(f"Fallback: {from_provider} -> {to_provider}: {error}")
await notify_ops_team(from_provider, error)
@factory.on_all_failed
async def handle_all_failed(errors: dict):
logger.error(f"All providers failed: {errors}")
await trigger_incident()
Разные провайдеры для разных задач¶
# LLM Factory
llm_factory = ProviderFactory()
llm_factory.register("openai", OpenAIProvider(...))
llm_factory.register("anthropic", AnthropicProvider(...))
# Video Factory
video_factory = ProviderFactory()
video_factory.register("kling", KlingProvider(...))
video_factory.register("fal", FalProvider(...))
# TTS Factory
tts_factory = ProviderFactory()
tts_factory.register("edge", EdgeTTSProvider(...))
tts_factory.register("elevenlabs", ElevenLabsProvider(...))
Выбор провайдера по условию¶
factory = ProviderFactory()
factory.register("gpt4", OpenAIProvider(..., model="gpt-4o"))
factory.register("gpt4mini", OpenAIProvider(..., model="gpt-4o-mini"))
factory.register("claude", AnthropicProvider(...))
# Выбор на основе условия
response = await factory.generate(
prompt="Hello",
selector=lambda providers: (
"gpt4" if len(prompt) > 1000 else "gpt4mini"
)
)
API Reference¶
ProviderFactory¶
class ProviderFactory:
def __init__(
self,
health_check_interval: int = 60,
health_check_timeout: float = 10.0,
failure_threshold: int = 5,
recovery_timeout: int = 60,
half_open_requests: int = 2
)
def register(
self,
name: str,
provider: BaseProvider,
priority: int = 0
) -> None
def unregister(self, name: str) -> None
async def generate(
self,
*args,
selector: Callable = None,
**kwargs
) -> Any
def get_status(self) -> Dict[str, str]
def get_metrics(self) -> Dict[str, ProviderMetrics]
async def start_health_checks(self) -> None
async def stop_health_checks(self) -> None
def on_fallback(self, handler: Callable) -> None
def on_all_failed(self, handler: Callable) -> None
async def close(self) -> None
ProviderMetrics¶
@dataclass
class ProviderMetrics:
name: str
total_requests: int
successful_requests: int
failed_requests: int
avg_latency_ms: float
circuit_state: str # closed, open, half_open
last_error: Optional[str]
last_success: Optional[datetime]
Паттерны использования¶
Cost-aware fallback¶
class CostAwareFactory(ProviderFactory):
"""Fallback с учётом стоимости."""
def __init__(self):
super().__init__()
self.costs = {
"gpt4": 0.03,
"gpt4mini": 0.00015,
"claude": 0.003
}
async def generate(self, prompt: str, max_cost: float = None, **kwargs):
# Сортируем по стоимости
providers = sorted(
self.providers.items(),
key=lambda x: self.costs.get(x[0], float('inf'))
)
for name, provider in providers:
if max_cost and self.costs.get(name, 0) > max_cost:
continue
try:
return await provider.generate(prompt, **kwargs)
except Exception:
continue
raise AllProvidersUnavailable()
Latency-based routing¶
class LatencyAwareFactory(ProviderFactory):
"""Routing на основе latency."""
async def select_provider(self):
metrics = self.get_metrics()
# Выбираем провайдера с минимальной latency
available = [
(name, m) for name, m in metrics.items()
if m.circuit_state == "closed"
]
if not available:
raise AllProvidersUnavailable()
return min(available, key=lambda x: x[1].avg_latency_ms)[0]
Примеры из production¶
Autoshorts — LLM с fallback¶
class LLMService:
def __init__(self):
self.factory = ProviderFactory(
failure_threshold=3,
recovery_timeout=120
)
# Основной провайдер
self.factory.register(
"openai",
OpenAIProvider(api_key=settings.openai_api_key),
priority=1
)
# Fallback
self.factory.register(
"anthropic",
AnthropicProvider(api_key=settings.anthropic_api_key),
priority=2
)
# Логирование fallback
self.factory.on_fallback(self.log_fallback)
async def log_fallback(self, from_p: str, to_p: str, error: Exception):
logger.warning(f"LLM fallback: {from_p} -> {to_p}")
await self.metrics.increment("llm_fallback")
async def generate_script(self, prompt: str) -> str:
response = await self.factory.generate(
prompt=prompt,
system="You are a video script writer...",
max_tokens=2000
)
return response.content
Multi-region Video Generation¶
class VideoService:
def __init__(self):
self.factory = ProviderFactory(health_check_interval=30)
# Разные регионы/провайдеры
self.factory.register("kling_us", KlingProvider(region="us"))
self.factory.register("kling_eu", KlingProvider(region="eu"))
self.factory.register("fal", FalProvider(...))
# Health checks
asyncio.create_task(self.factory.start_health_checks())
async def generate_video(self, prompt: str) -> str:
result = await self.factory.generate(
prompt=prompt,
duration=5
)
return result.video_url