Skip to content

Workers

Background tasks run on separate worker processes using TaskIQ with RabbitMQ as the message broker. This keeps the API responsive while heavy operations like transcription and video rendering happen asynchronously. Redis remains for SSE pub/sub, cache, sessions, and task result storage.

How It Works

sequenceDiagram
    participant API
    participant RMQ as RabbitMQ
    participant Worker
    participant DB

    API->>RMQ: .kicker().with_labels(priority=N).kiq()
    API->>API: Return immediately
    Worker->>RMQ: Consume (AMQP)
    RMQ->>Worker: Deliver task (highest priority first)
    Worker->>Worker: Execute task
    Worker->>DB: Update records
    Worker->>RMQ: Ack message

You define tasks as async functions decorated with @broker.task. When you call .kicker().with_labels(priority=N).kiq() on a task, it serializes the arguments and publishes a message to the RabbitMQ queue with the specified priority. Workers consume messages in priority order.

The API and workers are separate processes. You can scale workers independently by running more of them.

Brokers

We have specialized brokers for different workloads. Each uses a dedicated RabbitMQ queue:

flowchart TB
    subgraph API["FastAPI"]
        A[Queue Tasks]
    end

    subgraph RMQ["RabbitMQ Queues"]
        Q1[(download)]
        Q2[(analysis<br/>priority 0-3)]
        Q3[(render<br/>priority 0-3)]
        Q4[(asset<br/>priority 0-3)]
        Q5[(email)]
        Q6[(proxy)]
    end

    subgraph Workers["Worker Processes"]
        W1[Download Workers]
        W2[Analysis Workers]
        W3[Render Workers]
        W4[Asset Workers]
        W5[Email Workers]
        W6[Proxy Workers]
    end

    A --> Q1
    A --> Q2
    A --> Q3
    A --> Q4
    A --> Q5
    A --> Q6

    Q1 --> W1
    Q2 --> W2
    Q3 --> W3
    Q4 --> W4
    Q5 --> W5
    Q6 --> W6
Broker Queue Priority Purpose
analysis_broker analysis 0-3 Whisper transcription, silence/false-start/profanity detection
render_broker render 0-3 FFmpeg video processing, audio mute/bleep
asset_broker asset 0-3 Asset editing (trim, extract audio)
download_broker download FIFO yt-dlp (clips and assets), audio extraction, waveform, thumbnail
proxy_broker proxy FIFO FFmpeg HEVC/4K → H.264 480p re-encode for non-web-compatible clips (CPU-heavy, split from download so audio extraction isn't blocked)
email_broker email FIFO Postmark email sending, scheduled cron tasks

The brokers are defined in backend/src/infrastructure/taskiq/brokers.py.

Priority Queue

User-facing brokers (analysis, render, asset) use RabbitMQ native priority queues (max_priority=3, QueueType.CLASSIC). Higher-tier users get their tasks consumed first.

Tier is_paid Priority RabbitMQ Behavior
viral true 3 (HIGH) Consumed first
creator true 2 (NORMAL) Consumed second
hobby true 1 (LOW) Consumed third
trial / free false 0 (BACKGROUND) Consumed last

Priority is resolved at enqueue time via resolve_task_priority(tier_name, is_paid) in infrastructure/taskiq/priority.py. The tier context comes from resolve_tier_context(user_id, db).

Analysis and render workers use --ack-type when_executed so messages are only acknowledged after task completion. If a worker crashes mid-task, RabbitMQ requeues the message automatically.

Prefetch count (qos=2) is kept low to ensure strict priority ordering — workers won't buffer many low-priority messages ahead of high-priority ones arriving later.

Task Lifecycle

stateDiagram-v2
    [*] --> Queued: .kiq()
    Queued --> Running: Worker picks up
    Running --> Success: Task completes
    Running --> Failed: Error thrown
    Failed --> Queued: Retry (if enabled)
    Failed --> Dead: Max retries exceeded
    Success --> [*]
    Dead --> [*]

Defining Tasks

Tasks live in their module's tasks.py file. Per Convention #16 shape 2, task bodies hold no DB session — every DB op lives in a small named helper that opens local_session() internally and closes before returning. Storage and other non-DB clients still come through TaskiqDepends. Here's the pattern:

# backend/src/workers/download/tasks.py

from src.infrastructure.taskiq import download_broker
from src.infrastructure.taskiq.deps import get_storage_client
from taskiq import TaskiqDepends

from .context import get_clip_context, mark_clip_failed, _finalize_youtube_download

@download_broker.task(
    task_name="download_youtube_video",
    retry_on_error=True,
    max_retries=3,
)
async def download_youtube_video(
    clip_file_uuid: str,
    storage: StorageClient = TaskiqDepends(get_storage_client),
) -> DownloadResult:
    """Download video from YouTube and process it."""
    ctx = await get_clip_context(clip_file_uuid)  # opens + closes a session
    # ... slow IO (yt-dlp, FFmpeg, R2 uploads) — no session held
    await _finalize_youtube_download(clip_file_uuid, ...)  # opens + closes a session

Key points:

  1. Task names must be unique across all brokers
  2. All arguments must be JSON-serializable (except TaskiqDepends injections)
  3. Task bodies do not take a db parameter or hold a session. Per Convention #16, DB ops live in named helpers that own their session lifecycle. Pipeline Deps classes (RenderDeps, AnalysisDeps) likewise carry no db field. Helper modules: workers/download/context.py, workers/render/data.py, workers/analysis/data.py. # SLOW_IO markers tag any helper that performs slow IO (FFmpeg, yt-dlp, large R2 transfers, third-party LLM/transcription/payment APIs) so reviewers can spot scope mismatches at a glance.
  4. Return values should be serializable for result storage

Queuing Tasks

For priority-routed tasks (analysis, render, asset), resolve the user's tier and enqueue with priority:

tier_ctx = await resolve_tier_context(user_id, db)
priority = resolve_task_priority(tier_ctx.tier_name, tier_ctx.is_paid)
await analyze_project.kicker().with_labels(priority=int(priority)).kiq(
    project_uuid=str(project_uuid),
    pacing_level=50,
)

For non-priority tasks (download, email), use plain .kiq():

await download_youtube_video.kiq(clip_file_uuid=str(clip.uuid))

Retry Policy

flowchart LR
    A[Task Fails] --> B{Retries left?}
    B -->|Yes| C[Wait with backoff]
    C --> D[Retry task]
    D --> E{Success?}
    E -->|No| B
    E -->|Yes| F[Done]
    B -->|No| G[Mark as dead]

The download and asset brokers use SmartRetryMiddleware:

  • Max retries: 3
  • Initial delay: 5 seconds
  • Exponential backoff with jitter
  • Max delay: 60 seconds

This handles transient failures like network issues. Permanent failures (like invalid YouTube URLs) fail fast.

The variable per-message backoff requires the rabbitmq_delayed_message_exchange plugin on the broker — installed via rabbitmq/Dockerfile and enabled via delayed_message_exchange_plugin=True on AioPikaBroker. Without it, every retry kick raises IncorrectRoutingKeyError and the task is silently dropped.

Running Workers

Start specific brokers for scaling:

# Scale download workers
uv run taskiq worker src.infrastructure.taskiq:download_broker --workers 4

# Scale analysis workers (CPU-heavy)
uv run taskiq worker src.infrastructure.taskiq:analysis_broker --workers 2

Graceful Shutdown

Each TaskIQ worker is launched with --wait-tasks-timeout 50 and the container has stop_grace_period: 60s (defaults; override per env via TASKIQ_WAIT_TASKS_TIMEOUT_SECONDS and TASKIQ_STOP_GRACE_PERIOD). When the container receives SIGTERM, taskiq stops accepting new tasks and waits up to 50s for in-flight tasks to finish before disconnecting from the broker; Docker's SIGTERM → SIGKILL window is 60s, so there's a 10s buffer for WorkerContext.shutdown() (httpx pools, OpenAI client) to drain. Tasks that don't complete within the wait window get redelivered on the next worker (ack-type when_executed means the broker still owns the message until taskiq acks it).

This is operator-visible: a deploy that ships a long-running task at the same moment as a worker restart will get a single retry on the new worker rather than a partial-result failure.

Scaling Strategy

flowchart TB
    subgraph Download["Download Workers (I/O bound)"]
        D1[Worker 1]
        D2[Worker 2]
        D3[Worker 3]
        D4[Worker 4]
    end

    subgraph Analysis["Analysis Workers (CPU bound)"]
        A1[Worker 1]
        A2[Worker 2]
    end

    subgraph Render["Render Workers (CPU bound)"]
        R1[Worker 1]
        R2[Worker 2]
    end

In Docker Compose, the worker service runs all brokers. For production, you'd typically split them into separate deployments.

Database Access

Tasks run in separate processes, so they need their own database connections. Per Convention #16 shape 2, sessions are scoped to individual DB ops via small named helpers — never held across slow IO. Each helper opens local_session(), runs its query, and closes before returning:

# workers/download/context.py
from ...infrastructure.database import local_session

async def get_clip_context(clip_file_uuid: str) -> ClipContext:
    """Load the per-task ClipFile + Project read-set."""
    async with local_session() as db:
        clip_file = await crud_clip_files.get(db=db, uuid=clip_file_uuid)
        # ... build context dataclass
        return ClipContext(...)

async def _finalize_youtube_download(clip_file_uuid: str, ...) -> None:  # write phase
    async with local_session() as db:
        await crud_clip_files.update(db=db, object={...}, uuid=clip_file_uuid)
        await db.commit()

The task body composes these helpers around slow IO. The connection pool uses NullPool so each session opens a fresh connection. Aspirational enforcement: rg "TaskiqDepends\(get_db_session\)" backend/src/workers/ and rg "deps\.db" backend/src/workers/ both return zero hits.

Key Files

Component Location
Broker setup backend/src/infrastructure/taskiq/brokers.py
DB dependency backend/src/infrastructure/taskiq/deps.py
Worker entry backend/src/infrastructure/taskiq/worker.py
Download tasks (yt-dlp, audio extraction, waveform, thumbnail) backend/src/workers/download/tasks.py
Proxy task (generate_clip_proxy, registered on proxy_broker) backend/src/workers/download/tasks.py
Analysis tasks backend/src/workers/analysis/tasks.py
Render tasks backend/src/workers/render/tasks.py
Asset edit tasks backend/src/workers/asset_edit/tasks.py

← Events Timeline →