Events (SSE)¶
Workers publish events to Redis pub/sub. The API streams them to clients via Server-Sent Events. This gives real-time updates without polling.
How It Works¶
sequenceDiagram
participant Worker
participant Redis
participant SSE as SSE Endpoint
participant Browser as EventSource
Worker->>Redis: PUBLISH project:{uuid}:events
Redis->>SSE: Message received
SSE->>Browser: event: ANALYSIS_COMPLETE<br/>data: {...}
Browser->>Browser: Invalidate React Query cache
Each project has its own channel: project:{uuid}:events. When a worker finishes a step, it publishes an event. The SSE endpoint subscribes to that channel and forwards events to connected clients.
Late-subscribe replay. Redis pub/sub has no buffering — any event published before the subscriber's pubsub.subscribe() call lands is lost forever. For the analysis pipeline this surfaces as a "user clicks Analyze, opens the editor, and only sees the last step" race because early ANALYSIS_PROGRESS events fire before the EventSource connects. The publisher mitigates this by caching the latest AnalysisProgressEvent per project to a TTL'd Redis string key (progress_snapshot:{project_uuid}, 1-hour TTL) on every publish, and clearing it on ANALYSIS_COMPLETE / ANALYSIS_FAILED. subscribe_with_keepalive reads the snapshot via read_progress_snapshot immediately after pubsub.subscribe() and yields it as the first frame, so late subscribers see the current pipeline step without waiting for the next step boundary. The mechanism is intentionally analysis-only — other events (clip / export) either fire at well-defined moments (the user is watching) or the consumer polls as a fallback.
Event Flow Architecture¶
flowchart TB
subgraph Workers["Background Workers"]
DW[Download Worker]
PW[Proxy Worker]
AW[Analysis Worker]
RW[Render Worker]
end
subgraph Redis["Redis"]
CH1[project:abc:events]
CH2[project:xyz:events]
end
subgraph API["FastAPI"]
SSE1[SSE /abc/events]
SSE2[SSE /xyz/events]
end
subgraph Clients["Browser Clients"]
C1[User A viewing project abc]
C2[User B viewing project xyz]
end
DW -->|Publish| CH1
PW -->|Publish| CH1
AW -->|Publish| CH1
RW -->|Publish| CH2
CH1 -->|Subscribe| SSE1
CH2 -->|Subscribe| SSE2
SSE1 -->|Stream| C1
SSE2 -->|Stream| C2
The publisher is a singleton that lazily connects to Redis:
from src.infrastructure.events import get_event_publisher
publisher = get_event_publisher()
await publisher.publish(
project_uuid=project_uuid,
event=AnalysisCompleteEvent(edit_count=42),
)
Event Types¶
Events are defined in backend/src/infrastructure/events/schemas.py. Each has a type string and payload:
flowchart LR
subgraph Clip["Clip Events"]
CP[CLIP_PROCESSING]
CR[CLIP_READY]
CF[CLIP_FAILED]
end
subgraph Analysis["Analysis Events"]
AS[ANALYSIS_STARTED]
AP[ANALYSIS_PROGRESS]
AC[ANALYSIS_COMPLETE]
AF[ANALYSIS_FAILED]
end
subgraph Export["Export Events"]
ES[EXPORT_STARTED]
EP[EXPORT_PROGRESS]
EC[EXPORT_COMPLETE]
EF[EXPORT_FAILED]
end
Clip Events¶
| Event | When | Payload |
|---|---|---|
CLIP_PROCESSING |
Download started | clip_uuid |
CLIP_READY |
Processing complete | clip_uuid, duration_ms |
CLIP_FAILED |
Processing failed | clip_uuid, error |
Analysis Events¶
| Event | When | Payload |
|---|---|---|
ANALYSIS_STARTED |
Pipeline begins | project_uuid |
ANALYSIS_PROGRESS |
Step completed | step, progress |
ANALYSIS_COMPLETE |
All edits created | edit_count |
ANALYSIS_FAILED |
Pipeline failed | error |
Export Events¶
| Event | When | Payload |
|---|---|---|
EXPORT_STARTED |
Render begins | export_uuid |
EXPORT_PROGRESS |
Step boundary reached (5% / 30% / 85%) | progress (0-100), message |
EXPORT_COMPLETE |
Ready to download | export_uuid, storage_key |
EXPORT_FAILED |
Render failed | export_uuid, error |
SSE Connection Lifecycle¶
stateDiagram-v2
[*] --> Connecting: new EventSource()
Connecting --> Connected: onopen
Connecting --> Error: onerror
Connected --> Receiving: event received
Receiving --> Connected: process event
Connected --> Reconnecting: connection lost
Reconnecting --> Connected: auto-reconnect
Error --> Reconnecting: retry
Connected --> [*]: close()
SSE Endpoint¶
The endpoint is at GET /api/v1/projects/{project_uuid}/events. It returns a StreamingResponse with text/event-stream content type.
@router.get("/{project_uuid}/events")
async def stream_project_events(
project_uuid: UUID,
session_data: SessionData | None = Depends(get_session_from_cookie),
):
# Auth from the Redis-backed session — no DB hit.
if session_data is None or not session_data.is_active:
raise UnauthorizedException("Not authenticated")
# Acquire a session manually for the ownership check, then release it
# before returning the StreamingResponse. A `Depends(async_session)`
# parameter would hold the pool slot for the entire EventSource lifetime.
async with local_session() as db:
user = await crud_users.get(db=db, id=session_data.user_id, is_deleted=False)
if user is None:
raise UnauthorizedException("Not authenticated")
project = await project_service.get_by_uuid(project_uuid, db)
if project["user_id"] != user["id"]:
raise ForbiddenException(...)
return StreamingResponse(
event_subscriber.subscribe(str(project_uuid)),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
Connection-pool note. SSE responses live as long as the client EventSource. Taking a DB-bearing dependency (Depends(async_session), Depends(get_current_user)) would hold a pool slot for the entire stream, which exhausts the pool under load. The fix is to acquire a session manually for the auth/ownership window and release it before returning — the streaming generator only talks to Redis. Notification and asset SSE endpoints skip the DB entirely and auth from SessionData.is_active alone (soft-deleted users don't generate events upstream, so the is_deleted=False re-check isn't worth a pool slot for an SSE handshake). The project SSE keeps the DB lookup because the per-project ownership check is load-bearing.
The subscriber is an async generator that yields SSE-formatted strings:
It also sends keepalive comments every 15 seconds to prevent connection timeouts.
Frontend Integration¶
flowchart TB
subgraph Hook["useProjectEvents Hook"]
ES[EventSource] --> H[Event Handlers]
H --> I[Invalidate Queries]
end
subgraph Queries["React Query"]
CQ[useClips]
EQ[useEdits]
XQ[useExports]
end
I -->|clip_ready| CQ
I -->|analysis_complete| EQ
I -->|export_complete| XQ
CQ --> UI[UI Re-render]
EQ --> UI
XQ --> UI
The frontend uses EventSource to connect:
const eventSource = new EventSource(
`/api/v1/projects/${projectUuid}/events`,
{ withCredentials: true }
);
eventSource.addEventListener('CLIP_READY', (e) => {
const data = JSON.parse(e.data);
queryClient.invalidateQueries(['clips', projectUuid]);
});
eventSource.addEventListener('ANALYSIS_COMPLETE', (e) => {
const data = JSON.parse(e.data);
queryClient.invalidateQueries(['edits', projectUuid]);
});
When events arrive, we invalidate React Query caches so the UI refetches fresh data. This keeps the UI in sync without manual polling.
Polling Fallback¶
Not everything uses SSE. For simpler cases, we use React Query's refetchInterval to poll while a resource is pending:
flowchart LR
subgraph Polling["Polling Pattern"]
Q[useQuery] -->|refetchInterval| API[GET /resource]
API -->|status: pending| Q
API -->|status: complete| STOP[Stop polling]
end
When to Use Each¶
| Pattern | Use Case | Examples |
|---|---|---|
| SSE | Real-time critical, user is actively watching | Clip processing, Analysis pipeline |
| Polling | Background process, simpler implementation | Exports, Asset downloads |
Polling Implementation¶
// hooks/useAssets.ts
export function useAssets(groupId?: string) {
return useQuery({
queryKey: assetKeys.list(groupId),
queryFn: () => assetApi.list(groupId),
// Poll every 3s while assets are downloading
refetchInterval: (query) => {
const assets = query.state.data?.data;
if (!assets) return false;
const hasPending = assets.some((a) => a.status === 'pending');
return hasPending ? 3000 : false;
},
});
}
Current Usage¶
| Feature | Pattern | Why |
|---|---|---|
| Clips | SSE | User watches upload progress in real-time |
| Analysis | SSE | User watches analysis steps in real-time |
| Exports | Polling | Background render, user checks back later |
| Assets | Polling | YouTube downloads, user imports and continues working |
Key Files¶
| Component | Location |
|---|---|
| Event schemas | backend/src/infrastructure/events/schemas.py |
| Publisher | backend/src/infrastructure/events/publisher.py |
| Subscriber | backend/src/infrastructure/events/subscriber.py |
| SSE endpoint | backend/src/interfaces/api/v1/projects.py |
| Frontend SSE client | frontend/shared/lib/events.ts |
| Frontend hook | frontend/features/projects/hooks.ts |