A2A agent framework in one import.
Project description
a2akit
A2A agent framework in one import.
Build Agent-to-Agent (A2A) protocol agents with minimal boilerplate. Streaming, cancellation, multi-turn conversations, and artifact handling — batteries included.
Install
pip install a2akit
With optional LangGraph support:
pip install a2akit[langgraph]
Quick Start
from a2akit import A2AServer, AgentCardConfig, TaskContext, Worker
class EchoWorker(Worker):
async def handle(self, ctx: TaskContext) -> None:
await ctx.complete(f"Echo: {ctx.user_text}")
server = A2AServer(
worker=EchoWorker(),
agent_card=AgentCardConfig(
name="Echo Agent",
description="Echoes your input back.",
version="0.1.0",
),
)
app = server.as_fastapi_app()
Run it:
uvicorn my_agent:app --reload
Test it:
curl -X POST http://localhost:8000/v1/message:send \
-H "Content-Type: application/json" \
-d '{"message":{"role":"user","parts":[{"text":"hello"}],"messageId":"1"}}'
Features
- One-liner setup —
A2AServerwires storage, broker, event bus, and endpoints - Middleware —
A2AMiddlewarepipeline for auth extraction, header injection, payload sanitization, and logging - Streaming — word-by-word artifact streaming via SSE
- Cancellation — cooperative and force-cancel with timeout fallback
- Multi-turn —
request_input()/request_auth()for conversational flows - Direct reply —
reply_directly()for simple request/response without task tracking - Lifecycle hooks — fire-and-forget callbacks on terminal state transitions
- Pluggable backends — swap in Redis, PostgreSQL, RabbitMQ (coming soon)
- Type-safe — full type hints,
py.typedmarker, PEP 561 compliant
Architecture
a2akit allows you to bring your own Storage, Broker, EventBus,
CancelRegistry and Worker. You can also leverage the in-memory implementations
for development.
graph TD
HTTP["HTTP Server\n(FastAPI + SSE)"]
MW["Middleware\n(before / after dispatch)"]
ENV["RequestEnvelope\n(params + context)"]
TM["TaskManager\n(coordinates)"]
WA["WorkerAdapter\n(lifecycle)"]
EE["EventEmitter\n(facade)"]
HE["HookableEmitter\n(lifecycle hooks)"]
B["Broker\n(queues & schedules)"]
S["Storage\n(persistence)"]
EB["EventBus\n(event fan-out)"]
CR["CancelRegistry\n(cancel signals)"]
W["Worker\n(your code)"]
TC["TaskContext\n(execution API)"]
HTTP -->|raw request| MW
MW -->|"mutate envelope\n(extract secrets, headers)"| ENV
ENV -->|"params (persisted)"| TM
ENV -.->|"context (transient)"| B
TM -->|schedules tasks| B
TM -.->|reads / writes| S
TM -.->|subscribes| EB
TM -.->|cancel signals| CR
B -->|delegates execution| WA
WA -->|"handle(ctx)"| W
W -.->|uses API| TC
TC -->|emits via| HE
HE -->|decorates| EE
EE -->|writes| S
EE -->|publishes| EB
WA -.->|checks / cleanup| CR
TaskManager handles submission, validation, streaming, and cancellation. It coordinates between Broker, Storage, EventBus, and CancelRegistry — but never touches the Worker directly.
WorkerAdapter bridges the Broker queue to your Worker. It manages the lifecycle:
dequeue -> check cancel -> build context -> transition to working -> call handle(ctx) ->
cleanup.
EventEmitter is the facade that TaskContext uses to persist state (Storage) and broadcast events (EventBus) without knowing about either directly. Storage writes are authoritative; EventBus is best-effort.
Middleware intercepts requests at the HTTP boundary. before_dispatch runs before
TaskManager sees the request — extract secrets from message.metadata, read HTTP headers,
sanitize payloads. after_dispatch runs after TaskManager returns — log timing, emit
metrics, clean up. Transient data lives in RequestEnvelope.context and flows through
the Broker to the Worker via ctx.request_context, but is never persisted in Storage.
Pluggable backends: Swap Storage, Broker, EventBus, and CancelRegistry
independently — e.g. PostgreSQL storage + Redis broker + Redis event bus. All backends
implement their respective ABC.
TaskContext API
TaskContext is the only interface your worker needs. It abstracts away all A2A protocol details:
Properties
| Property | Description |
|---|---|
ctx.user_text |
The user's input as plain text |
ctx.parts |
Raw message parts (text, files, etc.) |
ctx.files |
File parts as list[FileInfo] (content, url, filename, media_type) |
ctx.data_parts |
Structured data parts as list[dict] |
ctx.task_id |
Current task UUID |
ctx.context_id |
Conversation / context identifier |
ctx.message_id |
ID of the triggering message |
ctx.metadata |
Arbitrary metadata from the request (persisted) |
ctx.request_context |
Transient data from middleware (never persisted) |
ctx.is_cancelled |
Check if cancellation was requested |
ctx.turn_ended |
Whether a terminal method was called |
ctx.history |
Previous messages in this task (list[HistoryMessage]) |
ctx.previous_artifacts |
Artifacts from prior turns (list[PreviousArtifact]) |
Lifecycle
| Method | Description |
|---|---|
ctx.complete(text?) |
Mark task completed with optional text artifact |
ctx.complete_json(data) |
Complete with a JSON data artifact |
ctx.respond(text?) |
Complete with a direct message (no artifact) |
ctx.reply_directly(text) |
Return a Message directly without task tracking |
ctx.fail(reason) |
Mark task failed |
ctx.reject(reason?) |
Reject the task |
ctx.request_input(question) |
Ask user for more input |
ctx.request_auth(details?) |
Request secondary authentication |
Streaming
| Method | Description |
|---|---|
ctx.send_status(msg) |
Emit intermediate status update |
ctx.emit_text_artifact(...) |
Emit a text artifact chunk |
ctx.emit_data_artifact(data) |
Emit a structured data artifact chunk |
ctx.emit_artifact(...) |
Emit an artifact with any content (text, data, file_bytes, file_url) |
Context
| Method | Description |
|---|---|
ctx.load_context() |
Load stored context for this conversation |
ctx.update_context(data) |
Store context for this conversation |
No Part(root=TextPart(...)). No EventQueue. No TaskUpdater. Just call methods.
Streaming Example
import asyncio
from a2akit import A2AServer, AgentCardConfig, TaskContext, Worker
class StreamingWorker(Worker):
async def handle(self, ctx: TaskContext) -> None:
words = ctx.user_text.split()
await ctx.send_status(f"Streaming {len(words)} words...")
for i, word in enumerate(words):
is_last = i == len(words) - 1
await ctx.emit_text_artifact(
text=word + ("" if is_last else " "),
artifact_id="stream",
append=(i > 0),
last_chunk=is_last,
)
await asyncio.sleep(0.1)
await ctx.complete()
server = A2AServer(
worker=StreamingWorker(),
agent_card=AgentCardConfig(
name="Streamer",
description="Word-by-word streaming",
version="0.1.0",
),
)
app = server.as_fastapi_app()
Middleware
Middleware operates on a RequestEnvelope at the HTTP boundary, separating
transient request data (tokens, headers) from the persisted A2A protocol payload.
from a2akit import A2AMiddleware, A2AServer, AgentCardConfig, RequestEnvelope, TaskContext, Worker
from fastapi import Request
class SecretExtractor(A2AMiddleware):
"""Move sensitive keys from message.metadata into transient context."""
SECRET_KEYS = {"user_token", "api_key", "auth_token"}
async def before_dispatch(self, envelope: RequestEnvelope, request: Request) -> None:
msg_meta = envelope.params.message.metadata or {}
for key in self.SECRET_KEYS & msg_meta.keys():
envelope.context[key] = msg_meta.pop(key)
if auth := request.headers.get("Authorization"):
envelope.context["auth_header"] = auth
class MyWorker(Worker):
async def handle(self, ctx: TaskContext) -> None:
# Transient — never in Storage
token = ctx.request_context.get("user_token")
# Persisted — from message.metadata
trace_id = ctx.metadata.get("trace_id")
result = await call_external_api(token)
await ctx.complete(f"Result: {result}")
server = A2AServer(
worker=MyWorker(),
agent_card=AgentCardConfig(name="My Agent", description="...", version="0.1.0"),
middlewares=[SecretExtractor()],
)
app = server.as_fastapi_app()
Execution order: before_dispatch runs in registration order;
after_dispatch runs in reverse (like Python context managers).
Lifecycle Hooks
Register callbacks that fire after terminal state transitions (completed, failed, canceled, rejected). Hooks are fire-and-forget — errors are logged and swallowed, never affecting task processing.
import logging
from a2a.types import Message, TaskState
from a2akit import A2AServer, AgentCardConfig, TaskContext, Worker
from a2akit.hooks import LifecycleHooks
logger = logging.getLogger(__name__)
async def on_terminal(task_id: str, state: TaskState, message: Message | None) -> None:
"""Called once per task when it reaches a terminal state."""
if state == TaskState.completed:
logger.info("Task %s completed successfully", task_id)
elif state == TaskState.failed:
logger.warning("Task %s failed: %s", task_id, message)
elif state == TaskState.canceled:
logger.info("Task %s was canceled", task_id)
class MyWorker(Worker):
async def handle(self, ctx: TaskContext) -> None:
await ctx.complete(f"Done: {ctx.user_text}")
server = A2AServer(
worker=MyWorker(),
agent_card=AgentCardConfig(name="Hooked Agent", description="...", version="0.1.0"),
hooks=LifecycleHooks(on_terminal=on_terminal),
)
app = server.as_fastapi_app()
Hooks fire after a successful Storage write. If the write fails (e.g.
TaskTerminalStateError from a concurrent cancel), the hook does not fire.
The Storage terminal-state guard provides exactly-once delivery per task.
A2AServer Configuration
server = A2AServer(
worker=MyWorker(),
agent_card=AgentCardConfig(
name="My Agent",
description="What your agent does.",
version="0.1.0",
),
middlewares=[SecretExtractor()], # optional middleware pipeline
storage="memory", # or pass a Storage instance
broker="memory", # or pass a Broker instance
event_bus="memory", # or pass an EventBus instance
cancel_registry=None, # or pass a CancelRegistry instance
blocking_timeout_s=30.0, # timeout for blocking requests
max_concurrent_tasks=None, # limit parallel task execution
hooks=LifecycleHooks(...), # optional lifecycle hooks
)
app = server.as_fastapi_app()
Endpoints
| Method | Path | Description |
|---|---|---|
| POST | /v1/message:send |
Submit a message, return task or direct reply |
| POST | /v1/message:stream |
Submit a message, stream events via SSE |
| GET | /v1/tasks/{task_id} |
Get a single task by ID |
| GET | /v1/tasks |
List tasks with filters and pagination |
| POST | /v1/tasks/{task_id}:cancel |
Cancel a task |
| POST | /v1/tasks/{task_id}:subscribe |
Subscribe to task updates via SSE |
| GET | /v1/health |
Health check |
| GET | /.well-known/agent-card.json |
Agent discovery card |
A2A Protocol Version
a2akit implements A2A v0.3.0.
Roadmap
Planned features for upcoming releases. Priorities may shift based on feedback.
| Feature | Target |
|---|---|
| Dependency injection | v0.1.0 |
| Documentation website | v0.1.0 |
| Redis EventBus | v0.2.0 |
| Redis Broker | v0.2.0 |
| PostgreSQL Storage | v0.2.0 |
| SQLite Storage | v0.2.0 |
| Backend conformance test suite | v0.2.0 |
| OpenTelemetry integration | v0.2.0 |
| RabbitMQ Broker | v0.3.0+ |
| JSON-RPC transport | v0.3.0+ |
| gRPC transport | v0.4.0+ |
This roadmap is subject to change.
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file a2akit-0.0.2.tar.gz.
File metadata
- Download URL: a2akit-0.0.2.tar.gz
- Upload date:
- Size: 174.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
756edeee325ca66da2219fb6c04d0feb38358cdef09d489acd435d91dee33de1
|
|
| MD5 |
d626ed62482d0183d1d7494f6fd601a3
|
|
| BLAKE2b-256 |
deaf3d8e163938075886e2b2b8e7d82cfdd8a93e657c1804bcb3da4add06c21e
|
Provenance
The following attestation bundles were made for a2akit-0.0.2.tar.gz:
Publisher:
publish.yml on Coding-Crashkurse/a2a-kit
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
a2akit-0.0.2.tar.gz -
Subject digest:
756edeee325ca66da2219fb6c04d0feb38358cdef09d489acd435d91dee33de1 - Sigstore transparency entry: 1043534625
- Sigstore integration time:
-
Permalink:
Coding-Crashkurse/a2a-kit@b5eb4739d0570235e403b9ebc582aca7fe600890 -
Branch / Tag:
refs/tags/v0.0.2 - Owner: https://github.com/Coding-Crashkurse
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@b5eb4739d0570235e403b9ebc582aca7fe600890 -
Trigger Event:
push
-
Statement type:
File details
Details for the file a2akit-0.0.2-py3-none-any.whl.
File metadata
- Download URL: a2akit-0.0.2-py3-none-any.whl
- Upload date:
- Size: 50.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
acff6582430844ae29af3ae8460667f9cdc7a351b45d4367cf3f7bce5581b7de
|
|
| MD5 |
6d149be9726232a3009e7ded7996d5ee
|
|
| BLAKE2b-256 |
1f7cebb91928b9c5846b0e06c86e0beda4ee8e825d6941bcb5e3dcbedc8801d1
|
Provenance
The following attestation bundles were made for a2akit-0.0.2-py3-none-any.whl:
Publisher:
publish.yml on Coding-Crashkurse/a2a-kit
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
a2akit-0.0.2-py3-none-any.whl -
Subject digest:
acff6582430844ae29af3ae8460667f9cdc7a351b45d4367cf3f7bce5581b7de - Sigstore transparency entry: 1043534698
- Sigstore integration time:
-
Permalink:
Coding-Crashkurse/a2a-kit@b5eb4739d0570235e403b9ebc582aca7fe600890 -
Branch / Tag:
refs/tags/v0.0.2 - Owner: https://github.com/Coding-Crashkurse
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@b5eb4739d0570235e403b9ebc582aca7fe600890 -
Trigger Event:
push
-
Statement type: