Rate Limiting¶
Двухуровневый rate limiting: по IP и по пользователю.
Возможности¶
- Token bucket алгоритм
- IP-based limiting
- User-based limiting
- Dual-layer (IP + User)
- Redis и in-memory backends
- FastAPI middleware
Использование¶
Базовый лимитер¶
from kit.saas.rate_limit import RateLimiter
limiter = RateLimiter(
max_requests=100,
window_seconds=60 # 100 запросов в минуту
)
# Проверка
if await limiter.is_allowed("client_ip"):
# Обработка запроса
pass
else:
# 429 Too Many Requests
raise HTTPException(429, "Rate limit exceeded")
Получение информации о лимите¶
info = await limiter.get_limit_info("client_ip")
print(f"Remaining: {info.remaining}")
print(f"Reset in: {info.reset_in} seconds")
print(f"Limit: {info.limit}")
Dual-Layer лимитер¶
from kit.saas.rate_limit import DualLayerLimiter
limiter = DualLayerLimiter(
ip_limit=100, # 100 запросов на IP
user_limit=1000, # 1000 запросов на пользователя
window_seconds=3600 # За час
)
# Проверка обоих уровней
result = await limiter.check(
ip="192.168.1.1",
user_id="user_123"
)
if not result.allowed:
print(f"Blocked by: {result.blocked_by}") # "ip" или "user"
FastAPI Middleware¶
from fastapi import FastAPI
from kit.saas.rate_limit import RateLimitMiddleware, RateLimiter
app = FastAPI()
limiter = RateLimiter(max_requests=100, window_seconds=60)
app.add_middleware(
RateLimitMiddleware,
limiter=limiter,
key_func=lambda request: request.client.host # По IP
)
# Или по пользователю
app.add_middleware(
RateLimitMiddleware,
limiter=limiter,
key_func=lambda request: request.state.user.id
)
Redis backend¶
from kit.saas.rate_limit import RateLimiter, RedisBackend
backend = RedisBackend(redis_url="redis://localhost:6379")
limiter = RateLimiter(
max_requests=100,
window_seconds=60,
backend=backend
)
Разные лимиты для разных endpoints¶
from kit.saas.rate_limit import RateLimiter, rate_limit
# Глобальный лимитер
default_limiter = RateLimiter(100, 60)
# Строгий лимитер для дорогих операций
strict_limiter = RateLimiter(10, 60)
@app.get("/api/data")
@rate_limit(default_limiter)
async def get_data():
return {"data": "..."}
@app.post("/api/generate")
@rate_limit(strict_limiter)
async def generate():
return {"result": "..."}
План-based лимиты¶
class PlanBasedLimiter:
def __init__(self):
self.limiters = {
"free": RateLimiter(10, 60), # 10/мин
"pro": RateLimiter(100, 60), # 100/мин
"enterprise": RateLimiter(1000, 60) # 1000/мин
}
async def check(self, user_id: str, plan: str) -> bool:
limiter = self.limiters.get(plan, self.limiters["free"])
return await limiter.is_allowed(user_id)
API Reference¶
RateLimiter¶
class RateLimiter:
def __init__(
self,
max_requests: int,
window_seconds: int,
backend: Backend = None # None = MemoryBackend
)
async def is_allowed(self, key: str) -> bool
async def get_limit_info(self, key: str) -> LimitInfo
async def reset(self, key: str) -> None
DualLayerLimiter¶
class DualLayerLimiter:
def __init__(
self,
ip_limit: int,
user_limit: int,
window_seconds: int,
backend: Backend = None
)
async def check(self, ip: str, user_id: str = None) -> CheckResult
async def get_info(self, ip: str, user_id: str = None) -> DualLimitInfo
LimitInfo¶
@dataclass
class LimitInfo:
limit: int
remaining: int
reset_in: int # секунды до сброса
is_limited: bool
@dataclass
class CheckResult:
allowed: bool
blocked_by: Optional[str] # "ip", "user", None
ip_info: LimitInfo
user_info: Optional[LimitInfo]
Headers¶
Middleware автоматически добавляет заголовки:
При превышении:
Примеры из production¶
Autoshorts — API с лимитами¶
class APIRateLimiter:
def __init__(self):
# Разные лимиты для разных операций
self.limiters = {
"generate": DualLayerLimiter(
ip_limit=5, # 5 генераций с IP в час
user_limit=20, # 20 генераций для пользователя в час
window_seconds=3600
),
"api": DualLayerLimiter(
ip_limit=100,
user_limit=1000,
window_seconds=3600
)
}
async def check_generate(self, request: Request) -> bool:
ip = request.client.host
user_id = request.state.user.id if hasattr(request.state, 'user') else None
result = await self.limiters["generate"].check(ip, user_id)
if not result.allowed:
raise HTTPException(
429,
detail={
"error": "Rate limit exceeded",
"blocked_by": result.blocked_by,
"retry_after": result.ip_info.reset_in
},
headers={"Retry-After": str(result.ip_info.reset_in)}
)
return True
# Использование
rate_limiter = APIRateLimiter()
@app.post("/api/generate")
async def generate(request: Request):
await rate_limiter.check_generate(request)
# ... генерация
DedMoroz.ai — Telegram rate limiting¶
class TelegramRateLimiter:
def __init__(self):
self.limiter = RateLimiter(
max_requests=5, # 5 запросов
window_seconds=60 # в минуту
)
async def check_user(self, telegram_id: int) -> bool:
key = f"tg_{telegram_id}"
if not await self.limiter.is_allowed(key):
info = await self.limiter.get_limit_info(key)
return False, info.reset_in
return True, 0
# В боте
@router.message()
async def handle_message(message: Message):
allowed, wait_time = await rate_limiter.check_user(message.from_user.id)
if not allowed:
await message.answer(
f"Слишком много запросов. Подождите {wait_time} секунд."
)
return
# Обработка
...
Adaptive rate limiting¶
class AdaptiveRateLimiter:
"""Адаптивный лимитер на основе нагрузки."""
def __init__(self):
self.base_limit = 100
self.current_load = 0.0
async def get_limit(self) -> int:
# Уменьшаем лимит при высокой нагрузке
if self.current_load > 0.8:
return int(self.base_limit * 0.5)
elif self.current_load > 0.6:
return int(self.base_limit * 0.75)
return self.base_limit
async def check(self, key: str) -> bool:
limit = await self.get_limit()
limiter = RateLimiter(limit, 60)
return await limiter.is_allowed(key)
def update_load(self, load: float):
self.current_load = load