A2A agent framework in one import.
Project description
a2akit
Production-grade A2A protocol framework for Python.
Build Agent-to-Agent agents with streaming, cancellation, multi-turn conversations, push notifications, pluggable backends (Memory, SQLite, PostgreSQL, Redis), OpenTelemetry, and a built-in debug UI — all on top of FastAPI.
Why a2akit?
The official A2A Python SDK gives you protocol primitives — but you have to wire everything yourself. You need to understand AgentExecutor, RequestContext, EventQueue, TaskUpdater, TaskState, Part, TextPart, SendMessageRequest, task creation, event routing, and more. A minimal agent easily becomes 50+ lines of boilerplate:
Official SDK — Currency Agent (simplified)
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import InternalError, InvalidParamsError, Part, TaskState, TextPart, UnsupportedOperationError
from a2a.utils import new_agent_text_message, new_task
from a2a.utils.errors import ServerError
class CurrencyAgentExecutor(AgentExecutor):
def __init__(self):
self.agent = CurrencyAgent()
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
query = context.get_user_input()
task = context.current_task
if not task:
task = new_task(context.message)
await event_queue.enqueue_event(task)
updater = TaskUpdater(event_queue, task.id, task.context_id)
try:
async for item in self.agent.stream(query, task.context_id):
if not item["is_task_complete"] and not item["require_user_input"]:
await updater.update_status(
TaskState.working,
new_agent_text_message(item["content"], task.context_id, task.id),
)
elif item["require_user_input"]:
await updater.update_status(
TaskState.input_required,
new_agent_text_message(item["content"], task.context_id, task.id),
final=True,
)
break
else:
await updater.add_artifact(
[Part(root=TextPart(text=item["content"]))], name="conversion_result"
)
await updater.complete()
break
except Exception as e:
raise ServerError(error=InternalError()) from e
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
raise ServerError(error=UnsupportedOperationError())
With a2akit, the same logic is just this:
from a2akit import Worker, TaskContext
class CurrencyWorker(Worker):
async def handle(self, ctx: TaskContext) -> None:
async for item in my_agent.stream(ctx.user_text, ctx.task_id):
if item["require_user_input"]:
await ctx.request_input(item["content"])
elif item["is_task_complete"]:
await ctx.complete(item["content"])
else:
await ctx.send_status(item["content"])
The difference: a2akit handles task creation, event routing, state machines, error wrapping, SSE streaming, and protocol compliance for you. You write your agent logic — the framework handles the protocol.
| Official SDK | a2akit | |
|---|---|---|
| Boilerplate | Manage EventQueue, TaskUpdater, TaskState, Part objects manually |
ctx.complete(), ctx.send_status(), ctx.request_input() |
| Task lifecycle | Create tasks, track state, wire events yourself | Automatic — framework manages the full lifecycle |
| Streaming | Manual SSE + event queue plumbing | Built-in, one method call |
| Storage | Bring your own | Memory, SQLite, PostgreSQL, Redis out of the box |
| Cancellation | Implement yourself | Cooperative + force-cancel with timeout |
| Push notifications | Implement yourself | Built-in with anti-SSRF and retries |
| Debug UI | None | Built-in browser UI at /chat |
| Middleware | Implement yourself | Pluggable pipeline (auth, validation, etc.) |
Install
pip install a2akit
Quick Start — Echo Agent in 8 Lines
from a2akit import A2AServer, AgentCardConfig, TaskContext, Worker
class EchoWorker(Worker):
async def handle(self, ctx: TaskContext) -> None:
await ctx.complete(f"Echo: {ctx.user_text}")
server = A2AServer(
worker=EchoWorker(),
agent_card=AgentCardConfig(name="Echo", description="Echoes input back.", version="0.1.0"),
)
app = server.as_fastapi_app(debug=True)
uvicorn my_agent:app --reload
# Agent running at http://localhost:8000
# Debug UI at http://localhost:8000/chat
Quick Start — OpenAI Agent in 15 Lines
from openai import AsyncOpenAI
from a2akit import A2AServer, AgentCardConfig, TaskContext, Worker
client = AsyncOpenAI()
class OpenAIWorker(Worker):
async def handle(self, ctx: TaskContext) -> None:
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": ctx.user_text}],
)
await ctx.complete(response.choices[0].message.content)
server = A2AServer(
worker=OpenAIWorker(),
agent_card=AgentCardConfig(name="GPT Agent", description="OpenAI-powered agent.", version="0.1.0"),
)
app = server.as_fastapi_app(debug=True)
Any LLM SDK works — the Worker pattern is framework-agnostic.
Client
from a2akit import A2AClient
async with A2AClient("http://localhost:8000") as client:
result = await client.send("Hello, agent!")
print(result.text)
# Streaming
async for chunk in client.stream_text("Stream me"):
print(chunk, end="")
Debug UI
app = server.as_fastapi_app(debug=True)
Open http://localhost:8000/chat — chat with your agent, inspect tasks, and view state transitions in real time.
Architecture
HTTP Request
|
v
Middleware chain (auth, validation, content-type)
|
v
JSON-RPC / REST endpoint
|
v
TaskManager (lifecycle orchestration)
|
+---> Storage (persist) -- Memory | SQLite | PostgreSQL
+---> Broker (enqueue) -- Memory | Redis Streams
+---> EventEmitter (notify) -- Hooks | Tracing | Push
|
v
Worker.handle(ctx: TaskContext) <-- your code here
|
+---> ctx.complete(text) -- finish the task
+---> ctx.send_status(text) -- progress update
+---> ctx.emit_text_artifact() -- streaming chunks
+---> ctx.request_input(text) -- multi-turn
+---> ctx.fail(text) -- error
|
v
EventBus (fan-out) -- Memory | Redis Pub/Sub + Streams
|
v
SSE stream to client
Extras
pip install a2akit[redis] # Redis broker, event bus & cancel registry
pip install a2akit[postgres] # PostgreSQL storage
pip install a2akit[sqlite] # SQLite storage
pip install a2akit[langgraph] # LangGraph integration
pip install a2akit[otel] # OpenTelemetry tracing & metrics
All Features
- One-liner setup —
A2AServerwires storage, broker, event bus, and endpoints - A2AClient — auto-discovers agents, supports send/stream/cancel/subscribe with retries and transport fallback
- Streaming — word-by-word artifact streaming via SSE
- Cancellation — cooperative and force-cancel with timeout fallback
- Multi-turn —
request_input()/request_auth()for conversational flows - Direct reply —
reply_directly()for simple request/response without task tracking - Multi-transport — JSON-RPC and HTTP+JSON simultaneously
- Middleware pipeline — auth extraction (Bearer, API key), header injection, payload sanitization
- Push notifications — webhook delivery with anti-SSRF validation and configurable retries
- Lifecycle hooks — fire-and-forget callbacks on state transitions
- Dependency injection — shared infrastructure with automatic lifecycle management
- OpenTelemetry — distributed tracing and metrics with W3C context propagation
- Pluggable backends — Memory, SQLite, PostgreSQL, Redis
- Optimistic concurrency control — version-tracked storage updates
- SSE replay —
Last-Event-IDbased reconnection with gap-fill - Debug UI — built-in browser interface for chat + task inspection
- Type-safe — full type hints,
py.typedmarker, PEP 561 compliant - 20+ examples — echo, streaming, LangGraph, auth, middleware, push, DI, multi-transport, and more
Links
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file a2akit-0.0.22.tar.gz.
File metadata
- Download URL: a2akit-0.0.22.tar.gz
- Upload date:
- Size: 600.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b1ca337150170160789cd6604c05265a0643cc1871134ae5dfcd266a19d076a4
|
|
| MD5 |
8750adc1dc80e99f084529811cca3ae9
|
|
| BLAKE2b-256 |
fac0e58804488efbc984ff65fc744c7d77ec91aa05eb58007f946a866585d470
|
Provenance
The following attestation bundles were made for a2akit-0.0.22.tar.gz:
Publisher:
publish.yml on Coding-Crashkurse/a2akit
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
a2akit-0.0.22.tar.gz -
Subject digest:
b1ca337150170160789cd6604c05265a0643cc1871134ae5dfcd266a19d076a4 - Sigstore transparency entry: 1200239105
- Sigstore integration time:
-
Permalink:
Coding-Crashkurse/a2akit@aa06902da46e7d09ebacf1a141e115314ddfa09a -
Branch / Tag:
refs/tags/v0.0.22 - Owner: https://github.com/Coding-Crashkurse
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@aa06902da46e7d09ebacf1a141e115314ddfa09a -
Trigger Event:
push
-
Statement type:
File details
Details for the file a2akit-0.0.22-py3-none-any.whl.
File metadata
- Download URL: a2akit-0.0.22-py3-none-any.whl
- Upload date:
- Size: 191.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2f9e75b72a554b5791a3b5b822294369127f38ff8469d6b0569a7116c68c20da
|
|
| MD5 |
a2e334c3681bd1515d6d8ab3380c5023
|
|
| BLAKE2b-256 |
ae58c9753cf39c615bb0cfafd27e073045476cf9124f026fef98c4e9020a16a2
|
Provenance
The following attestation bundles were made for a2akit-0.0.22-py3-none-any.whl:
Publisher:
publish.yml on Coding-Crashkurse/a2akit
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
a2akit-0.0.22-py3-none-any.whl -
Subject digest:
2f9e75b72a554b5791a3b5b822294369127f38ff8469d6b0569a7116c68c20da - Sigstore transparency entry: 1200239147
- Sigstore integration time:
-
Permalink:
Coding-Crashkurse/a2akit@aa06902da46e7d09ebacf1a141e115314ddfa09a -
Branch / Tag:
refs/tags/v0.0.22 - Owner: https://github.com/Coding-Crashkurse
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@aa06902da46e7d09ebacf1a141e115314ddfa09a -
Trigger Event:
push
-
Statement type: