Skip to content

Events (SSE)

Workers publish events to Redis pub/sub. The API streams them to clients via Server-Sent Events. This gives real-time updates without polling.

How It Works

sequenceDiagram
    participant Worker
    participant Redis
    participant SSE as SSE Endpoint
    participant Browser as EventSource

    Worker->>Redis: PUBLISH project:{uuid}:events
    Redis->>SSE: Message received
    SSE->>Browser: event: ANALYSIS_COMPLETE<br/>data: {...}
    Browser->>Browser: Invalidate React Query cache

Each project has its own channel: project:{uuid}:events. When a worker finishes a step, it publishes an event. The SSE endpoint subscribes to that channel and forwards events to connected clients.

Event Flow Architecture

flowchart TB
    subgraph Workers["Background Workers"]
        DW[Download Worker]
        PW[Proxy Worker]
        AW[Analysis Worker]
        RW[Render Worker]
    end

    subgraph Redis["Redis"]
        CH1[project:abc:events]
        CH2[project:xyz:events]
    end

    subgraph API["FastAPI"]
        SSE1[SSE /abc/events]
        SSE2[SSE /xyz/events]
    end

    subgraph Clients["Browser Clients"]
        C1[User A viewing project abc]
        C2[User B viewing project xyz]
    end

    DW -->|Publish| CH1
    PW -->|Publish| CH1
    AW -->|Publish| CH1
    RW -->|Publish| CH2

    CH1 -->|Subscribe| SSE1
    CH2 -->|Subscribe| SSE2

    SSE1 -->|Stream| C1
    SSE2 -->|Stream| C2

The publisher is a singleton that lazily connects to Redis:

from src.infrastructure.events import get_event_publisher

publisher = get_event_publisher()
await publisher.publish(
    project_uuid=project_uuid,
    event=AnalysisCompleteEvent(edit_count=42),
)

Event Types

Events are defined in backend/src/infrastructure/events/schemas.py. Each has a type string and payload:

flowchart LR
    subgraph Clip["Clip Events"]
        CP[CLIP_PROCESSING]
        CR[CLIP_READY]
        CF[CLIP_FAILED]
    end

    subgraph Analysis["Analysis Events"]
        AS[ANALYSIS_STARTED]
        AP[ANALYSIS_PROGRESS]
        AC[ANALYSIS_COMPLETE]
        AF[ANALYSIS_FAILED]
    end

    subgraph Export["Export Events"]
        ES[EXPORT_STARTED]
        EP[EXPORT_PROGRESS]
        EC[EXPORT_COMPLETE]
        EF[EXPORT_FAILED]
    end

Clip Events

Event When Payload
CLIP_PROCESSING Download started clip_uuid
CLIP_READY Processing complete clip_uuid, duration_ms
CLIP_FAILED Processing failed clip_uuid, error

Analysis Events

Event When Payload
ANALYSIS_STARTED Pipeline begins project_uuid
ANALYSIS_PROGRESS Step completed step, progress
ANALYSIS_COMPLETE All edits created edit_count
ANALYSIS_FAILED Pipeline failed error

Export Events

Event When Payload
EXPORT_STARTED Render begins export_uuid
EXPORT_PROGRESS Step boundary reached (5% / 30% / 85%) progress (0-100), message
EXPORT_COMPLETE Ready to download export_uuid, storage_key
EXPORT_FAILED Render failed export_uuid, error

SSE Connection Lifecycle

stateDiagram-v2
    [*] --> Connecting: new EventSource()
    Connecting --> Connected: onopen
    Connecting --> Error: onerror
    Connected --> Receiving: event received
    Receiving --> Connected: process event
    Connected --> Reconnecting: connection lost
    Reconnecting --> Connected: auto-reconnect
    Error --> Reconnecting: retry
    Connected --> [*]: close()

SSE Endpoint

The endpoint is at GET /api/v1/projects/{project_uuid}/events. It returns a StreamingResponse with text/event-stream content type.

@router.get("/{project_uuid}/events")
async def stream_project_events(
    project_uuid: UUID,
    current_user: dict = Depends(get_current_user),
):
    # Verify user owns project
    project = await project_service.get(...)
    if project["user_id"] != current_user["id"]:
        raise HTTPException(403)

    return StreamingResponse(
        event_subscriber.subscribe(str(project_uuid)),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        },
    )

The subscriber is an async generator that yields SSE-formatted strings:

event: ANALYSIS_COMPLETE
data: {"edit_count": 42, "timestamp": "2024-01-15T..."}

It also sends keepalive comments every 15 seconds to prevent connection timeouts.

Frontend Integration

flowchart TB
    subgraph Hook["useProjectEvents Hook"]
        ES[EventSource] --> H[Event Handlers]
        H --> I[Invalidate Queries]
    end

    subgraph Queries["React Query"]
        CQ[useClips]
        EQ[useEdits]
        XQ[useExports]
    end

    I -->|clip_ready| CQ
    I -->|analysis_complete| EQ
    I -->|export_complete| XQ

    CQ --> UI[UI Re-render]
    EQ --> UI
    XQ --> UI

The frontend uses EventSource to connect:

const eventSource = new EventSource(
  `/api/v1/projects/${projectUuid}/events`,
  { withCredentials: true }
);

eventSource.addEventListener('CLIP_READY', (e) => {
  const data = JSON.parse(e.data);
  queryClient.invalidateQueries(['clips', projectUuid]);
});

eventSource.addEventListener('ANALYSIS_COMPLETE', (e) => {
  const data = JSON.parse(e.data);
  queryClient.invalidateQueries(['edits', projectUuid]);
});

When events arrive, we invalidate React Query caches so the UI refetches fresh data. This keeps the UI in sync without manual polling.

Polling Fallback

Not everything uses SSE. For simpler cases, we use React Query's refetchInterval to poll while a resource is pending:

flowchart LR
    subgraph Polling["Polling Pattern"]
        Q[useQuery] -->|refetchInterval| API[GET /resource]
        API -->|status: pending| Q
        API -->|status: complete| STOP[Stop polling]
    end

When to Use Each

Pattern Use Case Examples
SSE Real-time critical, user is actively watching Clip processing, Analysis pipeline
Polling Background process, simpler implementation Exports, Asset downloads

Polling Implementation

// hooks/useAssets.ts
export function useAssets(groupId?: string) {
  return useQuery({
    queryKey: assetKeys.list(groupId),
    queryFn: () => assetApi.list(groupId),
    // Poll every 3s while assets are downloading
    refetchInterval: (query) => {
      const assets = query.state.data?.data;
      if (!assets) return false;
      const hasPending = assets.some((a) => a.status === 'pending');
      return hasPending ? 3000 : false;
    },
  });
}

Current Usage

Feature Pattern Why
Clips SSE User watches upload progress in real-time
Analysis SSE User watches analysis steps in real-time
Exports Polling Background render, user checks back later
Assets Polling YouTube downloads, user imports and continues working

Key Files

Component Location
Event schemas backend/src/infrastructure/events/schemas.py
Publisher backend/src/infrastructure/events/publisher.py
Subscriber backend/src/infrastructure/events/subscriber.py
SSE endpoint backend/src/interfaces/api/v1/projects.py
Frontend SSE client frontend/shared/lib/events.ts
Frontend hook frontend/features/projects/hooks.ts

← Storage Workers →