Backend Development Guide¶
This guide covers patterns and conventions for developing in the Sapari backend.
Project Structure¶
The backend follows a layered architecture: business logic in modules, API routes in interfaces, shared infrastructure, and background workers.
flowchart TB
subgraph src["backend/src/"]
M[modules/] --> |Business Logic| MC[user/, clip/, project/, ...]
I[interfaces/] --> |API Layer| IA[api/v1/]
IN[infrastructure/] --> |Cross-Cutting| IC[database/, auth/, storage/, events/]
W[workers/] --> |Background Jobs| WC[analysis/, render/, download/]
end
- modules/ - Domain business logic. Each module has its own models, schemas, CRUD, and services.
- interfaces/ - External interfaces (REST API, could add GraphQL/gRPC).
- infrastructure/ - Database, auth, storage, events. Shared plumbing.
- workers/ - Background tasks: transcription, rendering, downloads.
backend/src/
├── modules/ # Domain business logic
│ ├── user/ # Each module has: model, schemas, crud, service
│ ├── project/
│ ├── clip/
│ ├── edit/
│ ├── asset/
│ ├── caption_line/
│ ├── credits/ # Credit balance, transactions, metering
│ ├── entitlement/ # Flexible access control, trial grants
│ ├── payment/ # Stripe checkout, subscription lifecycle
│ ├── subscription/ # Subscription records
│ ├── product/ # Products with entitlement configs
│ ├── price/ # Stripe prices
│ ├── tier/ # Subscription tiers (hobby, creator, viral)
│ ├── discount/ # Coupons, promo codes
│ ├── email/ # Email service, templates, tasks
│ ├── export/
│ ├── feature_flag/ # Feature flags with rollout + allowlist
│ ├── draft/
│ ├── preset/
│ ├── notification/ # In-app notifications (user + system-wide), SSE push
│ ├── analysis_run/
│ ├── auth/ # Auth service, schemas, domain exceptions
│ ├── admin_audit/ # Admin event log + audit trail (who/what/when + before/after diffs)
│ ├── api_keys/ # API key management
│ ├── preview_preset/ # Preview preset configurations
│ ├── rate_limit/ # Rate limit management (per-endpoint, tier-based)
│ ├── support/ # Support conversations (bidirectional, priority auto-assignment)
│ └── common/ # Shared exceptions, utilities, constants
├── interfaces/ # External interfaces
│ ├── api/v1/ # REST API routes
│ └── main.py # FastAPI app factory
├── infrastructure/ # Cross-cutting concerns
│ ├── database/ # Session, models, migrations
│ ├── auth/ # Authentication, OAuth, sessions, JWT
│ ├── stripe/ # Stripe client, webhooks
│ ├── storage/ # R2/S3 client
│ ├── events/ # SSE pub/sub (project-scoped + user-scoped notification channels)
│ ├── config/ # Settings
│ ├── ai/ # LLM providers (OpenAI, Anthropic)
│ ├── cache/ # Redis/Memcached
│ ├── email/ # Email settings
│ ├── taskiq/ # Background task broker, scheduler
│ ├── rate_limit/ # Redis-backed rate limiting
│ └── logging/ # Structured logging, Logfire
└── workers/ # Background task processing
├── analysis/ # Transcription, silence/false-start detection
├── render/ # FFmpeg processing
├── download/ # yt-dlp, audio extraction
├── asset_edit/ # Asset processing
└── shared/ # Shared worker utilities
Adding a New Module¶
Follow this order - each step builds on the previous:
flowchart LR
A[1. Model] --> B[2. Schemas]
B --> C[3. CRUD]
C --> D[4. Service]
D --> E[5. Routes]
E --> F[6. Tests]
Step 1: Create the Model¶
Models live in modules/{name}/models.py. Use mixins for common fields:
# modules/widget/models.py
from sqlalchemy import String, Integer, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column
from ...infrastructure.database.models import UUIDMixin, TimestampMixin, SoftDeleteMixin
from ...infrastructure.database.session import Base
class Widget(Base, UUIDMixin, TimestampMixin, SoftDeleteMixin):
"""Widget model with UUID primary key and soft deletion."""
__tablename__ = "widget"
# Required fields
name: Mapped[str] = mapped_column(String(100))
# Optional fields with defaults
count: Mapped[int] = mapped_column(Integer, default=0)
# Foreign keys (always indexed)
user_id: Mapped[int] = mapped_column(
ForeignKey("user.id", ondelete="CASCADE"),
index=True,
)
Mixin Reference:
| Mixin | Fields Added | When to Use |
|---|---|---|
UUIDMixin |
uuid (PK) |
Public-facing entities |
TimestampMixin |
created_at, updated_at |
Almost always |
SoftDeleteMixin |
deleted_at, is_deleted |
When you need audit trails |
Step 2: Create Schemas¶
Schemas validate input, serialize output, and generate OpenAPI docs. Create variants for different operations:
modules/{name}/schemas.py:
# modules/widget/schemas.py
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field
from typing import Annotated
from ..common.schemas import TimestampSchema
# Base with shared fields
class WidgetBase(BaseModel):
name: Annotated[str, Field(min_length=1, max_length=100)]
# For API responses (excludes sensitive data)
class WidgetRead(TimestampSchema, WidgetBase):
uuid: UUID
count: int
user_id: int
# For creating (only user-provided fields)
class WidgetCreate(WidgetBase):
model_config = ConfigDict(extra="forbid") # Reject unknown fields
# For updating (all fields optional)
class WidgetUpdate(BaseModel):
model_config = ConfigDict(extra="forbid")
name: Annotated[str | None, Field(min_length=1, max_length=100, default=None)]
count: int | None = None
Schema Naming Convention:
| Schema | Purpose |
|---|---|
WidgetBase |
Shared fields for inheritance |
WidgetRead |
API responses |
WidgetCreate |
POST request bodies |
WidgetCreateInternal |
Internal creation with hashed passwords, etc. |
WidgetUpdate |
PATCH request bodies (all optional) |
WidgetDelete |
Soft deletion data |
Step 3: Create CRUD¶
We use FastCRUD to implement the repository pattern. It gives us a data access layer that abstracts away raw SQL, providing a clean interface for database operations:
# modules/widget/crud.py
from fastcrud import FastCRUD
from .models import Widget
crud_widgets: FastCRUD = FastCRUD(Widget)
Common CRUD Operations:
# Create
widget = await crud_widgets.create(db=db, object=widget_data, schema_to_select=WidgetRead)
# Read one
widget = await crud_widgets.get(db=db, uuid=uuid, is_deleted=False)
# Read many with pagination
widgets = await crud_widgets.get_multi(
db=db,
offset=skip,
limit=limit,
schema_to_select=WidgetRead,
is_deleted=False,
)
# Check existence
exists = await crud_widgets.exists(db=db, name="foo")
# Update
updated = await crud_widgets.update(db=db, object=update_data, uuid=uuid)
# Soft delete
await crud_widgets.delete(db=db, uuid=uuid)
# Hard delete (rarely used)
await crud_widgets.db_delete(db=db, uuid=uuid)
# Joins
result = await crud_widgets.get_joined(
db=db,
join_model=User,
join_prefix="user_",
schema_to_select=WidgetRead,
join_schema_to_select=UserRead,
uuid=uuid,
nest_joins=True,
)
Query Patterns¶
FastCRUD's get_multi() defaults to limit=100. This is correct for API endpoints (human pagination) but dangerous for internal queries that need all records. Three cases:
Case 1: A human will scroll this. Paginate. Caller passes limit/offset, capped by MAX_PAGE_LIMIT:
# API endpoint -- caller controls pagination
widgets = await crud_widgets.get_multi(
db=db, offset=skip, limit=limit,
schema_to_select=WidgetRead, is_deleted=False,
)
Case 2: Code needs all records. Use limit=None. The WHERE clause is the bound:
# Internal -- domain-bounded query, no pagination
clips = await crud_clips.get_multi_joined(
db=db, project_id=project_uuid, limit=None,
schema_to_select=ClipReadBase,
joins_config=[clip_file_join_config()],
)
limit=None is safe when: (1) the WHERE clause bounds the count, (2) per-record work is cheap, (3) the bound is stable. When (3) doesn't hold (e.g., edits depend on analysis pipeline design), add a sanity cap that raises loudly.
Case 3: Expensive work per record. Job queue. Batch IDs, enqueue tasks:
# Batch processing -- expensive work per record
while True:
users = await crud_users.get_multi_by_cursor(
db=db, cursor=cursor, limit=DEFAULT_BATCH_SIZE,
)
if not users["data"]:
break
await process_batch(users["data"])
cursor = users["next_cursor"]
Write-side enforcement: Entity count caps belong on creation, not on reads. Example: MAX_CLIPS_PER_PROJECT = 100 is checked in presign_upload() and import_youtube(), then reads use limit=None with confidence. For concurrent safety, wrap count → check → create with advisory_xact_lock(db, f'clips:{project_uuid}') — see §Count-Race Prevention below.
SQL aggregates over get_multi: When computing a scalar (sum, count, max), use SQLAlchemy directly instead of loading rows into Python:
# Correct: SQL aggregate
stmt = select(func.sum(EntitlementTransaction.cost_microcents)).where(...)
result = await db.execute(stmt)
# Wrong: loading rows to sum in Python
result = await crud.get_multi(db=db, ...)
total = sum(row["cost"] for row in result["data"])
Step 4: Create Service¶
Services contain business logic - validation, permissions, orchestrating multiple CRUD operations:
# modules/widget/service.py
from typing import Any
from sqlalchemy.ext.asyncio import AsyncSession
from ..common.exceptions import WidgetNotFoundError, PermissionDeniedError
from .crud import crud_widgets
from .schemas import WidgetCreate, WidgetRead, WidgetUpdate
class WidgetService:
"""Business logic for widget operations."""
async def create(
self,
widget: WidgetCreate,
user_id: int,
db: AsyncSession,
) -> dict[str, Any]:
"""Create a widget for a user."""
widget_data = widget.model_dump()
widget_data["user_id"] = user_id
return await crud_widgets.create(
db=db,
object=widget_data,
schema_to_select=WidgetRead,
)
async def get_by_uuid(
self,
uuid: str,
db: AsyncSession,
) -> dict[str, Any]:
"""Get a widget by UUID."""
widget = await crud_widgets.get(
db=db,
schema_to_select=WidgetRead,
uuid=uuid,
is_deleted=False,
)
if not widget:
raise WidgetNotFoundError(f"Widget {uuid} not found")
return widget
async def verify_ownership(
self,
widget: dict[str, Any],
user_id: int,
) -> None:
"""Verify user owns this widget."""
if widget["user_id"] != user_id:
raise PermissionDeniedError("You don't own this widget")
Service Patterns:
- Services are stateless classes
- Each method takes
db: AsyncSessionas parameter - Raise domain exceptions (not HTTP exceptions)
- Use CRUD for database operations
- Handle business validation and permissions
Step 5: Create Routes¶
Routes go in interfaces/api/v1/{name}.py. Keep them thin - delegate to services, don't put logic here. Global exception handlers catch DomainError and unhandled exceptions automatically, so routes don't need try/except unless a specific exception requires a custom status code:
# interfaces/api/v1/widgets.py
from typing import Annotated, Any
from uuid import UUID
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from ....infrastructure.auth.session.dependencies import get_current_user
from ....infrastructure.database.session import async_session
from ....modules.widget.schemas import WidgetCreate, WidgetRead, WidgetUpdate
from ....modules.widget.service import WidgetService
from ..dependencies import get_widget_service
router = APIRouter(tags=["Widgets"])
@router.post(
"/",
status_code=201,
response_model=WidgetRead,
summary="Create Widget",
responses={
201: {"description": "Widget created"},
401: {"description": "Not authenticated"},
},
)
async def create_widget(
widget: WidgetCreate,
db: Annotated[AsyncSession, Depends(async_session)],
current_user: Annotated[dict[str, Any], Depends(get_current_user)],
widget_service: Annotated[WidgetService, Depends(get_widget_service)],
) -> dict[str, Any]:
"""Create a new widget."""
return await widget_service.create(widget, current_user["id"], db)
@router.get(
"/{widget_uuid}",
response_model=WidgetRead,
summary="Get Widget",
)
async def get_widget(
widget_uuid: UUID,
db: Annotated[AsyncSession, Depends(async_session)],
widget_service: Annotated[WidgetService, Depends(get_widget_service)],
) -> dict[str, Any]:
"""Get a widget by UUID."""
return await widget_service.get_by_uuid(str(widget_uuid), db)
Register the router in interfaces/api/v1/__init__.py:
from .widgets import router as widgets_router
router.include_router(widgets_router, prefix="/widgets")
Dependency Injection¶
FastAPI's Depends() handles dependency injection. Database sessions are per-request. Everything else (services, infrastructure) are singletons created in the lifespan and accessed via request.state (ASGI lifespan state).
flowchart TB
subgraph Lifespan["Created Once (Lifespan)"]
SVC[All Services]
STORAGE[StorageClient]
EMAIL[PostmarkClient]
STRIPE[StripeService]
end
subgraph Request["Per Request"]
DB[async_session]
AUTH[get_current_user]
end
Route --> DB
Route --> SVC
Route --> AUTH
All singletons are created in app_factory.py's lifespan and yielded as a dict. Dependencies in interfaces/api/dependencies.py read from request.state:
from typing import cast
def get_widget_service(request: Request) -> WidgetService:
return cast(WidgetService, request.state.widget_service)
def get_storage_client(request: Request) -> StorageClient:
return cast(StorageClient, request.state.storage_client)
Services with dependencies share singleton instances via constructor injection:
# In lifespan (app_factory.py):
entitlement_service = EntitlementService()
credit_service = CreditService(entitlement_service=entitlement_service)
beta_service = BetaService(entitlement_service=entitlement_service, credit_service=credit_service)
yield {"entitlement_service": entitlement_service, "credit_service": credit_service, ...}
Authentication dependencies:
# Require any authenticated user
current_user: Annotated[dict[str, Any], Depends(get_current_user)]
# Optional user (returns None for anonymous callers instead of 401)
current_user: Annotated[dict[str, Any] | None, Depends(get_optional_user)]
# Require superuser/admin (narrow meta endpoints only: health, queues, sudo, tier catalog)
_: Annotated[dict[str, Any], Depends(get_current_superuser)]
# Require superuser + audit trail + IP allowlist + session timeout (default for admin endpoints)
audit: Annotated[AdminAuditContext, Depends(get_admin_audit_context)]
# ... then inside handler, after the service call:
await audit.log(AdminEventType.SEARCH, "user", details={"action": "list"})
When to use which: any admin endpoint that returns PII or mutates admin-touchable state should use get_admin_audit_context. The bare get_current_superuser is reserved for narrow meta endpoints where audit noise exceeds value (health checks, queue polling, sudo primitive, tier catalog lookup).
Request-scoped locale: there are two ways for request-stack code to get the user-facing locale, both backed by the same modules/common/i18n package.
(1) get_request_locale FastAPI dep returns authenticated User.locale → Accept-Language header → DEFAULT_LOCALE. It's built on get_optional_user so unauthenticated paths (signup, OAuth callback for first-time users, public routes) still get a locale without raising 401. Used today at POST /users/ (password signup) and GET /auth/oauth/callback/google to persist User.locale on first-time account creation, and inside the error-mapping path via handle_exception(error, locale) so the route's localized detail is rendered in the user's UI language. Pass the resolved value into the service when the service legitimately needs the explicit parameter (the signup case: it must be persisted onto a yet-to-be-created User row).
# Resolve the locale once per request and pass to the service
signup_locale: Annotated[str, Depends(get_request_locale)]
(2) t_current(key, **interp) from modules/common/i18n/catalogs.py reads the locale from a per-request ContextVar set by LocaleMiddleware (modules/common/i18n/middleware.py, registered outermost in create_application so it runs before any route, dep, or service body). This is the path service-layer code should reach for at message-localization raise sites — it avoids threading a locale parameter through every method down the call stack. The middleware reads Accept-Language only (no User.locale lookup), which matches the catalog-resolution model on the error path; the authed-user case is still covered because the SPA stamps Accept-Language from User.locale after the server-wins reconcile. Pure-ASGI shape by design — BaseHTTPMiddleware runs the wrapped app in a separate asyncio task and the ContextVar set inside its dispatch is invisible to the route handler.
# Service-layer raise site (no locale param needed)
raise UserNotFoundError(t_current("error.user_not_found"))
The catalogs themselves live in modules/common/i18n/{en,pt,es}.py, aggregated by catalogs.py:t(locale, key). t_current(key) is t(get_current_locale(), key) with the ContextVar read folded in. Workers MUST NOT use t_current() — task processes have no request scope, so the read returns DEFAULT_LOCALE for every user. Workers take a snapshotted locale task arg (Convention #12) and call t(locale, key) explicitly. The transition of existing service raise sites from raw English strings to t_current() is a per-slice migration (see §Exception Handling — raise-site localization).
The global DomainError exception handler and CatchAllErrorMiddleware intentionally DO NOT call get_request_locale — they sit outside the FastAPI dependency graph and read Accept-Language directly via resolve_locale_from_accept_language. The SPA stamps the header on every request, so the authed-user case is covered without a DB read on the error path. Routes that catch a non-domain exception themselves and call handle_exception(error, locale) SHOULD pass the dep-resolved locale (which prefers User.locale); the canned-default path through the global handler uses the header-only fallback.
Exception Handling¶
Services raise domain-specific exceptions (like WidgetNotFoundError), not HTTP exceptions. Two global exception handlers catch these automatically — routes don't need try/except for standard errors.
flowchart LR
A[Service raises DomainError] --> B[Global DomainError handler]
B --> C[EXCEPTION_MAPPING → HTTPException]
C --> D[Sanitized JSON response]
E[Unhandled Exception] --> F[Global catch-all handler]
F --> G[Log details server-side]
G --> H[Generic 500 response]
Global handlers are registered in modules/common/utils/error_handler.py via register_exception_handlers(), called in app_factory.py. Three layers:
RequestValidationErrorhandler: FastAPI validation errors return a generic 422 message (no field names leaked), resolved throught(locale, INVALID_REQUEST_MESSAGE_KEY).DomainErrorhandler: each 4xx exception'sHTTPMappingdecides whether to render a canned default (viat(locale, mapping.default_key)) or pass the rawstr(exc)message through (whenuse_message_when_present=Trueand the raise site supplied a message). Messages that pass through (ValidationError,PermissionDeniedError,InsufficientCreditsError,UserExistsError) MUST be pre-sanitized by the raising service — the exception class docstrings spell this out. Everything else (ResourceNotFoundError,ResourceExistsError,TierNotFoundError, etc.) returns the canned localized string. 5xx domain errors always returnt(locale, GENERIC_ERROR_MESSAGE_KEY).CatchAllErrorMiddleware: Unhandled exceptions return a generic 500, also localized viat(locale, GENERIC_ERROR_MESSAGE_KEY).
All error responses include a support_id (8-char UUID) for customer support lookup. Full details logged server-side only. Routes use handle_exception(error, locale) as a fallback for non-domain errors — pass Depends(get_request_locale) from the route to render the detail in the user's UI language; the default falls back to English.
Locale resolution in handlers + middleware bypasses get_request_locale. The handlers and CatchAllErrorMiddleware sit outside the FastAPI dependency graph, so they cannot call the dep. Instead, _resolve_request_locale(request) reads Accept-Language directly via resolve_locale_from_accept_language. The SPA stamps Accept-Language on every request (ApiClient.setLocale), so the authed-user case is covered without a DB read; unauthed requests get either the header value or DEFAULT_LOCALE. This is the intentional asymmetry with the route-side get_request_locale (which prefers User.locale) — handlers run on every request including the error path that the dep itself might have failed, so they must stay decoupled from the user lookup.
Define domain exceptions in modules/common/exceptions.py:
class DomainError(Exception):
"""Base for all domain errors."""
pass
class ResourceNotFoundError(DomainError):
"""Resource not found."""
pass
class WidgetNotFoundError(ResourceNotFoundError):
"""Widget not found."""
pass
class PermissionDeniedError(DomainError):
"""User lacks permission."""
pass
Mapping to HTTP in modules/common/constants.py. EXCEPTION_MAPPING: dict[type[DomainError], HTTPMapping] — each entry is an HTTPMapping(status, default_key, use_message_when_present) frozen dataclass. default_key is an i18n catalog key resolved at handler time via t(locale, key); use_message_when_present=True means a non-empty raise-site str(exc) overrides the default (so a service can say ValidationError("Draft 'X' already exists") and have that exact string surface to the client):
from dataclasses import dataclass
@dataclass(frozen=True)
class HTTPMapping:
status: int
default_key: str
use_message_when_present: bool
EXCEPTION_MAPPING: dict[type[DomainError], HTTPMapping] = {
ResourceNotFoundError: HTTPMapping(404, "error.resource_not_found", False),
PermissionDeniedError: HTTPMapping(403, "error.permission_denied", True),
ValidationError: HTTPMapping(422, "error.invalid_request", True),
ResourceExistsError: HTTPMapping(409, "error.resource_exists", False),
InsufficientCreditsError: HTTPMapping(402, "error.insufficient_credits", True),
# ... see constants.py for the full list
}
Adding a new domain exception: define the class in modules/common/exceptions.py, add the corresponding canonical English string to modules/common/i18n/en.py under a new error.<snake_case_name> key, mirror the translation into pt.py + es.py (the TestCatalogParity test in tests/unit/modules/common/i18n/test_catalogs.py fails the PR if you skip the mirrors — per-PR key-parity gate, no CI step required), then add an HTTPMapping entry to EXCEPTION_MAPPING. Pick use_message_when_present=True only when raise sites legitimately need to surface a distinct, pre-sanitized message per call; otherwise leave it False and the canned localized default wins.
Raise-site str(exc) messages (the use_message_when_present branch) are migrating from raw English to localized strings incrementally. The helper is t_current(key, **interp) from modules/common/i18n/catalogs.py — it reads the request locale from the LocaleMiddleware-set ContextVar, so raise sites in service code don't need a threaded locale parameter. Pattern: raise ValidationError(t_current("error.draft_already_exists", name=draft_name)). Services adopt it slice-by-slice as their messages are added to the catalog; the canned defaults + the RequestValidationError 422 + the 5xx generic all flow through t() already, so an unmigrated service still produces a localized response — just with the canned default instead of the raise-site detail. The CSRF dependency (infrastructure/auth/session/dependencies.py:verify_csrf_token) is the first non-service raise-site adopter — it raises CSRFException(t_current("auth.csrf_missing")) / CSRFException(t_current("auth.csrf_invalid")) from the request scope, same pattern as services. Workers MUST NOT use t_current() — see §Dependency Injection · Request-scoped locale (2) for the worker rule + the t(locale, key) snapshot pattern; workers/analysis/tasks.py:_record_success is the first in-tree instance, threading user_locale from interfaces/api/v1/projects.py:trigger_analysis (Depends(get_request_locale)) through .kiq(user_locale=...) to a t(user_locale, "notification.analysis.title.*") call inside the worker.
Database Patterns¶
Async Sessions¶
All database operations are async. The session is injected via Depends(async_session) and auto-commits on success:
from sqlalchemy.ext.asyncio import AsyncSession
async def my_operation(db: AsyncSession):
# Operations auto-commit on success
result = await crud_widgets.create(db=db, object=data)
return result
Count-Race Prevention¶
count → check → insert is not atomic under concurrent requests. Two requests can both read the same count, both pass the limit check, both insert — the cap is silently exceeded. Two tools close the race, matching different constraint shapes.
Partial unique index — works when the constraint is "exactly one row per X" (e.g., one trial per user). Postgres rejects the second insert at commit time with IntegrityError. The service catches and converts to the domain return value. See entitlement/models.py (uq_user_entitlement_trial) and entitlement/trial.py:grant_trial_if_eligible for the shipped pattern.
Advisory lock — works for any N-bounded cap. Helper lives at infrastructure/database/locks.py:advisory_xact_lock. Example:
from ...infrastructure.database.locks import advisory_xact_lock
async def create_clip(self, project_uuid, ..., db):
await advisory_xact_lock(db, f"clips:{project_uuid}")
count = await crud_clips.count(db=db, project_id=project_uuid)
if count >= MAX_CLIPS_PER_PROJECT:
raise ValidationError(...)
await crud_clips.create(db=db, object=..., commit=True) # commit releases the lock
The lock's transaction is the one that autobegins at db.execute inside the helper. The crud.create(commit=True) that ends the block commits that transaction and releases the lock. Do NOT call await db.commit() between the lock acquire and the create — it releases the lock early.
Why pg_advisory_xact_lock (blocking, transaction-scoped) over pg_try_advisory_xact_lock (non-blocking) or pg_advisory_lock (session-scoped): we want waiters to serialize not error; we want auto-release on commit/rollback so a caller can't leak a lock by forgetting to release.
Key convention: "<entity>:<identifier>" (e.g., "clips:<project_uuid>", "user_projects:<user_id>", "presets:<user_id>", "preview_presets:<user_id>"). Namespaces prevent cross-entity hashtext collisions; per-identifier scoping avoids unrelated-entity serialization. Distinct-but-related entities (e.g., the two preset tables each have independent per-user caps) MUST use distinct keys — colliding them would serialize unrelated operations for no safety benefit. Collision rate is ½^32 per key pair — accepted.
Indexes¶
Add indexes for: - Foreign keys (always) - Fields used in WHERE clauses - Fields used for sorting
# Single column
email: Mapped[str] = mapped_column(String(50), unique=True, index=True)
# Composite index
__table_args__ = (
Index("idx_widget_user_status", "user_id", "status"),
)
Relationships¶
from sqlalchemy.orm import relationship
# One-to-many (parent side)
widgets: Mapped[list["Widget"]] = relationship(
"Widget",
back_populates="user",
lazy="selectin", # Eager load
default_factory=list,
init=False,
)
# Many-to-one (child side)
user: Mapped["User"] = relationship(
"User",
back_populates="widgets",
lazy="selectin",
init=False,
)
Database connection — direct endpoint, matched region¶
Two things matter for query latency:
- Use the direct Postgres endpoint, NOT the pooler. PgBouncer in transaction mode breaks asyncpg's prepared-statement cache, forcing 3-4× the round trips per query. Our app already has SQLAlchemy connection pooling (
POSTGRES_POOL_SIZE=50+POSTGRES_MAX_OVERFLOW=20= 70-connection ceiling per API instance;MAX_OVERFLOWprovides spike-absorption headroom so pressure shows up as alarmable pool-checkout latency rather than a cliff-edge 500), so PgBouncer in front of asyncpg is redundant and harmful. The pooler is designed for serverless apps that open one connection per request; we run persistent uvicorn workers.
Caveat — Neon compute auto-suspend on Hobby tier. Hobby-tier Neon compute scales to zero after ~5 min idle (default). When the next request arrives, asyncpg's pooled connection is stale → request acquires fresh conn (pool_pre_ping handles it) but if the conn dies during a request, the cleanup-time rollback raises asyncpg.InterfaceError: cannot call Transaction.rollback(): the underlying connection is closed and bubbles as a 500. Defense-in-depth fix at infrastructure/database/session.py swallows close-time errors at debug level (added 2026-04-28 for #56). For production, upgrade Neon to Pro tier (always-on compute, $19/mo) to remove the auto-suspend window entirely — the convention above keeps holding because we stay on the direct endpoint.
- DB region must match server region. Each query is one network round trip. Cross-region (server in US-East, DB in US-West or vice versa) adds ~50-65ms per query. A single request that runs 4-6 sequential queries pays 200-400ms in DB latency before any other work happens. Match regions (Hetzner Ashburn ↔ Neon
us-east-1, Hetzner Hillsboro ↔ Neonus-west-2) for sub-10ms query latency.
Both decisions are made at provision time. Changing the DB region requires creating a new Neon project and migrating data.
Migrations¶
Generate migrations with Alembic:
Key Conventions¶
- UUIDs for public APIs - Never expose internal integer IDs
- Soft deletes - Use
is_deleted=Falsein queries - Async everything - All I/O operations are async
- Type hints - Full typing with
Mapped[]andAnnotated[] - Domain exceptions - Raise domain errors, map to HTTP in routes
- Validation at boundaries - Pydantic validates API input
- Use FastCRUD for simple CRUD - For complex queries (joins, aggregations, batch updates), use SQLAlchemy directly
- No magic numbers - Extract to constants in module-level
constants.pyfiles - No imports inside methods - All imports at module top level
- Client IP extraction - Always use
modules.common.utils.request_ip.get_client_ip(request)instead ofrequest.client.hostdirectly. The raw transport peer is Caddy's internal Docker IP; the helper reads the X-Forwarded-For header that Caddy populates from Cloudflare'sCF-Connecting-IP. Misuse causes per-IP rate limits to track Caddy as the "client" and lock out all users globally. - Client-supplied offsets validated against computed ceilings - Any API field that references a position in a computed timeline (e.g.,
edit.end_msintoSUM(clip_file.duration_ms),asset_edit.end_msintouser_asset.duration_ms,edit_override.{start_ms, end_ms}into the same project SUM,edit.asset_offset_msintoasset_file.duration_ms) must be bounds-checked at the service layer against the computed ceiling before persisting. The schemaField(ge=0)+end_ms > start_msvalidator is necessary but not sufficient — a tampered client can sendend_ms=999999999which schemas accept but renders silently clamp or crash. Four canonical instances ship today: (a)EditService._validate_edit_boundsinedit/service.pyusesclip.utils.get_project_total_duration_ms(returnstuple[ProjectDurationStatus, int];EMPTYrejects,NOT_READYskips,READYenforces) — needed because the ceiling is a SUM across rows not already fetched at the call site. The check applies to non-asset edits (silence/false_start/clean/manual_cut/keep) AND to asset edits with non-INSERT visual modes (OVERLAY, REPLACE, NONE, null) that are NOT positioned inside an insert. Two exceptions bypass the timeline ceiling, both driven by the insert mechanism extendingoutput_duration_msat render (workers/render/steps.py:567). (1) INSERT-mode asset edits — the ceiling is replaced by a duration capend_ms - start_ms ≤ MAX_VIDEO_DURATION_MS(same deferred-exploit defense, scoped to a duration sanity cap rather than a position ceiling). (2) Non-INSERT asset edits positioned inside an insert (inside_insert_edit_idset — anchor OR merge mode) — anchor mode positions viainsert_offset_msand the renderer intersects against the host's output range; merge mode encodesstart_ms = splice_point + offset_within_insertso the visual cursor lands at the user's drop point, which legitimately putsend_mspasttotalbecause the host insert extends the rendered timeline andshift_edits_for_insertsaccounts for the expansion. The validator branches oninside_insert_edit_id(not on the overlap mode), so future modes added toinsert_overlap_modesinherit the exception automatically; (b)AssetService.validate_edit_boundsinasset/service.pyis a pure function takingduration_ms: int | Nonedirectly from the caller'srequire_asset_uploadedreturn — simpler shape because the ceiling is a single scalar already fetched upstream; ©validate_overrides_against_projectindraft/utils.pyreuses theget_project_total_duration_mshelper to bounds-check everyEditOverride.{start_ms, end_ms}inDraftCreate/DraftUpdate/ExportCreatepayloads, andEditService.{create, update}inline-checksEdit.asset_offset_msagainstasset_file["duration_ms"]threaded from_verify_asset_uploaded(changed to return the joined asset_file dict so callers don't re-query); (d)EditService._verify_inside_insert_anchorinedit/service.pyboundsEdit.insert_offset_msagainst the anchored insert's duration (anchor["end_ms"] - anchor["start_ms"]) at create + update time. Same shape as (b): scalar ceiling fetched via a single CRUD read, inline comparison; the FK target check (anchor exists, same project,visual_mode='insert') lives in the same helper because they're all preconditions for a valid render-time anchor. The pattern to copy is "service-layer bounds validation against a computed ceiling," not a specific helper module — reach for a helper when the check is non-trivial or reused across services (a, c), inline when the value is already in hand and the check is a single comparison (b-subset, c-inline). Zero-ceiling state (EMPTYin the multi-row case) rejects rather than skipping — otherwise an attacker creates the offset first and materializes the ceiling later.start_msis bounds-checked independently fromend_mswhen the cross-field validator cannot fire (e.g.,EditOverride.end_ms=Nonecase — transitivity doesn't closestart_ms ≤ total). Audit verified complete 2026-04-21: every client-facing_msoffset field on a public schema has a service-layer bounds check; worker-internal_msschemas (workers/render/schemas.py,workers/analysis/**/schemas.py,workers/download/schemas.py,workers/asset_edit/schemas.py) are constructed from server-owned data, not network payloads, and are outside this convention's scope. - Freeze configuration at the queue boundary - Any configuration value that feeds a background worker and depends on user tier (watermark-free, export retention, feature gates) MUST be snapshotted onto the queued record at service-layer kickoff. Workers read the snapshot;
resolve_tier_context()must NEVER be called from inside a worker task. Rationale: workers run minutes-to-hours after the request; if tier state changes in that window (subscription expired, upgrade processed), live re-resolution silently produces the wrong answer for whichever end of the transition the user is on. Enforcement:rg "resolve_tier_context" backend/src/workers/MUST return zero hits. Exception mechanism: a surviving call site that is genuinely observational (logging, metrics, diagnostics — not a policy decision) must carry an inline# noqa: tier-in-worker — justification: <reason>comment so future audits can distinguish deliberate-but-rare exceptions from overlooked regressions. Any unannotated grep hit is a finding. Shipped instances: Fix 9 (expires_atat create, not completion), Fix 11 (watermark_requiredsnapshot, not re-resolved), Fix 12 (analysis mode/settings frozen at kickoff). The same boundary applies to the request-scoped locale ContextVar. The_current_localeContextVar set byLocaleMiddlewareexists only inside the request's task; worker processes start withDEFAULT_LOCALEand never see the originating user's pick. Workers MUST NOT callt_current(key)frommodules/common/i18n/catalogs.py— silently produces English for every user. Snapshot the locale onto the task arg at service-layer kickoff (next towatermark_required/tier_contextsnapshots) and callt(locale, key)explicitly inside the task. Enforcement:rg "t_current\(" backend/src/workers/MUST return zero hits. Same exception annotation as above if a genuinely observational call site needs to bypass. - Validate against resolved metadata, not just identifier shape - When a client-supplied identifier resolves into a new entity (YouTube URL → ClipFile after fetching video metadata, asset UUID → UserAsset after DB lookup, etc.), downstream resource limits and cap checks MUST run against the resolved entity's properties, not against the identifier alone. Identifier-only validation (URL is well-formed, UUID parses, string length in range) is necessary but not sufficient: a tampered client can send a valid-looking identifier whose resolved properties violate policy. Shipped instances: Fix 4 (Edit bounds against project duration — applies to all edit types except INSERT-mode asset edits, which extend the timeline at render and are bounded by a duration cap instead; see Convention #11 instance (a)), Fix 16 (AssetEditRequest bounds against asset duration), Fix 5 (YouTube duration cap at import time, after
fetch_video_info). Related to Convention #11 but distinct: #11 is about client-supplied _ms offsets already in the request; this convention is about metadata the server fetches to resolve an identifier. - Worker processes require explicit instrumentation setup - TaskIQ worker processes do NOT inherit Logfire auto-instrumentation from the web process. Each worker must call
instrument_taskiq(service_name=...)at startup, andconfigure_broker_lifecycle(broker, service_name, ...)must pass a distinctservice.nameper queue (sapari-{email,analysis,render,download,proxy,asset-edit}). Enforcement:service_nameis a non-default positional-or-keyword argument onconfigure_broker_lifecycle— adding a broker without it fails at call time (TypeError) rather than silently inheriting a generic name and losing worker-scoped observability. Rationale: worker processes run in separate Python interpreters (different TaskIQ worker pools), so the web process'sconfigure_logfire()call has no effect on them. Without the per-broker call, workers emit zero spans, which silently breaks Logfire dashboards and makes worker-side incidents invisible. Per-instrumentor on/off policy (both web and workers) lives indocs/operations/monitoring.md§Auto-instrumentor defaults: pydantic-ai + SQLAlchemy + FastAPI ON, Redis OFF. The env vars (LOGFIRE_INSTRUMENT_{SQLALCHEMY,REDIS}for web,..._WORKERSfor workers) encode this — don't flip defaults without updating the matrix in that doc. - Manual span naming follows the six-category taxonomy - Every manually emitted
logfire.span(...)must fit one of six categories: (a)<worker>.pipelinefor DAG invocation parents, (b)step.<step_id>for pipeline steps (emitted automatically byfastroai.LogfireTracer— don't hand-write), ©taskiq.<task_name>for task-level parents (emitted automatically byLogfireSpanMiddleware— don't hand-write), (d)<module>.<method>for service-layer operations (credits.reserve,payment.handle_webhook), (e)ext.<service>.<op>for outbound calls (ext.whisper.transcribe,ext.ffmpeg.render,ext.stripe.modify_subscription), (f)sse.<event>for SSE publish paths. Everyext.*span carries a domain correlation key at entry (clip_file_uuid,subscription_id, etc.); every<module>.<method>span carries the primary identifier at entry and an outcome post-attr viaspan.set_attribute()at exit. Cron tasks addlabels={"task_type": "cron"}to their decorator, which the middleware copies onto the span astask_type="cron". Rationale: the taxonomy keeps Logfire filters, saved queries, and ad-hoc grouping legible across the codebase — new spans slot into existing dashboards instead of forcing per-query regex munging. Full table with examples indocs/operations/monitoring.md§Span taxonomy. -
Database sessions are for short-lived database work — A session does not span slow IO of any kind, regardless of how it was acquired (
Depends(async_session)parameter on a route,TaskiqDepends(get_db_session)parameter on a worker task,async with local_session()opened manually). The rule is about session scope, not acquisition path. Slow IO is anything that runs longer than a single-digit number of seconds in the worst case: subprocess calls (FFmpeg, yt-dlp, ffprobe at scale), large R2 transfers, third-party LLM / transcription / payment APIs, anything wrapped inasyncio.to_thread()orloop.run_in_executor(). The Postgres / Neon idle-connection timeout is the failure mode that surfaces this most reliably (a session held across an 8-min FFmpeg run dies; the post-IO write fails withasyncpg.InterfaceError: connection is closed— observed in staging 2026-04-30 ongenerate_clip_proxy), but the rule applies universally — a session pinned across slow IO also blocks pool checkout for other requests. Two implementation shapes that satisfy the rule, pick whichever reads cleaner: (1) Three-phase task body — open a session, do reads, close; run slow IO with no session held; open a fresh session, do writes, close. Suitable when the structure is "read, do IO, write." Shipped:import_youtube(route). (2) Per-operation helpers with internal sessions — each DB operation lives in a small named helper (get_clip_context,mark_clip_failed,_finalize_proxy,_finalize_clip_artifacts,_finalize_youtube_download,update_asset_records_replace,_load_export,_finalize_export,_mark_run_failed, …) that openslocal_session()internally and closes before returning. Task bodies, step bodies, and pipelineDepsclasses do NOT take session parameters or open session blocks. Suitable for tasks more complex than three phases (multiple read-IO-read cycles, pipeline architectures with many steps), and the default shape we converge on going forward. Shipped (workers):generate_clip_proxy,process_clip_artifacts,download_youtube_video,download_youtube_asset,edit_asset(linear);render_export,analyze_project(pipeline-architecture). Event-driven waits also follow this shape:wait_for_audio_extraction(workers/analysis/tasks.py) opens no session across itsasync for event in subscribe_to_project(...)loop — everyClipReadyEventre-invokes theload_pending_audio_clip_file_idshelper, which owns its ownlocal_session(). The wait blocks on Redis pubsub (no session held) and re-reads on each ping; see Convention #18 for the "events are wake-ups, DB is truth" pattern this codifies. Helper modules:workers/download/context.py,workers/render/data.py,workers/analysis/data.py. Aspirational enforcement now achievable:rg "TaskiqDepends\(get_db_session\)" backend/src/workers/returns zero hits, andrg "deps\.db" backend/src/workers/likewise — the dependency-injected-session anti-pattern is structurally absent from the worker tree. Streaming endpoints (StreamingResponsefor SSE, large file streams, long polls) MUST acquire their session manually viaasync with local_session()and release it before returning the response — takingDepends(async_session)orDepends(get_current_user)as a route parameter holds the session for the entire EventSource lifetime under FastAPI 0.118+'sDepends(... yield ...)semantics, exhaustingPOSTGRES_POOL_SIZE=50(+ 20 overflow = 70 ceiling) at 70 concurrent SSE clients. Authentication for streaming endpoints prefers Redis-only checks (Depends(get_session_from_cookie)+SessionData.is_active); fall back to a manual session lookup only when an ownership/permission check needs DB state. Shipped streaming instances:stream_project_events(manual session for the per-project ownership check),stream_notification_eventsandstream_asset_events(zero DB hit —SessionDataalone).# SLOW_IOdiscovery aid: any helper that performs slow IO carries a# SLOW_IOcomment marker on its definition line — when reviewing a worker task body or any code that holds a session (rare under shape 2; common under shape 1), grep its callees forSLOW_IOto see whether the session scope is wrong. Candidate signals for a new marker: spawns a subprocess (FFmpeg, yt-dlp, ffprobe), wraps a synchronous call inasyncio.to_thread()orloop.run_in_executor()(the wrap is the signal — you only offload to a thread when the sync call is slow enough to block the event loop), transfers a file > ~10 MB to/from R2/S3, calls a third-party LLM / transcription / payment API. Reviewer-driven for now; CI promotion follows once all worker tasks are migrated. Enforcement greps (MUST return zero hits):rg "async def stream_.*Depends\(async_session\)" backend/src/interfaces/api/v1/,rg "async def stream_.*Depends\(get_current_user\)" backend/src/interfaces/api/v1/,rg "TaskiqDepends\(get_db_session\)" backend/src/workers/,rg "deps\.db" backend/src/workers/— all four are now structurally enforceable. -
Package
__init__.pyfiles MUST NOT eagerly import theirtaskssubmodule —modules/X/__init__.pydoes NOT containfrom . import tasksorfrom .tasks import …at module top level. Task functions live inmodules/X/tasks.py; the broker entry that needs them imports the submodule explicitly. The email broker's entry atinfrastructure/taskiq/worker.pydoesfrom ...modules.email import tasks # noqa: F401(patterned on its existing imports ofcommon,email,export,payment,user); per-broker workers (workers/<name>/tasks.py) register their tasks directly when their package is loaded bypython -m taskiq worker. Why:infrastructure/taskiq/__init__.pyrunsfrom . import appas a side-effect at line 10, andapp.pyimportsworkers/shared/context.py(the per-processWorkerContextsingleton).context.pyimports severalmodules/X/service.pyto construct singletons (today:modules.email.service.EmailService). Resolvingmodules.X.servicerunsmodules/X/__init__.pyfirst; if that file eagerly importstasks, andtasks.pyimports frominfrastructure.taskiq(every broker-registering tasks file does), Python is now mid-loadinginfrastructure.taskiqfrom line 10 —email_brokerand friends are not yet bound, and the inner import raisesImportError: cannot import name 'email_broker' from partially initialized module. Surfaced 2026-05-09 in staging the moment the WorkerContext landed:sapari-taskiq-schedulercrash-looped 72× (container died on every startup); the email / asset-edit / download / proxy worker containers reportedUpwithRestartCount=0but their inner taskiqworker-Nsubprocesses crash-looped under taskiq's process manager — visible only by reading the worker logs (Process worker-0 restarted with pid …over and over). Analysis and render survived only because their package__init__.pyhappened to loadmodules.emailvia a different transitive path before first touchinginfrastructure.taskiq. The fix removed the eagerfrom . import tasksfrommodules/email/__init__.py(the only module with the pattern at the time); this convention exists so the next module addition doesn't re-introduce the trap. How to apply: keep newmodules/X/__init__.pyfiles free of any.tasksimport — schemas, models, service, crud, enums only. Tasks register via the broker's worker entry, not via package init. The rule generalizes: any submodule whose top-level imports includeinfrastructure.taskiq(directly or transitively) should not be eagerly imported from a package__init__.pythat can be reached duringinfrastructure.taskiq.__init__'s own initialization. Enforcement (MUST return zero hits):grep -lE "^from \. import tasks$|^from \.tasks " backend/src/modules/*/__init__.py. Wired into theLintingCI job as a final step so a regression fails the PR before merge. -
Events are wake-ups; the database is truth — Any in-process wait that subscribes to Redis pubsub to detect external progress (
async for event in subscribe_to_project(...)or equivalent) MUST treat each received event as a re-check trigger, NOT as the carrier of authoritative state. On every relevant event, re-query the DB via a small helper that owns its own session (Convention #16 shape 2). Rationale: Redis pubsub has no buffering (events published beforepubsub.subscribe()lands are gone), no replay, and no guaranteed delivery; payload schemas drift; andhasattr/ optional-field guards on the consumer side silently swallow mismatches. Treating the event as a payload-bearing message makes every one of those failure modes a silent hang. Surfaced 2026-05-13 inwait_for_audio_extraction: the wait dereferencedevent.clip_file_uuidbehind ahasattrguard, butClipReadyEventonly carriesclip_uuid(the per-project Clip UUID, not the ClipFile UUID) — thehasattrreturned False on every event, the pending set never drained, and analysis runs sat inpendingfor 8+ minutes before the task-level timeout fired. The fix re-readsload_pending_audio_clip_file_ids(project_uuid)from DB on everyClipReadyEventand proceeds when it returns empty, regardless of event payload shape. How to apply: when adding a new event-driven wait, the event handler does ONE thing — call a DB-truth helper. The helper owns the session, the wait holds none. Never branch on event payload fields. Enforcement greps: any newasync for event in subscribe_to_project(...)should be reviewed against this convention;rg "hasattr\(event," backend/src/workers/returns zero hits (the antipattern guard). Snapshot-on-connect is the read-side complement: SSE consumers that connect mid-pipeline can miss events published beforepubsub.subscribe()lands.EventPublisher.publishcaches the latestAnalysisProgressEventto a TTL'd Redis string key (progress_snapshot:<project_uuid>, TTL 3600s);subscribe_with_keepaliveyields the cached frame on connect before entering the live loop. Same principle: don't trust pubsub to be load-bearing; back it with a DB or cache read. -
User-facing strings flow through the i18n catalog — Any service-layer
raisewhose message is surfaced to the client MUST come from a translation key (t_current("namespace.key", **interp)frommodules/common/i18n/catalogs.py), not an inline string literal. "Surfaced to the client" is defined byEXCEPTION_MAPPING[ExcClass].use_message_when_presentinmodules/common/constants.py:TrueforValidationError,PermissionDeniedError,InsufficientCreditsError,UserExistsError,MultipartCompleteError,MultipartAbortError— those raise-site messages reach the user verbatim.FalseforResourceNotFoundError,ResourceExistsError,UserNotFoundError,TierNotFoundError,RateLimitNotFoundError,MultipartUploadNotFoundError— those messages are masked by the catalog default at error-handler time, so raise sites can stay as English debug strings without leaking. Every new translation key MUST be added to all three locale catalogs (en.py,pt.py,es.py); missing siblings fall back to English at lookup, which is silent translation drift. Workers MUST NOT callt_current()(see Convention #12) — they take a snapshottedlocaletask arg at kickoff and callt(locale, key)explicitly. How to apply: when adding araise <SurfacedExc>(...)to a service, (1) addservice.error_name: "message"toen.py,pt.py,es.py(translate, do not paste English into pt/es), (2) replace the raise withraise <SurfacedExc>(t_current("service.error_name", **interp)). Reuse cross-service keys when the message is identical and the namespace fits (error.superuser_required,project.access_denied). Enforcement greps (MUST return zero hits inbackend/src/modules/):rg 'raise (ValidationError\|PermissionDeniedError\|InsufficientCreditsError\|UserExistsError\|MultipartCompleteError\|MultipartAbortError)\("' backend/src/modules/and the\(f"variant. Frontend-facing copy (SPA components, landing site) goes through the i18next catalog (frontend/shared/lib/i18n.ts,landing/src/lib/i18n.ts) — same principle, separate enforcement. Phase 2 has substantively landed (the SPA's 19 namespace catalogs are populated and active; landing'sen/pt/espage tree is shipped); ongoing gaps are slice-by-slice (a handful of hardcoded English literals remain infeatures/admin/components/Payments.tsxemail templates and similar admin-internal corners). When the backend writes a frozen English string to a DB column that's surfaced to the user (e.g.EntitlementTransaction.description,Edit.reason_tag), the canonical pattern is a small backend-raw → i18n-key map maintained on the frontend rendering surface — seedocs/development/frontend.md§Localizing system-generated DB strings. -
Cookie
Max-Agemust match its server-side storage TTL — When a token is stored both as a client cookie and a server-side Redis entry (CSRF tokens, session IDs, session-derived caches), the cookieMax-Ageand the storageexpirationMUST come from the same TTL value, OR the read-path MUST self-heal on storage miss. IfMax-Age > storage TTL, the cookie outlives the server entry and every subsequent request 403s on validation until the user clears the cookie (typically by logging out). Surfaced 2026-05-13:SESSION_COOKIE_MAX_AGE = 86400(24h) vs CSRF storage TTL =SESSION_TIMEOUT_MINUTES * 60 = 1800s(30m) drift. Idle users came back from lunch and every mutation 403'd silently — pre-fix,/auth/check-authskipped CSRF regeneration whenever the cookie was present without verifying the stored entry existed. The shipped fix validates the cookie against storage oncheck_authand regenerates + overwrites on a miss (auth.py:608-625). How to apply: when introducing a new dual-store token, either pin both TTLs to the same constant, or make the cookie's read path self-heal —if cookie_value and not storage.has(cookie_value): regenerate(). Both are acceptable; pick the one that matches the desired UX (single TTL = strict lockout after idle, self-heal = transparent recovery). Document the choice inline at the cookie-set call site. Second instance — remember-me sessions:SessionManager.timeout_seconds_for(metadata)is the single TTL source for a session whose login setremember_me— it returnsSESSION_REMEMBER_ME_DAYSinstead ofSESSION_TIMEOUT_MINUTES, and is read at validation, on every activity-drivenstorage.update(expiration=…)renewal, by the cleanup sweep, and for the CSRF token's storage TTL + cookieMax-Age. Before this, only the initial session cookie + Redis TTL were extended, so the session still died at the default idle window and the CSRF token's storage TTL decayed to the default on first activity — the 30-day persistence wasn't real.
Rate Limiting¶
Rate limits use a RateLimit dataclass defined in infrastructure/auth/constants.py:
from infrastructure.auth.constants import RATE_LIMIT_SIGNUP, RateLimit
# Usage in route handlers:
await _check_rate_limit(request, "signup", RATE_LIMIT_SIGNUP)
Token expiry durations are also in infrastructure/auth/constants.py:
Credit Architecture¶
Credits are built on the entitlement system. EntitlementTransaction is the authoritative ledger; quantity_used on UserEntitlement is a rebuildable cache. Every credit event (grant, usage, reset, refund) writes to both atomically.
Ledger Pattern¶
EntitlementTransaction (authoritative, append-only)
├── GRANT — credits added (purchase, subscription, beta)
├── USAGE — credits consumed (one per entitlement drained)
├── RESET — period boundary marker (RENEWABLE credits, amount=0)
└── REFUND — credits returned
UserEntitlement.quantity_used (cache, rebuildable)
└── Updated in the same db.commit() as the transaction record
rebuild_user_balance() recomputes quantity_used from the ledger at any time. Admin endpoint: POST /credits/admin/rebuild/{user_id}.
Lazy Evaluation¶
Two things happen lazily inside get_user_balances(), before the balance is computed:
-
_materialize_ungranted_payments()— Finds payments withstatus=SUCCEEDED, entitlement_granted=False. Creates credit entitlements via atomic CAS (UPDATE ... WHERE entitlement_granted = false). The webhook is a thin fact-recorder; credits materialize on first balance read. -
_reset_expired_renewable_periods()— For RENEWABLE entitlements (beta credits), checks ifperiod_start_at + renewal_period_days < now. If expired, resetsquantity_used=0, writes a RESET transaction, clears stale reservations.
Both are self-healing: if the system is down when a webhook fires, the next balance read catches up.
Payment Flow¶
Stripe webhook fires
→ handle_checkout_session_completed()
→ Creates Payment record (status=SUCCEEDED, entitlement_granted=False)
→ Grants TIER_ACCESS eagerly (user needs dashboard access immediately)
→ Credits are NOT granted here (deferred)
→ Returns 200
User opens dashboard
→ GET /users/me/billing
→ get_user_balances()
→ _materialize_ungranted_payments() ← credits created here
→ _reset_expired_renewable_periods() ← RENEWABLE reset here
→ Returns balance with credits
Credit Metering¶
Analysis endpoints check credits before processing:
- Estimate —
ProjectService.estimate_analysis_credits()sums clip durations (ceiling to minutes), applies mode multiplier - Reserve —
CreditService.reserve_credits()stamps reservation metadata (includesreserved_periodfor RENEWABLE entitlements) - Consume — Worker calls
CreditService.use_credits()→consume_entitlement()drain loop writes oneEntitlementTransaction(USAGE)per entitlement drained, all withcommit=False, singledb.commit() - Release — On failure,
CreditService.release_credits()clears reservation metadata
Consumption Ordering¶
When credits are consumed, entitlements are drained in priority order:
- RENEWABLE first (use-it-or-lose-it — beta credits)
- DECREMENTAL by expiry (expiring soonest first, then non-expiring)
- ACCUMULATIVE last
This means beta credits are always consumed before subscription credits, and expiring promos are consumed before permanent purchases.
Analysis Modes¶
Mode is derived from settings (not explicitly selected). AnalysisMode enum in modules/analysis_run/enums.py, multipliers in CREDIT_MULTIPLIERS:
| Mode | Multiplier | Trigger |
|---|---|---|
ai_edit |
1.0 | Any cut/censorship/director feature enabled |
captions_only |
0.5 | Only language set (transcription + captions) |
manual |
0.0 | Nothing toggled (frontend doesn't call analyze) |
The analyze endpoint rejects a captions_only request that carries any ai_edit setting (non-zero pacing_level / false_start_sensitivity, non-empty director_notes, non-none censorship) with a 422 via ProjectService.validate_analysis_mode_consistency. This keeps analysis_mode meaningful for metrics and prevents the "0.5× credits for full pipeline" bypass a tampered client would otherwise get. The worker itself is not mode-gated beyond AI Director asset fetching, so the API-layer validation is the authoritative gate. Mode stored on AnalysisRun for audit.
Beta Entitlement¶
Beta users get creator-tier access + 240 renewable AI minutes/month. Managed via BetaService in entitlement/beta.py:
from modules.entitlement.beta import BetaService
beta = BetaService()
await beta.grant(user_id, db) # TIER_ACCESS + RENEWABLE CREDIT_GRANT
await beta.revoke(user_id, db) # Soft-deletes both by reference_id
await beta.has_access(user_id, db) # Checks active TIER_ACCESS with BETA reason
await beta.bulk_grant(user_ids, db) # One tier lookup, bulk duplicate check, single commit
await beta.list_users(db) # All beta users with credit usage (SQL join)
Admin invite flow: POST /admin/beta/invite takes an email. If the user exists, grants beta immediately and sends a welcome email. If not, creates a signed JWT token (24h expiry, purpose=beta_invite) and sends an invite email with a signup link.
The signup link delivers the token via ?beta_invite=TOKEN on the landing URL. Both signup paths consume it:
- Email/password signup (POST /users/?beta_invite=TOKEN) verifies the token and grants beta after account creation on email match.
- Google OAuth signup — the frontend forwards ?beta_invite=TOKEN to GET /auth/oauth/google?beta_invite=TOKEN; the backend persists it on OAuthState and the callback (/auth/oauth/callback/google) verifies + grants on match. Grant failures are non-fatal to OAuth login (logged, not raised).
Email matching is canonical, not byte-exact. The comparison goes through modules.common.utils.email.canonical_email() on both sides: lowercase entire address, strip +tag aliasing for any domain, strip dots in the local part for @gmail.com and @googlemail.com (other providers treat dots as distinct mailboxes — do not generalize). Without this, an invite to User.Name@gmail.com silently fails to grant beta to username@gmail.com even though Google routes both to the same mailbox.
Beta entitlements stack with subscriptions. resolve_tier_context() picks the highest-ranked tier. Credits from both pools sum. Beta credits are consumed first (RENEWABLE before DECREMENTAL).
Thumbnails¶
Asset and project thumbnails are generated via FFmpeg (modules/asset/thumbnail.py) and stored in R2 as thumbnail_key. The thumbnail_url is served inline in list responses (not separate endpoints) — minting is a local operation (~1ms, no network call).
- Assets: Generated in the
process_asset_artifactsworker (workers/download/tasks.py) for video/image types — confirm_upload only flips status toprocessing; the worker handles ffprobe + waveform + thumbnail off the request path (Convention #16). Center-cropped to 240x240 JPEG. URL is a JWT-fronted Cloudflare Worker URL (MediaTokenService.mint), same path as asset video playback. Unifies asset-playback and asset-thumbnail under a single auth model. - Projects: Generated from first clip in
process_clip_artifacts()worker. Only if project has no thumbnail yet. URL is a presigned R2 URL (storage_client.generate_download_url) — project thumbnails were not migrated to the Worker path. - Enrichment:
_enrich_with_thumbnail_urls()in AssetService (instance method, mints viaMediaTokenService),_enrich_projects_with_thumbnail_urls()module-level in ProjectService (presigned). Called in all list/get/confirm return paths.
Vertical Crop¶
Optional crop for non-native aspect ratio exports. CropRegion schema (draft/schemas.py) stores normalized 0-1 coordinates. build_crop_filter() (workers/render/ffmpeg/video_filters.py) converts to FFmpeg crop=w:h:x:y filter with even-dimension rounding for H.264 chroma alignment.
Crop is injected in all 3 render paths (trim-only, asset composite, B-roll composite) after setpts and before scale+pad. For B-roll two-pass rendering, crop is applied in Pass 1 (trim) to avoid double-cropping. Source dimensions probed via get_video_dimensions() at render time.
Per-asset crop reframe (issue #235). REPLACE / INSERT video and image asset edits get their own opt-in cover-crop, independent of the main-video crop above. Schema fields on Edit: asset_crop_enabled (bool), asset_crop_zoom (∈ [1.0, 10.0]), asset_crop_pan_x / asset_crop_pan_y (∈ [-1.0, 1.0]) — all NOT NULL with defaults (migration f8a2d4c7b193). Source dims travel on AssetFile.width / AssetFile.height, populated by _probe_asset_media (migration a4c9e2b6f085); they remain NULL for audio and pre-migration legacy assets. The renderer's build_video_replace_segments calls _maybe_crop_chain per asset segment — when crop is disabled, target dims unknown, or source dims missing, it returns None and the segment takes today's letterbox scale_part. Otherwise it writes a crop_chain of scale=<cover>,crop=<region>,scale=<exact target>,setsar=1 onto the VideoReplaceSegment (workers/render/ffmpeg/concat/segments.py). _compute_crop_region (workers/render/ffmpeg/video_filters.py) is the Python port of frontend/shared/lib/cropUtils.ts:computeCropRegion; the two implementations must stay in lockstep so preview matches export. Encoding is UI-state (zoom + pan) rather than a frozen CropRegion {x, y, w, h} because the crop window must auto-adapt when the user changes project aspect — frozen render-state would silently re-letterbox on the new target.
Tier Enforcement¶
Tier-specific limits are centralized in UserTierContext — a frozen dataclass resolved once per request via resolve_tier_context(). Properties: max_projects, can_use_ai_director, can_watermark_free, can_access_support, storage_quota_mb, export_retention_days.
from modules.entitlement.tier_context import resolve_tier_context
# In API routes (via DI dependency) — the ONLY place resolve_tier_context
# is called. Snapshot the decision onto the queued record if a worker needs it.
tier_ctx = await resolve_tier_context(user_id, db)
if tier_ctx.max_projects <= project_count:
raise ValidationError("Project limit reached")
# For worker-bound work: snapshot at request time (Convention #12).
export = await crud_exports.create(
db=db,
object=ExportCreateInternal(
...,
watermark_required=not tier_ctx.can_watermark_free, # snapshot
),
)
# In workers: read from the snapshot on the queued record, NEVER
# re-resolve tier_ctx. The user's tier may have changed between
# kickoff and execution; the snapshot preserves intent.
if export["watermark_required"]:
apply_watermark(...)
See Convention #12 for the full rationale + inline # noqa: tier-in-worker exception mechanism for the rare observational case.
Resolution Logic¶
resolve_tier_context() picks the highest-ranked tier across all active tier entitlements (not the first one). Key behaviors:
tier_name: From the highest-ranked tier (free=0, hobby=1, creator=2, viral=3)is_paid: True if ANY entitlement has a paid grant_reason (subscription,purchase,enterprise_contract) — across all entitlements, not just the winnergrant_reasons: Frozenset of every active entitlement's grant reason. Callers check set membership (e.g.{"beta"} & tier_ctx.grant_reasons) rather than a scalar winner — the previous scalar field was unreliable onTIER_RANKties (e.g. TRIAL+BETA both on creator), wheremax()could pick TRIAL and hide the beta entitlementcan_watermark_free: True ifis_paidand tier inWATERMARK_FREE_TIERS, OR ifgrant_reasonsintersectsWATERMARK_FREE_GRANT_REASONS(includesbeta)
This means a beta+hobby user gets creator features (beta wins on tier rank), is_paid=True (from hobby), and watermark-free (beta in grant_reasons).
Constants in modules/entitlement/constants.py: TIER_MAX_PROJECTS, TIER_STORAGE_MB, TIER_EXPORT_RETENTION_DAYS, AI_DIRECTOR_TIERS, TIER_RANK, PAID_GRANT_REASONS, WATERMARK_FREE_GRANT_REASONS.
Unit conversions use BYTES_PER_MB from modules/common/constants.py (e.g., quota_bytes = tier_ctx.storage_quota_mb * BYTES_PER_MB).
Priority Queue¶
Task brokers use RabbitMQ with native priority queues. User-facing tasks (analysis, render, asset edit) are enqueued with a priority derived from the user's subscription tier. Non-user-facing tasks (email, download) use plain FIFO.
Priority Mapping¶
| Tier | is_paid |
Priority | Constant |
|---|---|---|---|
| viral | true | 3 | TaskPriority.HIGH |
| creator | true | 2 | TaskPriority.NORMAL |
| hobby | true | 1 | TaskPriority.LOW |
| trial / free / none | false | 0 | TaskPriority.BACKGROUND |
Enqueue Pattern¶
from infrastructure.taskiq.priority import resolve_task_priority
from modules.entitlement.tier_context import resolve_tier_context
tier_ctx = await resolve_tier_context(user_id, db)
priority = resolve_task_priority(tier_ctx.tier_name, tier_ctx.is_paid)
await analyze_project.kicker().with_labels(priority=int(priority)).kiq(
project_uuid=str(project_uuid),
)
Priority is resolved at enqueue time — if a user upgrades, their next task gets the new priority. Tasks already in the queue keep their original priority.
Queue Configuration¶
Priority queues use QueueType.CLASSIC with max_priority=3 and qos=2 (low prefetch for strict ordering). Brokers and constants are in infrastructure/taskiq/brokers.py and infrastructure/taskiq/priority.py.
Feature Flags¶
Postgres-backed feature flags for kill switches, gradual rollouts, beta testing, and time-limited features. No external dependencies.
Models¶
FeatureFlag — the flag itself (key, enabled, rollout_percentage, description, expires_at). FeatureFlagUser — explicit allowlist (user always gets the flag).
Evaluation Order¶
enabled=False→ off (kill switch — overrides everything)- User in allowlist → on (bypasses expiry and percentage)
expires_atin the past → offrollout_percentageset → deterministic hash checkrollout_percentagenull → on (everyone)
Route Gating¶
Use require_feature() dependency to gate endpoints behind flags. Returns 404 (not 403) to hide existence:
from modules.feature_flag.dependencies import require_feature
@router.post("/new-endpoint")
async def new_endpoint(
...,
_: Annotated[None, Depends(require_feature("my_flag"))],
):
# Only reachable when flag is on for this user
Frontend¶
useFeatureFlag("key") returns a boolean backed by GET /api/v1/flags (cached 5 minutes). The backend is the authority — even if someone inspects the response, the route dependency blocks access.
Admin Endpoints¶
GET/POST/PATCH/DELETE /api/v1/admin/flags + allowlist management via /admin/flags/{id}/users. Superuser only.
Scripts¶
| Script | Purpose | How to run |
|---|---|---|
scripts/setup_initial_data.py |
Create tables + seed tiers | Auto-runs on docker compose up |
scripts/seed_all.py |
Tables + tiers + Stripe products (full setup) | Auto-runs on docker compose up |
scripts/seed_stripe_products.py |
Stripe products + prices only | Called by seed_all.py if STRIPE_SECRET_KEY is set |
scripts/seed_trial_credits.py |
Grant trial credits to a user | uv run python scripts/seed_trial_credits.py --email user@example.com |
Key Files¶
| Purpose | Location |
|---|---|
| Database session | infrastructure/database/session.py |
| Model mixins | infrastructure/database/models.py |
| Auth dependencies | infrastructure/auth/session/dependencies.py |
| Auth constants | infrastructure/auth/constants.py |
| Common schemas | modules/common/schemas.py |
| Domain exceptions | modules/common/exceptions.py |
Exception → HTTP mapping (HTTPMapping + EXCEPTION_MAPPING) |
modules/common/constants.py |
| Error handler | modules/common/utils/error_handler.py |
i18n catalogs + t() lookup + t_current() raise-site helper |
modules/common/i18n/{catalogs.py,en.py,pt.py,es.py} |
i18n request-locale ContextVar (get_current_locale / set_current_locale) |
modules/common/i18n/context.py |
i18n LocaleMiddleware (pure-ASGI; reads Accept-Language → ContextVar) |
modules/common/i18n/middleware.py |
| API dependencies | interfaces/api/dependencies.py |
| Route registration | interfaces/api/v1/__init__.py |
| Credit service + ledger | modules/credits/service.py |
| Entitlement transaction model | modules/credits/models.py |
| Trial granting | modules/entitlement/trial.py |
| Beta service | modules/entitlement/beta.py |
| Beta admin API | interfaces/api/v1/beta.py |
| Tier context resolution | modules/entitlement/tier_context.py |
| Entitlement constants | modules/entitlement/constants.py |
| Broker setup | infrastructure/taskiq/brokers.py |
| Priority routing | infrastructure/taskiq/priority.py |
| Feature flags | modules/feature_flag/service.py |
| Feature flag dependency | modules/feature_flag/dependencies.py |