Streaming & SSE
Thallus uses Server-Sent Events (SSE) to stream real-time progress updates during chat queries. Instead of waiting for a complete response, your client receives a stream of events as the system plans, runs agents, calls tools, and generates the final answer.
How streaming works
The streaming flow has three steps: start a task, subscribe to progress events, then fetch the final result.
1. Start a task
POST /api/v1/chat/stream
Authorization: Bearer <token>
Content-Type: application/json
{
"message": "What were our top-selling products last quarter?",
"conversation_id": "optional-uuid",
"mode": "auto"
}
The server responds immediately with a task ID:
{
"task_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "processing"
}
Only one task can be active per user at a time. Starting a new task while one is running returns a 409 Conflict.
2. Subscribe to progress
Open an SSE connection to receive real-time updates:
GET /api/v1/chat/{task_id}/progress?last_id=0
Authorization: Bearer <token>
The last_id parameter controls where to start reading. Use 0 for the beginning, or the last received stream ID for reconnection.
3. Fetch the result
After receiving a complete event, fetch the final response:
GET /api/v1/chat/{task_id}/result
Authorization: Bearer <token>
{
"task_id": "550e8400-...",
"status": "completed",
"response": "Based on your sales data...",
"citations": [...],
"conversation_id": "..."
}
SSE wire format
Each event follows the standard SSE format with an id field (the stream position) and a data field containing JSON:
id: 1705331200000-0
data: {"task_id":"550e8400-...","update_type":"agent_started","message":"Running research agent","event_id":"uuid","timestamp":"2026-01-15T10:00:00Z","agent":"research_agent","depth":0}
The server sends periodic heartbeat events during idle periods to keep the connection alive. Heartbeat events should be ignored by your client.
Events are stored server-side with a retention window after task completion, enabling reconnection and replay.
Event types
Progress events are grouped into six categories. Every event includes the base fields: task_id, update_type, message, event_id, and timestamp. Additional fields vary by event type.
Planning events
| Event | Description | Additional fields |
|---|---|---|
plan_created |
Execution plan generated | details.agents (list of agent names) |
plan_executing |
Plan execution started | plan_id, plan_version |
plan_completed |
All plan steps finished | plan_id |
replan_triggered |
Plan revised mid-execution | details.reason, plan_version |
Agent events
| Event | Description | Additional fields |
|---|---|---|
agent_queued |
Agent waiting to run | agent |
agent_started |
Agent began execution | agent, agent_id, depth, started_at |
agent_progress |
Intermediate status update | agent, message |
agent_completed |
Agent finished successfully | agent, duration_ms, completed_at |
agent_failed |
Agent encountered an error | agent, details.error |
agent_spawned |
Agent created a child agent | agent, parent_agent_id, depth |
Tool events
| Event | Description | Additional fields |
|---|---|---|
tool_started |
Tool execution began | agent, details.tool |
tool_completed |
Tool finished | agent, details.tool, duration_ms |
tool_failed |
Tool error | agent, details.tool, details.error |
Parallel execution events
| Event | Description | Additional fields |
|---|---|---|
parallel_group_started |
Multiple agents started together | parallel_group_id, parallel_group_size, details.agent_ids |
parallel_group_completed |
All parallel agents finished | parallel_group_id |
Streaming events
| Event | Description | Additional fields |
|---|---|---|
text_delta |
Incremental text chunk of the final response | message (the text to append) |
General events
| Event | Description | Additional fields |
|---|---|---|
status |
General status message | — |
thinking |
Agent reasoning (optional) | — |
error |
Non-fatal error | details.error |
complete |
Task fully complete (terminal) | completed_at, details.final, details.response_preview |
hypotheses_updated |
Investigation hypotheses changed | details.hypotheses |
The complete event is the terminal event. When your client receives it, close the SSE connection and fetch the final result.
Event payload fields
Every event includes these base fields:
| Field | Type | Description |
|---|---|---|
task_id |
string | The task this event belongs to |
update_type |
string | One of the event types listed above |
message |
string | Human-readable status message |
event_id |
string | Unique UUID for deduplication |
timestamp |
string | ISO 8601 UTC timestamp |
schema_version |
int | Payload schema version (currently 1) |
Context fields (present when applicable):
| Field | Type | Description |
|---|---|---|
agent |
string | Agent name (e.g., research_agent) |
agent_id |
string | Unique agent execution ID |
parent_agent_id |
string | Parent agent ID (for spawned agents) |
depth |
int | Nesting depth (0 = root agent) |
plan_id |
string | Current plan identifier |
plan_version |
int | Increments on each replan |
parallel_group_id |
string | Group ID for parallel agent batches |
parallel_group_size |
int | Number of agents in the parallel group |
started_at |
string | ISO 8601 start timestamp |
completed_at |
string | ISO 8601 completion timestamp |
duration_ms |
int | Execution time in milliseconds |
details |
object | Event-specific additional data |
Client example
This example uses a fetch-based SSE library to connect with a Bearer token (standard EventSource does not support custom headers):
import { fetchEventSource } from "fetch-event-source-library";
const taskId = "550e8400-...";
let lastEventId = "0";
const seenEvents = new Set();
await fetchEventSource(`/api/v1/chat/${taskId}/progress?last_id=${lastEventId}`, {
headers: {
Authorization: `Bearer ${accessToken}`,
},
onmessage(event) {
if (event.event === "heartbeat") return;
const data = JSON.parse(event.data);
// Deduplicate on reconnection
if (seenEvents.has(data.event_id)) return;
seenEvents.add(data.event_id);
// Track position for reconnection
lastEventId = event.id;
if (data.update_type === "text_delta") {
appendToResponse(data.message);
} else if (data.update_type === "complete") {
fetchResult(taskId);
}
},
onerror(err) {
// Return to trigger reconnection with backoff
// fetch-event-source handles this automatically
},
});
Reconnection
If the SSE connection drops, reconnect by passing the last received stream ID in the last_id query parameter. The server replays any events you missed.
Recommended strategy
- Maximum attempts: 10
- Backoff: Exponential — 1s, 2s, 4s, 8s, 16s, 30s (capped at 30s)
- Resume from: The
idfield of the last received SSE event
Connection states
Deduplication
On reconnection, you may receive events that your client already processed. Use the event_id field (a UUID unique to each event) to deduplicate. The recommended approach is an LRU cache of recent event IDs with a cap around 500 entries.
Recovery endpoints
If your client loses context entirely (e.g., page refresh), use these endpoints to recover:
| Method | Endpoint | Description |
|---|---|---|
| GET | /api/v1/chat/active |
Check for an active or recently completed task |
| GET | /api/v1/chat/{task_id}/progress/state |
Get all events for a task as JSON (not SSE) |
| GET | /api/v1/chat/{task_id}/result |
Get the final result after completion |
Page refresh recovery flow
- Call
GET /chat/activeto check for in-flight tasks - If
active_taskis present, reconnect SSE to that task ID - If
completed_taskis present, fetch the result directly - If neither, no recovery needed
Limits
| Limit | Value |
|---|---|
| SSE connections per user | 5 concurrent |
| Events per stream | ~1,000 (approximate cap) |
Related pages
- Core API Endpoints — full endpoint reference including the chat streaming flow
- Error Handling — error responses during streaming (401, 404, 429)
- Rate Limiting — SSE connection limits and request throttling
- How Orchestration Works — understanding plan and agent execution stages