emit.run

WebSockets

Real-time job and space event streams

WebSockets

Subscribe to real-time events for a single job or an entire space. Events are pushed as JSON messages over a persistent WebSocket connection.

Auth: API key via ?token= query param.

  • GET /api/v1/realtime/:jobId: token is optional. If omitted, emit.run opens an anonymous progress-only stream.
  • GET /api/v1/realtime/spaces/:spaceId: token is required.

Required scopes:

EndpointMinimum scope
GET /api/v1/realtime/:jobIdnone (anonymous progress-only), or jobs:read:progress with ?token=
GET /api/v1/realtime/spaces/:spaceIdjobs:read

Scope-Based Data Filtering

Your token's scope controls what data is included in events. This lets you safely hand tokens to end users without exposing job internals.

ScopeEvents delivered
jobs:read:progressinit (last known progress, if any), progress, and completed (without data)
jobs:readFull event stream with all payloads

With jobs:read:progress, clients receive progress updates plus a completion signal. Result/error payloads and all other event types are withheld. Use jobs:read if you need full completion payloads, failure details, kill events, or log append signals.

Space scoping is enforced: a space-scoped token can only connect to its own space's stream and to jobs within that space. Connecting to a job in a different space returns 403.

Client-Facing Apps: Direct Job Stream

For client-facing progress bars, you can connect directly to a job stream without a token.

Anonymous job streams are always progress-scoped:

  • They can only subscribe to individual jobs
  • They receive init, progress, and completed (without data)
  • They cannot access space-wide streams or mutate anything
// Backend: create a job, return the job ID to your client
const res = await fetch(`${API}/spaces/${spaceId}/jobs`, {
  method: "POST",
  headers: { "x-api-key": PRODUCER_KEY, "Content-Type": "application/json" },
  body: JSON.stringify({ name: "generate-report", payload: { reportId: 42 } }),
});
const job = await res.json();

// Frontend: connect directly (anonymous progress scope)
const ws = new WebSocket(`wss://emit.run/api/v1/realtime/${job.id}`);

ws.onmessage = (e) => {
  const event = JSON.parse(e.data);
  if (event.type === "init" && event.progress) {
    renderProgressBar(event.progress.percent);
  } else if (event.type === "progress") {
    renderProgressBar(event.data.percent); // full progress data included
  }
};

For failure or kill updates, use webhooks or have your backend send a final status update.


Job Stream

GET /api/v1/realtime/:jobId

Connect to a single job's event stream. You receive an init event immediately on connect — full status with jobs:read, or last known progress (if any) with jobs:read:progress — then live events as they occur.

ParamRequiredDescription
tokenNoOptional API key with jobs:read:progress or higher. Omit to use anonymous progress scope.
const url = new URL(`wss://emit.run/api/v1/realtime/${jobId}`);
if (apiKey) {
  url.searchParams.set("token", apiKey);
}
const ws = new WebSocket(url.toString());

ws.onmessage = (e) => {
  const event = JSON.parse(e.data);

  switch (event.type) {
    case "init":
      if (event.progress) {
        console.log("Last progress:", event.progress);
      } else {
        console.log("Current status:", event.status);
      }
      break;
    case "progress":
      console.log("Progress:", event.data); // e.g. { percent, message, subProgress? }
      break;
    case "completed":
      console.log("Done:", event.data); // full payload only with jobs:read
      ws.close();
      break;
    case "dead":
      console.error("Failed:", event.data); // full payload only with jobs:read
      ws.close();
      break;
    case "killed":
      console.warn("Stopped:", event.data); // full payload only with jobs:read
      ws.close();
      break;
    case "retried":
      console.log("Retrying, attempt:", event.data?.attemptNumber); // jobs:read only
      break;
  }
};
import asyncio
import json
import websockets

async def watch_job(job_id: str, api_key: str | None = None):
    uri = f"wss://emit.run/api/v1/realtime/{job_id}"
    if api_key:
        uri = f"{uri}?token={api_key}"

    async with websockets.connect(uri) as ws:
        async for message in ws:
            event = json.loads(message)

            match event["type"]:
                case "init":
                    if event.get("progress"):
                        print("Last progress:", event["progress"])
                    else:
                        print("Current status:", event["status"])
                case "progress":
                    print("Progress:", event.get("data"))
                case "completed":
                    print("Done:", event.get("data"))  # jobs:read only
                    break
                case "dead":
                    print("Failed:", event.get("data"))  # jobs:read only
                    break
                case "killed":
                    print("Stopped:", event.get("data"))  # jobs:read only
                    break
                case "retried":
                    print("Retrying, attempt:", event.get("data", {}).get("attemptNumber"))  # jobs:read only

asyncio.run(watch_job("01JLQX..."))
# websocat is a curl-like tool for WebSockets
websocat "wss://emit.run/api/v1/realtime/$JOB_ID"

Event reference

Full-scope events (jobs:read) share this shape:

{
  "jobId": "01JLQX...",
  "jobName": "process-video",
  "type": "progress",
  "status": "running",
  "data": {
    "percent": 65,
    "message": "Encoding video",
    "subProgress": {
      "download": { "percent": 100, "message": "Source fetched" },
      "transcode": { "percent": 42, "message": "Pass 1/2" }
    }
  },
  "timestamp": "2025-02-24T10:32:00.000Z"
}

Progress-scope events (jobs:read:progress) include progress data plus completion status:

{
  "jobId": "01JLQX...",
  "type": "progress",
  "data": {
    "percent": 65,
    "message": "Encoding video",
    "subProgress": {
      "download": { "percent": 100, "message": "Source fetched" },
      "transcode": { "percent": 42, "message": "Pass 1/2" }
    }
  },
  "timestamp": "2025-02-24T10:32:00.000Z"
}
TypeWhendata fieldjobs:read:progress
initImmediately on connect— (current status only)Delivered (includes progress if available)
progressWorker sent a progress update{ percent, message, subProgress? }Delivered
deliveredJob polled by a workerNot delivered
ackedWorker acknowledged; job is now runningNot delivered
checkpointWorker stored a checkpointWorker-defined payloadNot delivered
eventWorker stored a custom debug/admin eventWorker-defined payloadNot delivered
logs_appendedWorker appended one or more logs via POST /jobs/:id/logs{ fromSeq, toSeq, count }Not delivered
completedJob completed successfullyResult payload from workerDelivered (without data)
retriedJob failed, being re-queued{ error, attemptNumber }Not delivered
deadJob failed with no retries remaining{ error }Not delivered
killedJob force-stopped before completion{ reason }Not delivered
delivery_timeoutWorker polled but never acked in time; reverted to pendingNot delivered

With jobs:read:progress, only init, progress, and completed are delivered. completed omits data.

logs_appended is intentionally lightweight: it signals that logs were appended, and includes fromSeq/toSeq so clients can fetch exact ranges from GET /jobs/:id/logs.

Message samples

Sent immediately on connect. With jobs:read, it includes current status; with jobs:read:progress, it includes the last known progress (if any).

{
  "jobId": "01JLQX...",
  "type": "init",
  "status": "running",
  "timestamp": "2025-02-24T10:31:05.000Z"
}

Sent each time the worker calls the progress endpoint. data always includes percent and message, plus optional subProgress keyed by stage name.

{
  "jobId": "01JLQX...",
  "jobName": "process-video",
  "type": "progress",
  "status": "running",
  "data": {
    "percent": 65,
    "message": "Encoding video",
    "subProgress": {
      "download": { "percent": 100, "message": "Source fetched" },
      "transcode": { "percent": 42, "message": "Pass 1/2" }
    }
  },
  "timestamp": "2025-02-24T10:32:00.000Z"
}

Sent when the worker marks the job complete. data contains the result payload with jobs:read; jobs:read:progress clients receive the same event without data.

{
  "jobId": "01JLQX...",
  "jobName": "process-video",
  "type": "completed",
  "status": "completed",
  "data": { "output_url": "s3://bucket/output.mp4", "duration_ms": 12340 },
  "timestamp": "2025-02-24T10:34:20.000Z"
}

Sent when the job fails with no retries remaining. data contains the final error. Delivered only with jobs:read.

{
  "jobId": "01JLQX...",
  "jobName": "process-video",
  "type": "dead",
  "status": "dead",
  "data": { "error": "FFmpeg exited with code 1: out of memory" },
  "timestamp": "2025-02-24T10:42:00.000Z"
}

Sent when a job is force-stopped before completion. Delivered only with jobs:read.

{
  "jobId": "01JLQX...",
  "jobName": "process-video",
  "type": "killed",
  "status": "killed",
  "data": { "reason": "manual stop" },
  "timestamp": "2025-02-24T10:36:00.000Z"
}

Sent when a job fails but still has retries remaining. data includes the error and attempt number. Delivered only with jobs:read.

{
  "jobId": "01JLQX...",
  "jobName": "process-video",
  "type": "retried",
  "status": "pending",
  "data": { "error": "Connection timeout", "attemptNumber": 1 },
  "timestamp": "2025-02-24T10:33:10.000Z"
}

See also: Space Stream for space-wide event streaming, reconnect strategies, and common patterns.

On this page