Skip to content

Backend Development Guide

This guide covers patterns and conventions for developing in the Sapari backend.

Project Structure

The backend follows a layered architecture: business logic in modules, API routes in interfaces, shared infrastructure, and background workers.

flowchart TB
    subgraph src["backend/src/"]
        M[modules/] --> |Business Logic| MC[user/, clip/, project/, ...]
        I[interfaces/] --> |API Layer| IA[api/v1/]
        IN[infrastructure/] --> |Cross-Cutting| IC[database/, auth/, storage/, events/]
        W[workers/] --> |Background Jobs| WC[analysis/, render/, download/]
    end
  • modules/ - Domain business logic. Each module has its own models, schemas, CRUD, and services.
  • interfaces/ - External interfaces (REST API, could add GraphQL/gRPC).
  • infrastructure/ - Database, auth, storage, events. Shared plumbing.
  • workers/ - Background tasks: transcription, rendering, downloads.
backend/src/
├── modules/              # Domain business logic
│   ├── user/            # Each module has: model, schemas, crud, service
│   ├── project/
│   ├── clip/
│   ├── edit/
│   ├── asset/
│   ├── caption_line/
│   ├── credits/         # Credit balance, transactions, metering
│   ├── entitlement/     # Flexible access control, trial grants
│   ├── payment/         # Stripe checkout, subscription lifecycle
│   ├── subscription/    # Subscription records
│   ├── product/         # Products with entitlement configs
│   ├── price/           # Stripe prices
│   ├── tier/            # Subscription tiers (hobby, creator, viral)
│   ├── discount/        # Coupons, promo codes
│   ├── email/           # Email service, templates, tasks
│   ├── export/
│   ├── feature_flag/     # Feature flags with rollout + allowlist
│   ├── draft/
│   ├── preset/
│   ├── notification/    # In-app notifications (user + system-wide), SSE push
│   ├── analysis_run/
│   ├── auth/            # Auth service, schemas, domain exceptions
│   ├── admin_audit/     # Admin event log + audit trail (who/what/when + before/after diffs)
│   ├── api_keys/        # API key management
│   ├── preview_preset/  # Preview preset configurations
│   ├── rate_limit/      # Rate limit management (per-endpoint, tier-based)
│   ├── support/         # Support conversations (bidirectional, priority auto-assignment)
│   └── common/          # Shared exceptions, utilities, constants
├── interfaces/          # External interfaces
│   ├── api/v1/         # REST API routes
│   └── main.py         # FastAPI app factory
├── infrastructure/      # Cross-cutting concerns
│   ├── database/       # Session, models, migrations
│   ├── auth/           # Authentication, OAuth, sessions, JWT
│   ├── stripe/         # Stripe client, webhooks
│   ├── storage/        # R2/S3 client
│   ├── events/         # SSE pub/sub (project-scoped + user-scoped notification channels)
│   ├── config/         # Settings
│   ├── ai/             # LLM providers (OpenAI, Anthropic)
│   ├── cache/          # Redis/Memcached
│   ├── email/          # Email settings
│   ├── taskiq/         # Background task broker, scheduler
│   ├── rate_limit/     # Redis-backed rate limiting
│   └── logging/        # Structured logging, Logfire
└── workers/            # Background task processing
    ├── analysis/       # Transcription, silence/false-start detection
    ├── render/         # FFmpeg processing
    ├── download/       # yt-dlp, audio extraction
    ├── asset_edit/     # Asset processing
    └── shared/         # Shared worker utilities

Adding a New Module

Follow this order - each step builds on the previous:

flowchart LR
    A[1. Model] --> B[2. Schemas]
    B --> C[3. CRUD]
    C --> D[4. Service]
    D --> E[5. Routes]
    E --> F[6. Tests]

Step 1: Create the Model

Models live in modules/{name}/models.py. Use mixins for common fields:

# modules/widget/models.py
from sqlalchemy import String, Integer, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column

from ...infrastructure.database.models import UUIDMixin, TimestampMixin, SoftDeleteMixin
from ...infrastructure.database.session import Base


class Widget(Base, UUIDMixin, TimestampMixin, SoftDeleteMixin):
    """Widget model with UUID primary key and soft deletion."""

    __tablename__ = "widget"

    # Required fields
    name: Mapped[str] = mapped_column(String(100))

    # Optional fields with defaults
    count: Mapped[int] = mapped_column(Integer, default=0)

    # Foreign keys (always indexed)
    user_id: Mapped[int] = mapped_column(
        ForeignKey("user.id", ondelete="CASCADE"),
        index=True,
    )

Mixin Reference:

Mixin Fields Added When to Use
UUIDMixin uuid (PK) Public-facing entities
TimestampMixin created_at, updated_at Almost always
SoftDeleteMixin deleted_at, is_deleted When you need audit trails

Step 2: Create Schemas

Schemas validate input, serialize output, and generate OpenAPI docs. Create variants for different operations:

modules/{name}/schemas.py:

# modules/widget/schemas.py
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field
from typing import Annotated

from ..common.schemas import TimestampSchema


# Base with shared fields
class WidgetBase(BaseModel):
    name: Annotated[str, Field(min_length=1, max_length=100)]


# For API responses (excludes sensitive data)
class WidgetRead(TimestampSchema, WidgetBase):
    uuid: UUID
    count: int
    user_id: int


# For creating (only user-provided fields)
class WidgetCreate(WidgetBase):
    model_config = ConfigDict(extra="forbid")  # Reject unknown fields


# For updating (all fields optional)
class WidgetUpdate(BaseModel):
    model_config = ConfigDict(extra="forbid")

    name: Annotated[str | None, Field(min_length=1, max_length=100, default=None)]
    count: int | None = None

Schema Naming Convention:

Schema Purpose
WidgetBase Shared fields for inheritance
WidgetRead API responses
WidgetCreate POST request bodies
WidgetCreateInternal Internal creation with hashed passwords, etc.
WidgetUpdate PATCH request bodies (all optional)
WidgetDelete Soft deletion data

Step 3: Create CRUD

We use FastCRUD to implement the repository pattern. It gives us a data access layer that abstracts away raw SQL, providing a clean interface for database operations:

# modules/widget/crud.py
from fastcrud import FastCRUD
from .models import Widget

crud_widgets: FastCRUD = FastCRUD(Widget)

Common CRUD Operations:

# Create
widget = await crud_widgets.create(db=db, object=widget_data, schema_to_select=WidgetRead)

# Read one
widget = await crud_widgets.get(db=db, uuid=uuid, is_deleted=False)

# Read many with pagination
widgets = await crud_widgets.get_multi(
    db=db,
    offset=skip,
    limit=limit,
    schema_to_select=WidgetRead,
    is_deleted=False,
)

# Check existence
exists = await crud_widgets.exists(db=db, name="foo")

# Update
updated = await crud_widgets.update(db=db, object=update_data, uuid=uuid)

# Soft delete
await crud_widgets.delete(db=db, uuid=uuid)

# Hard delete (rarely used)
await crud_widgets.db_delete(db=db, uuid=uuid)

# Joins
result = await crud_widgets.get_joined(
    db=db,
    join_model=User,
    join_prefix="user_",
    schema_to_select=WidgetRead,
    join_schema_to_select=UserRead,
    uuid=uuid,
    nest_joins=True,
)

Query Patterns

FastCRUD's get_multi() defaults to limit=100. This is correct for API endpoints (human pagination) but dangerous for internal queries that need all records. Three cases:

Case 1: A human will scroll this. Paginate. Caller passes limit/offset, capped by MAX_PAGE_LIMIT:

# API endpoint -- caller controls pagination
widgets = await crud_widgets.get_multi(
    db=db, offset=skip, limit=limit,
    schema_to_select=WidgetRead, is_deleted=False,
)

Case 2: Code needs all records. Use limit=None. The WHERE clause is the bound:

# Internal -- domain-bounded query, no pagination
clips = await crud_clips.get_multi_joined(
    db=db, project_id=project_uuid, limit=None,
    schema_to_select=ClipReadBase,
    joins_config=[clip_file_join_config()],
)

limit=None is safe when: (1) the WHERE clause bounds the count, (2) per-record work is cheap, (3) the bound is stable. When (3) doesn't hold (e.g., edits depend on analysis pipeline design), add a sanity cap that raises loudly.

Case 3: Expensive work per record. Job queue. Batch IDs, enqueue tasks:

# Batch processing -- expensive work per record
while True:
    users = await crud_users.get_multi_by_cursor(
        db=db, cursor=cursor, limit=DEFAULT_BATCH_SIZE,
    )
    if not users["data"]:
        break
    await process_batch(users["data"])
    cursor = users["next_cursor"]

Write-side enforcement: Entity count caps belong on creation, not on reads. Example: MAX_CLIPS_PER_PROJECT = 100 is checked in presign_upload() and import_youtube(), then reads use limit=None with confidence. For concurrent safety, wrap count → check → create with advisory_xact_lock(db, f'clips:{project_uuid}') — see §Count-Race Prevention below.

SQL aggregates over get_multi: When computing a scalar (sum, count, max), use SQLAlchemy directly instead of loading rows into Python:

# Correct: SQL aggregate
stmt = select(func.sum(EntitlementTransaction.cost_microcents)).where(...)
result = await db.execute(stmt)

# Wrong: loading rows to sum in Python
result = await crud.get_multi(db=db, ...)
total = sum(row["cost"] for row in result["data"])

Step 4: Create Service

Services contain business logic - validation, permissions, orchestrating multiple CRUD operations:

# modules/widget/service.py
from typing import Any
from sqlalchemy.ext.asyncio import AsyncSession

from ..common.exceptions import WidgetNotFoundError, PermissionDeniedError
from .crud import crud_widgets
from .schemas import WidgetCreate, WidgetRead, WidgetUpdate


class WidgetService:
    """Business logic for widget operations."""

    async def create(
        self,
        widget: WidgetCreate,
        user_id: int,
        db: AsyncSession,
    ) -> dict[str, Any]:
        """Create a widget for a user."""
        widget_data = widget.model_dump()
        widget_data["user_id"] = user_id

        return await crud_widgets.create(
            db=db,
            object=widget_data,
            schema_to_select=WidgetRead,
        )

    async def get_by_uuid(
        self,
        uuid: str,
        db: AsyncSession,
    ) -> dict[str, Any]:
        """Get a widget by UUID."""
        widget = await crud_widgets.get(
            db=db,
            schema_to_select=WidgetRead,
            uuid=uuid,
            is_deleted=False,
        )
        if not widget:
            raise WidgetNotFoundError(f"Widget {uuid} not found")
        return widget

    async def verify_ownership(
        self,
        widget: dict[str, Any],
        user_id: int,
    ) -> None:
        """Verify user owns this widget."""
        if widget["user_id"] != user_id:
            raise PermissionDeniedError("You don't own this widget")

Service Patterns:

  • Services are stateless classes
  • Each method takes db: AsyncSession as parameter
  • Raise domain exceptions (not HTTP exceptions)
  • Use CRUD for database operations
  • Handle business validation and permissions

Step 5: Create Routes

Routes go in interfaces/api/v1/{name}.py. Keep them thin - delegate to services, don't put logic here. Global exception handlers catch DomainError and unhandled exceptions automatically, so routes don't need try/except unless a specific exception requires a custom status code:

# interfaces/api/v1/widgets.py
from typing import Annotated, Any
from uuid import UUID

from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession

from ....infrastructure.auth.session.dependencies import get_current_user
from ....infrastructure.database.session import async_session
from ....modules.widget.schemas import WidgetCreate, WidgetRead, WidgetUpdate
from ....modules.widget.service import WidgetService
from ..dependencies import get_widget_service

router = APIRouter(tags=["Widgets"])


@router.post(
    "/",
    status_code=201,
    response_model=WidgetRead,
    summary="Create Widget",
    responses={
        201: {"description": "Widget created"},
        401: {"description": "Not authenticated"},
    },
)
async def create_widget(
    widget: WidgetCreate,
    db: Annotated[AsyncSession, Depends(async_session)],
    current_user: Annotated[dict[str, Any], Depends(get_current_user)],
    widget_service: Annotated[WidgetService, Depends(get_widget_service)],
) -> dict[str, Any]:
    """Create a new widget."""
    return await widget_service.create(widget, current_user["id"], db)


@router.get(
    "/{widget_uuid}",
    response_model=WidgetRead,
    summary="Get Widget",
)
async def get_widget(
    widget_uuid: UUID,
    db: Annotated[AsyncSession, Depends(async_session)],
    widget_service: Annotated[WidgetService, Depends(get_widget_service)],
) -> dict[str, Any]:
    """Get a widget by UUID."""
    return await widget_service.get_by_uuid(str(widget_uuid), db)

Register the router in interfaces/api/v1/__init__.py:

from .widgets import router as widgets_router

router.include_router(widgets_router, prefix="/widgets")

Dependency Injection

FastAPI's Depends() handles dependency injection. Database sessions are per-request. Everything else (services, infrastructure) are singletons created in the lifespan and accessed via request.state (ASGI lifespan state).

flowchart TB
    subgraph Lifespan["Created Once (Lifespan)"]
        SVC[All Services]
        STORAGE[StorageClient]
        EMAIL[PostmarkClient]
        STRIPE[StripeService]
    end

    subgraph Request["Per Request"]
        DB[async_session]
        AUTH[get_current_user]
    end

    Route --> DB
    Route --> SVC
    Route --> AUTH

All singletons are created in app_factory.py's lifespan and yielded as a dict. Dependencies in interfaces/api/dependencies.py read from request.state:

from typing import cast

def get_widget_service(request: Request) -> WidgetService:
    return cast(WidgetService, request.state.widget_service)

def get_storage_client(request: Request) -> StorageClient:
    return cast(StorageClient, request.state.storage_client)

Services with dependencies share singleton instances via constructor injection:

# In lifespan (app_factory.py):
entitlement_service = EntitlementService()
credit_service = CreditService(entitlement_service=entitlement_service)
beta_service = BetaService(entitlement_service=entitlement_service, credit_service=credit_service)
yield {"entitlement_service": entitlement_service, "credit_service": credit_service, ...}

Authentication dependencies:

# Require any authenticated user
current_user: Annotated[dict[str, Any], Depends(get_current_user)]

# Optional user (returns None for anonymous callers instead of 401)
current_user: Annotated[dict[str, Any] | None, Depends(get_optional_user)]

# Require superuser/admin (narrow meta endpoints only: health, queues, sudo, tier catalog)
_: Annotated[dict[str, Any], Depends(get_current_superuser)]

# Require superuser + audit trail + IP allowlist + session timeout (default for admin endpoints)
audit: Annotated[AdminAuditContext, Depends(get_admin_audit_context)]
# ... then inside handler, after the service call:
await audit.log(AdminEventType.SEARCH, "user", details={"action": "list"})

When to use which: any admin endpoint that returns PII or mutates admin-touchable state should use get_admin_audit_context. The bare get_current_superuser is reserved for narrow meta endpoints where audit noise exceeds value (health checks, queue polling, sudo primitive, tier catalog lookup).

Request-scoped locale: there are two ways for request-stack code to get the user-facing locale, both backed by the same modules/common/i18n package.

(1) get_request_locale FastAPI dep returns authenticated User.localeAccept-Language header → DEFAULT_LOCALE. It's built on get_optional_user so unauthenticated paths (signup, OAuth callback for first-time users, public routes) still get a locale without raising 401. Used today at POST /users/ (password signup) and GET /auth/oauth/callback/google to persist User.locale on first-time account creation, and inside the error-mapping path via handle_exception(error, locale) so the route's localized detail is rendered in the user's UI language. Pass the resolved value into the service when the service legitimately needs the explicit parameter (the signup case: it must be persisted onto a yet-to-be-created User row).

# Resolve the locale once per request and pass to the service
signup_locale: Annotated[str, Depends(get_request_locale)]

(2) t_current(key, **interp) from modules/common/i18n/catalogs.py reads the locale from a per-request ContextVar set by LocaleMiddleware (modules/common/i18n/middleware.py, registered outermost in create_application so it runs before any route, dep, or service body). This is the path service-layer code should reach for at message-localization raise sites — it avoids threading a locale parameter through every method down the call stack. The middleware reads Accept-Language only (no User.locale lookup), which matches the catalog-resolution model on the error path; the authed-user case is still covered because the SPA stamps Accept-Language from User.locale after the server-wins reconcile. Pure-ASGI shape by design — BaseHTTPMiddleware runs the wrapped app in a separate asyncio task and the ContextVar set inside its dispatch is invisible to the route handler.

# Service-layer raise site (no locale param needed)
raise UserNotFoundError(t_current("error.user_not_found"))

The catalogs themselves live in modules/common/i18n/{en,pt,es}.py, aggregated by catalogs.py:t(locale, key). t_current(key) is t(get_current_locale(), key) with the ContextVar read folded in. Workers MUST NOT use t_current() — task processes have no request scope, so the read returns DEFAULT_LOCALE for every user. Workers take a snapshotted locale task arg (Convention #12) and call t(locale, key) explicitly. The transition of existing service raise sites from raw English strings to t_current() is a per-slice migration (see §Exception Handling — raise-site localization).

The global DomainError exception handler and CatchAllErrorMiddleware intentionally DO NOT call get_request_locale — they sit outside the FastAPI dependency graph and read Accept-Language directly via resolve_locale_from_accept_language. The SPA stamps the header on every request, so the authed-user case is covered without a DB read on the error path. Routes that catch a non-domain exception themselves and call handle_exception(error, locale) SHOULD pass the dep-resolved locale (which prefers User.locale); the canned-default path through the global handler uses the header-only fallback.

Exception Handling

Services raise domain-specific exceptions (like WidgetNotFoundError), not HTTP exceptions. Two global exception handlers catch these automatically — routes don't need try/except for standard errors.

flowchart LR
    A[Service raises DomainError] --> B[Global DomainError handler]
    B --> C[EXCEPTION_MAPPING → HTTPException]
    C --> D[Sanitized JSON response]
    E[Unhandled Exception] --> F[Global catch-all handler]
    F --> G[Log details server-side]
    G --> H[Generic 500 response]

Global handlers are registered in modules/common/utils/error_handler.py via register_exception_handlers(), called in app_factory.py. Three layers:

  1. RequestValidationError handler: FastAPI validation errors return a generic 422 message (no field names leaked), resolved through t(locale, INVALID_REQUEST_MESSAGE_KEY).
  2. DomainError handler: each 4xx exception's HTTPMapping decides whether to render a canned default (via t(locale, mapping.default_key)) or pass the raw str(exc) message through (when use_message_when_present=True and the raise site supplied a message). Messages that pass through (ValidationError, PermissionDeniedError, InsufficientCreditsError, UserExistsError) MUST be pre-sanitized by the raising service — the exception class docstrings spell this out. Everything else (ResourceNotFoundError, ResourceExistsError, TierNotFoundError, etc.) returns the canned localized string. 5xx domain errors always return t(locale, GENERIC_ERROR_MESSAGE_KEY).
  3. CatchAllErrorMiddleware: Unhandled exceptions return a generic 500, also localized via t(locale, GENERIC_ERROR_MESSAGE_KEY).

All error responses include a support_id (8-char UUID) for customer support lookup. Full details logged server-side only. Routes use handle_exception(error, locale) as a fallback for non-domain errors — pass Depends(get_request_locale) from the route to render the detail in the user's UI language; the default falls back to English.

Locale resolution in handlers + middleware bypasses get_request_locale. The handlers and CatchAllErrorMiddleware sit outside the FastAPI dependency graph, so they cannot call the dep. Instead, _resolve_request_locale(request) reads Accept-Language directly via resolve_locale_from_accept_language. The SPA stamps Accept-Language on every request (ApiClient.setLocale), so the authed-user case is covered without a DB read; unauthed requests get either the header value or DEFAULT_LOCALE. This is the intentional asymmetry with the route-side get_request_locale (which prefers User.locale) — handlers run on every request including the error path that the dep itself might have failed, so they must stay decoupled from the user lookup.

Define domain exceptions in modules/common/exceptions.py:

class DomainError(Exception):
    """Base for all domain errors."""
    pass

class ResourceNotFoundError(DomainError):
    """Resource not found."""
    pass

class WidgetNotFoundError(ResourceNotFoundError):
    """Widget not found."""
    pass

class PermissionDeniedError(DomainError):
    """User lacks permission."""
    pass

Mapping to HTTP in modules/common/constants.py. EXCEPTION_MAPPING: dict[type[DomainError], HTTPMapping] — each entry is an HTTPMapping(status, default_key, use_message_when_present) frozen dataclass. default_key is an i18n catalog key resolved at handler time via t(locale, key); use_message_when_present=True means a non-empty raise-site str(exc) overrides the default (so a service can say ValidationError("Draft 'X' already exists") and have that exact string surface to the client):

from dataclasses import dataclass

@dataclass(frozen=True)
class HTTPMapping:
    status: int
    default_key: str
    use_message_when_present: bool


EXCEPTION_MAPPING: dict[type[DomainError], HTTPMapping] = {
    ResourceNotFoundError: HTTPMapping(404, "error.resource_not_found", False),
    PermissionDeniedError: HTTPMapping(403, "error.permission_denied", True),
    ValidationError: HTTPMapping(422, "error.invalid_request", True),
    ResourceExistsError: HTTPMapping(409, "error.resource_exists", False),
    InsufficientCreditsError: HTTPMapping(402, "error.insufficient_credits", True),
    # ... see constants.py for the full list
}

Adding a new domain exception: define the class in modules/common/exceptions.py, add the corresponding canonical English string to modules/common/i18n/en.py under a new error.<snake_case_name> key, mirror the translation into pt.py + es.py (the TestCatalogParity test in tests/unit/modules/common/i18n/test_catalogs.py fails the PR if you skip the mirrors — per-PR key-parity gate, no CI step required), then add an HTTPMapping entry to EXCEPTION_MAPPING. Pick use_message_when_present=True only when raise sites legitimately need to surface a distinct, pre-sanitized message per call; otherwise leave it False and the canned localized default wins.

Raise-site str(exc) messages (the use_message_when_present branch) are migrating from raw English to localized strings incrementally. The helper is t_current(key, **interp) from modules/common/i18n/catalogs.py — it reads the request locale from the LocaleMiddleware-set ContextVar, so raise sites in service code don't need a threaded locale parameter. Pattern: raise ValidationError(t_current("error.draft_already_exists", name=draft_name)). Services adopt it slice-by-slice as their messages are added to the catalog; the canned defaults + the RequestValidationError 422 + the 5xx generic all flow through t() already, so an unmigrated service still produces a localized response — just with the canned default instead of the raise-site detail. The CSRF dependency (infrastructure/auth/session/dependencies.py:verify_csrf_token) is the first non-service raise-site adopter — it raises CSRFException(t_current("auth.csrf_missing")) / CSRFException(t_current("auth.csrf_invalid")) from the request scope, same pattern as services. Workers MUST NOT use t_current() — see §Dependency Injection · Request-scoped locale (2) for the worker rule + the t(locale, key) snapshot pattern; workers/analysis/tasks.py:_record_success is the first in-tree instance, threading user_locale from interfaces/api/v1/projects.py:trigger_analysis (Depends(get_request_locale)) through .kiq(user_locale=...) to a t(user_locale, "notification.analysis.title.*") call inside the worker.

Database Patterns

Async Sessions

All database operations are async. The session is injected via Depends(async_session) and auto-commits on success:

from sqlalchemy.ext.asyncio import AsyncSession

async def my_operation(db: AsyncSession):
    # Operations auto-commit on success
    result = await crud_widgets.create(db=db, object=data)
    return result

Count-Race Prevention

count → check → insert is not atomic under concurrent requests. Two requests can both read the same count, both pass the limit check, both insert — the cap is silently exceeded. Two tools close the race, matching different constraint shapes.

Partial unique index — works when the constraint is "exactly one row per X" (e.g., one trial per user). Postgres rejects the second insert at commit time with IntegrityError. The service catches and converts to the domain return value. See entitlement/models.py (uq_user_entitlement_trial) and entitlement/trial.py:grant_trial_if_eligible for the shipped pattern.

Advisory lock — works for any N-bounded cap. Helper lives at infrastructure/database/locks.py:advisory_xact_lock. Example:

from ...infrastructure.database.locks import advisory_xact_lock

async def create_clip(self, project_uuid, ..., db):
    await advisory_xact_lock(db, f"clips:{project_uuid}")
    count = await crud_clips.count(db=db, project_id=project_uuid)
    if count >= MAX_CLIPS_PER_PROJECT:
        raise ValidationError(...)
    await crud_clips.create(db=db, object=..., commit=True)  # commit releases the lock

The lock's transaction is the one that autobegins at db.execute inside the helper. The crud.create(commit=True) that ends the block commits that transaction and releases the lock. Do NOT call await db.commit() between the lock acquire and the create — it releases the lock early.

Why pg_advisory_xact_lock (blocking, transaction-scoped) over pg_try_advisory_xact_lock (non-blocking) or pg_advisory_lock (session-scoped): we want waiters to serialize not error; we want auto-release on commit/rollback so a caller can't leak a lock by forgetting to release.

Key convention: "<entity>:<identifier>" (e.g., "clips:<project_uuid>", "user_projects:<user_id>", "presets:<user_id>", "preview_presets:<user_id>"). Namespaces prevent cross-entity hashtext collisions; per-identifier scoping avoids unrelated-entity serialization. Distinct-but-related entities (e.g., the two preset tables each have independent per-user caps) MUST use distinct keys — colliding them would serialize unrelated operations for no safety benefit. Collision rate is ½^32 per key pair — accepted.

Indexes

Add indexes for: - Foreign keys (always) - Fields used in WHERE clauses - Fields used for sorting

# Single column
email: Mapped[str] = mapped_column(String(50), unique=True, index=True)

# Composite index
__table_args__ = (
    Index("idx_widget_user_status", "user_id", "status"),
)

Relationships

from sqlalchemy.orm import relationship

# One-to-many (parent side)
widgets: Mapped[list["Widget"]] = relationship(
    "Widget",
    back_populates="user",
    lazy="selectin",  # Eager load
    default_factory=list,
    init=False,
)

# Many-to-one (child side)
user: Mapped["User"] = relationship(
    "User",
    back_populates="widgets",
    lazy="selectin",
    init=False,
)

Database connection — direct endpoint, matched region

Two things matter for query latency:

  1. Use the direct Postgres endpoint, NOT the pooler. PgBouncer in transaction mode breaks asyncpg's prepared-statement cache, forcing 3-4× the round trips per query. Our app already has SQLAlchemy connection pooling (POSTGRES_POOL_SIZE=50 + POSTGRES_MAX_OVERFLOW=20 = 70-connection ceiling per API instance; MAX_OVERFLOW provides spike-absorption headroom so pressure shows up as alarmable pool-checkout latency rather than a cliff-edge 500), so PgBouncer in front of asyncpg is redundant and harmful. The pooler is designed for serverless apps that open one connection per request; we run persistent uvicorn workers.

Caveat — Neon compute auto-suspend on Hobby tier. Hobby-tier Neon compute scales to zero after ~5 min idle (default). When the next request arrives, asyncpg's pooled connection is stale → request acquires fresh conn (pool_pre_ping handles it) but if the conn dies during a request, the cleanup-time rollback raises asyncpg.InterfaceError: cannot call Transaction.rollback(): the underlying connection is closed and bubbles as a 500. Defense-in-depth fix at infrastructure/database/session.py swallows close-time errors at debug level (added 2026-04-28 for #56). For production, upgrade Neon to Pro tier (always-on compute, $19/mo) to remove the auto-suspend window entirely — the convention above keeps holding because we stay on the direct endpoint.

  1. DB region must match server region. Each query is one network round trip. Cross-region (server in US-East, DB in US-West or vice versa) adds ~50-65ms per query. A single request that runs 4-6 sequential queries pays 200-400ms in DB latency before any other work happens. Match regions (Hetzner Ashburn ↔ Neon us-east-1, Hetzner Hillsboro ↔ Neon us-west-2) for sub-10ms query latency.

Both decisions are made at provision time. Changing the DB region requires creating a new Neon project and migrating data.

Migrations

Generate migrations with Alembic:

cd backend
uv run alembic revision --autogenerate -m "add widget table"
uv run alembic upgrade head

Key Conventions

  1. UUIDs for public APIs - Never expose internal integer IDs
  2. Soft deletes - Use is_deleted=False in queries
  3. Async everything - All I/O operations are async
  4. Type hints - Full typing with Mapped[] and Annotated[]
  5. Domain exceptions - Raise domain errors, map to HTTP in routes
  6. Validation at boundaries - Pydantic validates API input
  7. Use FastCRUD for simple CRUD - For complex queries (joins, aggregations, batch updates), use SQLAlchemy directly
  8. No magic numbers - Extract to constants in module-level constants.py files
  9. No imports inside methods - All imports at module top level
  10. Client IP extraction - Always use modules.common.utils.request_ip.get_client_ip(request) instead of request.client.host directly. The raw transport peer is Caddy's internal Docker IP; the helper reads the X-Forwarded-For header that Caddy populates from Cloudflare's CF-Connecting-IP. Misuse causes per-IP rate limits to track Caddy as the "client" and lock out all users globally.
  11. Client-supplied offsets validated against computed ceilings - Any API field that references a position in a computed timeline (e.g., edit.end_ms into SUM(clip_file.duration_ms), asset_edit.end_ms into user_asset.duration_ms, edit_override.{start_ms, end_ms} into the same project SUM, edit.asset_offset_ms into asset_file.duration_ms) must be bounds-checked at the service layer against the computed ceiling before persisting. The schema Field(ge=0) + end_ms > start_ms validator is necessary but not sufficient — a tampered client can send end_ms=999999999 which schemas accept but renders silently clamp or crash. Four canonical instances ship today: (a) EditService._validate_edit_bounds in edit/service.py uses clip.utils.get_project_total_duration_ms (returns tuple[ProjectDurationStatus, int]; EMPTY rejects, NOT_READY skips, READY enforces) — needed because the ceiling is a SUM across rows not already fetched at the call site. The check applies to non-asset edits (silence/false_start/clean/manual_cut/keep) AND to asset edits with non-INSERT visual modes (OVERLAY, REPLACE, NONE, null) that are NOT positioned inside an insert. Two exceptions bypass the timeline ceiling, both driven by the insert mechanism extending output_duration_ms at render (workers/render/steps.py:567). (1) INSERT-mode asset edits — the ceiling is replaced by a duration cap end_ms - start_ms ≤ MAX_VIDEO_DURATION_MS (same deferred-exploit defense, scoped to a duration sanity cap rather than a position ceiling). (2) Non-INSERT asset edits positioned inside an insert (inside_insert_edit_id set — anchor OR merge mode) — anchor mode positions via insert_offset_ms and the renderer intersects against the host's output range; merge mode encodes start_ms = splice_point + offset_within_insert so the visual cursor lands at the user's drop point, which legitimately puts end_ms past total because the host insert extends the rendered timeline and shift_edits_for_inserts accounts for the expansion. The validator branches on inside_insert_edit_id (not on the overlap mode), so future modes added to insert_overlap_modes inherit the exception automatically; (b) AssetService.validate_edit_bounds in asset/service.py is a pure function taking duration_ms: int | None directly from the caller's require_asset_uploaded return — simpler shape because the ceiling is a single scalar already fetched upstream; © validate_overrides_against_project in draft/utils.py reuses the get_project_total_duration_ms helper to bounds-check every EditOverride.{start_ms, end_ms} in DraftCreate / DraftUpdate / ExportCreate payloads, and EditService.{create, update} inline-checks Edit.asset_offset_ms against asset_file["duration_ms"] threaded from _verify_asset_uploaded (changed to return the joined asset_file dict so callers don't re-query); (d) EditService._verify_inside_insert_anchor in edit/service.py bounds Edit.insert_offset_ms against the anchored insert's duration (anchor["end_ms"] - anchor["start_ms"]) at create + update time. Same shape as (b): scalar ceiling fetched via a single CRUD read, inline comparison; the FK target check (anchor exists, same project, visual_mode='insert') lives in the same helper because they're all preconditions for a valid render-time anchor. The pattern to copy is "service-layer bounds validation against a computed ceiling," not a specific helper module — reach for a helper when the check is non-trivial or reused across services (a, c), inline when the value is already in hand and the check is a single comparison (b-subset, c-inline). Zero-ceiling state (EMPTY in the multi-row case) rejects rather than skipping — otherwise an attacker creates the offset first and materializes the ceiling later. start_ms is bounds-checked independently from end_ms when the cross-field validator cannot fire (e.g., EditOverride.end_ms=None case — transitivity doesn't close start_ms ≤ total). Audit verified complete 2026-04-21: every client-facing _ms offset field on a public schema has a service-layer bounds check; worker-internal _ms schemas (workers/render/schemas.py, workers/analysis/**/schemas.py, workers/download/schemas.py, workers/asset_edit/schemas.py) are constructed from server-owned data, not network payloads, and are outside this convention's scope.
  12. Freeze configuration at the queue boundary - Any configuration value that feeds a background worker and depends on user tier (watermark-free, export retention, feature gates) MUST be snapshotted onto the queued record at service-layer kickoff. Workers read the snapshot; resolve_tier_context() must NEVER be called from inside a worker task. Rationale: workers run minutes-to-hours after the request; if tier state changes in that window (subscription expired, upgrade processed), live re-resolution silently produces the wrong answer for whichever end of the transition the user is on. Enforcement: rg "resolve_tier_context" backend/src/workers/ MUST return zero hits. Exception mechanism: a surviving call site that is genuinely observational (logging, metrics, diagnostics — not a policy decision) must carry an inline # noqa: tier-in-worker — justification: <reason> comment so future audits can distinguish deliberate-but-rare exceptions from overlooked regressions. Any unannotated grep hit is a finding. Shipped instances: Fix 9 (expires_at at create, not completion), Fix 11 (watermark_required snapshot, not re-resolved), Fix 12 (analysis mode/settings frozen at kickoff). The same boundary applies to the request-scoped locale ContextVar. The _current_locale ContextVar set by LocaleMiddleware exists only inside the request's task; worker processes start with DEFAULT_LOCALE and never see the originating user's pick. Workers MUST NOT call t_current(key) from modules/common/i18n/catalogs.py — silently produces English for every user. Snapshot the locale onto the task arg at service-layer kickoff (next to watermark_required / tier_context snapshots) and call t(locale, key) explicitly inside the task. Enforcement: rg "t_current\(" backend/src/workers/ MUST return zero hits. Same exception annotation as above if a genuinely observational call site needs to bypass.
  13. Validate against resolved metadata, not just identifier shape - When a client-supplied identifier resolves into a new entity (YouTube URL → ClipFile after fetching video metadata, asset UUID → UserAsset after DB lookup, etc.), downstream resource limits and cap checks MUST run against the resolved entity's properties, not against the identifier alone. Identifier-only validation (URL is well-formed, UUID parses, string length in range) is necessary but not sufficient: a tampered client can send a valid-looking identifier whose resolved properties violate policy. Shipped instances: Fix 4 (Edit bounds against project duration — applies to all edit types except INSERT-mode asset edits, which extend the timeline at render and are bounded by a duration cap instead; see Convention #11 instance (a)), Fix 16 (AssetEditRequest bounds against asset duration), Fix 5 (YouTube duration cap at import time, after fetch_video_info). Related to Convention #11 but distinct: #11 is about client-supplied _ms offsets already in the request; this convention is about metadata the server fetches to resolve an identifier.
  14. Worker processes require explicit instrumentation setup - TaskIQ worker processes do NOT inherit Logfire auto-instrumentation from the web process. Each worker must call instrument_taskiq(service_name=...) at startup, and configure_broker_lifecycle(broker, service_name, ...) must pass a distinct service.name per queue (sapari-{email,analysis,render,download,proxy,asset-edit}). Enforcement: service_name is a non-default positional-or-keyword argument on configure_broker_lifecycle — adding a broker without it fails at call time (TypeError) rather than silently inheriting a generic name and losing worker-scoped observability. Rationale: worker processes run in separate Python interpreters (different TaskIQ worker pools), so the web process's configure_logfire() call has no effect on them. Without the per-broker call, workers emit zero spans, which silently breaks Logfire dashboards and makes worker-side incidents invisible. Per-instrumentor on/off policy (both web and workers) lives in docs/operations/monitoring.md §Auto-instrumentor defaults: pydantic-ai + SQLAlchemy + FastAPI ON, Redis OFF. The env vars (LOGFIRE_INSTRUMENT_{SQLALCHEMY,REDIS} for web, ..._WORKERS for workers) encode this — don't flip defaults without updating the matrix in that doc.
  15. Manual span naming follows the six-category taxonomy - Every manually emitted logfire.span(...) must fit one of six categories: (a) <worker>.pipeline for DAG invocation parents, (b) step.<step_id> for pipeline steps (emitted automatically by fastroai.LogfireTracer — don't hand-write), © taskiq.<task_name> for task-level parents (emitted automatically by LogfireSpanMiddleware — don't hand-write), (d) <module>.<method> for service-layer operations (credits.reserve, payment.handle_webhook), (e) ext.<service>.<op> for outbound calls (ext.whisper.transcribe, ext.ffmpeg.render, ext.stripe.modify_subscription), (f) sse.<event> for SSE publish paths. Every ext.* span carries a domain correlation key at entry (clip_file_uuid, subscription_id, etc.); every <module>.<method> span carries the primary identifier at entry and an outcome post-attr via span.set_attribute() at exit. Cron tasks add labels={"task_type": "cron"} to their decorator, which the middleware copies onto the span as task_type="cron". Rationale: the taxonomy keeps Logfire filters, saved queries, and ad-hoc grouping legible across the codebase — new spans slot into existing dashboards instead of forcing per-query regex munging. Full table with examples in docs/operations/monitoring.md §Span taxonomy.
  16. Database sessions are for short-lived database work — A session does not span slow IO of any kind, regardless of how it was acquired (Depends(async_session) parameter on a route, TaskiqDepends(get_db_session) parameter on a worker task, async with local_session() opened manually). The rule is about session scope, not acquisition path. Slow IO is anything that runs longer than a single-digit number of seconds in the worst case: subprocess calls (FFmpeg, yt-dlp, ffprobe at scale), large R2 transfers, third-party LLM / transcription / payment APIs, anything wrapped in asyncio.to_thread() or loop.run_in_executor(). The Postgres / Neon idle-connection timeout is the failure mode that surfaces this most reliably (a session held across an 8-min FFmpeg run dies; the post-IO write fails with asyncpg.InterfaceError: connection is closed — observed in staging 2026-04-30 on generate_clip_proxy), but the rule applies universally — a session pinned across slow IO also blocks pool checkout for other requests. Two implementation shapes that satisfy the rule, pick whichever reads cleaner: (1) Three-phase task body — open a session, do reads, close; run slow IO with no session held; open a fresh session, do writes, close. Suitable when the structure is "read, do IO, write." Shipped: import_youtube (route). (2) Per-operation helpers with internal sessions — each DB operation lives in a small named helper (get_clip_context, mark_clip_failed, _finalize_proxy, _finalize_clip_artifacts, _finalize_youtube_download, update_asset_records_replace, _load_export, _finalize_export, _mark_run_failed, …) that opens local_session() internally and closes before returning. Task bodies, step bodies, and pipeline Deps classes do NOT take session parameters or open session blocks. Suitable for tasks more complex than three phases (multiple read-IO-read cycles, pipeline architectures with many steps), and the default shape we converge on going forward. Shipped (workers): generate_clip_proxy, process_clip_artifacts, download_youtube_video, download_youtube_asset, edit_asset (linear); render_export, analyze_project (pipeline-architecture). Event-driven waits also follow this shape: wait_for_audio_extraction (workers/analysis/tasks.py) opens no session across its async for event in subscribe_to_project(...) loop — every ClipReadyEvent re-invokes the load_pending_audio_clip_file_ids helper, which owns its own local_session(). The wait blocks on Redis pubsub (no session held) and re-reads on each ping; see Convention #18 for the "events are wake-ups, DB is truth" pattern this codifies. Helper modules: workers/download/context.py, workers/render/data.py, workers/analysis/data.py. Aspirational enforcement now achievable: rg "TaskiqDepends\(get_db_session\)" backend/src/workers/ returns zero hits, and rg "deps\.db" backend/src/workers/ likewise — the dependency-injected-session anti-pattern is structurally absent from the worker tree. Streaming endpoints (StreamingResponse for SSE, large file streams, long polls) MUST acquire their session manually via async with local_session() and release it before returning the response — taking Depends(async_session) or Depends(get_current_user) as a route parameter holds the session for the entire EventSource lifetime under FastAPI 0.118+'s Depends(... yield ...) semantics, exhausting POSTGRES_POOL_SIZE=50 (+ 20 overflow = 70 ceiling) at 70 concurrent SSE clients. Authentication for streaming endpoints prefers Redis-only checks (Depends(get_session_from_cookie) + SessionData.is_active); fall back to a manual session lookup only when an ownership/permission check needs DB state. Shipped streaming instances: stream_project_events (manual session for the per-project ownership check), stream_notification_events and stream_asset_events (zero DB hit — SessionData alone). # SLOW_IO discovery aid: any helper that performs slow IO carries a # SLOW_IO comment marker on its definition line — when reviewing a worker task body or any code that holds a session (rare under shape 2; common under shape 1), grep its callees for SLOW_IO to see whether the session scope is wrong. Candidate signals for a new marker: spawns a subprocess (FFmpeg, yt-dlp, ffprobe), wraps a synchronous call in asyncio.to_thread() or loop.run_in_executor() (the wrap is the signal — you only offload to a thread when the sync call is slow enough to block the event loop), transfers a file > ~10 MB to/from R2/S3, calls a third-party LLM / transcription / payment API. Reviewer-driven for now; CI promotion follows once all worker tasks are migrated. Enforcement greps (MUST return zero hits): rg "async def stream_.*Depends\(async_session\)" backend/src/interfaces/api/v1/, rg "async def stream_.*Depends\(get_current_user\)" backend/src/interfaces/api/v1/, rg "TaskiqDepends\(get_db_session\)" backend/src/workers/, rg "deps\.db" backend/src/workers/ — all four are now structurally enforceable.

  17. Package __init__.py files MUST NOT eagerly import their tasks submodulemodules/X/__init__.py does NOT contain from . import tasks or from .tasks import … at module top level. Task functions live in modules/X/tasks.py; the broker entry that needs them imports the submodule explicitly. The email broker's entry at infrastructure/taskiq/worker.py does from ...modules.email import tasks # noqa: F401 (patterned on its existing imports of common, email, export, payment, user); per-broker workers (workers/<name>/tasks.py) register their tasks directly when their package is loaded by python -m taskiq worker. Why: infrastructure/taskiq/__init__.py runs from . import app as a side-effect at line 10, and app.py imports workers/shared/context.py (the per-process WorkerContext singleton). context.py imports several modules/X/service.py to construct singletons (today: modules.email.service.EmailService). Resolving modules.X.service runs modules/X/__init__.py first; if that file eagerly imports tasks, and tasks.py imports from infrastructure.taskiq (every broker-registering tasks file does), Python is now mid-loading infrastructure.taskiq from line 10 — email_broker and friends are not yet bound, and the inner import raises ImportError: cannot import name 'email_broker' from partially initialized module. Surfaced 2026-05-09 in staging the moment the WorkerContext landed: sapari-taskiq-scheduler crash-looped 72× (container died on every startup); the email / asset-edit / download / proxy worker containers reported Up with RestartCount=0 but their inner taskiq worker-N subprocesses crash-looped under taskiq's process manager — visible only by reading the worker logs (Process worker-0 restarted with pid … over and over). Analysis and render survived only because their package __init__.py happened to load modules.email via a different transitive path before first touching infrastructure.taskiq. The fix removed the eager from . import tasks from modules/email/__init__.py (the only module with the pattern at the time); this convention exists so the next module addition doesn't re-introduce the trap. How to apply: keep new modules/X/__init__.py files free of any .tasks import — schemas, models, service, crud, enums only. Tasks register via the broker's worker entry, not via package init. The rule generalizes: any submodule whose top-level imports include infrastructure.taskiq (directly or transitively) should not be eagerly imported from a package __init__.py that can be reached during infrastructure.taskiq.__init__'s own initialization. Enforcement (MUST return zero hits): grep -lE "^from \. import tasks$|^from \.tasks " backend/src/modules/*/__init__.py. Wired into the Linting CI job as a final step so a regression fails the PR before merge.

  18. Events are wake-ups; the database is truth — Any in-process wait that subscribes to Redis pubsub to detect external progress (async for event in subscribe_to_project(...) or equivalent) MUST treat each received event as a re-check trigger, NOT as the carrier of authoritative state. On every relevant event, re-query the DB via a small helper that owns its own session (Convention #16 shape 2). Rationale: Redis pubsub has no buffering (events published before pubsub.subscribe() lands are gone), no replay, and no guaranteed delivery; payload schemas drift; and hasattr / optional-field guards on the consumer side silently swallow mismatches. Treating the event as a payload-bearing message makes every one of those failure modes a silent hang. Surfaced 2026-05-13 in wait_for_audio_extraction: the wait dereferenced event.clip_file_uuid behind a hasattr guard, but ClipReadyEvent only carries clip_uuid (the per-project Clip UUID, not the ClipFile UUID) — the hasattr returned False on every event, the pending set never drained, and analysis runs sat in pending for 8+ minutes before the task-level timeout fired. The fix re-reads load_pending_audio_clip_file_ids(project_uuid) from DB on every ClipReadyEvent and proceeds when it returns empty, regardless of event payload shape. How to apply: when adding a new event-driven wait, the event handler does ONE thing — call a DB-truth helper. The helper owns the session, the wait holds none. Never branch on event payload fields. Enforcement greps: any new async for event in subscribe_to_project(...) should be reviewed against this convention; rg "hasattr\(event," backend/src/workers/ returns zero hits (the antipattern guard). Snapshot-on-connect is the read-side complement: SSE consumers that connect mid-pipeline can miss events published before pubsub.subscribe() lands. EventPublisher.publish caches the latest AnalysisProgressEvent to a TTL'd Redis string key (progress_snapshot:<project_uuid>, TTL 3600s); subscribe_with_keepalive yields the cached frame on connect before entering the live loop. Same principle: don't trust pubsub to be load-bearing; back it with a DB or cache read.

  19. User-facing strings flow through the i18n catalog — Any service-layer raise whose message is surfaced to the client MUST come from a translation key (t_current("namespace.key", **interp) from modules/common/i18n/catalogs.py), not an inline string literal. "Surfaced to the client" is defined by EXCEPTION_MAPPING[ExcClass].use_message_when_present in modules/common/constants.py: True for ValidationError, PermissionDeniedError, InsufficientCreditsError, UserExistsError, MultipartCompleteError, MultipartAbortError — those raise-site messages reach the user verbatim. False for ResourceNotFoundError, ResourceExistsError, UserNotFoundError, TierNotFoundError, RateLimitNotFoundError, MultipartUploadNotFoundError — those messages are masked by the catalog default at error-handler time, so raise sites can stay as English debug strings without leaking. Every new translation key MUST be added to all three locale catalogs (en.py, pt.py, es.py); missing siblings fall back to English at lookup, which is silent translation drift. Workers MUST NOT call t_current() (see Convention #12) — they take a snapshotted locale task arg at kickoff and call t(locale, key) explicitly. How to apply: when adding a raise <SurfacedExc>(...) to a service, (1) add service.error_name: "message" to en.py, pt.py, es.py (translate, do not paste English into pt/es), (2) replace the raise with raise <SurfacedExc>(t_current("service.error_name", **interp)). Reuse cross-service keys when the message is identical and the namespace fits (error.superuser_required, project.access_denied). Enforcement greps (MUST return zero hits in backend/src/modules/): rg 'raise (ValidationError\|PermissionDeniedError\|InsufficientCreditsError\|UserExistsError\|MultipartCompleteError\|MultipartAbortError)\("' backend/src/modules/ and the \(f" variant. Frontend-facing copy (SPA components, landing site) goes through the i18next catalog (frontend/shared/lib/i18n.ts, landing/src/lib/i18n.ts) — same principle, separate enforcement. Phase 2 has substantively landed (the SPA's 19 namespace catalogs are populated and active; landing's en/pt/es page tree is shipped); ongoing gaps are slice-by-slice (a handful of hardcoded English literals remain in features/admin/components/Payments.tsx email templates and similar admin-internal corners). When the backend writes a frozen English string to a DB column that's surfaced to the user (e.g. EntitlementTransaction.description, Edit.reason_tag), the canonical pattern is a small backend-raw → i18n-key map maintained on the frontend rendering surface — see docs/development/frontend.md §Localizing system-generated DB strings.

  20. Cookie Max-Age must match its server-side storage TTL — When a token is stored both as a client cookie and a server-side Redis entry (CSRF tokens, session IDs, session-derived caches), the cookie Max-Age and the storage expiration MUST come from the same TTL value, OR the read-path MUST self-heal on storage miss. If Max-Age > storage TTL, the cookie outlives the server entry and every subsequent request 403s on validation until the user clears the cookie (typically by logging out). Surfaced 2026-05-13: SESSION_COOKIE_MAX_AGE = 86400 (24h) vs CSRF storage TTL = SESSION_TIMEOUT_MINUTES * 60 = 1800s (30m) drift. Idle users came back from lunch and every mutation 403'd silently — pre-fix, /auth/check-auth skipped CSRF regeneration whenever the cookie was present without verifying the stored entry existed. The shipped fix validates the cookie against storage on check_auth and regenerates + overwrites on a miss (auth.py:608-625). How to apply: when introducing a new dual-store token, either pin both TTLs to the same constant, or make the cookie's read path self-heal — if cookie_value and not storage.has(cookie_value): regenerate(). Both are acceptable; pick the one that matches the desired UX (single TTL = strict lockout after idle, self-heal = transparent recovery). Document the choice inline at the cookie-set call site. Second instance — remember-me sessions: SessionManager.timeout_seconds_for(metadata) is the single TTL source for a session whose login set remember_me — it returns SESSION_REMEMBER_ME_DAYS instead of SESSION_TIMEOUT_MINUTES, and is read at validation, on every activity-driven storage.update(expiration=…) renewal, by the cleanup sweep, and for the CSRF token's storage TTL + cookie Max-Age. Before this, only the initial session cookie + Redis TTL were extended, so the session still died at the default idle window and the CSRF token's storage TTL decayed to the default on first activity — the 30-day persistence wasn't real.

Rate Limiting

Rate limits use a RateLimit dataclass defined in infrastructure/auth/constants.py:

from infrastructure.auth.constants import RATE_LIMIT_SIGNUP, RateLimit

# Usage in route handlers:
await _check_rate_limit(request, "signup", RATE_LIMIT_SIGNUP)

Token expiry durations are also in infrastructure/auth/constants.py:

VERIFICATION_TOKEN_EXPIRY_HOURS = 24
RESET_PASSWORD_TOKEN_EXPIRY_HOURS = 1

Credit Architecture

Credits are built on the entitlement system. EntitlementTransaction is the authoritative ledger; quantity_used on UserEntitlement is a rebuildable cache. Every credit event (grant, usage, reset, refund) writes to both atomically.

Ledger Pattern

EntitlementTransaction (authoritative, append-only)
  ├── GRANT   — credits added (purchase, subscription, beta)
  ├── USAGE   — credits consumed (one per entitlement drained)
  ├── RESET   — period boundary marker (RENEWABLE credits, amount=0)
  └── REFUND  — credits returned

UserEntitlement.quantity_used (cache, rebuildable)
  └── Updated in the same db.commit() as the transaction record

rebuild_user_balance() recomputes quantity_used from the ledger at any time. Admin endpoint: POST /credits/admin/rebuild/{user_id}.

Lazy Evaluation

Two things happen lazily inside get_user_balances(), before the balance is computed:

  1. _materialize_ungranted_payments() — Finds payments with status=SUCCEEDED, entitlement_granted=False. Creates credit entitlements via atomic CAS (UPDATE ... WHERE entitlement_granted = false). The webhook is a thin fact-recorder; credits materialize on first balance read.

  2. _reset_expired_renewable_periods() — For RENEWABLE entitlements (beta credits), checks if period_start_at + renewal_period_days < now. If expired, resets quantity_used=0, writes a RESET transaction, clears stale reservations.

Both are self-healing: if the system is down when a webhook fires, the next balance read catches up.

Payment Flow

Stripe webhook fires
  → handle_checkout_session_completed()
    → Creates Payment record (status=SUCCEEDED, entitlement_granted=False)
    → Grants TIER_ACCESS eagerly (user needs dashboard access immediately)
    → Credits are NOT granted here (deferred)
  → Returns 200

User opens dashboard
  → GET /users/me/billing
    → get_user_balances()
      → _materialize_ungranted_payments()  ← credits created here
      → _reset_expired_renewable_periods() ← RENEWABLE reset here
      → Returns balance with credits

Credit Metering

Analysis endpoints check credits before processing:

  1. EstimateProjectService.estimate_analysis_credits() sums clip durations (ceiling to minutes), applies mode multiplier
  2. ReserveCreditService.reserve_credits() stamps reservation metadata (includes reserved_period for RENEWABLE entitlements)
  3. Consume — Worker calls CreditService.use_credits()consume_entitlement() drain loop writes one EntitlementTransaction(USAGE) per entitlement drained, all with commit=False, single db.commit()
  4. Release — On failure, CreditService.release_credits() clears reservation metadata

Consumption Ordering

When credits are consumed, entitlements are drained in priority order:

  1. RENEWABLE first (use-it-or-lose-it — beta credits)
  2. DECREMENTAL by expiry (expiring soonest first, then non-expiring)
  3. ACCUMULATIVE last

This means beta credits are always consumed before subscription credits, and expiring promos are consumed before permanent purchases.

Analysis Modes

Mode is derived from settings (not explicitly selected). AnalysisMode enum in modules/analysis_run/enums.py, multipliers in CREDIT_MULTIPLIERS:

Mode Multiplier Trigger
ai_edit 1.0 Any cut/censorship/director feature enabled
captions_only 0.5 Only language set (transcription + captions)
manual 0.0 Nothing toggled (frontend doesn't call analyze)

The analyze endpoint rejects a captions_only request that carries any ai_edit setting (non-zero pacing_level / false_start_sensitivity, non-empty director_notes, non-none censorship) with a 422 via ProjectService.validate_analysis_mode_consistency. This keeps analysis_mode meaningful for metrics and prevents the "0.5× credits for full pipeline" bypass a tampered client would otherwise get. The worker itself is not mode-gated beyond AI Director asset fetching, so the API-layer validation is the authoritative gate. Mode stored on AnalysisRun for audit.

Beta Entitlement

Beta users get creator-tier access + 240 renewable AI minutes/month. Managed via BetaService in entitlement/beta.py:

from modules.entitlement.beta import BetaService

beta = BetaService()
await beta.grant(user_id, db)       # TIER_ACCESS + RENEWABLE CREDIT_GRANT
await beta.revoke(user_id, db)      # Soft-deletes both by reference_id
await beta.has_access(user_id, db)  # Checks active TIER_ACCESS with BETA reason
await beta.bulk_grant(user_ids, db) # One tier lookup, bulk duplicate check, single commit
await beta.list_users(db)           # All beta users with credit usage (SQL join)

Admin invite flow: POST /admin/beta/invite takes an email. If the user exists, grants beta immediately and sends a welcome email. If not, creates a signed JWT token (24h expiry, purpose=beta_invite) and sends an invite email with a signup link.

The signup link delivers the token via ?beta_invite=TOKEN on the landing URL. Both signup paths consume it: - Email/password signup (POST /users/?beta_invite=TOKEN) verifies the token and grants beta after account creation on email match. - Google OAuth signup — the frontend forwards ?beta_invite=TOKEN to GET /auth/oauth/google?beta_invite=TOKEN; the backend persists it on OAuthState and the callback (/auth/oauth/callback/google) verifies + grants on match. Grant failures are non-fatal to OAuth login (logged, not raised).

Email matching is canonical, not byte-exact. The comparison goes through modules.common.utils.email.canonical_email() on both sides: lowercase entire address, strip +tag aliasing for any domain, strip dots in the local part for @gmail.com and @googlemail.com (other providers treat dots as distinct mailboxes — do not generalize). Without this, an invite to User.Name@gmail.com silently fails to grant beta to username@gmail.com even though Google routes both to the same mailbox.

Beta entitlements stack with subscriptions. resolve_tier_context() picks the highest-ranked tier. Credits from both pools sum. Beta credits are consumed first (RENEWABLE before DECREMENTAL).

Thumbnails

Asset and project thumbnails are generated via FFmpeg (modules/asset/thumbnail.py) and stored in R2 as thumbnail_key. The thumbnail_url is served inline in list responses (not separate endpoints) — minting is a local operation (~1ms, no network call).

  • Assets: Generated in the process_asset_artifacts worker (workers/download/tasks.py) for video/image types — confirm_upload only flips status to processing; the worker handles ffprobe + waveform + thumbnail off the request path (Convention #16). Center-cropped to 240x240 JPEG. URL is a JWT-fronted Cloudflare Worker URL (MediaTokenService.mint), same path as asset video playback. Unifies asset-playback and asset-thumbnail under a single auth model.
  • Projects: Generated from first clip in process_clip_artifacts() worker. Only if project has no thumbnail yet. URL is a presigned R2 URL (storage_client.generate_download_url) — project thumbnails were not migrated to the Worker path.
  • Enrichment: _enrich_with_thumbnail_urls() in AssetService (instance method, mints via MediaTokenService), _enrich_projects_with_thumbnail_urls() module-level in ProjectService (presigned). Called in all list/get/confirm return paths.

Vertical Crop

Optional crop for non-native aspect ratio exports. CropRegion schema (draft/schemas.py) stores normalized 0-1 coordinates. build_crop_filter() (workers/render/ffmpeg/video_filters.py) converts to FFmpeg crop=w:h:x:y filter with even-dimension rounding for H.264 chroma alignment.

Crop is injected in all 3 render paths (trim-only, asset composite, B-roll composite) after setpts and before scale+pad. For B-roll two-pass rendering, crop is applied in Pass 1 (trim) to avoid double-cropping. Source dimensions probed via get_video_dimensions() at render time.

Per-asset crop reframe (issue #235). REPLACE / INSERT video and image asset edits get their own opt-in cover-crop, independent of the main-video crop above. Schema fields on Edit: asset_crop_enabled (bool), asset_crop_zoom (∈ [1.0, 10.0]), asset_crop_pan_x / asset_crop_pan_y (∈ [-1.0, 1.0]) — all NOT NULL with defaults (migration f8a2d4c7b193). Source dims travel on AssetFile.width / AssetFile.height, populated by _probe_asset_media (migration a4c9e2b6f085); they remain NULL for audio and pre-migration legacy assets. The renderer's build_video_replace_segments calls _maybe_crop_chain per asset segment — when crop is disabled, target dims unknown, or source dims missing, it returns None and the segment takes today's letterbox scale_part. Otherwise it writes a crop_chain of scale=<cover>,crop=<region>,scale=<exact target>,setsar=1 onto the VideoReplaceSegment (workers/render/ffmpeg/concat/segments.py). _compute_crop_region (workers/render/ffmpeg/video_filters.py) is the Python port of frontend/shared/lib/cropUtils.ts:computeCropRegion; the two implementations must stay in lockstep so preview matches export. Encoding is UI-state (zoom + pan) rather than a frozen CropRegion {x, y, w, h} because the crop window must auto-adapt when the user changes project aspect — frozen render-state would silently re-letterbox on the new target.

Tier Enforcement

Tier-specific limits are centralized in UserTierContext — a frozen dataclass resolved once per request via resolve_tier_context(). Properties: max_projects, can_use_ai_director, can_watermark_free, can_access_support, storage_quota_mb, export_retention_days.

from modules.entitlement.tier_context import resolve_tier_context

# In API routes (via DI dependency) — the ONLY place resolve_tier_context
# is called. Snapshot the decision onto the queued record if a worker needs it.
tier_ctx = await resolve_tier_context(user_id, db)
if tier_ctx.max_projects <= project_count:
    raise ValidationError("Project limit reached")

# For worker-bound work: snapshot at request time (Convention #12).
export = await crud_exports.create(
    db=db,
    object=ExportCreateInternal(
        ...,
        watermark_required=not tier_ctx.can_watermark_free,  # snapshot
    ),
)

# In workers: read from the snapshot on the queued record, NEVER
# re-resolve tier_ctx. The user's tier may have changed between
# kickoff and execution; the snapshot preserves intent.
if export["watermark_required"]:
    apply_watermark(...)

See Convention #12 for the full rationale + inline # noqa: tier-in-worker exception mechanism for the rare observational case.

Resolution Logic

resolve_tier_context() picks the highest-ranked tier across all active tier entitlements (not the first one). Key behaviors:

  • tier_name: From the highest-ranked tier (free=0, hobby=1, creator=2, viral=3)
  • is_paid: True if ANY entitlement has a paid grant_reason (subscription, purchase, enterprise_contract) — across all entitlements, not just the winner
  • grant_reasons: Frozenset of every active entitlement's grant reason. Callers check set membership (e.g. {"beta"} & tier_ctx.grant_reasons) rather than a scalar winner — the previous scalar field was unreliable on TIER_RANK ties (e.g. TRIAL+BETA both on creator), where max() could pick TRIAL and hide the beta entitlement
  • can_watermark_free: True if is_paid and tier in WATERMARK_FREE_TIERS, OR if grant_reasons intersects WATERMARK_FREE_GRANT_REASONS (includes beta)

This means a beta+hobby user gets creator features (beta wins on tier rank), is_paid=True (from hobby), and watermark-free (beta in grant_reasons).

Constants in modules/entitlement/constants.py: TIER_MAX_PROJECTS, TIER_STORAGE_MB, TIER_EXPORT_RETENTION_DAYS, AI_DIRECTOR_TIERS, TIER_RANK, PAID_GRANT_REASONS, WATERMARK_FREE_GRANT_REASONS.

Unit conversions use BYTES_PER_MB from modules/common/constants.py (e.g., quota_bytes = tier_ctx.storage_quota_mb * BYTES_PER_MB).

Priority Queue

Task brokers use RabbitMQ with native priority queues. User-facing tasks (analysis, render, asset edit) are enqueued with a priority derived from the user's subscription tier. Non-user-facing tasks (email, download) use plain FIFO.

Priority Mapping

Tier is_paid Priority Constant
viral true 3 TaskPriority.HIGH
creator true 2 TaskPriority.NORMAL
hobby true 1 TaskPriority.LOW
trial / free / none false 0 TaskPriority.BACKGROUND

Enqueue Pattern

from infrastructure.taskiq.priority import resolve_task_priority
from modules.entitlement.tier_context import resolve_tier_context

tier_ctx = await resolve_tier_context(user_id, db)
priority = resolve_task_priority(tier_ctx.tier_name, tier_ctx.is_paid)
await analyze_project.kicker().with_labels(priority=int(priority)).kiq(
    project_uuid=str(project_uuid),
)

Priority is resolved at enqueue time — if a user upgrades, their next task gets the new priority. Tasks already in the queue keep their original priority.

Queue Configuration

Priority queues use QueueType.CLASSIC with max_priority=3 and qos=2 (low prefetch for strict ordering). Brokers and constants are in infrastructure/taskiq/brokers.py and infrastructure/taskiq/priority.py.

Feature Flags

Postgres-backed feature flags for kill switches, gradual rollouts, beta testing, and time-limited features. No external dependencies.

Models

FeatureFlag — the flag itself (key, enabled, rollout_percentage, description, expires_at). FeatureFlagUser — explicit allowlist (user always gets the flag).

Evaluation Order

  1. enabled=Falseoff (kill switch — overrides everything)
  2. User in allowlist → on (bypasses expiry and percentage)
  3. expires_at in the past → off
  4. rollout_percentage set → deterministic hash check
  5. rollout_percentage null → on (everyone)

Route Gating

Use require_feature() dependency to gate endpoints behind flags. Returns 404 (not 403) to hide existence:

from modules.feature_flag.dependencies import require_feature

@router.post("/new-endpoint")
async def new_endpoint(
    ...,
    _: Annotated[None, Depends(require_feature("my_flag"))],
):
    # Only reachable when flag is on for this user

Frontend

useFeatureFlag("key") returns a boolean backed by GET /api/v1/flags (cached 5 minutes). The backend is the authority — even if someone inspects the response, the route dependency blocks access.

Admin Endpoints

GET/POST/PATCH/DELETE /api/v1/admin/flags + allowlist management via /admin/flags/{id}/users. Superuser only.

Scripts

Script Purpose How to run
scripts/setup_initial_data.py Create tables + seed tiers Auto-runs on docker compose up
scripts/seed_all.py Tables + tiers + Stripe products (full setup) Auto-runs on docker compose up
scripts/seed_stripe_products.py Stripe products + prices only Called by seed_all.py if STRIPE_SECRET_KEY is set
scripts/seed_trial_credits.py Grant trial credits to a user uv run python scripts/seed_trial_credits.py --email user@example.com

Key Files

Purpose Location
Database session infrastructure/database/session.py
Model mixins infrastructure/database/models.py
Auth dependencies infrastructure/auth/session/dependencies.py
Auth constants infrastructure/auth/constants.py
Common schemas modules/common/schemas.py
Domain exceptions modules/common/exceptions.py
Exception → HTTP mapping (HTTPMapping + EXCEPTION_MAPPING) modules/common/constants.py
Error handler modules/common/utils/error_handler.py
i18n catalogs + t() lookup + t_current() raise-site helper modules/common/i18n/{catalogs.py,en.py,pt.py,es.py}
i18n request-locale ContextVar (get_current_locale / set_current_locale) modules/common/i18n/context.py
i18n LocaleMiddleware (pure-ASGI; reads Accept-Language → ContextVar) modules/common/i18n/middleware.py
API dependencies interfaces/api/dependencies.py
Route registration interfaces/api/v1/__init__.py
Credit service + ledger modules/credits/service.py
Entitlement transaction model modules/credits/models.py
Trial granting modules/entitlement/trial.py
Beta service modules/entitlement/beta.py
Beta admin API interfaces/api/v1/beta.py
Tier context resolution modules/entitlement/tier_context.py
Entitlement constants modules/entitlement/constants.py
Broker setup infrastructure/taskiq/brokers.py
Priority routing infrastructure/taskiq/priority.py
Feature flags modules/feature_flag/service.py
Feature flag dependency modules/feature_flag/dependencies.py

← Getting Started Frontend Development →