Repository¶
Абстракция хранилища данных с реализациями Memory и SQLite.
Возможности¶
- Единый интерфейс для разных хранилищ
- Memory — для тестов и временных данных
- SQLite — для персистентного хранения
- Async API
- CRUD операции
- Поиск и фильтрация
Использование¶
Memory Repository¶
from kit.core.repository import MemoryRepository
repo = MemoryRepository()
# Сохранение
await repo.save("user:123", {
"name": "John",
"email": "john@example.com"
})
# Получение
user = await repo.get("user:123")
print(user["name"]) # John
# Удаление
await repo.delete("user:123")
# Проверка существования
exists = await repo.exists("user:123")
SQLite Repository¶
from kit.core.repository import SQLiteRepository
repo = SQLiteRepository(db_path="./data.db")
# Тот же API
await repo.save("config:app", {"theme": "dark", "lang": "ru"})
config = await repo.get("config:app")
Поиск по паттерну¶
# Все ключи с префиксом
users = await repo.find("user:*")
# С фильтром
active_users = await repo.find(
"user:*",
filter_fn=lambda k, v: v.get("active", False)
)
Batch операции¶
# Множественное сохранение
await repo.save_many({
"user:1": {"name": "Alice"},
"user:2": {"name": "Bob"},
"user:3": {"name": "Charlie"}
})
# Множественное получение
users = await repo.get_many(["user:1", "user:2", "user:3"])
TTL (Time To Live)¶
# Сохранение с истечением
await repo.save("session:abc", {"user_id": "123"}, ttl=3600) # 1 час
# Автоматически удалится через час
API Reference¶
BaseRepository¶
class BaseRepository(ABC):
async def get(self, key: str) -> Optional[Dict]
async def save(self, key: str, value: Dict, ttl: int = None) -> None
async def delete(self, key: str) -> bool
async def exists(self, key: str) -> bool
async def find(self, pattern: str, filter_fn: Callable = None) -> List[Tuple[str, Dict]]
async def get_many(self, keys: List[str]) -> Dict[str, Dict]
async def save_many(self, items: Dict[str, Dict]) -> None
async def clear(self) -> None
async def count(self, pattern: str = "*") -> int
MemoryRepository¶
SQLiteRepository¶
class SQLiteRepository(BaseRepository):
def __init__(
self,
db_path: str = "data.db",
table_name: str = "repository",
default_ttl: int = None
)
Паттерны использования¶
Кэширование¶
class CachedService:
def __init__(self):
self.cache = MemoryRepository(default_ttl=300) # 5 минут
self.db = SQLiteRepository()
async def get_user(self, user_id: str) -> dict:
# Проверяем кэш
cached = await self.cache.get(f"user:{user_id}")
if cached:
return cached
# Загружаем из БД
user = await self.db.get(f"user:{user_id}")
if user:
await self.cache.save(f"user:{user_id}", user)
return user
Session Storage¶
class SessionStore:
def __init__(self):
self.repo = MemoryRepository(default_ttl=1800) # 30 минут
async def create(self, user_id: str) -> str:
session_id = generate_session_id()
await self.repo.save(f"session:{session_id}", {
"user_id": user_id,
"created_at": datetime.now().isoformat()
})
return session_id
async def get(self, session_id: str) -> Optional[dict]:
return await self.repo.get(f"session:{session_id}")
async def destroy(self, session_id: str) -> None:
await self.repo.delete(f"session:{session_id}")
Project Drafts¶
class DraftRepository:
def __init__(self):
self.repo = SQLiteRepository(db_path="drafts.db")
async def save_draft(self, user_id: str, project_id: str, data: dict):
await self.repo.save(
f"draft:{user_id}:{project_id}",
{**data, "updated_at": datetime.now().isoformat()}
)
async def get_user_drafts(self, user_id: str) -> List[dict]:
results = await self.repo.find(f"draft:{user_id}:*")
return [v for k, v in results]
Миграция между хранилищами¶
async def migrate(source: BaseRepository, target: BaseRepository):
"""Миграция данных между репозиториями."""
all_items = await source.find("*")
for key, value in all_items:
await target.save(key, value)
print(f"Migrated {len(all_items)} items")
# Memory -> SQLite
memory_repo = MemoryRepository()
sqlite_repo = SQLiteRepository()
await migrate(memory_repo, sqlite_repo)
Примеры из production¶
Autoshorts — хранение прогресса¶
class TaskRepository:
def __init__(self):
self.repo = SQLiteRepository(db_path="tasks.db")
async def update_progress(self, task_id: str, step: str, percent: int):
task = await self.repo.get(f"task:{task_id}") or {}
task.update({
"current_step": step,
"progress": percent,
"updated_at": datetime.now().isoformat()
})
await self.repo.save(f"task:{task_id}", task)
async def get_pending_tasks(self) -> List[dict]:
results = await self.repo.find(
"task:*",
filter_fn=lambda k, v: v.get("status") == "pending"
)
return [v for k, v in results]