Skip to content
Navigation

The orbiter-a2a package implements the Agent-to-Agent protocol, enabling agents to discover, communicate with, and delegate tasks to other agents over HTTP. It provides an A2AServer for exposing agents as network services, an A2AClient for consuming them, and a RemoteAgent wrapper that makes remote agents look like local ones.

Basic Usage

Server Side

python
from orbiter.a2a import A2AServer, AgentCard, AgentCapabilities, AgentSkill, ServingConfig

# Define what your agent can do
card = AgentCard(
    name="researcher",
    description="Searches and summarizes information",
    url="http://localhost:8001",
    capabilities=AgentCapabilities(streaming=True, batch=False),
    skills=[
        AgentSkill(name="search", description="Search the web"),
        AgentSkill(name="summarize", description="Summarize text"),
    ],
)

# Create and start server
config = ServingConfig(host="0.0.0.0", port=8001)
server = A2AServer(card=card, config=config)

# Register the agent executor
server.register_executor(my_agent_executor)

# Run (blocks)
await server.start()

Client Side

python
from orbiter.a2a import A2AClient, ClientConfig

config = ClientConfig(timeout=30.0)
client = A2AClient(config=config)

# Discover agent capabilities
card = await client.get_card("http://localhost:8001")
print(f"Agent: {card.name}")
print(f"Skills: {[s.name for s in card.skills]}")

# Send a task
result = await client.send_task(
    url="http://localhost:8001",
    input="What are the latest trends in AI?",
)
print(result)

Agent Cards

An AgentCard describes an agent’s identity, capabilities, and skills:

python
from orbiter.a2a import AgentCard, AgentCapabilities, AgentSkill

card = AgentCard(
    name="data-analyst",
    description="Analyzes datasets and generates reports",
    url="http://localhost:8002",
    version="1.0.0",
    capabilities=AgentCapabilities(
        streaming=True,
        batch=True,
        push_notifications=False,
    ),
    skills=[
        AgentSkill(
            name="analyze",
            description="Analyze a dataset",
            input_schema={"type": "object", "properties": {"data_url": {"type": "string"}}},
        ),
        AgentSkill(
            name="visualize",
            description="Create data visualizations",
        ),
    ],
)

Agent Discovery

Clients discover agents by fetching their card from a well-known endpoint:

python
# Cards are served at /.well-known/agent-card (or /agent-card)
card = await client.get_card("http://agent-host:8001")

Cards can also be loaded from local files:

python
card = await client.get_card("file:///path/to/agent-card.json")

Transport Modes

ModeValueDescription
HTTP"http"Standard HTTP request/response
SSE"sse"Server-Sent Events for streaming
WEBSOCKET"websocket"WebSocket for bidirectional communication

Task Lifecycle

Tasks follow a state machine:

StateDescription
PENDINGTask created, waiting to execute
RUNNINGCurrently executing
COMPLETEDFinished successfully
FAILEDExecution failed
CANCELLEDCancelled by client
python
from orbiter.a2a import TaskState

# Task status updates are emitted during execution
# TaskStatusUpdateEvent: task_id, state, message
# TaskArtifactUpdateEvent: task_id, artifact_name, content

A2AServer

The server uses FastAPI to expose agent capabilities:

python
from orbiter.a2a import A2AServer, AgentCard, ServingConfig

server = A2AServer(
    card=agent_card,
    config=ServingConfig(
        host="0.0.0.0",
        port=8001,
        cors_origins=["*"],
    ),
)

Endpoints

EndpointMethodDescription
/agent-cardGETReturn the agent card
/.well-known/agent-cardGETAgent card discovery endpoint
/tasksPOSTSubmit a new task
/tasks/{id}GETGet task status
/tasks/{id}/streamGETStream task results (SSE)

Agent Executor

Register an executor that handles incoming tasks:

python
from orbiter.a2a.server import AgentExecutor

class MyExecutor(AgentExecutor):
    async def execute(self, task_id: str, input: str) -> str:
        result = await my_agent.run(input)
        return result.output

    async def stream(self, task_id: str, input: str):
        async for chunk in my_agent.stream(input):
            yield chunk

server.register_executor(MyExecutor())

Task Store

The server uses an in-memory task store by default. Implement the TaskStore protocol for persistence:

python
from orbiter.a2a.server import TaskStore

class RedisTaskStore(TaskStore):
    async def create(self, task_id: str, input: str) -> None: ...
    async def get(self, task_id: str) -> dict | None: ...
    async def update(self, task_id: str, **kwargs) -> None: ...
    async def delete(self, task_id: str) -> None: ...

A2AClient

The client handles HTTP communication and card resolution:

python
from orbiter.a2a import A2AClient, ClientConfig

client = A2AClient(
    config=ClientConfig(
        timeout=30.0,
        max_retries=3,
    ),
)

# Get agent card
card = await client.get_card("http://agent:8001")

# Send task
result = await client.send_task("http://agent:8001", input="Hello")

# Stream results
async for event in client.stream_task("http://agent:8001", input="Hello"):
    print(event)

RemoteAgent

RemoteAgent wraps an A2A client connection to make remote agents behave like local Agent instances:

python
from orbiter.a2a import RemoteAgent

remote = RemoteAgent(
    url="http://researcher:8001",
    client=client,
)

# Use like a local agent
result = await remote.run("What is quantum computing?")
print(result)

# Describe returns the agent card info
info = remote.describe()
print(info["name"])  # "researcher"

ClientManager

Thread-safe manager for multiple A2A client connections:

python
from orbiter.a2a import ClientManager

manager = ClientManager()

# Register remote agents
manager.register("researcher", "http://researcher:8001")
manager.register("coder", "http://coder:8002")

# Get client for a specific agent
client = manager.get("researcher")
result = await client.send_task("http://researcher:8001", input="Find papers on LLMs")

Advanced Patterns

Multi-Agent Orchestration

Use A2A to coordinate multiple specialized agents:

python
from orbiter.a2a import A2AClient, RemoteAgent

client = A2AClient()

researcher = RemoteAgent(url="http://researcher:8001", client=client)
writer = RemoteAgent(url="http://writer:8002", client=client)
reviewer = RemoteAgent(url="http://reviewer:8003", client=client)

# Pipeline: research -> write -> review
research = await researcher.run("Latest AI trends")
draft = await writer.run(f"Write an article based on: {research}")
review = await reviewer.run(f"Review this article: {draft}")

Service Discovery

Dynamically discover and connect to agents:

python
agent_urls = ["http://agent1:8001", "http://agent2:8002", "http://agent3:8003"]

available_agents = {}
for url in agent_urls:
    try:
        card = await client.get_card(url)
        available_agents[card.name] = RemoteAgent(url=url, client=client)
    except Exception:
        print(f"Agent at {url} unavailable")

Streaming Integration

Stream results from a remote agent:

python
async for event in client.stream_task("http://agent:8001", input="Explain Python"):
    if hasattr(event, "text"):
        print(event.text, end="", flush=True)
print()  # final newline

API Summary

SymbolModuleDescription
AgentCardorbiter.a2aAgent identity, capabilities, and skills
AgentCapabilitiesorbiter.a2aStreaming, batch, push notification support
AgentSkillorbiter.a2aNamed skill with optional input schema
ServingConfigorbiter.a2aServer configuration: host, port, CORS
ClientConfigorbiter.a2aClient configuration: timeout, retries
A2AServerorbiter.a2aFastAPI-based agent server
A2AClientorbiter.a2aHTTP client for A2A communication
RemoteAgentorbiter.a2aAgent-compatible wrapper for remote agents
ClientManagerorbiter.a2aThread-safe multi-client manager
TaskStateorbiter.a2aEnum: PENDING, RUNNING, COMPLETED, FAILED, CANCELLED
TransportModeorbiter.a2aEnum: HTTP, SSE, WEBSOCKET
TaskStoreorbiter.a2a.serverProtocol for task persistence
AgentExecutororbiter.a2a.serverProtocol for task execution