Skip to main content

A2A agent framework in one import.

Project description

a2akit

PyPI License: MIT Python CI Coverage

Production-grade A2A protocol framework for Python.

Build Agent-to-Agent agents with streaming, cancellation, multi-turn conversations, push notifications, pluggable backends (Memory, SQLite, PostgreSQL, Redis), OpenTelemetry, and a built-in debug UI — all on top of FastAPI.

Why a2akit?

The official A2A Python SDK gives you protocol primitives — but you have to wire everything yourself. You need to understand AgentExecutor, RequestContext, EventQueue, TaskUpdater, TaskState, Part, TextPart, SendMessageRequest, task creation, event routing, and more. A minimal agent easily becomes 50+ lines of boilerplate:

Official SDK — Currency Agent (simplified)
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import InternalError, InvalidParamsError, Part, TaskState, TextPart, UnsupportedOperationError
from a2a.utils import new_agent_text_message, new_task
from a2a.utils.errors import ServerError

class CurrencyAgentExecutor(AgentExecutor):
    def __init__(self):
        self.agent = CurrencyAgent()

    async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
        query = context.get_user_input()
        task = context.current_task
        if not task:
            task = new_task(context.message)
            await event_queue.enqueue_event(task)
        updater = TaskUpdater(event_queue, task.id, task.context_id)
        try:
            async for item in self.agent.stream(query, task.context_id):
                if not item["is_task_complete"] and not item["require_user_input"]:
                    await updater.update_status(
                        TaskState.working,
                        new_agent_text_message(item["content"], task.context_id, task.id),
                    )
                elif item["require_user_input"]:
                    await updater.update_status(
                        TaskState.input_required,
                        new_agent_text_message(item["content"], task.context_id, task.id),
                        final=True,
                    )
                    break
                else:
                    await updater.add_artifact(
                        [Part(root=TextPart(text=item["content"]))], name="conversion_result"
                    )
                    await updater.complete()
                    break
        except Exception as e:
            raise ServerError(error=InternalError()) from e

    async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
        raise ServerError(error=UnsupportedOperationError())

With a2akit, the same logic is just this:

from a2akit import Worker, TaskContext

class CurrencyWorker(Worker):
    async def handle(self, ctx: TaskContext) -> None:
        async for item in my_agent.stream(ctx.user_text, ctx.task_id):
            if item["require_user_input"]:
                await ctx.request_input(item["content"])
            elif item["is_task_complete"]:
                await ctx.complete(item["content"])
            else:
                await ctx.send_status(item["content"])

The difference: a2akit handles task creation, event routing, state machines, error wrapping, SSE streaming, and protocol compliance for you. You write your agent logic — the framework handles the protocol.

Official SDK a2akit
Boilerplate Manage EventQueue, TaskUpdater, TaskState, Part objects manually ctx.complete(), ctx.send_status(), ctx.request_input()
Task lifecycle Create tasks, track state, wire events yourself Automatic — framework manages the full lifecycle
Streaming Manual SSE + event queue plumbing Built-in, one method call
Storage Bring your own Memory, SQLite, PostgreSQL, Redis out of the box
Cancellation Implement yourself Cooperative + force-cancel with timeout
Push notifications Implement yourself Built-in with anti-SSRF and retries
Debug UI None Built-in browser UI at /chat
Middleware Implement yourself Pluggable pipeline (auth, validation, etc.)

Install

pip install a2akit

Quick Start — Echo Agent in 8 Lines

from a2akit import A2AServer, AgentCardConfig, TaskContext, Worker

class EchoWorker(Worker):
    async def handle(self, ctx: TaskContext) -> None:
        await ctx.complete(f"Echo: {ctx.user_text}")

server = A2AServer(
    worker=EchoWorker(),
    agent_card=AgentCardConfig(name="Echo", description="Echoes input back.", version="0.1.0"),
)
app = server.as_fastapi_app(debug=True)
uvicorn my_agent:app --reload
# Agent running at http://localhost:8000
# Debug UI at http://localhost:8000/chat

Quick Start — OpenAI Agent in 15 Lines

from openai import AsyncOpenAI
from a2akit import A2AServer, AgentCardConfig, TaskContext, Worker

client = AsyncOpenAI()

class OpenAIWorker(Worker):
    async def handle(self, ctx: TaskContext) -> None:
        response = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": ctx.user_text}],
        )
        await ctx.complete(response.choices[0].message.content)

server = A2AServer(
    worker=OpenAIWorker(),
    agent_card=AgentCardConfig(name="GPT Agent", description="OpenAI-powered agent.", version="0.1.0"),
)
app = server.as_fastapi_app(debug=True)

Any LLM SDK works — the Worker pattern is framework-agnostic.

Client

from a2akit import A2AClient

async with A2AClient("http://localhost:8000") as client:
    result = await client.send("Hello, agent!")
    print(result.text)

    # Streaming
    async for chunk in client.stream_text("Stream me"):
        print(chunk, end="")

Debug UI

app = server.as_fastapi_app(debug=True)

Open http://localhost:8000/chat — chat with your agent, inspect tasks, and view state transitions in real time.

Debug UI

Architecture

HTTP Request
  |
  v
Middleware chain (auth, validation, content-type)
  |
  v
JSON-RPC / REST endpoint
  |
  v
TaskManager (lifecycle orchestration)
  |
  +---> Storage (persist)     -- Memory | SQLite | PostgreSQL
  +---> Broker (enqueue)      -- Memory | Redis Streams
  +---> EventEmitter (notify) -- Hooks | Tracing | Push
  |
  v
Worker.handle(ctx: TaskContext)   <-- your code here
  |
  +---> ctx.complete(text)        -- finish the task
  +---> ctx.send_status(text)     -- progress update
  +---> ctx.emit_text_artifact()  -- streaming chunks
  +---> ctx.request_input(text)   -- multi-turn
  +---> ctx.fail(text)            -- error
  |
  v
EventBus (fan-out)  -- Memory | Redis Pub/Sub + Streams
  |
  v
SSE stream to client

Extras

pip install a2akit[redis]       # Redis broker, event bus & cancel registry
pip install a2akit[postgres]    # PostgreSQL storage
pip install a2akit[sqlite]      # SQLite storage
pip install a2akit[langgraph]   # LangGraph integration
pip install a2akit[otel]        # OpenTelemetry tracing & metrics

All Features

  • One-liner setupA2AServer wires storage, broker, event bus, and endpoints
  • A2AClient — auto-discovers agents, supports send/stream/cancel/subscribe with retries and transport fallback
  • Streaming — word-by-word artifact streaming via SSE
  • Cancellation — cooperative and force-cancel with timeout fallback
  • Multi-turnrequest_input() / request_auth() for conversational flows
  • Direct replyreply_directly() for simple request/response without task tracking
  • Multi-transport — JSON-RPC and HTTP+JSON simultaneously
  • Middleware pipeline — auth extraction (Bearer, API key), header injection, payload sanitization
  • Push notifications — webhook delivery with anti-SSRF validation and configurable retries
  • Lifecycle hooks — fire-and-forget callbacks on state transitions
  • Dependency injection — shared infrastructure with automatic lifecycle management
  • OpenTelemetry — distributed tracing and metrics with W3C context propagation
  • Pluggable backends — Memory, SQLite, PostgreSQL, Redis
  • Optimistic concurrency control — version-tracked storage updates
  • SSE replayLast-Event-ID based reconnection with gap-fill
  • Debug UI — built-in browser interface for chat + task inspection
  • Type-safe — full type hints, py.typed marker, PEP 561 compliant
  • 20+ examples — echo, streaming, LangGraph, auth, middleware, push, DI, multi-transport, and more

Links

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

a2akit-0.0.32.tar.gz (622.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

a2akit-0.0.32-py3-none-any.whl (210.4 kB view details)

Uploaded Python 3

File details

Details for the file a2akit-0.0.32.tar.gz.

File metadata

  • Download URL: a2akit-0.0.32.tar.gz
  • Upload date:
  • Size: 622.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for a2akit-0.0.32.tar.gz
Algorithm Hash digest
SHA256 afa218b1fad832fcbddabe3c70ea739c0d10356585eadb005c4db8a9562e15f2
MD5 6e4684659c5ab44ba3d3722c6f14382d
BLAKE2b-256 07969c6a1a659b7767740dd341b9b5ac24d7d6083dbda87745c4a12a32b290cb

See more details on using hashes here.

Provenance

The following attestation bundles were made for a2akit-0.0.32.tar.gz:

Publisher: publish.yml on Coding-Crashkurse/a2akit

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file a2akit-0.0.32-py3-none-any.whl.

File metadata

  • Download URL: a2akit-0.0.32-py3-none-any.whl
  • Upload date:
  • Size: 210.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for a2akit-0.0.32-py3-none-any.whl
Algorithm Hash digest
SHA256 160f8b1c0e009f3e795c3e1e695c75a7b55f1728deb6863e031398fd5790468e
MD5 429b41fd105a9eb02979f388853b1256
BLAKE2b-256 8d7111a23e45b7c39cb759710f5293cc7d3225a70bc95663e02c335cb63cd3ae

See more details on using hashes here.

Provenance

The following attestation bundles were made for a2akit-0.0.32-py3-none-any.whl:

Publisher: publish.yml on Coding-Crashkurse/a2akit

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page