Workers¶
Background tasks run on separate worker processes using TaskIQ with RabbitMQ as the message broker. This keeps the API responsive while heavy operations like transcription and video rendering happen asynchronously. Redis remains for SSE pub/sub, cache, sessions, and task result storage.
How It Works¶
sequenceDiagram
participant API
participant RMQ as RabbitMQ
participant Worker
participant DB
API->>RMQ: .kicker().with_labels(priority=N).kiq()
API->>API: Return immediately
Worker->>RMQ: Consume (AMQP)
RMQ->>Worker: Deliver task (highest priority first)
Worker->>Worker: Execute task
Worker->>DB: Update records
Worker->>RMQ: Ack message
You define tasks as async functions decorated with @broker.task. When you call .kicker().with_labels(priority=N).kiq() on a task, it serializes the arguments and publishes a message to the RabbitMQ queue with the specified priority. Workers consume messages in priority order.
The API and workers are separate processes. You can scale workers independently by running more of them.
Brokers¶
We have specialized brokers for different workloads. Each uses a dedicated RabbitMQ queue:
flowchart TB
subgraph API["FastAPI"]
A[Queue Tasks]
end
subgraph RMQ["RabbitMQ Queues"]
Q1[(download)]
Q2[(analysis<br/>priority 0-3)]
Q3[(render<br/>priority 0-3)]
Q4[(asset<br/>priority 0-3)]
Q5[(email)]
Q6[(proxy)]
end
subgraph Workers["Worker Processes"]
W1[Download Workers]
W2[Analysis Workers]
W3[Render Workers]
W4[Asset Workers]
W5[Email Workers]
W6[Proxy Workers]
end
A --> Q1
A --> Q2
A --> Q3
A --> Q4
A --> Q5
A --> Q6
Q1 --> W1
Q2 --> W2
Q3 --> W3
Q4 --> W4
Q5 --> W5
Q6 --> W6
| Broker | Queue | Priority | Purpose |
|---|---|---|---|
analysis_broker |
analysis | 0-3 | Whisper transcription, silence/false-start/profanity detection |
render_broker |
render | 0-3 | FFmpeg video processing, audio mute/bleep |
asset_broker |
asset | 0-3 | Asset editing (trim, extract audio) |
download_broker |
download | FIFO | yt-dlp (clips and assets), audio extraction, waveform, thumbnail |
proxy_broker |
proxy | FIFO | FFmpeg HEVC/4K → H.264 480p re-encode for non-web-compatible clips (CPU-heavy, split from download so audio extraction isn't blocked) |
email_broker |
FIFO | Postmark email sending, scheduled cron tasks |
The brokers are defined in backend/src/infrastructure/taskiq/brokers.py.
Priority Queue¶
User-facing brokers (analysis, render, asset) use RabbitMQ native priority queues (max_priority=3, QueueType.CLASSIC). Higher-tier users get their tasks consumed first.
| Tier | is_paid |
Priority | RabbitMQ Behavior |
|---|---|---|---|
| viral | true | 3 (HIGH) | Consumed first |
| creator | true | 2 (NORMAL) | Consumed second |
| hobby | true | 1 (LOW) | Consumed third |
| trial / free | false | 0 (BACKGROUND) | Consumed last |
Priority is resolved at enqueue time via resolve_task_priority(tier_name, is_paid) in infrastructure/taskiq/priority.py. The tier context comes from resolve_tier_context(user_id, db).
Analysis and render workers use --ack-type when_executed so messages are only acknowledged after task completion. If a worker crashes mid-task, RabbitMQ requeues the message automatically.
Prefetch count (qos=2) is kept low to ensure strict priority ordering — workers won't buffer many low-priority messages ahead of high-priority ones arriving later.
Task Lifecycle¶
stateDiagram-v2
[*] --> Queued: .kiq()
Queued --> Running: Worker picks up
Running --> Success: Task completes
Running --> Failed: Error thrown
Failed --> Queued: Retry (if enabled)
Failed --> Dead: Max retries exceeded
Success --> [*]
Dead --> [*]
Defining Tasks¶
Tasks live in their module's tasks.py file. Per Convention #16 shape 2, task bodies hold no DB session — every DB op lives in a small named helper that opens local_session() internally and closes before returning. Storage and other non-DB clients still come through TaskiqDepends. Here's the pattern:
# backend/src/workers/download/tasks.py
from src.infrastructure.taskiq import download_broker
from src.infrastructure.taskiq.deps import get_storage_client
from taskiq import TaskiqDepends
from .context import get_clip_context, mark_clip_failed, _finalize_youtube_download
@download_broker.task(
task_name="download_youtube_video",
retry_on_error=True,
max_retries=3,
)
async def download_youtube_video(
clip_file_uuid: str,
storage: StorageClient = TaskiqDepends(get_storage_client),
) -> DownloadResult:
"""Download video from YouTube and process it."""
ctx = await get_clip_context(clip_file_uuid) # opens + closes a session
# ... slow IO (yt-dlp, FFmpeg, R2 uploads) — no session held
await _finalize_youtube_download(clip_file_uuid, ...) # opens + closes a session
Key points:
- Task names must be unique across all brokers
- All arguments must be JSON-serializable (except
TaskiqDependsinjections) - Task bodies do not take a
dbparameter or hold a session. Per Convention #16, DB ops live in named helpers that own their session lifecycle. PipelineDepsclasses (RenderDeps,AnalysisDeps) likewise carry nodbfield. Helper modules:workers/download/context.py,workers/render/data.py,workers/analysis/data.py.# SLOW_IOmarkers tag any helper that performs slow IO (FFmpeg, yt-dlp, large R2 transfers, third-party LLM/transcription/payment APIs) so reviewers can spot scope mismatches at a glance. - Return values should be serializable for result storage
Queuing Tasks¶
For priority-routed tasks (analysis, render, asset), resolve the user's tier and enqueue with priority:
tier_ctx = await resolve_tier_context(user_id, db)
priority = resolve_task_priority(tier_ctx.tier_name, tier_ctx.is_paid)
await analyze_project.kicker().with_labels(priority=int(priority)).kiq(
project_uuid=str(project_uuid),
pacing_level=50,
)
For non-priority tasks (download, email), use plain .kiq():
Retry Policy¶
flowchart LR
A[Task Fails] --> B{Retries left?}
B -->|Yes| C[Wait with backoff]
C --> D[Retry task]
D --> E{Success?}
E -->|No| B
E -->|Yes| F[Done]
B -->|No| G[Mark as dead]
The download and asset brokers use SmartRetryMiddleware:
- Max retries: 3
- Initial delay: 5 seconds
- Exponential backoff with jitter
- Max delay: 60 seconds
This handles transient failures like network issues. Permanent failures (like invalid YouTube URLs) fail fast.
The variable per-message backoff requires the rabbitmq_delayed_message_exchange plugin on the broker — installed via rabbitmq/Dockerfile and enabled via delayed_message_exchange_plugin=True on AioPikaBroker. Without it, every retry kick raises IncorrectRoutingKeyError and the task is silently dropped.
Running Workers¶
Start specific brokers for scaling:
# Scale download workers
uv run taskiq worker src.infrastructure.taskiq:download_broker --workers 4
# Scale analysis workers (CPU-heavy)
uv run taskiq worker src.infrastructure.taskiq:analysis_broker --workers 2
Graceful Shutdown¶
Each TaskIQ worker is launched with --wait-tasks-timeout 50 and the
container has stop_grace_period: 60s (defaults; override per env via
TASKIQ_WAIT_TASKS_TIMEOUT_SECONDS and TASKIQ_STOP_GRACE_PERIOD). When
the container receives SIGTERM, taskiq stops accepting new tasks and
waits up to 50s for in-flight tasks to finish before disconnecting from
the broker; Docker's SIGTERM → SIGKILL window is 60s, so there's a 10s
buffer for WorkerContext.shutdown() (httpx pools, OpenAI client) to
drain. Tasks that don't complete within the wait window get redelivered
on the next worker (ack-type when_executed means the broker still owns
the message until taskiq acks it).
This is operator-visible: a deploy that ships a long-running task at the same moment as a worker restart will get a single retry on the new worker rather than a partial-result failure.
Scaling Strategy¶
flowchart TB
subgraph Download["Download Workers (I/O bound)"]
D1[Worker 1]
D2[Worker 2]
D3[Worker 3]
D4[Worker 4]
end
subgraph Analysis["Analysis Workers (CPU bound)"]
A1[Worker 1]
A2[Worker 2]
end
subgraph Render["Render Workers (CPU bound)"]
R1[Worker 1]
R2[Worker 2]
end
In Docker Compose, the worker service runs all brokers. For production, you'd typically split them into separate deployments.
Database Access¶
Tasks run in separate processes, so they need their own database connections. Per Convention #16 shape 2, sessions are scoped to individual DB ops via small named helpers — never held across slow IO. Each helper opens local_session(), runs its query, and closes before returning:
# workers/download/context.py
from ...infrastructure.database import local_session
async def get_clip_context(clip_file_uuid: str) -> ClipContext:
"""Load the per-task ClipFile + Project read-set."""
async with local_session() as db:
clip_file = await crud_clip_files.get(db=db, uuid=clip_file_uuid)
# ... build context dataclass
return ClipContext(...)
async def _finalize_youtube_download(clip_file_uuid: str, ...) -> None: # write phase
async with local_session() as db:
await crud_clip_files.update(db=db, object={...}, uuid=clip_file_uuid)
await db.commit()
The task body composes these helpers around slow IO. The connection pool uses NullPool so each session opens a fresh connection. Aspirational enforcement: rg "TaskiqDepends\(get_db_session\)" backend/src/workers/ and rg "deps\.db" backend/src/workers/ both return zero hits.
Key Files¶
| Component | Location |
|---|---|
| Broker setup | backend/src/infrastructure/taskiq/brokers.py |
| DB dependency | backend/src/infrastructure/taskiq/deps.py |
| Worker entry | backend/src/infrastructure/taskiq/worker.py |
| Download tasks (yt-dlp, audio extraction, waveform, thumbnail) | backend/src/workers/download/tasks.py |
Proxy task (generate_clip_proxy, registered on proxy_broker) |
backend/src/workers/download/tasks.py |
| Analysis tasks | backend/src/workers/analysis/tasks.py |
| Render tasks | backend/src/workers/render/tasks.py |
| Asset edit tasks | backend/src/workers/asset_edit/tasks.py |