Monitoring¶
This page covers observability for Sapari in production.
Logging¶
We use structured logging via Logfire (Pydantic's observability platform). Logs include:
- Request/response for API calls
- Task execution for workers
- Error tracebacks with context
from src.infrastructure.logging import get_logger
logger = get_logger()
logger.info("Processing clip", clip_uuid=str(clip.uuid), status=clip.status)
Metrics¶
Key metrics to monitor:
| Metric | What to Watch |
|---|---|
| API latency (p99) | Should be < 500ms for most endpoints |
| RabbitMQ queue depth | Growing queue = workers can't keep up (check per priority level) |
| Task duration | Transcription ~1-2min per minute of video |
| Error rate | Spike = something broken |
| Storage usage | R2 bucket sizes |
Alerts¶
Set up alerts for:
- API error rate > 1%
- Task failure rate > 5%
- RabbitMQ queue depth > 100 for any broker (check via management API at
:15672) - RabbitMQ consumer count drops to 0 for any queue
- Database connection pool exhaustion
- Redis memory > 80% (cache + sessions + result backend)
Tracing¶
Logfire provides distributed tracing across API requests and background tasks. Each request/task gets a trace ID that links:
- API handler execution
- Database queries
- External API calls (Whisper, LLM)
- Task queue operations
Per-worker service names¶
Each process reports a distinct service.name so Logfire UI can filter spans by worker role. The FastAPI web process is sapari-api; each TaskIQ worker gets its own name driven by startup_taskiq_worker(state, service_name):
| Process | service.name |
Configured in |
|---|---|---|
| FastAPI web | sapari-api |
backend/.env → LOGFIRE_SERVICE_NAME |
| Email worker | sapari-email |
infrastructure/taskiq/app.py:80 |
| Analysis worker | sapari-analysis |
infrastructure/taskiq/app.py:81 |
| Render worker | sapari-render |
infrastructure/taskiq/app.py:82 |
| Download worker | sapari-download |
infrastructure/taskiq/app.py:83 |
| Proxy worker | sapari-proxy |
infrastructure/taskiq/app.py:84 |
| Asset-edit worker | sapari-asset-edit |
infrastructure/taskiq/app.py:85 |
| Scheduler | sapari-scheduler |
infrastructure/taskiq/scheduler.py |
Add service.name != 'sapari-api' to any Logfire query to scope to worker traffic. Filter to a specific worker via service.name = 'sapari-render' for per-queue latency investigations.
Span taxonomy (six categories)¶
Manual spans follow a six-category naming convention so Logfire filters, saved queries, and ad-hoc grouping stay legible. Every new manual span must fit one of these:
| Category | Pattern | Example | When to use |
|---|---|---|---|
| Pipeline parent | <worker>.pipeline |
analysis.pipeline, render.pipeline |
Wraps the full DAG invocation inside a task body. Has project_uuid / export_uuid correlation keys. |
| Pipeline step | step.<step_id> (auto) |
step.download_clips, step.apply_cuts |
Emitted automatically by fastroai.LogfireTracer — don't hand-write. |
| Task | taskiq.<task_name> (auto) |
taskiq.analyze_project, taskiq.cleanup_expired_exports |
Emitted automatically by LogfireSpanMiddleware — don't hand-write. Cron tasks carry task_type="cron" attribute (see Convention #15). |
| Service-layer | <module>.<method> |
credits.reserve, entitlement.grant, payment.handle_webhook |
Wrap business operations that span multiple CRUD calls, so per-operation latency is queryable distinct from any single SQL query. |
| External call | ext.<service>.<op> |
ext.whisper.transcribe, ext.ffmpeg.render, ext.ytdlp.download, ext.stripe.modify_subscription |
Wrap every outbound call to a non-owned service so its p99 can be isolated from surrounding app code. |
| Cron | — | uses taskiq.<task_name> with task_type="cron" attr |
Saved query: filter task_type = 'cron' to isolate scheduled runs from on-demand queue traffic. |
| SSE | sse.<event> |
sse.publish, sse.clip_ready |
Wrap publish paths so event-emission latency is distinct from the originating task. |
Attribute conventions. Every ext.* span should carry a domain correlation key at span entry (clip_file_uuid, asset_file_uuid, user_asset_uuid, subscription_id, etc.) so a slow call can be traced back to the originating resource without reading surrounding code. <module>.<method> spans carry the operation's primary identifier (user_id, event_type, project_uuid) plus a post-attr via span.set_attribute() that captures the outcome (reserved=true/false, entitlements_drained=N, payments_materialized=N).
Why manual spans over pure auto-instrumentation. Logfire auto-instruments SQLAlchemy, Redis, Pydantic AI, and FastAPI. Those are always on and produce the "zoom in" view. Manual spans produce the "zoom out" view — the thing a dashboard wants ("p99 of credits.reserve") rather than the raw SQL or Redis calls that composed it. Auto-instrumentation is necessary but not sufficient for app-level SLO work.
Auto-instrumentor defaults¶
Not every Logfire auto-instrumentor earns its keep. Current defaults:
| Auto-instrumentor | Web (sapari-api) |
Workers (sapari-*) |
Env var | Rationale |
|---|---|---|---|---|
instrument_pydantic_ai |
✅ ON | ✅ ON | LOGFIRE_INSTRUMENT_PYDANTIC_AI |
Highest-value auto-signal. LLM calls are the biggest unit cost in the app — every invocation needs token counts, cost, model, prompt hash for debugging + billing reconciliation. Keep on unconditionally. |
instrument_sqlalchemy |
✅ ON | ✅ ON | LOGFIRE_INSTRUMENT_SQLALCHEMY / LOGFIRE_INSTRUMENT_SQLALCHEMY_WORKERS |
Per-query spans let you debug "why is this endpoint p99 slow" without adding manual spans during the incident. Neon's query insights cover aggregate patterns, but don't correlate back to user-visible request latency. Kept on despite span-volume cost because the debugging value wins during incidents. |
instrument_redis |
❌ OFF | ❌ OFF | LOGFIRE_INSTRUMENT_REDIS / LOGFIRE_INSTRUMENT_REDIS_WORKERS |
Redis ops in Sapari are uniformly sub-millisecond (cache, session, rate-limit, pub/sub). Real-world problems surface at the request-span level before they're visible at the Redis-op level. Span volume dominates (Redis is called per request × multiple ops) without proportional signal. Turn on temporarily via env var for Redis-specific investigations. |
instrument_fastapi |
✅ ON | N/A | (always on when app is instrumented) | One span per HTTP request is the backbone — this is what every other span hangs off. |
Rationale for keeping SQLAlchemy despite the cost: when staging hit an OOM + 100% CPU incident (2026-04-23), the time-to-diagnose was dominated by not having per-query visibility into what the hot worker was doing. Neon showed nothing unusual (the bug was in a pubsub spin loop, not a query). SQL spans wouldn't have caught that one specifically, but the adjacent question "is a runaway query involved" is the first thing you ask when CPU pins, and having SQL spans present answers it in 30 seconds instead of 30 minutes. That asymmetry justifies the span volume.
Rationale for turning Redis off: the SSE subscriber spin-loop bug (fixed 2026-04-23) was emitting ~50-100k Redis get_message spans per second per open SSE connection. Redis instrumentation was effectively amplifying an already-pathological CPU pattern with per-call span overhead. Redis ops are sufficiently cheap and uniform that production issues surface at the request-level span first; there's no class of bug where "per-Redis-op span" is the load-bearing signal.
How to flip them temporarily. The env vars map to the on/off toggle. For a focused investigation:
# On the staging or prod server, edit .env:
LOGFIRE_INSTRUMENT_REDIS=true
LOGFIRE_INSTRUMENT_REDIS_WORKERS=true
# Restart backend + affected workers
docker compose up -d sapari-backend sapari-taskiq-<worker>
Remember to flip back after the investigation.
Media proxy observability (R2 Stage 5)¶
Two instrumented signals feed the R2 Stage 5 dashboards. See OBSERVABILITY.md for the full plan.
Backend mint latency (Logfire)¶
MediaTokenService.mint() is wrapped in a logfire.span("media_token.mint", ...) with attributes bucket, user_id, kid, expires_in. Span timing captures latency automatically. Exceptions raised inside the span are recorded as the span's error.
Saved queries to create in the Logfire UI (Workspaces → Saved queries):
| Query name | Filter | Purpose |
|---|---|---|
media_token.mint p50/p95/p99 (24h) |
span_name = 'media_token.mint', time = last 24h |
Stage 5 mint-latency gate |
media_token.mint error rate (24h) |
span_name = 'media_token.mint' AND status = 'error' |
Spikes = mint path broken |
Plan decision rule (OBSERVABILITY.md Phase 6): if p99 mint < 500ms AND p99 Worker verify < 100ms, drop PROXY_URL.REFRESH_BUFFER_SECONDS from 30 to 15.
Worker reject-reason logs (structured JSON)¶
/media/v1/* rejections emit single-line JSON to the Worker log stream. Shape:
{"type": "media_verify_rejected", "reason": "expired", "url": "https://staging.sapari.io/media/v1/<redacted>"}
Reasons are an enum (worker/src/constants.ts:RejectReason):
- malformed — JWT header couldn't be parsed.
- unknown_kid — kid missing or not in the active registry.
- expired — token expired. Expected on long sessions; frontend refreshes via 401.
- invalid_signature — signature verification failed or issuer/claims mismatch. Real attacks land here — investigate if rate exceeds a trickle.
- unknown_bucket — bkt claim doesn't resolve to a known R2 binding. Config drift.
- r2_miss — token verified but R2 has no object at key. Often a deleted-file + dangling-DB-reference bug.
To aggregate by reason on staging:
wrangler tail --env staging --format json \
| jq -c 'select(.logs[]?.message[0] | fromjson? | .type == "media_verify_rejected")' \
| jq -c '.logs[].message[0] | fromjson' \
| jq -s 'group_by(.reason) | map({reason: .[0].reason, count: length})'
Tail for 24h after a deploy, then diff against the cf-analytics-*.md baseline — the sum of counts should ≈ the baseline's 4xx. If a large fraction lands in invalid_signature or r2_miss, root-cause before Stage 6.
Key Files¶
| Component | Location |
|---|---|
| Logging setup | backend/src/infrastructure/logging/__init__.py |
| Logfire config | backend/src/infrastructure/observability/__init__.py |
| Media token span | backend/src/infrastructure/media_proxy/service.py:mint |
| Worker reject enum | worker/src/constants.ts:RejectReason |
| Worker reject log | worker/src/media.ts:logRejected |