Skip to content
Navigation

This document describes the architecture of orbiter-distributed, which enables horizontal scaling of agent execution across multiple workers using Redis for task queuing, event streaming, and state management.

System Architecture

mermaid
graph TB
    subgraph Client ["Client Process"]
        App["Application Code"]
        DFn["distributed()"]
        TH["TaskHandle"]
    end

    subgraph Redis ["Redis"]
        TS["Task Stream<br/><code>orbiter:tasks</code>"]
        THash["Task Hashes<br/><code>orbiter:task:{id}</code>"]
        PS["Pub/Sub Channels<br/><code>orbiter:events:{id}</code>"]
        ES["Event Streams<br/><code>orbiter:stream:{id}</code>"]
        CS["Cancel Channels<br/><code>orbiter:cancel:{id}</code>"]
        HB["Worker Heartbeats<br/><code>orbiter:workers:{id}</code>"]
        IX["Task Index<br/><code>orbiter:task:index</code>"]
    end

    subgraph Workers ["Worker Pool"]
        W1["Worker 1"]
        W2["Worker 2"]
        WN["Worker N"]
    end

    subgraph Temporal ["Temporal (Optional)"]
        TC["Temporal Server"]
        TW["Temporal Worker"]
    end

    App --> DFn
    DFn -->|"XADD payload"| TS
    DFn -->|"HSET status=PENDING"| THash
    DFn --> TH

    TS -->|"XREADGROUP"| W1
    TS -->|"XREADGROUP"| W2
    TS -->|"XREADGROUP"| WN

    W1 -->|"PUBLISH event"| PS
    W1 -->|"XADD event"| ES
    W1 -->|"HSET status"| THash
    W1 -->|"HSET heartbeat"| HB

    TH -->|"SUBSCRIBE"| PS
    TH -->|"XRANGE replay"| ES
    TH -->|"HGETALL status"| THash
    TH -->|"PUBLISH cancel"| CS
    CS -->|"cancel signal"| W1

    W1 -.->|"executor=temporal"| TC
    TC -.-> TW

Component Descriptions

TaskBroker (broker.py)

The task broker manages the Redis Streams-based task queue. It provides durable, multi-worker task distribution using consumer groups.

Key operations:

MethodRedis CommandDescription
connect()XGROUP CREATE ... MKSTREAMCreates the consumer group and stream
submit(task)XADDEnqueues a serialized TaskPayload
claim(worker_id)XREADGROUP ... >Blocking pop for the next available task
ack(task_id)XACKAcknowledges successful processing
nack(task_id)XRANGE + XACK + XADDRe-enqueues a task for retry
cancel(task_id)PUBLISH + HSETSends cancel signal and updates status

Consumer group model: All workers join the same consumer group ({queue_name}:group). Redis distributes messages across consumers so each task is claimed by exactly one worker. The > special ID in XREADGROUP means “only new messages not yet delivered to this consumer.”

Worker (worker.py)

The worker is the core execution engine. It runs a claim-execute loop, processing tasks and publishing streaming events.

Lifecycle:

mermaid
stateDiagram-v2
    [*] --> Connecting: start()
    Connecting --> ClaimLoop: broker/store/publisher connected
    ClaimLoop --> Claiming: check queue
    Claiming --> Executing: task claimed
    Claiming --> Claiming: no task (timeout)
    Executing --> Claiming: task completed/failed
    ClaimLoop --> Disconnecting: shutdown signal
    Disconnecting --> [*]: cleanup complete

Per-task execution flow:

  1. Claims task from queue via TaskBroker.claim()
  2. Sets task status to RUNNING in TaskStore
  3. Starts a cancel listener (Pub/Sub subscription on orbiter:cancel:{task_id})
  4. Reconstructs Agent or Swarm from agent_config via from_dict()
  5. Calls run.stream(agent, input, detailed=task.detailed)
  6. Publishes each StreamEvent via EventPublisher
  7. On completion: sets status to COMPLETED, acknowledges the task
  8. On failure: sets status to FAILED, nacks if retries remain
  9. On cancellation: sets status to CANCELLED, emits StatusEvent(status='cancelled')
  10. Records metrics (duration, wait time, success/failure)

Concurrency: The concurrency parameter spawns multiple _claim_loop coroutines, allowing a single worker process to execute multiple tasks in parallel.

Heartbeat: A separate Redis connection publishes health data to orbiter:workers:{worker_id} as a Redis hash with a configurable TTL (default 30s). The heartbeat fires every heartbeat_ttl / 3 seconds.

EventPublisher / EventSubscriber (events.py)

Events are published to two destinations for different consumption patterns:

mermaid
graph LR
    Worker -->|"publish(task_id, event)"| EP["EventPublisher"]
    EP -->|"PUBLISH (JSON)"| PubSub["Pub/Sub Channel<br/><code>orbiter:events:{id}</code>"]
    EP -->|"XADD (JSON)"| Stream["Redis Stream<br/><code>orbiter:stream:{id}</code>"]

    PubSub -->|"subscribe()"| Live["Live Consumer<br/>(real-time)"]
    Stream -->|"replay()"| Replay["Replay Consumer<br/>(persistent)"]
ChannelRedis TypeUse CaseDelivery
orbiter:events:{task_id}Pub/SubLive real-time eventsAt-most-once, only active subscribers
orbiter:stream:{task_id}StreamReplay / persistenceDurable, TTL-based expiry (default 1h)

Event deserialization: Events are serialized as JSON with a type discriminator field. _deserialize_event() uses _EVENT_TYPE_MAP to reconstruct the correct Pydantic model from JSON.

Subscriber termination: subscribe() automatically ends the async iterator when a terminal StatusEvent is received (status is completed, error, or cancelled).

TaskStore (store.py)

The task store tracks task status using Redis hashes with TTL-based auto-cleanup.

Data model per task:

Redis KeyTypeContents
orbiter:task:{task_id}Hashtask_id, status, worker_id, started_at, completed_at, result (JSON), error, retries
orbiter:task:indexSetAll known task_id values (for listing)

Each hash has a configurable TTL (default 24h) set on every set_status() call. The secondary index set enables list_tasks() without scanning all Redis keys.

TaskHandle / distributed() (client.py)

The distributed() function is the main public API. It serializes the agent, submits the task, and returns a TaskHandle.

mermaid
sequenceDiagram
    participant App as Application
    participant D as distributed()
    participant B as TaskBroker
    participant S as TaskStore
    participant E as EventSubscriber
    participant W as Worker

    App->>D: distributed(agent, input)
    D->>D: agent.to_dict()
    D->>B: submit(TaskPayload)
    D-->>App: TaskHandle

    W->>B: claim(worker_id)
    B-->>W: TaskPayload
    W->>S: set_status(RUNNING)
    W->>W: run.stream(agent, input)
    W->>E: publish(event) [repeated]
    W->>S: set_status(COMPLETED)

    App->>E: handle.stream()
    E-->>App: StreamEvent [live]

    App->>S: handle.status()
    S-->>App: TaskResult

    App->>S: handle.result()
    S-->>App: dict (polls until terminal)

TaskHandle methods:

MethodDescription
result()Polls TaskStore until terminal status, returns result dict
stream()Yields live StreamEvent via Pub/Sub subscription
cancel()Publishes cancel signal via TaskBroker.cancel()
status()Returns current TaskResult from TaskStore

CancellationToken / Cancel Flow (cancel.py)

Cancellation is cooperative — the worker checks a CancellationToken between event yields during agent execution.

mermaid
sequenceDiagram
    participant App as Application
    participant B as TaskBroker
    participant R as Redis Pub/Sub
    participant W as Worker
    participant T as CancellationToken

    App->>B: cancel(task_id)
    B->>R: PUBLISH orbiter:cancel:{task_id}
    B->>R: HSET status=CANCELLED

    R-->>W: cancel message
    W->>T: token.cancel()
    W->>W: check token.cancelled between yields
    W->>W: emit StatusEvent(status='cancelled')
    W->>W: break execution loop

TaskPayload / TaskResult / TaskStatus (models.py)

Frozen Pydantic BaseModel instances used as data carriers:

  • TaskPayload — What gets queued: agent config, input, detailed flag, metadata, timeout
  • TaskResult — Status tracking: status, result, error, timing, worker ID, retries
  • TaskStatusStrEnum with values: PENDING, RUNNING, COMPLETED, FAILED, CANCELLED, RETRYING

Health Monitoring (health.py)

Workers publish health data via the heartbeat loop. The health system provides:

  • WorkerHealth — Frozen dataclass with worker status, task counts, heartbeat timestamp
  • WorkerHealthCheck — Implements orbiter.observability.health.HealthCheck protocol for integration with the observability stack
  • get_worker_fleet_status() — Scans all orbiter:workers:* keys to return fleet-wide health; workers with heartbeat older than 60s are marked dead

Metrics (metrics.py)

Records distributed task metrics using the orbiter-observability infrastructure (OTel when available, in-memory fallback otherwise):

MetricTypeDescription
orbiter.distributed.tasks.submittedCounterTasks submitted to the queue
orbiter.distributed.tasks.completedCounterSuccessfully completed tasks
orbiter.distributed.tasks.failedCounterFailed tasks
orbiter.distributed.tasks.cancelledCounterCancelled tasks
orbiter.distributed.queue.depthGaugeCurrent queue depth
orbiter.distributed.task.durationHistogramTask execution duration
orbiter.distributed.task.wait_timeHistogramTime from submission to execution start
orbiter.stream.events.emittedCounterStreaming events emitted (by type)
orbiter.stream.event.publish_durationHistogramEvent publish latency

Alerts (alerts.py)

Pre-defined alert rules registered with the global AlertManager:

RuleThresholdSeverity
Queue depth high> 100WARNING
Queue depth critical> 500CRITICAL
Task failure rate> 10%WARNING
Worker count zero= 0CRITICAL
Task wait time high> 60sWARNING

Tracing

Distributed tracing spans the full task lifecycle from submission to execution:

  1. Client side: distributed() creates an orbiter.distributed.submit span and injects W3C Baggage context into TaskPayload.metadata['trace_context']
  2. Worker side: _execute_task() extracts the trace context and creates a child orbiter.distributed.execute span linked to the submission span
  3. Inner execution: Tool calls within the agent use the existing @traced decorator for nested spans

Data Flow

Redis-Only Mode

code
Client                          Redis                          Worker
  |                               |                              |
  |-- distributed(agent, input) ->|                              |
  |   XADD orbiter:tasks          |                              |
  |                               |<-- XREADGROUP (blocking) --- |
  |                               |--- TaskPayload ------------>|
  |                               |                              |
  |                               |<-- HSET RUNNING ----------- |
  |                               |<-- PUBLISH event ---------- |  (repeated per event)
  |                               |<-- XADD event ------------ |
  |<-- SUBSCRIBE events ---------|                              |
  |   StreamEvent (live)          |                              |
  |                               |<-- HSET COMPLETED --------- |
  |                               |<-- XACK ------------------- |
  |-- handle.result() ---------->|                              |
  |   HGETALL orbiter:task:{id}   |                              |
  |<-- TaskResult ---------------|                              |

Redis + Temporal Mode

When executor="temporal" is set on the worker:

code
Client                    Redis              Worker              Temporal
  |                         |                  |                    |
  |-- distributed() ------->|                  |                    |
  |                         |<-- claim --------|                    |
  |                         |                  |-- start_workflow ->|
  |                         |                  |                    |
  |                         |                  |   execute_agent    |
  |                         |                  |   _activity()      |
  |                         |                  |<-- result ---------|
  |                         |<-- HSET done ----|                    |
  |<-- result --------------|                  |                    |

In Temporal mode, the worker submits a AgentExecutionWorkflow to Temporal instead of executing the agent directly. The Temporal activity (execute_agent_activity) reconstructs the agent, runs run.stream(), and heartbeats every 10 events. Temporal handles retries and state recovery on worker failure.

Redis Data Structures Summary

Key PatternTypePurposeTTL
orbiter:tasksStreamTask queue (consumer group)None
orbiter:tasks:groupConsumer GroupWorker task distributionN/A
orbiter:task:{task_id}HashTask status and result24h
orbiter:task:indexSetTask ID index for listingNone
orbiter:events:{task_id}Pub/Sub ChannelLive event streamingN/A
orbiter:stream:{task_id}StreamPersistent event replay1h
orbiter:cancel:{task_id}Pub/Sub ChannelCancel signal deliveryN/A
orbiter:workers:{worker_id}HashWorker health/heartbeat30s