JWT Authentication¶
JWT аутентификация с поддержкой гостевого доступа.
Возможности¶
- JWT токены с настраиваемым TTL
- Refresh токены
- Гостевой доступ (guest tokens)
- FastAPI middleware
- Role-based access control
Использование¶
Базовое использование¶
from kit.saas.auth import JWTManager
jwt = JWTManager(
secret_key="your-secret-key",
algorithm="HS256",
access_token_expire=3600, # 1 час
refresh_token_expire=604800 # 7 дней
)
# Создание токена
token = jwt.create_access_token(
user_id="123",
role="user",
extra={"name": "John"}
)
# Верификация
payload = jwt.verify_token(token)
print(payload["user_id"]) # "123"
print(payload["role"]) # "user"
Refresh токены¶
# Создание пары токенов
tokens = jwt.create_token_pair(user_id="123", role="user")
print(tokens.access_token)
print(tokens.refresh_token)
# Обновление access токена
new_tokens = jwt.refresh_tokens(tokens.refresh_token)
Гостевой доступ¶
# Создание гостевого токена
guest_token = jwt.create_guest_token(
device_id="device_123",
expire=86400 # 24 часа
)
# Верификация
payload = jwt.verify_token(guest_token)
print(payload["is_guest"]) # True
print(payload["device_id"]) # "device_123"
FastAPI Middleware¶
from fastapi import FastAPI, Request, Depends
from kit.saas.auth import JWTManager, AuthMiddleware, get_current_user
app = FastAPI()
jwt = JWTManager(secret_key="your-secret")
# Добавляем middleware
app.add_middleware(
AuthMiddleware,
jwt_manager=jwt,
exclude_paths=["/health", "/docs", "/openapi.json"]
)
@app.get("/api/profile")
async def get_profile(request: Request):
user = request.state.user
return {
"user_id": user.id,
"role": user.role,
"is_guest": user.is_guest
}
# Или через Depends
@app.get("/api/profile2")
async def get_profile2(user = Depends(get_current_user)):
return {"user_id": user.id}
Role-based Access¶
from kit.saas.auth import require_role, require_auth
@app.get("/api/admin")
@require_role("admin")
async def admin_only(request: Request):
return {"message": "Admin access granted"}
@app.get("/api/users")
@require_role(["admin", "moderator"])
async def admin_or_mod(request: Request):
return {"message": "Access granted"}
@app.get("/api/data")
@require_auth # Любой авторизованный (не гость)
async def auth_only(request: Request):
return {"message": "Authenticated"}
Custom Claims¶
token = jwt.create_access_token(
user_id="123",
role="premium",
extra={
"subscription": "pro",
"features": ["export", "analytics"],
"company_id": "company_456"
}
)
payload = jwt.verify_token(token)
print(payload["subscription"]) # "pro"
print(payload["features"]) # ["export", "analytics"]
API Reference¶
JWTManager¶
class JWTManager:
def __init__(
self,
secret_key: str,
algorithm: str = "HS256",
access_token_expire: int = 3600,
refresh_token_expire: int = 604800
)
def create_access_token(
self,
user_id: str,
role: str = "user",
extra: dict = None
) -> str
def create_refresh_token(self, user_id: str) -> str
def create_token_pair(
self,
user_id: str,
role: str = "user",
extra: dict = None
) -> TokenPair
def create_guest_token(
self,
device_id: str = None,
expire: int = 86400
) -> str
def verify_token(self, token: str) -> dict
def refresh_tokens(self, refresh_token: str) -> TokenPair
def revoke_token(self, token: str) -> None
def is_revoked(self, token: str) -> bool
TokenPair¶
@dataclass
class TokenPair:
access_token: str
refresh_token: str
token_type: str = "Bearer"
expires_in: int = 3600
User¶
Примеры из production¶
Autoshorts — Auth система¶
class AuthService:
def __init__(self):
self.jwt = JWTManager(
secret_key=settings.jwt_secret,
access_token_expire=3600,
refresh_token_expire=604800 * 4 # 4 недели
)
self.db = Database()
async def register(self, email: str, password: str) -> TokenPair:
# Создание пользователя
user_id = await self.db.create_user(email, hash_password(password))
# Создание токенов
return self.jwt.create_token_pair(
user_id=user_id,
role="user",
extra={"email": email}
)
async def login(self, email: str, password: str) -> TokenPair:
user = await self.db.get_user_by_email(email)
if not user or not verify_password(password, user.password_hash):
raise InvalidCredentials()
return self.jwt.create_token_pair(
user_id=user.id,
role=user.role,
extra={"email": user.email, "plan": user.plan}
)
async def guest_access(self, device_id: str) -> str:
"""Гостевой доступ с ограничениями."""
return self.jwt.create_guest_token(
device_id=device_id,
expire=86400 * 7 # 7 дней
)
DedMoroz.ai — Telegram + JWT¶
class TelegramAuth:
def __init__(self):
self.jwt = JWTManager(secret_key=settings.jwt_secret)
def create_token_for_telegram_user(self, telegram_id: int, username: str) -> str:
"""Создание JWT для Telegram пользователя."""
return self.jwt.create_access_token(
user_id=f"tg_{telegram_id}",
role="telegram_user",
extra={
"telegram_id": telegram_id,
"username": username,
"source": "telegram"
}
)
async def verify_telegram_request(self, token: str) -> dict:
"""Верификация запроса от Telegram бота."""
payload = self.jwt.verify_token(token)
if payload.get("source") != "telegram":
raise InvalidToken("Not a Telegram token")
return payload
API с лимитами по плану¶
class PlanBasedAuth:
def __init__(self):
self.jwt = JWTManager(secret_key=settings.jwt_secret)
def create_token(self, user: User) -> str:
plan_limits = {
"free": {"daily_limit": 10, "features": []},
"pro": {"daily_limit": 100, "features": ["export"]},
"enterprise": {"daily_limit": 1000, "features": ["export", "api"]}
}
return self.jwt.create_access_token(
user_id=user.id,
role=user.plan,
extra={
"limits": plan_limits.get(user.plan, plan_limits["free"]),
"plan": user.plan
}
)
# В endpoint
@app.post("/api/generate")
async def generate(request: Request):
user = request.state.user
limits = user.extra.get("limits", {})
if user.daily_usage >= limits.get("daily_limit", 0):
raise HTTPException(429, "Daily limit exceeded")
# ...