WebSockets
Real-time job and space event streams
WebSockets
Subscribe to real-time events for a single job or an entire space. Events are pushed as JSON messages over a persistent WebSocket connection.
Auth: API key via ?token= query param.
GET /api/v1/realtime/:jobId: token is optional. If omitted, emit.run opens an anonymous progress-only stream.GET /api/v1/realtime/spaces/:spaceId: token is required.
Required scopes:
| Endpoint | Minimum scope |
|---|---|
GET /api/v1/realtime/:jobId | none (anonymous progress-only), or jobs:read:progress with ?token= |
GET /api/v1/realtime/spaces/:spaceId | jobs:read |
Scope-Based Data Filtering
Your token's scope controls what data is included in events. This lets you safely hand tokens to end users without exposing job internals.
| Scope | Events delivered |
|---|---|
jobs:read:progress | init (last known progress, if any), progress, and completed (without data) |
jobs:read | Full event stream with all payloads |
With jobs:read:progress, clients receive progress updates plus a completion signal. Result/error payloads and all other event types are withheld. Use jobs:read if you need full completion payloads, failure details, kill events, or log append signals.
Space scoping is enforced: a space-scoped token can only connect to its own space's stream and to jobs within that space. Connecting to a job in a different space returns 403.
Client-Facing Apps: Direct Job Stream
For client-facing progress bars, you can connect directly to a job stream without a token.
Anonymous job streams are always progress-scoped:
- They can only subscribe to individual jobs
- They receive
init,progress, andcompleted(withoutdata) - They cannot access space-wide streams or mutate anything
// Backend: create a job, return the job ID to your client
const res = await fetch(`${API}/spaces/${spaceId}/jobs`, {
method: "POST",
headers: { "x-api-key": PRODUCER_KEY, "Content-Type": "application/json" },
body: JSON.stringify({ name: "generate-report", payload: { reportId: 42 } }),
});
const job = await res.json();
// Frontend: connect directly (anonymous progress scope)
const ws = new WebSocket(`wss://emit.run/api/v1/realtime/${job.id}`);
ws.onmessage = (e) => {
const event = JSON.parse(e.data);
if (event.type === "init" && event.progress) {
renderProgressBar(event.progress.percent);
} else if (event.type === "progress") {
renderProgressBar(event.data.percent); // full progress data included
}
};For failure or kill updates, use webhooks or have your backend send a final status update.
Job Stream
GET /api/v1/realtime/:jobId
Connect to a single job's event stream. You receive an init event immediately on connect — full status with jobs:read, or last known progress (if any) with jobs:read:progress — then live events as they occur.
| Param | Required | Description |
|---|---|---|
token | No | Optional API key with jobs:read:progress or higher. Omit to use anonymous progress scope. |
const url = new URL(`wss://emit.run/api/v1/realtime/${jobId}`);
if (apiKey) {
url.searchParams.set("token", apiKey);
}
const ws = new WebSocket(url.toString());
ws.onmessage = (e) => {
const event = JSON.parse(e.data);
switch (event.type) {
case "init":
if (event.progress) {
console.log("Last progress:", event.progress);
} else {
console.log("Current status:", event.status);
}
break;
case "progress":
console.log("Progress:", event.data); // e.g. { percent, message, subProgress? }
break;
case "completed":
console.log("Done:", event.data); // full payload only with jobs:read
ws.close();
break;
case "dead":
console.error("Failed:", event.data); // full payload only with jobs:read
ws.close();
break;
case "killed":
console.warn("Stopped:", event.data); // full payload only with jobs:read
ws.close();
break;
case "retried":
console.log("Retrying, attempt:", event.data?.attemptNumber); // jobs:read only
break;
}
};import asyncio
import json
import websockets
async def watch_job(job_id: str, api_key: str | None = None):
uri = f"wss://emit.run/api/v1/realtime/{job_id}"
if api_key:
uri = f"{uri}?token={api_key}"
async with websockets.connect(uri) as ws:
async for message in ws:
event = json.loads(message)
match event["type"]:
case "init":
if event.get("progress"):
print("Last progress:", event["progress"])
else:
print("Current status:", event["status"])
case "progress":
print("Progress:", event.get("data"))
case "completed":
print("Done:", event.get("data")) # jobs:read only
break
case "dead":
print("Failed:", event.get("data")) # jobs:read only
break
case "killed":
print("Stopped:", event.get("data")) # jobs:read only
break
case "retried":
print("Retrying, attempt:", event.get("data", {}).get("attemptNumber")) # jobs:read only
asyncio.run(watch_job("01JLQX..."))# websocat is a curl-like tool for WebSockets
websocat "wss://emit.run/api/v1/realtime/$JOB_ID"Event reference
Full-scope events (jobs:read) share this shape:
{
"jobId": "01JLQX...",
"jobName": "process-video",
"type": "progress",
"status": "running",
"data": {
"percent": 65,
"message": "Encoding video",
"subProgress": {
"download": { "percent": 100, "message": "Source fetched" },
"transcode": { "percent": 42, "message": "Pass 1/2" }
}
},
"timestamp": "2025-02-24T10:32:00.000Z"
}Progress-scope events (jobs:read:progress) include progress data plus completion status:
{
"jobId": "01JLQX...",
"type": "progress",
"data": {
"percent": 65,
"message": "Encoding video",
"subProgress": {
"download": { "percent": 100, "message": "Source fetched" },
"transcode": { "percent": 42, "message": "Pass 1/2" }
}
},
"timestamp": "2025-02-24T10:32:00.000Z"
}| Type | When | data field | jobs:read:progress |
|---|---|---|---|
init | Immediately on connect | — (current status only) | Delivered (includes progress if available) |
progress | Worker sent a progress update | { percent, message, subProgress? } | Delivered |
delivered | Job polled by a worker | — | Not delivered |
acked | Worker acknowledged; job is now running | — | Not delivered |
checkpoint | Worker stored a checkpoint | Worker-defined payload | Not delivered |
event | Worker stored a custom debug/admin event | Worker-defined payload | Not delivered |
logs_appended | Worker appended one or more logs via POST /jobs/:id/logs | { fromSeq, toSeq, count } | Not delivered |
completed | Job completed successfully | Result payload from worker | Delivered (without data) |
retried | Job failed, being re-queued | { error, attemptNumber } | Not delivered |
dead | Job failed with no retries remaining | { error } | Not delivered |
killed | Job force-stopped before completion | { reason } | Not delivered |
delivery_timeout | Worker polled but never acked in time; reverted to pending | — | Not delivered |
With jobs:read:progress, only init, progress, and completed are delivered. completed omits data.
logs_appended is intentionally lightweight: it signals that logs were appended, and includes fromSeq/toSeq so clients can fetch exact ranges from GET /jobs/:id/logs.
Message samples
Sent immediately on connect. With jobs:read, it includes current status; with jobs:read:progress, it includes the last known progress (if any).
{
"jobId": "01JLQX...",
"type": "init",
"status": "running",
"timestamp": "2025-02-24T10:31:05.000Z"
}Sent each time the worker calls the progress endpoint. data always includes percent and message, plus optional subProgress keyed by stage name.
{
"jobId": "01JLQX...",
"jobName": "process-video",
"type": "progress",
"status": "running",
"data": {
"percent": 65,
"message": "Encoding video",
"subProgress": {
"download": { "percent": 100, "message": "Source fetched" },
"transcode": { "percent": 42, "message": "Pass 1/2" }
}
},
"timestamp": "2025-02-24T10:32:00.000Z"
}Sent when the worker marks the job complete. data contains the result payload with jobs:read; jobs:read:progress clients receive the same event without data.
{
"jobId": "01JLQX...",
"jobName": "process-video",
"type": "completed",
"status": "completed",
"data": { "output_url": "s3://bucket/output.mp4", "duration_ms": 12340 },
"timestamp": "2025-02-24T10:34:20.000Z"
}Sent when the job fails with no retries remaining. data contains the final error. Delivered only with jobs:read.
{
"jobId": "01JLQX...",
"jobName": "process-video",
"type": "dead",
"status": "dead",
"data": { "error": "FFmpeg exited with code 1: out of memory" },
"timestamp": "2025-02-24T10:42:00.000Z"
}Sent when a job is force-stopped before completion. Delivered only with jobs:read.
{
"jobId": "01JLQX...",
"jobName": "process-video",
"type": "killed",
"status": "killed",
"data": { "reason": "manual stop" },
"timestamp": "2025-02-24T10:36:00.000Z"
}Sent when a job fails but still has retries remaining. data includes the error and attempt number. Delivered only with jobs:read.
{
"jobId": "01JLQX...",
"jobName": "process-video",
"type": "retried",
"status": "pending",
"data": { "error": "Connection timeout", "attemptNumber": 1 },
"timestamp": "2025-02-24T10:33:10.000Z"
}See also: Space Stream for space-wide event streaming, reconnect strategies, and common patterns.