Перейти к содержанию

JWT Authentication

JWT аутентификация с поддержкой гостевого доступа.

Возможности

  • JWT токены с настраиваемым TTL
  • Refresh токены
  • Гостевой доступ (guest tokens)
  • FastAPI middleware
  • Role-based access control

Использование

Базовое использование

from kit.saas.auth import JWTManager

jwt = JWTManager(
    secret_key="your-secret-key",
    algorithm="HS256",
    access_token_expire=3600,    # 1 час
    refresh_token_expire=604800  # 7 дней
)

# Создание токена
token = jwt.create_access_token(
    user_id="123",
    role="user",
    extra={"name": "John"}
)

# Верификация
payload = jwt.verify_token(token)
print(payload["user_id"])  # "123"
print(payload["role"])     # "user"

Refresh токены

# Создание пары токенов
tokens = jwt.create_token_pair(user_id="123", role="user")
print(tokens.access_token)
print(tokens.refresh_token)

# Обновление access токена
new_tokens = jwt.refresh_tokens(tokens.refresh_token)

Гостевой доступ

# Создание гостевого токена
guest_token = jwt.create_guest_token(
    device_id="device_123",
    expire=86400  # 24 часа
)

# Верификация
payload = jwt.verify_token(guest_token)
print(payload["is_guest"])  # True
print(payload["device_id"]) # "device_123"

FastAPI Middleware

from fastapi import FastAPI, Request, Depends
from kit.saas.auth import JWTManager, AuthMiddleware, get_current_user

app = FastAPI()
jwt = JWTManager(secret_key="your-secret")

# Добавляем middleware
app.add_middleware(
    AuthMiddleware,
    jwt_manager=jwt,
    exclude_paths=["/health", "/docs", "/openapi.json"]
)

@app.get("/api/profile")
async def get_profile(request: Request):
    user = request.state.user
    return {
        "user_id": user.id,
        "role": user.role,
        "is_guest": user.is_guest
    }

# Или через Depends
@app.get("/api/profile2")
async def get_profile2(user = Depends(get_current_user)):
    return {"user_id": user.id}

Role-based Access

from kit.saas.auth import require_role, require_auth

@app.get("/api/admin")
@require_role("admin")
async def admin_only(request: Request):
    return {"message": "Admin access granted"}

@app.get("/api/users")
@require_role(["admin", "moderator"])
async def admin_or_mod(request: Request):
    return {"message": "Access granted"}

@app.get("/api/data")
@require_auth  # Любой авторизованный (не гость)
async def auth_only(request: Request):
    return {"message": "Authenticated"}

Custom Claims

token = jwt.create_access_token(
    user_id="123",
    role="premium",
    extra={
        "subscription": "pro",
        "features": ["export", "analytics"],
        "company_id": "company_456"
    }
)

payload = jwt.verify_token(token)
print(payload["subscription"])  # "pro"
print(payload["features"])      # ["export", "analytics"]

API Reference

JWTManager

class JWTManager:
    def __init__(
        self,
        secret_key: str,
        algorithm: str = "HS256",
        access_token_expire: int = 3600,
        refresh_token_expire: int = 604800
    )

    def create_access_token(
        self,
        user_id: str,
        role: str = "user",
        extra: dict = None
    ) -> str

    def create_refresh_token(self, user_id: str) -> str

    def create_token_pair(
        self,
        user_id: str,
        role: str = "user",
        extra: dict = None
    ) -> TokenPair

    def create_guest_token(
        self,
        device_id: str = None,
        expire: int = 86400
    ) -> str

    def verify_token(self, token: str) -> dict
    def refresh_tokens(self, refresh_token: str) -> TokenPair

    def revoke_token(self, token: str) -> None
    def is_revoked(self, token: str) -> bool

TokenPair

@dataclass
class TokenPair:
    access_token: str
    refresh_token: str
    token_type: str = "Bearer"
    expires_in: int = 3600

User

@dataclass
class User:
    id: str
    role: str
    is_guest: bool = False
    extra: dict = None

Примеры из production

Autoshorts — Auth система

class AuthService:
    def __init__(self):
        self.jwt = JWTManager(
            secret_key=settings.jwt_secret,
            access_token_expire=3600,
            refresh_token_expire=604800 * 4  # 4 недели
        )
        self.db = Database()

    async def register(self, email: str, password: str) -> TokenPair:
        # Создание пользователя
        user_id = await self.db.create_user(email, hash_password(password))

        # Создание токенов
        return self.jwt.create_token_pair(
            user_id=user_id,
            role="user",
            extra={"email": email}
        )

    async def login(self, email: str, password: str) -> TokenPair:
        user = await self.db.get_user_by_email(email)

        if not user or not verify_password(password, user.password_hash):
            raise InvalidCredentials()

        return self.jwt.create_token_pair(
            user_id=user.id,
            role=user.role,
            extra={"email": user.email, "plan": user.plan}
        )

    async def guest_access(self, device_id: str) -> str:
        """Гостевой доступ с ограничениями."""
        return self.jwt.create_guest_token(
            device_id=device_id,
            expire=86400 * 7  # 7 дней
        )

DedMoroz.ai — Telegram + JWT

class TelegramAuth:
    def __init__(self):
        self.jwt = JWTManager(secret_key=settings.jwt_secret)

    def create_token_for_telegram_user(self, telegram_id: int, username: str) -> str:
        """Создание JWT для Telegram пользователя."""
        return self.jwt.create_access_token(
            user_id=f"tg_{telegram_id}",
            role="telegram_user",
            extra={
                "telegram_id": telegram_id,
                "username": username,
                "source": "telegram"
            }
        )

    async def verify_telegram_request(self, token: str) -> dict:
        """Верификация запроса от Telegram бота."""
        payload = self.jwt.verify_token(token)

        if payload.get("source") != "telegram":
            raise InvalidToken("Not a Telegram token")

        return payload

API с лимитами по плану

class PlanBasedAuth:
    def __init__(self):
        self.jwt = JWTManager(secret_key=settings.jwt_secret)

    def create_token(self, user: User) -> str:
        plan_limits = {
            "free": {"daily_limit": 10, "features": []},
            "pro": {"daily_limit": 100, "features": ["export"]},
            "enterprise": {"daily_limit": 1000, "features": ["export", "api"]}
        }

        return self.jwt.create_access_token(
            user_id=user.id,
            role=user.plan,
            extra={
                "limits": plan_limits.get(user.plan, plan_limits["free"]),
                "plan": user.plan
            }
        )

# В endpoint
@app.post("/api/generate")
async def generate(request: Request):
    user = request.state.user
    limits = user.extra.get("limits", {})

    if user.daily_usage >= limits.get("daily_limit", 0):
        raise HTTPException(429, "Daily limit exceeded")

    # ...