Jobs¶
Управление фоновыми задачами с отслеживанием статуса.
Возможности¶
- Асинхронное выполнение задач
- Пул воркеров с ограничением
- Отслеживание статуса
- Отмена задач
- Retry с backoff
Использование¶
Базовое использование¶
from kit.pipeline.jobs import JobManager, JobStatus
manager = JobManager(max_workers=4)
# Функция задачи
async def process_video(video_path: str, output_path: str):
# Долгая обработка
await heavy_processing(video_path)
return {"output": output_path}
# Запуск задачи
job = await manager.submit(
process_video,
args={"video_path": "input.mp4", "output_path": "output.mp4"}
)
print(f"Job ID: {job.id}")
print(f"Status: {job.status}") # JobStatus.PENDING
Ожидание результата¶
# Синхронное ожидание
result = await manager.wait(job.id)
print(result) # {"output": "output.mp4"}
# С таймаутом
try:
result = await manager.wait(job.id, timeout=300)
except TimeoutError:
print("Job timed out")
Отслеживание прогресса¶
async def process_with_progress(data: list, job_id: str, manager: JobManager):
total = len(data)
for i, item in enumerate(data):
await process_item(item)
await manager.update_progress(job_id, (i + 1) / total * 100)
return {"processed": total}
# Получение прогресса
progress = await manager.get_progress(job.id)
print(f"Progress: {progress}%")
Callback на завершение¶
@manager.on_complete
async def handle_complete(job_id: str, result: dict):
print(f"Job {job_id} completed: {result}")
@manager.on_failure
async def handle_failure(job_id: str, error: Exception):
print(f"Job {job_id} failed: {error}")
await notify_admin(job_id, error)
Отмена задачи¶
await manager.cancel(job.id)
job = await manager.get(job.id)
print(job.status) # JobStatus.CANCELLED
Retry логика¶
manager = JobManager(
max_workers=4,
max_retries=3,
retry_delay=5.0, # секунды между попытками
retry_backoff=2.0 # экспоненциальный backoff
)
# Задача будет повторена до 3 раз при ошибке
job = await manager.submit(unreliable_task)
API Reference¶
JobStatus¶
class JobStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
Job¶
@dataclass
class Job:
id: str
status: JobStatus
progress: float = 0.0
result: Any = None
error: Optional[str] = None
created_at: datetime
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
JobManager¶
class JobManager:
def __init__(
self,
max_workers: int = 4,
max_retries: int = 0,
retry_delay: float = 1.0,
retry_backoff: float = 2.0
)
async def submit(self, func: Callable, args: dict = None) -> Job
async def wait(self, job_id: str, timeout: float = None) -> Any
async def cancel(self, job_id: str) -> bool
async def get(self, job_id: str) -> Optional[Job]
async def get_progress(self, job_id: str) -> float
async def update_progress(self, job_id: str, progress: float) -> None
async def list_jobs(self, status: JobStatus = None) -> List[Job]
def on_complete(self, handler: Callable) -> None
def on_failure(self, handler: Callable) -> None
Примеры из production¶
Autoshorts — генерация видео¶
class VideoJobManager:
def __init__(self):
self.manager = JobManager(max_workers=2, max_retries=2)
self.manager.on_complete(self.notify_user)
self.manager.on_failure(self.handle_failure)
async def create_video(self, user_id: str, params: dict) -> str:
job = await self.manager.submit(
self.generate_video,
args={"user_id": user_id, **params}
)
return job.id
async def generate_video(self, user_id: str, prompt: str, style: str):
# Step 1: Generate script
script = await self.generate_script(prompt)
# Step 2: Generate audio
audio = await self.generate_audio(script)
# Step 3: Generate images
images = await self.generate_images(script, style)
# Step 4: Compose video
video = await self.compose_video(images, audio)
return {"video_url": video.url, "user_id": user_id}
async def notify_user(self, job_id: str, result: dict):
await telegram.send_message(
result["user_id"],
f"Ваше видео готово: {result['video_url']}"
)
async def handle_failure(self, job_id: str, error: Exception):
await telegram.send_message(
admin_id,
f"Job {job_id} failed: {error}"
)
Batch обработка¶
class BatchProcessor:
def __init__(self):
self.manager = JobManager(max_workers=8)
async def process_batch(self, items: List[dict]) -> List[str]:
# Запуск всех задач
jobs = []
for item in items:
job = await self.manager.submit(self.process_item, args=item)
jobs.append(job.id)
# Ожидание всех
results = []
for job_id in jobs:
try:
result = await self.manager.wait(job_id, timeout=600)
results.append(result)
except Exception as e:
results.append({"error": str(e)})
return results