Перейти к содержанию

Rate Limiting

Двухуровневый rate limiting: по IP и по пользователю.

Возможности

  • Token bucket алгоритм
  • IP-based limiting
  • User-based limiting
  • Dual-layer (IP + User)
  • Redis и in-memory backends
  • FastAPI middleware

Использование

Базовый лимитер

from kit.saas.rate_limit import RateLimiter

limiter = RateLimiter(
    max_requests=100,
    window_seconds=60  # 100 запросов в минуту
)

# Проверка
if await limiter.is_allowed("client_ip"):
    # Обработка запроса
    pass
else:
    # 429 Too Many Requests
    raise HTTPException(429, "Rate limit exceeded")

Получение информации о лимите

info = await limiter.get_limit_info("client_ip")

print(f"Remaining: {info.remaining}")
print(f"Reset in: {info.reset_in} seconds")
print(f"Limit: {info.limit}")

Dual-Layer лимитер

from kit.saas.rate_limit import DualLayerLimiter

limiter = DualLayerLimiter(
    ip_limit=100,       # 100 запросов на IP
    user_limit=1000,    # 1000 запросов на пользователя
    window_seconds=3600 # За час
)

# Проверка обоих уровней
result = await limiter.check(
    ip="192.168.1.1",
    user_id="user_123"
)

if not result.allowed:
    print(f"Blocked by: {result.blocked_by}")  # "ip" или "user"

FastAPI Middleware

from fastapi import FastAPI
from kit.saas.rate_limit import RateLimitMiddleware, RateLimiter

app = FastAPI()

limiter = RateLimiter(max_requests=100, window_seconds=60)

app.add_middleware(
    RateLimitMiddleware,
    limiter=limiter,
    key_func=lambda request: request.client.host  # По IP
)

# Или по пользователю
app.add_middleware(
    RateLimitMiddleware,
    limiter=limiter,
    key_func=lambda request: request.state.user.id
)

Redis backend

from kit.saas.rate_limit import RateLimiter, RedisBackend

backend = RedisBackend(redis_url="redis://localhost:6379")

limiter = RateLimiter(
    max_requests=100,
    window_seconds=60,
    backend=backend
)

Разные лимиты для разных endpoints

from kit.saas.rate_limit import RateLimiter, rate_limit

# Глобальный лимитер
default_limiter = RateLimiter(100, 60)

# Строгий лимитер для дорогих операций
strict_limiter = RateLimiter(10, 60)

@app.get("/api/data")
@rate_limit(default_limiter)
async def get_data():
    return {"data": "..."}

@app.post("/api/generate")
@rate_limit(strict_limiter)
async def generate():
    return {"result": "..."}

План-based лимиты

class PlanBasedLimiter:
    def __init__(self):
        self.limiters = {
            "free": RateLimiter(10, 60),      # 10/мин
            "pro": RateLimiter(100, 60),      # 100/мин
            "enterprise": RateLimiter(1000, 60)  # 1000/мин
        }

    async def check(self, user_id: str, plan: str) -> bool:
        limiter = self.limiters.get(plan, self.limiters["free"])
        return await limiter.is_allowed(user_id)

API Reference

RateLimiter

class RateLimiter:
    def __init__(
        self,
        max_requests: int,
        window_seconds: int,
        backend: Backend = None  # None = MemoryBackend
    )

    async def is_allowed(self, key: str) -> bool
    async def get_limit_info(self, key: str) -> LimitInfo
    async def reset(self, key: str) -> None

DualLayerLimiter

class DualLayerLimiter:
    def __init__(
        self,
        ip_limit: int,
        user_limit: int,
        window_seconds: int,
        backend: Backend = None
    )

    async def check(self, ip: str, user_id: str = None) -> CheckResult
    async def get_info(self, ip: str, user_id: str = None) -> DualLimitInfo

LimitInfo

@dataclass
class LimitInfo:
    limit: int
    remaining: int
    reset_in: int  # секунды до сброса
    is_limited: bool

@dataclass
class CheckResult:
    allowed: bool
    blocked_by: Optional[str]  # "ip", "user", None
    ip_info: LimitInfo
    user_info: Optional[LimitInfo]

Headers

Middleware автоматически добавляет заголовки:

X-RateLimit-Limit: 100
X-RateLimit-Remaining: 95
X-RateLimit-Reset: 1640000000

При превышении:

Retry-After: 30

Примеры из production

Autoshorts — API с лимитами

class APIRateLimiter:
    def __init__(self):
        # Разные лимиты для разных операций
        self.limiters = {
            "generate": DualLayerLimiter(
                ip_limit=5,        # 5 генераций с IP в час
                user_limit=20,     # 20 генераций для пользователя в час
                window_seconds=3600
            ),
            "api": DualLayerLimiter(
                ip_limit=100,
                user_limit=1000,
                window_seconds=3600
            )
        }

    async def check_generate(self, request: Request) -> bool:
        ip = request.client.host
        user_id = request.state.user.id if hasattr(request.state, 'user') else None

        result = await self.limiters["generate"].check(ip, user_id)

        if not result.allowed:
            raise HTTPException(
                429,
                detail={
                    "error": "Rate limit exceeded",
                    "blocked_by": result.blocked_by,
                    "retry_after": result.ip_info.reset_in
                },
                headers={"Retry-After": str(result.ip_info.reset_in)}
            )

        return True

# Использование
rate_limiter = APIRateLimiter()

@app.post("/api/generate")
async def generate(request: Request):
    await rate_limiter.check_generate(request)
    # ... генерация

DedMoroz.ai — Telegram rate limiting

class TelegramRateLimiter:
    def __init__(self):
        self.limiter = RateLimiter(
            max_requests=5,      # 5 запросов
            window_seconds=60    # в минуту
        )

    async def check_user(self, telegram_id: int) -> bool:
        key = f"tg_{telegram_id}"

        if not await self.limiter.is_allowed(key):
            info = await self.limiter.get_limit_info(key)
            return False, info.reset_in

        return True, 0

# В боте
@router.message()
async def handle_message(message: Message):
    allowed, wait_time = await rate_limiter.check_user(message.from_user.id)

    if not allowed:
        await message.answer(
            f"Слишком много запросов. Подождите {wait_time} секунд."
        )
        return

    # Обработка
    ...

Adaptive rate limiting

class AdaptiveRateLimiter:
    """Адаптивный лимитер на основе нагрузки."""

    def __init__(self):
        self.base_limit = 100
        self.current_load = 0.0

    async def get_limit(self) -> int:
        # Уменьшаем лимит при высокой нагрузке
        if self.current_load > 0.8:
            return int(self.base_limit * 0.5)
        elif self.current_load > 0.6:
            return int(self.base_limit * 0.75)
        return self.base_limit

    async def check(self, key: str) -> bool:
        limit = await self.get_limit()
        limiter = RateLimiter(limit, 60)
        return await limiter.is_allowed(key)

    def update_load(self, load: float):
        self.current_load = load