Skip to main content

A2A agent framework in one import.

Project description

a2akit

PyPI License: MIT Python CI Coverage

Production-grade A2A protocol framework for Python.

Build Agent-to-Agent agents with streaming, cancellation, multi-turn conversations, push notifications, pluggable backends (Memory, SQLite, PostgreSQL, Redis), OpenTelemetry, and a built-in debug UI — all on top of FastAPI.

Why a2akit?

The official A2A Python SDK gives you protocol primitives — but you have to wire everything yourself. You need to understand AgentExecutor, RequestContext, EventQueue, TaskUpdater, TaskState, Part, TextPart, SendMessageRequest, task creation, event routing, and more. A minimal agent easily becomes 50+ lines of boilerplate:

Official SDK — Currency Agent (simplified)
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import InternalError, InvalidParamsError, Part, TaskState, TextPart, UnsupportedOperationError
from a2a.utils import new_agent_text_message, new_task
from a2a.utils.errors import ServerError

class CurrencyAgentExecutor(AgentExecutor):
    def __init__(self):
        self.agent = CurrencyAgent()

    async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
        query = context.get_user_input()
        task = context.current_task
        if not task:
            task = new_task(context.message)
            await event_queue.enqueue_event(task)
        updater = TaskUpdater(event_queue, task.id, task.context_id)
        try:
            async for item in self.agent.stream(query, task.context_id):
                if not item["is_task_complete"] and not item["require_user_input"]:
                    await updater.update_status(
                        TaskState.working,
                        new_agent_text_message(item["content"], task.context_id, task.id),
                    )
                elif item["require_user_input"]:
                    await updater.update_status(
                        TaskState.input_required,
                        new_agent_text_message(item["content"], task.context_id, task.id),
                        final=True,
                    )
                    break
                else:
                    await updater.add_artifact(
                        [Part(root=TextPart(text=item["content"]))], name="conversion_result"
                    )
                    await updater.complete()
                    break
        except Exception as e:
            raise ServerError(error=InternalError()) from e

    async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
        raise ServerError(error=UnsupportedOperationError())

With a2akit, the same logic is just this:

from a2akit import Worker, TaskContext

class CurrencyWorker(Worker):
    async def handle(self, ctx: TaskContext) -> None:
        async for item in my_agent.stream(ctx.user_text, ctx.task_id):
            if item["require_user_input"]:
                await ctx.request_input(item["content"])
            elif item["is_task_complete"]:
                await ctx.complete(item["content"])
            else:
                await ctx.send_status(item["content"])

The difference: a2akit handles task creation, event routing, state machines, error wrapping, SSE streaming, and protocol compliance for you. You write your agent logic — the framework handles the protocol.

Official SDK a2akit
Boilerplate Manage EventQueue, TaskUpdater, TaskState, Part objects manually ctx.complete(), ctx.send_status(), ctx.request_input()
Task lifecycle Create tasks, track state, wire events yourself Automatic — framework manages the full lifecycle
Streaming Manual SSE + event queue plumbing Built-in, one method call
Storage Bring your own Memory, SQLite, PostgreSQL, Redis out of the box
Cancellation Implement yourself Cooperative + force-cancel with timeout
Push notifications Implement yourself Built-in with anti-SSRF and retries
Debug UI None Built-in browser UI at /chat
Middleware Implement yourself Pluggable pipeline (auth, validation, etc.)

Install

pip install a2akit

Quick Start — Echo Agent in 8 Lines

from a2akit import A2AServer, AgentCardConfig, TaskContext, Worker

class EchoWorker(Worker):
    async def handle(self, ctx: TaskContext) -> None:
        await ctx.complete(f"Echo: {ctx.user_text}")

server = A2AServer(
    worker=EchoWorker(),
    agent_card=AgentCardConfig(name="Echo", description="Echoes input back.", version="0.1.0"),
)
app = server.as_fastapi_app(debug=True)
uvicorn my_agent:app --reload
# Agent running at http://localhost:8000
# Debug UI at http://localhost:8000/chat

Quick Start — OpenAI Agent in 15 Lines

from openai import AsyncOpenAI
from a2akit import A2AServer, AgentCardConfig, TaskContext, Worker

client = AsyncOpenAI()

class OpenAIWorker(Worker):
    async def handle(self, ctx: TaskContext) -> None:
        response = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": ctx.user_text}],
        )
        await ctx.complete(response.choices[0].message.content)

server = A2AServer(
    worker=OpenAIWorker(),
    agent_card=AgentCardConfig(name="GPT Agent", description="OpenAI-powered agent.", version="0.1.0"),
)
app = server.as_fastapi_app(debug=True)

Any LLM SDK works — the Worker pattern is framework-agnostic.

Client

from a2akit import A2AClient

async with A2AClient("http://localhost:8000") as client:
    result = await client.send("Hello, agent!")
    print(result.text)

    # Streaming
    async for chunk in client.stream_text("Stream me"):
        print(chunk, end="")

Debug UI

app = server.as_fastapi_app(debug=True)

Open http://localhost:8000/chat — chat with your agent, inspect tasks, and view state transitions in real time.

Debug UI

Architecture

HTTP Request
  |
  v
Middleware chain (auth, validation, content-type)
  |
  v
JSON-RPC / REST endpoint
  |
  v
TaskManager (lifecycle orchestration)
  |
  +---> Storage (persist)     -- Memory | SQLite | PostgreSQL
  +---> Broker (enqueue)      -- Memory | Redis Streams
  +---> EventEmitter (notify) -- Hooks | Tracing | Push
  |
  v
Worker.handle(ctx: TaskContext)   <-- your code here
  |
  +---> ctx.complete(text)        -- finish the task
  +---> ctx.send_status(text)     -- progress update
  +---> ctx.emit_text_artifact()  -- streaming chunks
  +---> ctx.request_input(text)   -- multi-turn
  +---> ctx.fail(text)            -- error
  |
  v
EventBus (fan-out)  -- Memory | Redis Pub/Sub + Streams
  |
  v
SSE stream to client

Extras

pip install a2akit[redis]       # Redis broker, event bus & cancel registry
pip install a2akit[postgres]    # PostgreSQL storage
pip install a2akit[sqlite]      # SQLite storage
pip install a2akit[langgraph]   # LangGraph integration
pip install a2akit[otel]        # OpenTelemetry tracing & metrics

All Features

  • One-liner setupA2AServer wires storage, broker, event bus, and endpoints
  • A2AClient — auto-discovers agents, supports send/stream/cancel/subscribe with retries and transport fallback
  • Streaming — word-by-word artifact streaming via SSE
  • Cancellation — cooperative and force-cancel with timeout fallback
  • Multi-turnrequest_input() / request_auth() for conversational flows
  • Direct replyreply_directly() for simple request/response without task tracking
  • Multi-transport — JSON-RPC and HTTP+JSON simultaneously
  • Middleware pipeline — auth extraction (Bearer, API key), header injection, payload sanitization
  • Push notifications — webhook delivery with anti-SSRF validation and configurable retries
  • Lifecycle hooks — fire-and-forget callbacks on state transitions
  • Dependency injection — shared infrastructure with automatic lifecycle management
  • OpenTelemetry — distributed tracing and metrics with W3C context propagation
  • Pluggable backends — Memory, SQLite, PostgreSQL, Redis
  • Optimistic concurrency control — version-tracked storage updates
  • SSE replayLast-Event-ID based reconnection with gap-fill
  • Debug UI — built-in browser interface for chat + task inspection
  • Type-safe — full type hints, py.typed marker, PEP 561 compliant
  • 20+ examples — echo, streaming, LangGraph, auth, middleware, push, DI, multi-transport, and more

Links

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

a2akit-0.0.23.tar.gz (603.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

a2akit-0.0.23-py3-none-any.whl (193.6 kB view details)

Uploaded Python 3

File details

Details for the file a2akit-0.0.23.tar.gz.

File metadata

  • Download URL: a2akit-0.0.23.tar.gz
  • Upload date:
  • Size: 603.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for a2akit-0.0.23.tar.gz
Algorithm Hash digest
SHA256 d6a741df573ba4325d1e73c1c9e42aee31f35ba25195f0a62842466688367625
MD5 16e66c6cf635b92be49bbcfc74bb2d80
BLAKE2b-256 f824c017e228acca04a2685b3958e7648aff3b269f85033d6e380c2996f4249b

See more details on using hashes here.

Provenance

The following attestation bundles were made for a2akit-0.0.23.tar.gz:

Publisher: publish.yml on Coding-Crashkurse/a2akit

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file a2akit-0.0.23-py3-none-any.whl.

File metadata

  • Download URL: a2akit-0.0.23-py3-none-any.whl
  • Upload date:
  • Size: 193.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for a2akit-0.0.23-py3-none-any.whl
Algorithm Hash digest
SHA256 d4c1915a357465edb705260a4fb990e57360217b3e8f340abac2e7662e951f9f
MD5 4f198111d2371eb29142d735645043a3
BLAKE2b-256 e912539a9975ee8f122ee07e1a5b25fff4fdd44109b3ee31e58f11a8d17da746

See more details on using hashes here.

Provenance

The following attestation bundles were made for a2akit-0.0.23-py3-none-any.whl:

Publisher: publish.yml on Coding-Crashkurse/a2akit

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page