Перейти к содержанию

Repository

Абстракция хранилища данных с реализациями Memory и SQLite.

Возможности

  • Единый интерфейс для разных хранилищ
  • Memory — для тестов и временных данных
  • SQLite — для персистентного хранения
  • Async API
  • CRUD операции
  • Поиск и фильтрация

Использование

Memory Repository

from kit.core.repository import MemoryRepository

repo = MemoryRepository()

# Сохранение
await repo.save("user:123", {
    "name": "John",
    "email": "john@example.com"
})

# Получение
user = await repo.get("user:123")
print(user["name"])  # John

# Удаление
await repo.delete("user:123")

# Проверка существования
exists = await repo.exists("user:123")

SQLite Repository

from kit.core.repository import SQLiteRepository

repo = SQLiteRepository(db_path="./data.db")

# Тот же API
await repo.save("config:app", {"theme": "dark", "lang": "ru"})
config = await repo.get("config:app")

Поиск по паттерну

# Все ключи с префиксом
users = await repo.find("user:*")

# С фильтром
active_users = await repo.find(
    "user:*",
    filter_fn=lambda k, v: v.get("active", False)
)

Batch операции

# Множественное сохранение
await repo.save_many({
    "user:1": {"name": "Alice"},
    "user:2": {"name": "Bob"},
    "user:3": {"name": "Charlie"}
})

# Множественное получение
users = await repo.get_many(["user:1", "user:2", "user:3"])

TTL (Time To Live)

# Сохранение с истечением
await repo.save("session:abc", {"user_id": "123"}, ttl=3600)  # 1 час

# Автоматически удалится через час

API Reference

BaseRepository

class BaseRepository(ABC):
    async def get(self, key: str) -> Optional[Dict]
    async def save(self, key: str, value: Dict, ttl: int = None) -> None
    async def delete(self, key: str) -> bool
    async def exists(self, key: str) -> bool
    async def find(self, pattern: str, filter_fn: Callable = None) -> List[Tuple[str, Dict]]
    async def get_many(self, keys: List[str]) -> Dict[str, Dict]
    async def save_many(self, items: Dict[str, Dict]) -> None
    async def clear(self) -> None
    async def count(self, pattern: str = "*") -> int

MemoryRepository

class MemoryRepository(BaseRepository):
    def __init__(self, default_ttl: int = None)

SQLiteRepository

class SQLiteRepository(BaseRepository):
    def __init__(
        self,
        db_path: str = "data.db",
        table_name: str = "repository",
        default_ttl: int = None
    )

Паттерны использования

Кэширование

class CachedService:
    def __init__(self):
        self.cache = MemoryRepository(default_ttl=300)  # 5 минут
        self.db = SQLiteRepository()

    async def get_user(self, user_id: str) -> dict:
        # Проверяем кэш
        cached = await self.cache.get(f"user:{user_id}")
        if cached:
            return cached

        # Загружаем из БД
        user = await self.db.get(f"user:{user_id}")
        if user:
            await self.cache.save(f"user:{user_id}", user)

        return user

Session Storage

class SessionStore:
    def __init__(self):
        self.repo = MemoryRepository(default_ttl=1800)  # 30 минут

    async def create(self, user_id: str) -> str:
        session_id = generate_session_id()
        await self.repo.save(f"session:{session_id}", {
            "user_id": user_id,
            "created_at": datetime.now().isoformat()
        })
        return session_id

    async def get(self, session_id: str) -> Optional[dict]:
        return await self.repo.get(f"session:{session_id}")

    async def destroy(self, session_id: str) -> None:
        await self.repo.delete(f"session:{session_id}")

Project Drafts

class DraftRepository:
    def __init__(self):
        self.repo = SQLiteRepository(db_path="drafts.db")

    async def save_draft(self, user_id: str, project_id: str, data: dict):
        await self.repo.save(
            f"draft:{user_id}:{project_id}",
            {**data, "updated_at": datetime.now().isoformat()}
        )

    async def get_user_drafts(self, user_id: str) -> List[dict]:
        results = await self.repo.find(f"draft:{user_id}:*")
        return [v for k, v in results]

Миграция между хранилищами

async def migrate(source: BaseRepository, target: BaseRepository):
    """Миграция данных между репозиториями."""
    all_items = await source.find("*")

    for key, value in all_items:
        await target.save(key, value)

    print(f"Migrated {len(all_items)} items")

# Memory -> SQLite
memory_repo = MemoryRepository()
sqlite_repo = SQLiteRepository()
await migrate(memory_repo, sqlite_repo)

Примеры из production

Autoshorts — хранение прогресса

class TaskRepository:
    def __init__(self):
        self.repo = SQLiteRepository(db_path="tasks.db")

    async def update_progress(self, task_id: str, step: str, percent: int):
        task = await self.repo.get(f"task:{task_id}") or {}
        task.update({
            "current_step": step,
            "progress": percent,
            "updated_at": datetime.now().isoformat()
        })
        await self.repo.save(f"task:{task_id}", task)

    async def get_pending_tasks(self) -> List[dict]:
        results = await self.repo.find(
            "task:*",
            filter_fn=lambda k, v: v.get("status") == "pending"
        )
        return [v for k, v in results]