Перейти к содержанию

Jobs

Управление фоновыми задачами с отслеживанием статуса.

Возможности

  • Асинхронное выполнение задач
  • Пул воркеров с ограничением
  • Отслеживание статуса
  • Отмена задач
  • Retry с backoff

Использование

Базовое использование

from kit.pipeline.jobs import JobManager, JobStatus

manager = JobManager(max_workers=4)

# Функция задачи
async def process_video(video_path: str, output_path: str):
    # Долгая обработка
    await heavy_processing(video_path)
    return {"output": output_path}

# Запуск задачи
job = await manager.submit(
    process_video,
    args={"video_path": "input.mp4", "output_path": "output.mp4"}
)

print(f"Job ID: {job.id}")
print(f"Status: {job.status}")  # JobStatus.PENDING

Ожидание результата

# Синхронное ожидание
result = await manager.wait(job.id)
print(result)  # {"output": "output.mp4"}

# С таймаутом
try:
    result = await manager.wait(job.id, timeout=300)
except TimeoutError:
    print("Job timed out")

Отслеживание прогресса

async def process_with_progress(data: list, job_id: str, manager: JobManager):
    total = len(data)
    for i, item in enumerate(data):
        await process_item(item)
        await manager.update_progress(job_id, (i + 1) / total * 100)

    return {"processed": total}

# Получение прогресса
progress = await manager.get_progress(job.id)
print(f"Progress: {progress}%")

Callback на завершение

@manager.on_complete
async def handle_complete(job_id: str, result: dict):
    print(f"Job {job_id} completed: {result}")

@manager.on_failure
async def handle_failure(job_id: str, error: Exception):
    print(f"Job {job_id} failed: {error}")
    await notify_admin(job_id, error)

Отмена задачи

await manager.cancel(job.id)

job = await manager.get(job.id)
print(job.status)  # JobStatus.CANCELLED

Retry логика

manager = JobManager(
    max_workers=4,
    max_retries=3,
    retry_delay=5.0,  # секунды между попытками
    retry_backoff=2.0  # экспоненциальный backoff
)

# Задача будет повторена до 3 раз при ошибке
job = await manager.submit(unreliable_task)

API Reference

JobStatus

class JobStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

Job

@dataclass
class Job:
    id: str
    status: JobStatus
    progress: float = 0.0
    result: Any = None
    error: Optional[str] = None
    created_at: datetime
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None

JobManager

class JobManager:
    def __init__(
        self,
        max_workers: int = 4,
        max_retries: int = 0,
        retry_delay: float = 1.0,
        retry_backoff: float = 2.0
    )

    async def submit(self, func: Callable, args: dict = None) -> Job
    async def wait(self, job_id: str, timeout: float = None) -> Any
    async def cancel(self, job_id: str) -> bool
    async def get(self, job_id: str) -> Optional[Job]
    async def get_progress(self, job_id: str) -> float
    async def update_progress(self, job_id: str, progress: float) -> None
    async def list_jobs(self, status: JobStatus = None) -> List[Job]

    def on_complete(self, handler: Callable) -> None
    def on_failure(self, handler: Callable) -> None

Примеры из production

Autoshorts — генерация видео

class VideoJobManager:
    def __init__(self):
        self.manager = JobManager(max_workers=2, max_retries=2)
        self.manager.on_complete(self.notify_user)
        self.manager.on_failure(self.handle_failure)

    async def create_video(self, user_id: str, params: dict) -> str:
        job = await self.manager.submit(
            self.generate_video,
            args={"user_id": user_id, **params}
        )
        return job.id

    async def generate_video(self, user_id: str, prompt: str, style: str):
        # Step 1: Generate script
        script = await self.generate_script(prompt)

        # Step 2: Generate audio
        audio = await self.generate_audio(script)

        # Step 3: Generate images
        images = await self.generate_images(script, style)

        # Step 4: Compose video
        video = await self.compose_video(images, audio)

        return {"video_url": video.url, "user_id": user_id}

    async def notify_user(self, job_id: str, result: dict):
        await telegram.send_message(
            result["user_id"],
            f"Ваше видео готово: {result['video_url']}"
        )

    async def handle_failure(self, job_id: str, error: Exception):
        await telegram.send_message(
            admin_id,
            f"Job {job_id} failed: {error}"
        )

Batch обработка

class BatchProcessor:
    def __init__(self):
        self.manager = JobManager(max_workers=8)

    async def process_batch(self, items: List[dict]) -> List[str]:
        # Запуск всех задач
        jobs = []
        for item in items:
            job = await self.manager.submit(self.process_item, args=item)
            jobs.append(job.id)

        # Ожидание всех
        results = []
        for job_id in jobs:
            try:
                result = await self.manager.wait(job_id, timeout=600)
                results.append(result)
            except Exception as e:
                results.append({"error": str(e)})

        return results