orbiter.a2a.server
A2A server — FastAPI-based agent serving with agent card discovery.
A2A server — FastAPI-based agent serving with agent card discovery.
from orbiter.a2a.server import (
A2AServer,
A2AServerError,
AgentExecutor,
InMemoryTaskStore,
TaskStore,
)A2AServerError
class A2AServerError(OrbiterError)Raised for A2A server-level errors. Inherits from orbiter.types.OrbiterError.
TaskStore
@runtime_checkable
class TaskStore(Protocol)Minimal storage interface for A2A task state.
Methods
async def get(self, task_id: str) -> dict[str, Any] | None: ...
async def save(self, task_id: str, data: dict[str, Any]) -> None: ...
async def delete(self, task_id: str) -> None: ...InMemoryTaskStore
class InMemoryTaskStore()Simple in-memory task store for development and testing.
Methods
get
async def get(self, task_id: str) -> dict[str, Any] | NoneRetrieve task data by ID. Returns None if not found.
save
async def save(self, task_id: str, data: dict[str, Any]) -> NoneSave task data.
delete
async def delete(self, task_id: str) -> NoneDelete task data. No-op if not found.
AgentExecutor
class AgentExecutor(
agent: Any,
*,
streaming: bool = False,
)Wraps an agent for A2A task execution. Accepts any object with a run(input, ...) async method and a name attribute (i.e. an Orbiter Agent).
Constructor parameters
| Name | Type | Default | Description |
|---|---|---|---|
agent | Any | (required) | Agent object with run() async method and name attribute |
streaming | bool | False | Whether to enable streaming |
Properties
| Property | Type | Description |
|---|---|---|
agent_name | str | Name of the wrapped agent |
Methods
execute
async def execute(self, text: str, *, provider: Any = None) -> strRun the wrapped agent and return text output.
| Name | Type | Default | Description |
|---|---|---|---|
text | str | (required) | Input text for the agent |
provider | Any | None | Optional LLM provider |
Returns: str — Agent text output.
A2AServer
class A2AServer(
executor: AgentExecutor,
config: ServingConfig | None = None,
*,
task_store: TaskStore | None = None,
provider: Any = None,
)FastAPI-based A2A server with agent card discovery.
Exposes:
GET /.well-known/agent-card— agent card JSONPOST /— task execution (send text, get response)GET /tasks/{task_id}— retrieve task statePOST /stream— streaming task execution (when streaming is enabled in config)
Constructor parameters
| Name | Type | Default | Description |
|---|---|---|---|
executor | AgentExecutor | (required) | An AgentExecutor wrapping the agent to serve |
config | ServingConfig | None | None | Server configuration (host, port, skills, etc.). Defaults to ServingConfig() |
task_store | TaskStore | None | None | Task state storage. Defaults to InMemoryTaskStore |
provider | Any | None | Optional LLM provider passed through to the agent |
Properties
| Property | Type | Description |
|---|---|---|
agent_card | AgentCard | The constructed agent card |
task_store | TaskStore | The task store instance |
Methods
build_app
def build_app(self) -> AnyCreate and return the FastAPI application. Registers all routes.
Returns: FastAPI application instance.
Raises: A2AServerError — If fastapi is not installed.
API endpoints
GET /.well-known/agent-card
Returns the agent card as JSON.
Response: AgentCard as JSON.
POST /
Execute a task.
Request body:
{
"text": "What is Python?",
"task_id": "optional-id"
}Response (success):
{
"task_id": "...",
"status": {"state": "completed", "reason": ""},
"artifact": {"task_id": "...", "text": "...", "last_chunk": true}
}Response (error, 500):
{
"task_id": "...",
"status": {"state": "failed", "reason": "error message"}
}GET /tasks/{task_id}
Retrieve task state.
POST /stream (when streaming enabled)
Stream task execution as NDJSON. Each line is a TaskStatusUpdateEvent or TaskArtifactUpdateEvent.
Example
from orbiter.a2a import A2AServer, AgentExecutor, ServingConfig, AgentSkill
executor = AgentExecutor(my_agent, streaming=True)
config = ServingConfig(
host="0.0.0.0",
port=8000,
streaming=True,
skills=(
AgentSkill(id="qa", name="Q&A", description="Answer questions"),
),
)
server = A2AServer(executor, config)
app = server.build_app()
# Run with uvicorn:
# uvicorn my_module:app --host 0.0.0.0 --port 8000