AI & Workers Development Guide¶
This guide covers patterns for developing background workers and AI-powered analysis in Sapari. Workers handle anything too slow for a request/response cycle - video transcription, LLM analysis, FFmpeg rendering.
Worker Structure¶
Each worker follows a consistent layout:
flowchart TB
subgraph workers["workers/"]
A[analysis/] --> AT[tasks.py]
A --> AP[pipeline.py]
A --> AD[deps.py]
A --> AS[steps/]
R[render/] --> RT[tasks.py]
R --> RP[pipeline.py]
D[download/] --> DT[tasks.py]
end
Each worker module follows this structure:
workers/{name}/
├── tasks.py # TaskIQ task definitions
├── pipeline.py # FastroAI pipeline definition
├── deps.py # Dependency injection classes
├── schemas.py # Input/output schemas
├── constants.py # Configuration constants
└── {subdomain}/ # Step implementations
├── step.py
└── logic.py
Pipeline Pattern¶
Pipelines are DAGs that orchestrate multi-step workflows. Steps with no dependencies between them run in parallel automatically:
flowchart LR
A[load_audio] --> B[transcribe]
B --> C[detect_silences]
B --> D[detect_false_starts]
B --> IT[improve_transcript]
B --> IA[insert_fixed_assets]
B --> AD[insert_ai_directed_assets]
IT --> P[censor_profanity]
C --> E[validate_edits]
D --> E
E --> F[create_edits]
P --> F
IA --> F
AD --> F
F --> G[update_project]
P --> G
Defining a Pipeline¶
# workers/analysis/pipeline.py
from typing import Any
from fastroai import Pipeline, PipelineConfig, StepConfig
from .deps import AnalysisDeps
from .audio import LoadAudioStep
from .transcription import TranscribeStep
from .silence import DetectSilencesStep
from .false_starts import DetectFalseStartsStep, ValidateEditsStep
from .transcript_improvement import ImproveTranscriptStep
from .assets import InsertFixedAssetsStep
from .ai_director import InsertAIDirectedAssetsStep
from .profanity import CensorProfanityStep
from .edits import CreateEditsStep, UpdateProjectStep
analysis_pipeline: Pipeline[AnalysisDeps, dict[str, Any], None] = Pipeline(
name="video_analysis",
# Step instances
steps={
"load_audio": LoadAudioStep(),
"transcribe": TranscribeStep(),
"detect_silences": DetectSilencesStep(),
"detect_false_starts": DetectFalseStartsStep(),
"improve_transcript": ImproveTranscriptStep(),
"insert_fixed_assets": InsertFixedAssetsStep(),
"insert_ai_directed_assets": InsertAIDirectedAssetsStep(),
"censor_profanity": CensorProfanityStep(),
"validate_edits": ValidateEditsStep(),
"create_edits": CreateEditsStep(),
"update_project": UpdateProjectStep(),
},
# Dependency graph - steps run in parallel when possible
dependencies={
"transcribe": ["load_audio"],
"detect_silences": ["transcribe"],
"detect_false_starts": ["transcribe", "load_audio"],
"improve_transcript": ["transcribe"],
"insert_fixed_assets": ["transcribe"],
"insert_ai_directed_assets": ["transcribe"],
"censor_profanity": ["improve_transcript"],
"validate_edits": ["detect_silences", "detect_false_starts"],
"create_edits": ["validate_edits", "censor_profanity", "insert_fixed_assets", "insert_ai_directed_assets"],
"update_project": ["create_edits", "censor_profanity"],
},
# Which step's output is the final result
output_step="update_project",
# Global timeout
config=PipelineConfig(timeout=600.0),
# Per-step timeouts
step_configs={
"load_audio": StepConfig(timeout=120.0),
"transcribe": StepConfig(timeout=300.0),
"detect_false_starts": StepConfig(timeout=600.0),
"insert_ai_directed_assets": StepConfig(timeout=180.0),
},
)
Pipeline Dependencies¶
Dependencies are shared resources injected into all steps:
# workers/analysis/deps.py
from dataclasses import dataclass
from openai import AsyncOpenAI
from sqlalchemy.ext.asyncio import AsyncSession
from ...infrastructure.storage import StorageClient
@dataclass
class AnalysisDeps:
"""Dependencies shared across all analysis steps."""
db: AsyncSession
storage: StorageClient
openai_client: AsyncOpenAI
Step Pattern¶
Steps are the atomic units of work. Each step receives outputs from its dependencies via ctx.get_dependency() and returns a typed result:
flowchart TB
subgraph Step["BaseStep"]
I[__init__] --> |Initialize resources| E[execute]
E --> |Access deps| D[ctx.deps]
E --> |Get input| IN[ctx.get_input]
E --> |Get previous output| P[ctx.get_dependency]
E --> |Return result| R[Output Type]
end
Creating a Step¶
# workers/analysis/transcription/step.py
from pathlib import Path
from uuid import UUID
from fastroai import BaseStep, StepContext
from ....infrastructure.events import AnalysisProgressEvent, publish_event
from ..deps import AnalysisDeps
from ..schemas import AnalysisInput, TranscriptionResult
from .whisper import transcribe
class TranscribeStep(BaseStep[AnalysisDeps, TranscriptionResult]):
"""Transcribe audio using OpenAI Whisper API."""
async def execute(self, ctx: StepContext[AnalysisDeps]) -> TranscriptionResult:
# Get output from previous step
audio_path: Path = ctx.get_dependency("load_audio")
# Get pipeline input data
input_data: AnalysisInput = ctx.get_input("input")
project_uuid = UUID(input_data.project_uuid)
# Publish progress event for UI
await publish_event(
AnalysisProgressEvent(
project_uuid=project_uuid,
step="transcribe",
progress=30,
message="Transcribing speech...",
)
)
# Access shared dependencies
result = await transcribe(
client=ctx.deps.openai_client,
audio_path=audio_path,
language=input_data.language,
)
return result
Step Context Methods¶
| Method | Purpose |
|---|---|
ctx.deps |
Access shared dependencies (db, storage, etc.) |
ctx.get_input("input") |
Get pipeline input data |
ctx.get_dependency("step_name") |
Get output from a previous step |
ctx.run(agent, message) |
Run an LLM agent with cost tracking |
Task Pattern¶
Tasks are the entry points. API routes queue tasks, TaskIQ executes them asynchronously, and the task sets up dependencies before running the pipeline:
# workers/analysis/tasks.py
from typing import Annotated
from uuid import UUID
from openai import AsyncOpenAI
from taskiq import TaskiqDepends
from ...infrastructure.storage import StorageClient
from ...infrastructure.taskiq import DBSession, analysis_broker
from ...infrastructure.events import (
AnalysisStartedEvent,
AnalysisCompleteEvent,
AnalysisFailedEvent,
publish_event,
)
from ...modules.project.crud import crud_projects
from ...modules.project.enums import ProjectStatus
from .deps import AnalysisDeps
from .pipeline import analysis_pipeline
from .schemas import AnalysisInput, AnalysisOutput
# Dependency factories
def get_storage_client() -> StorageClient:
return StorageClient()
def get_openai_client() -> AsyncOpenAI:
return AsyncOpenAI()
# Type aliases for injection
StorageClientDep = Annotated[StorageClient, TaskiqDepends(get_storage_client)]
OpenAIClientDep = Annotated[AsyncOpenAI, TaskiqDepends(get_openai_client)]
@analysis_broker.task(task_name="analyze_project")
async def analyze_project(
project_uuid: str,
pacing_level: int = 50,
false_start_sensitivity: int = 50,
language: str | None = None,
# Injected dependencies (use None with type: ignore)
db: DBSession = None, # type: ignore[assignment]
storage: StorageClientDep = None, # type: ignore[assignment]
openai_client: OpenAIClientDep = None, # type: ignore[assignment]
) -> AnalysisOutput:
"""Analyze a project: transcribe and detect edits."""
project_uuid_obj = UUID(project_uuid)
temp_dir = None
try:
# Update status
await crud_projects.update(
db,
object={"status": ProjectStatus.ANALYZING},
uuid=project_uuid_obj,
)
# Publish start event
await publish_event(
AnalysisStartedEvent(
project_uuid=project_uuid_obj,
pacing_level=pacing_level,
language=language,
)
)
# Build dependencies and input
deps = AnalysisDeps(db=db, storage=storage, openai_client=openai_client)
input_data = AnalysisInput(
project_uuid=project_uuid,
pacing_level=pacing_level,
false_start_sensitivity=false_start_sensitivity,
language=language,
)
# Execute pipeline
result = await analysis_pipeline.execute(
input_data={"input": input_data},
deps=deps,
)
# Extract results
transcript = result.step_outputs.get("transcribe")
edits_count = result.step_outputs.get("create_edits", 0)
# Publish completion
await publish_event(
AnalysisCompleteEvent(
project_uuid=project_uuid_obj,
edit_count=edits_count,
word_count=len(transcript.words) if transcript else 0,
)
)
return AnalysisOutput(
project_uuid=project_uuid,
transcript_text=transcript.text if transcript else "",
edits_created=edits_count,
)
except Exception as e:
# Publish failure event
await publish_event(
AnalysisFailedEvent(
project_uuid=project_uuid_obj,
error=str(e)[:500],
)
)
# Update status
await crud_projects.update(
db,
object={
"status": ProjectStatus.FAILED,
"error_message": str(e)[:500],
},
uuid=project_uuid_obj,
)
raise
finally:
# Cleanup temp files
if temp_dir and temp_dir.exists():
shutil.rmtree(temp_dir)
Triggering Tasks¶
From API routes:
from ...workers.analysis.tasks import analyze_project
# Queue the task (returns immediately)
await analyze_project.kiq(
project_uuid=str(project.uuid),
pacing_level=50,
false_start_sensitivity=70,
language="en",
)
LLM Integration¶
Using FastroAgent with PydanticAI¶
Agents use pydantic_ai.Agent for structured LLM output, wrapped in FastroAgent for pipeline integration and cost tracking. Initialize once in __init__ (reused across executions), call via ctx.run():
from fastroai import BaseStep, FastroAgent, StepContext
from pydantic_ai import Agent
from pydantic_ai.models.fallback import FallbackModel
from pydantic_ai.models.openai import OpenAIChatModel
from pydantic_ai.providers.deepseek import DeepSeekProvider
class DetectFalseStartsStep(BaseStep[AnalysisDeps, list[FalseStartRegion]]):
"""Detect false starts using LLM analysis."""
def __init__(self) -> None:
# DeepSeek Reasoner primary, GPT-5-mini fallback
fallback_model = FallbackModel(
OpenAIChatModel("deepseek-reasoner", provider=DeepSeekProvider()),
OpenAIChatModel("gpt-5-mini"),
)
pydantic_agent: Agent[None, DetectionResult] = Agent(
model=fallback_model,
system_prompt=SYSTEM_PROMPT,
output_type=DetectionResult, # Pydantic model
)
self._agent: FastroAgent[DetectionResult] = FastroAgent(
agent=pydantic_agent,
output_type=DetectionResult,
temperature=0.3,
timeout=300,
max_tokens=16000,
)
async def execute(self, ctx: StepContext[AnalysisDeps]) -> list[FalseStartRegion]:
transcript: TranscriptionResult = ctx.get_dependency("transcribe")
input_data: AnalysisInput = ctx.get_input("input")
# Use ctx.run() for cost tracking
response = await ctx.run(self._agent, user_message)
result: DetectionResult = response.output
return process_results(result, transcript.words)
Structured Output¶
LLM responses are Pydantic models:
from pydantic import BaseModel
class Cut(BaseModel):
start_word_idx: int
end_word_idx: int
removed_text: str
keeper_preview: str
confidence: float # 0.0-1.0
pattern_type: str
reason: str
class DetectionResult(BaseModel):
cuts: list[Cut]
explanation: str
Sensitivity Thresholds¶
Map user sensitivity (0-100) to confidence thresholds:
def get_sensitivity_threshold(sensitivity: int) -> float:
"""Higher sensitivity = lower threshold = more detections."""
thresholds = {
25: 0.85, # Conservative - high confidence only
50: 0.70, # Balanced
75: 0.55, # Aggressive
100: 0.40, # Very aggressive
}
# Interpolate for values between
return interpolate(sensitivity, thresholds)
Whisper Integration¶
Transcription with Word Timing¶
# workers/analysis/transcription/whisper.py
async def transcribe(
client: AsyncOpenAI,
audio_path: Path,
language: str | None = None,
) -> TranscriptionResult:
"""Transcribe with word-level timestamps."""
file_size = audio_path.stat().st_size
# Auto-chunk large files
if file_size <= WHISPER_MAX_FILE_SIZE:
return await _transcribe_single(client, audio_path, language)
else:
return await _transcribe_chunked(client, audio_path, language)
async def _transcribe_single(client, audio_path, language):
with open(audio_path, "rb") as audio_file:
response = await client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
response_format="verbose_json",
timestamp_granularities=["word"],
language=language,
)
# Extract word-level timing
words = [
TranscriptWord(
word=w.word,
start_ms=int(w.start * 1000),
end_ms=int(w.end * 1000),
)
for w in response.words
]
return TranscriptionResult(
language=response.language,
duration_ms=int(response.duration * 1000),
words=words,
text=response.text,
)
Event Publishing¶
SSE Events¶
Workers publish progress events via Redis pub/sub. The frontend subscribes and updates the UI in real-time:
from ...infrastructure.events import (
AnalysisProgressEvent,
AnalysisCompleteEvent,
AnalysisFailedEvent,
publish_event,
)
# Progress update
await publish_event(
AnalysisProgressEvent(
project_uuid=project_uuid,
step="transcribe",
progress=30, # 0-100
message="Transcribing speech...",
)
)
# Completion
await publish_event(
AnalysisCompleteEvent(
project_uuid=project_uuid,
edit_count=42,
word_count=1500,
duration_ms=180000,
)
)
# Failure
await publish_event(
AnalysisFailedEvent(
project_uuid=project_uuid,
error="Transcription failed: invalid audio format",
)
)
Event Types¶
| Event | When | Key Fields |
|---|---|---|
AnalysisStartedEvent |
Pipeline begins | pacing_level, language |
AnalysisProgressEvent |
After each step | step, progress, message |
AnalysisCompleteEvent |
Success | edit_count, word_count |
AnalysisFailedEvent |
Error | error |
Notification Triggers¶
After publishing SSE events, workers also create persistent in-app notifications via NotificationService().create(). These are best-effort (wrapped in try/except):
# After AnalysisCompleteEvent
await NotificationService().create(
notification=NotificationCreate(
user_id=user_id,
title="Analysis complete",
message=f'"{project_name}" -- {edits_count} edits found',
type=NotificationType.SUCCESS,
),
db=db,
)
Current triggers: analysis complete (analysis worker), export ready (render worker).
Captions Only Mode¶
When a client posts analysis_mode="captions_only", the analyze endpoint enforces that all ai_edit settings are at their defaults: pacing_level=0, false_start_sensitivity=0, audio_censorship="none", caption_censorship="none", director_notes="". Any mismatch returns 422 before credit reservation via ProjectService.validate_analysis_mode_consistency. Only AI Director asset fetching is mode-gated inside the worker itself (workers/analysis/tasks.py skips the catalog fetch for captions_only).
The combination of the API-layer reject and the default values means: DetectSilencesStep returns [] (pacing=0), DetectFalseStartsStep returns [] (sensitivity=0), CensorProfanityStep returns unchanged (both censorship fields "none"), InsertAIDirectedAssetsStep returns [] (empty catalog). Only Whisper transcription, transcript improvement (if language set), and caption generation run.
Credits are charged at 0.5x rate via CREDIT_MULTIPLIERS in analysis_run/constants.py. A tampered client that bypasses the frontend can't game this — the backend validates mode-vs-settings consistency on every analyze request.
Adding a New Detection Type¶
Adding a new detection type (e.g., filler words) requires three changes: create the step, wire it into the pipeline, and update validation to include its results.
1. Create the Step¶
# workers/analysis/filler_words/step.py
class DetectFillerWordsStep(BaseStep[AnalysisDeps, list[FillerWordRegion]]):
"""Detect filler words like 'um', 'uh', 'like'."""
async def execute(self, ctx: StepContext[AnalysisDeps]) -> list[FillerWordRegion]:
transcript: TranscriptionResult = ctx.get_dependency("transcribe")
input_data: AnalysisInput = ctx.get_input("input")
await publish_event(
AnalysisProgressEvent(
project_uuid=UUID(input_data.project_uuid),
step="detect_filler_words",
progress=50,
message="Detecting filler words...",
)
)
# Detection logic
filler_patterns = ["um", "uh", "like", "you know"]
regions = []
for i, word in enumerate(transcript.words):
if word.word.lower().strip() in filler_patterns:
regions.append(
FillerWordRegion(
start_ms=word.start_ms,
end_ms=word.end_ms,
word=word.word,
confidence=0.9,
)
)
return regions
2. Add to Pipeline¶
# workers/analysis/pipeline.py
analysis_pipeline = Pipeline(
steps={
# ... existing steps
"detect_filler_words": DetectFillerWordsStep(), # Add
},
dependencies={
# ... existing deps
"detect_filler_words": ["transcribe"], # After transcribe
"validate_edits": ["detect_silences", "detect_false_starts", "detect_filler_words"], # Update
},
)
3. Update Validation¶
# workers/analysis/false_starts/validation/step.py
class ValidateEditsStep(BaseStep[AnalysisDeps, list[ValidatedEdit]]):
async def execute(self, ctx: StepContext[AnalysisDeps]) -> list[ValidatedEdit]:
silences = ctx.get_dependency("detect_silences")
false_starts = ctx.get_dependency("detect_false_starts")
filler_words = ctx.get_dependency("detect_filler_words") # Add
all_edits = silences + false_starts + filler_words # Include
# ... validation logic
Key Conventions¶
- Async everything - All I/O is async
- Event-driven - Publish progress events for UI
- Typed outputs - Steps return typed Pydantic models
- Cost tracking - Use
ctx.run()for LLM calls - Cleanup - Always cleanup temp files in
finally - Chunking - Handle large files by chunking
Key Files¶
| Purpose | Location |
|---|---|
| Analysis pipeline | workers/analysis/pipeline.py |
| Analysis task | workers/analysis/tasks.py |
| Dependencies | workers/analysis/deps.py |
| Transcription | workers/analysis/transcription/ |
| False start detection | workers/analysis/false_starts/ |
| Silence detection | workers/analysis/silence/ |
| Event schemas | infrastructure/events/schemas.py |
| Event publisher | infrastructure/events/publisher.py |