Streaming & SSE

Thallus uses Server-Sent Events (SSE) to stream real-time progress updates during chat queries. Instead of waiting for a complete response, your client receives a stream of events as the system plans, runs agents, calls tools, and generates the final answer.


How streaming works

The streaming flow has three steps: start a task, subscribe to progress events, then fetch the final result.

POST /chat/stream
Get task_id
Open SSE
Receive Events
GET /result

1. Start a task

POST /api/v1/chat/stream
Authorization: Bearer <token>
Content-Type: application/json

{
  "message": "What were our top-selling products last quarter?",
  "conversation_id": "optional-uuid",
  "mode": "auto"
}

The server responds immediately with a task ID:

{
  "task_id": "550e8400-e29b-41d4-a716-446655440000",
  "status": "processing"
}

Only one task can be active per user at a time. Starting a new task while one is running returns a 409 Conflict.

2. Subscribe to progress

Open an SSE connection to receive real-time updates:

GET /api/v1/chat/{task_id}/progress?last_id=0
Authorization: Bearer <token>

The last_id parameter controls where to start reading. Use 0 for the beginning, or the last received stream ID for reconnection.

3. Fetch the result

After receiving a complete event, fetch the final response:

GET /api/v1/chat/{task_id}/result
Authorization: Bearer <token>
{
  "task_id": "550e8400-...",
  "status": "completed",
  "response": "Based on your sales data...",
  "citations": [...],
  "conversation_id": "..."
}

SSE wire format

Each event follows the standard SSE format with an id field (the stream position) and a data field containing JSON:

SSE WIRE FORMAT
id: 1705331200000-0 data: {"task_id":"550e8400-...","update_type":"agent_started","message":"Running research agent","event_id":"uuid","timestamp":"2026-01-15T10:00:00Z","agent":"research_agent","depth":0}

The server sends periodic heartbeat events during idle periods to keep the connection alive. Heartbeat events should be ignored by your client.

Events are stored server-side with a retention window after task completion, enabling reconnection and replay.


Event types

Progress events are grouped into six categories. Every event includes the base fields: task_id, update_type, message, event_id, and timestamp. Additional fields vary by event type.

Planning events

Event Description Additional fields
plan_created Execution plan generated details.agents (list of agent names)
plan_executing Plan execution started plan_id, plan_version
plan_completed All plan steps finished plan_id
replan_triggered Plan revised mid-execution details.reason, plan_version

Agent events

Event Description Additional fields
agent_queued Agent waiting to run agent
agent_started Agent began execution agent, agent_id, depth, started_at
agent_progress Intermediate status update agent, message
agent_completed Agent finished successfully agent, duration_ms, completed_at
agent_failed Agent encountered an error agent, details.error
agent_spawned Agent created a child agent agent, parent_agent_id, depth

Tool events

Event Description Additional fields
tool_started Tool execution began agent, details.tool
tool_completed Tool finished agent, details.tool, duration_ms
tool_failed Tool error agent, details.tool, details.error

Parallel execution events

Event Description Additional fields
parallel_group_started Multiple agents started together parallel_group_id, parallel_group_size, details.agent_ids
parallel_group_completed All parallel agents finished parallel_group_id

Streaming events

Event Description Additional fields
text_delta Incremental text chunk of the final response message (the text to append)

General events

Event Description Additional fields
status General status message
thinking Agent reasoning (optional)
error Non-fatal error details.error
complete Task fully complete (terminal) completed_at, details.final, details.response_preview
hypotheses_updated Investigation hypotheses changed details.hypotheses
📋
plan_created
Planning
agent_started
Agent
🔧
tool_started
Tool
text_delta
Streaming
complete
Terminal

The complete event is the terminal event. When your client receives it, close the SSE connection and fetch the final result.


Event payload fields

Every event includes these base fields:

Field Type Description
task_id string The task this event belongs to
update_type string One of the event types listed above
message string Human-readable status message
event_id string Unique UUID for deduplication
timestamp string ISO 8601 UTC timestamp
schema_version int Payload schema version (currently 1)

Context fields (present when applicable):

Field Type Description
agent string Agent name (e.g., research_agent)
agent_id string Unique agent execution ID
parent_agent_id string Parent agent ID (for spawned agents)
depth int Nesting depth (0 = root agent)
plan_id string Current plan identifier
plan_version int Increments on each replan
parallel_group_id string Group ID for parallel agent batches
parallel_group_size int Number of agents in the parallel group
started_at string ISO 8601 start timestamp
completed_at string ISO 8601 completion timestamp
duration_ms int Execution time in milliseconds
details object Event-specific additional data

Client example

This example uses a fetch-based SSE library to connect with a Bearer token (standard EventSource does not support custom headers):

import { fetchEventSource } from "fetch-event-source-library";

const taskId = "550e8400-...";
let lastEventId = "0";
const seenEvents = new Set();

await fetchEventSource(`/api/v1/chat/${taskId}/progress?last_id=${lastEventId}`, {
  headers: {
    Authorization: `Bearer ${accessToken}`,
  },
  onmessage(event) {
    if (event.event === "heartbeat") return;

    const data = JSON.parse(event.data);

    // Deduplicate on reconnection
    if (seenEvents.has(data.event_id)) return;
    seenEvents.add(data.event_id);

    // Track position for reconnection
    lastEventId = event.id;

    if (data.update_type === "text_delta") {
      appendToResponse(data.message);
    } else if (data.update_type === "complete") {
      fetchResult(taskId);
    }
  },
  onerror(err) {
    // Return to trigger reconnection with backoff
    // fetch-event-source handles this automatically
  },
});

Reconnection

If the SSE connection drops, reconnect by passing the last received stream ID in the last_id query parameter. The server replays any events you missed.

  • Maximum attempts: 10
  • Backoff: Exponential — 1s, 2s, 4s, 8s, 16s, 30s (capped at 30s)
  • Resume from: The id field of the last received SSE event

Connection states

Connected
Receiving events
Reconnecting
Backoff in progress
Error
Max attempts reached
Closed
Task complete

Deduplication

On reconnection, you may receive events that your client already processed. Use the event_id field (a UUID unique to each event) to deduplicate. The recommended approach is an LRU cache of recent event IDs with a cap around 500 entries.


Recovery endpoints

If your client loses context entirely (e.g., page refresh), use these endpoints to recover:

Method Endpoint Description
GET /api/v1/chat/active Check for an active or recently completed task
GET /api/v1/chat/{task_id}/progress/state Get all events for a task as JSON (not SSE)
GET /api/v1/chat/{task_id}/result Get the final result after completion

Page refresh recovery flow

  1. Call GET /chat/active to check for in-flight tasks
  2. If active_task is present, reconnect SSE to that task ID
  3. If completed_task is present, fetch the result directly
  4. If neither, no recovery needed

Limits

Limit Value
SSE connections per user 5 concurrent
Events per stream ~1,000 (approximate cap)