Skip to main content

fast_a2a_app — Drop-in A2A server and chat UI for any AI agent

Project description

fast_a2a_app

Drop-in A2A server and chat UI for any FastAPI application that runs ai agents — installable from PyPI.

fast_a2a_app packages the battle-tested A2A protocol adapter and self-contained browser chat UI into a standalone pip-installable library. Get a fully spec-compliant A2A server plus a ready-to-use chat interface in under 20 lines.

pip install fast_a2a_app

Why fast_a2a_app?

Pydantic AI ships its own FastA2A integration, which is excellent if you are already inside the Pydantic AI ecosystem. fast_a2a_app exists for a different set of needs:

  • Mount point, not a framework. fast_a2a_app is a plain Starlette app you mount into an existing FastAPI application at any path prefix. Everything outside that prefix — authentication middleware, custom routes, dependency injection, observability — is yours to own and compose however you like.
  • Framework-agnostic. The library has zero dependency on Pydantic AI. Wire in any agent: raw Anthropic/OpenAI API calls, LangChain, LlamaIndex, or plain Python — as long as it exposes an async (str) -> str function or an async generator.
  • Separation of concerns. Your FastAPI application stays in charge of the HTTP layer (auth, rate limiting, CORS, health checks). fast_a2a_app only handles the A2A protocol inside its mounted prefix, keeping agent logic cleanly decoupled from transport concerns.

We hope this contributes to a composable AI agent architecture where protocol adapters, agent frameworks, and application infrastructure are independent choices.


What's inside

Module What it does
fast_a2a_app.server A2A JSON-RPC server (streaming SSE, multi-turn history, cross-instance cancel)
fast_a2a_app.ui Self-contained browser chat UI — no build step, no npm

Protocol features (via a2a-sdk 1.0.x)

  • SendMessage — single-shot request/response (non-streaming)
  • SendStreamingMessage — streaming SSE responses
  • CancelTask — immediate or cross-replica cancellation
  • SubscribeToTask — reconnect to an in-flight stream after a network blip
  • GetTask — snapshot fallback for page-reload recovery
  • .well-known/agent-card.json — agent discovery

Server features

  • Multi-turn history — every turn is stored in Redis and injected as a "Conversation so far:" prefix, giving the agent continuity without client-side replay
  • Cross-instance cancellation — cancel signals flow through Redis so any replica can stop a task running on another replica
  • Live progress updates — call report_progress("step 2/5…") from any tool and the chat UI spinner updates in real time
  • Lifecycle hookson_task_start / on_task_cancel callbacks for metrics, locks, or state resets

UI features

  • Stream toggle — checkbox in the input bar switches between SendStreamingMessage (tokens arrive live) and SendMessage (full response rendered at once); preference persisted in localStorage. Hidden automatically when the agent card reports capabilities.streaming = false.
  • Data part widget — parts with media_type: application/json are rendered as a labeled key-value table with color-coded value types; no raw JSON brackets shown
  • File part widget — raw binary parts show a type icon, filename, and media type with a "Download" button that creates a temporary Blob URL
  • Page-reload recovery — active task is stored in localStorage and resubscribed on the next load
  • Markdown rendering — agent responses rendered as GitHub-Flavored Markdown with DOMPurify sanitisation
  • Collapsible agent card — name, version, capabilities, and skills pulled from .well-known/agent-card.json

Storage

fast_a2a_app currently uses Redis for all server-side state:

What is stored Key pattern TTL
Task JSON (full A2A task object) a2a:task:{id} 24 h
Conversation index (task_id → sequence) a2a:context:{cid}:tasks 24 h
Cross-instance cancel signal a2a:cancel:{id} 5 min

Start a local Redis instance before running any example:

docker run -d -p 6379:6379 redis:7-alpine

Or point REDIS_URL at any managed Redis-compatible service (Redis Cloud, AWS ElastiCache, Azure Cache for Redis, etc.).

Roadmap — pluggable storage backends (MongoDB, PostgreSQL) are planned. The RedisTaskStore already implements the A2ATaskStore Protocol, so a Mongo or Postgres backend can be swapped in by passing a custom a2a_task_store to build_a2a_app() without any library changes.


Framework-agnostic design

fast_a2a_app has no dependency on any AI framework. build_a2a_app accepts two plain callables that you implement however you like:

Callable Signature
invoke async (prompt: str) -> str or async (prompt: str) -> Artifact
stream_invoke async (prompt: str) -> AsyncIterable[str]

Wrap them with the two helpers and pass to build_a2a_app:

invoke=build_invoke(my_async_fn)
stream_invoke=build_stream_invoke(my_async_generator_fn)

build_invoke accepts both plain-text and multi-part agents — return a str for a single text response, or return an Artifact to send text, structured data, and file parts together in one response.

build_stream_invoke automatically sets up the report_progress() ContextVar, so any code called during streaming can push live status updates to the chat UI — regardless of which framework (or none) your agent uses.

You can also implement invoke and stream_invoke directly as bare callables and pass them straight to build_a2a_app — no wrapper needed.


Quickstart

1. Install

pip install fast_a2a_app

2. Implement your agent

Any async (str) -> str function works as the non-streaming invoke. Any async (str) -> AsyncIterable[str] generator works for streaming.

# agent.py — Azure OpenAI chat completions, no framework needed
import os
from collections.abc import AsyncIterable
from openai import AsyncOpenAI

AZURE_AI_ENDPOINT = os.environ.get("AZURE_AI_ENDPOINT", "")
AZURE_AI_DEPLOYMENT = os.environ.get("AZURE_AI_DEPLOYMENT", "gpt-4o")
AZURE_AI_KEY = os.environ.get("AZURE_AI_KEY", "")
AZURE_AI_BASE_URL = os.environ.get(
    "AZURE_AI_BASE_URL",
    f"{AZURE_AI_ENDPOINT.rstrip('/')}/" if AZURE_AI_ENDPOINT else "",
)

if AZURE_AI_KEY:
    api_key = AZURE_AI_KEY
else:
    from azure.identity import AzureCliCredential, get_bearer_token_provider
    _sync_provider = get_bearer_token_provider(
        AzureCliCredential(), "https://ai.azure.com/.default"
    )
    async def api_key():
        return _sync_provider()

client = AsyncOpenAI(base_url=AZURE_AI_BASE_URL, api_key=api_key)

async def invoke(prompt: str) -> str:
    resp = await client.chat.completions.create(
        model=AZURE_AI_DEPLOYMENT, max_completion_tokens=1024,
        messages=[{"role": "user", "content": prompt}],
    )
    return (resp.choices[0].message.content or "").strip()

async def stream_invoke(prompt: str) -> AsyncIterable[str]:
    stream = await client.chat.completions.create(
        model=AZURE_AI_DEPLOYMENT, max_completion_tokens=1024,
        messages=[{"role": "user", "content": prompt}],
        stream=True,
    )
    async for chunk in stream:
        if not chunk.choices:
            continue
        text = chunk.choices[0].delta.content or ""
        if text:
            yield text

3. Wire up the server

# main.py
from fastapi import FastAPI
from a2a.types import AgentCapabilities, AgentCard, AgentInterface
from fast_a2a_app import a2a_ui, build_a2a_app, build_invoke, build_stream_invoke
from agent import invoke, stream_invoke

app = FastAPI()

agent_card = AgentCard(
    name="My Agent",
    description="Does cool things",
    version="1.0.0",
    supported_interfaces=[AgentInterface(url="http://localhost:8000/a2a/", protocol_binding="JSONRPC")],
    capabilities=AgentCapabilities(streaming=True),
    default_input_modes=["text"],
    default_output_modes=["text"],
)

app.mount("/a2a", build_a2a_app(
    agent_card=agent_card,
    invoke=build_invoke(invoke),
    stream_invoke=build_stream_invoke(stream_invoke),
))

app.mount("/", a2a_ui)        # built-in chat UI at http://localhost:8000/

4. Run

# Start Redis (required for conversation history)
docker run -d -p 6379:6379 redis:7-alpine

# Run the app
uvicorn main:app --reload

Open http://localhost:8000/ — you're chatting.


API reference

build_a2a_app(...)

Assembles a Starlette ASGI app. Mount it at any path prefix.

Parameter Type Default Description
agent_card AgentCard required Pre-built A2A agent card (name, description, version, url, skills, capabilities)
invoke Callable required Non-streaming callable — use build_invoke() to wrap
stream_invoke Callable | None None Streaming callable — use build_stream_invoke() to wrap
prompt_builder Callable | None conversation history prefix Custom prompt assembly function
on_task_start Callable[[str], Awaitable] | None None Called before each task
on_task_cancel Callable[[str], Awaitable] | None None Called on cancel
a2a_task_store A2ATaskStore | None auto Custom task store
redis_client aioredis.Redis | None auto Custom Redis client
redis_url str "redis://localhost:6379" Redis connection string
debug bool False Include exception details in failure messages

build_invoke(run)

Wraps any async (prompt: str) -> str | Artifact function as a non-streaming A2A invoke. Works with any AI framework or plain API call. Return a plain str for a text response, or return an Artifact to send multiple parts (text, JSON data, files) in one response:

from a2a.types import Artifact, Part
import json, uuid

async def my_agent(prompt: str) -> Artifact:
    return Artifact(
        artifact_id=str(uuid.uuid4()),
        name="result",
        parts=[
            Part(text=f"Here is your data for: {prompt}"),
            Part(raw=json.dumps({"count": 42}).encode(), media_type="application/json"),
            Part(raw=b"file content", filename="out.txt", media_type="text/plain"),
        ],
    )

app.mount("/a2a", build_a2a_app(agent_card=card, invoke=build_invoke(my_agent)))

build_stream_invoke(run)

Wraps any async (prompt: str) -> AsyncIterable[str] generator as a streaming A2A invoke. Also sets up the report_progress() ContextVar so live progress updates work out of the box — call report_progress("step 2/5…") anywhere during execution and it will appear as a working-status event in the chat UI.

report_progress(message)

Call from any agent tool to push a status string to the chat UI spinner. Has no effect outside a streaming context (safe to call unconditionally).

@agent.tool_plain
async def long_computation(n: int) -> str:
    report_progress(f"Computing step 1/{n}…")
    # …
    report_progress(f"Computing step 2/{n}…")
    return result

MultiPartAgentExecutor

Low-level AgentExecutor that accepts async (str) -> Artifact directly. Useful when you need full control over executor setup and bypass build_a2a_app. For most agents, using build_invoke with an Artifact-returning function is simpler.

from fast_a2a_app import MultiPartAgentExecutor
executor = MultiPartAgentExecutor(my_artifact_fn)

a2a_ui

A Starlette ASGI app serving a self-contained single-page chat interface. No build step, no npm. Mount it at "/" to serve the UI.

app.mount("/", a2a_ui)

The UI reads the agent card from /a2a/.well-known/agent-card.json to populate the header name and the collapsible info panel.


Example: Holiday Planner

examples/holiday_planner/ is a complete example showing how to build a domain-specific agent on fast_a2a_app.

examples/holiday_planner/
├── agent.py          # pydantic-ai agent with 4 tools
├── main.py           # FastAPI app + fast_a2a_app wiring
└── requirements.txt

Running the example

# One-time: create your .env from the shared template
cp examples/.env.example examples/.env
# edit examples/.env — set AZURE_AI_ENDPOINT and AZURE_AI_DEPLOYMENT

cd examples/holiday_planner
pip install -e ../../          # install fast_a2a_app from repo root
pip install -r requirements.txt

docker run -d -p 6379:6379 redis:7-alpine

uvicorn main:app --reload

Open http://localhost:8000/ and ask:

"I want to plan a 10-day trip somewhere in Southeast Asia in September, moderate budget, interested in food, temples, and nature. Can you help?"

The agent will ask follow-up questions, then use its tools to recommend destinations, build a day-by-day itinerary, estimate costs, and provide travel essentials — all with live progress updates in the UI.

Holiday planner tools

Tool Description
recommend_destinations 2-3 tailored destination suggestions with pros/cons
create_itinerary Day-by-day plan with restaurants and local tips
estimate_budget Cost breakdown table per person per day
get_travel_essentials Visa, health, weather, and packing guide

Example: Echo Agent (no LLM, no external dependencies)

examples/echo_agent/ is the minimal fast_a2a_app integration — pure Python, no API key, no AI framework.

examples/echo_agent/
├── agent.py          # Two plain async functions, zero external imports
├── main.py           # FastAPI app
└── requirements.txt  # fast_a2a_app only
# agent.py
async def invoke(prompt: str) -> str:
    return f"Echo: {prompt}"

async def stream_invoke(prompt: str) -> AsyncIterable[str]:
    words = f"Echo: {prompt}".split()
    for i, word in enumerate(words):
        yield word if i == len(words) - 1 else word + " "
        await asyncio.sleep(0.05)   # makes streaming visible in the UI

Running the echo agent

cd examples/echo_agent
pip install -e ../../
pip install -r requirements.txt

docker run -d -p 6379:6379 redis:7-alpine
uvicorn main:app --reload

No .env needed — the echo agent requires no API key. A REDIS_URL can be set via examples/.env if you need a non-default Redis address.

No API key needed. Open http://localhost:8000/ and type anything.


Example: Joke Agent (raw chat completions, no agent framework)

examples/joke_agent/ shows fast_a2a_app wired to plain Azure OpenAI chat completions — no agent framework at all.

examples/joke_agent/
├── agent.py          # Two plain async functions: run_joke_agent + stream_joke_agent
├── main.py           # FastAPI app using build_invoke / build_stream_invoke
└── requirements.txt

agent.py defines two callables that satisfy the fast_a2a_app contract:

# Non-streaming: async (str) -> str
async def run_joke_agent(prompt: str) -> str:
    response = await client.chat.completions.create(model=..., messages=[...])
    return (response.choices[0].message.content or "").strip()

# Streaming: async (str) -> AsyncIterable[str]
async def stream_joke_agent(prompt: str) -> AsyncIterable[str]:
    stream = await client.chat.completions.create(model=..., messages=[...], stream=True)
    async for chunk in stream:
        if not chunk.choices:
            continue
        text = chunk.choices[0].delta.content or ""
        if text:
            yield text

main.py wires them in with the helpers:

from a2a.types import AgentCapabilities, AgentCard, AgentInterface, AgentSkill
from fast_a2a_app import build_a2a_app, build_invoke, build_stream_invoke, a2a_ui
from agent import run_joke_agent, stream_joke_agent

agent_card = AgentCard(
    name="Joke Agent",
    description="Your AI stand-up comedian.",
    version="0.1.0",
    supported_interfaces=[AgentInterface(url="http://localhost:8000/a2a/", protocol_binding="JSONRPC")],
    capabilities=AgentCapabilities(streaming=True),
    default_input_modes=["text"],
    default_output_modes=["text"],
    skills=[AgentSkill(id="tell_joke", name="Tell a joke", description="Tells a joke on any topic.", tags=[])],
)

app.mount("/a2a", build_a2a_app(
    agent_card=agent_card,
    invoke=build_invoke(run_joke_agent),
    stream_invoke=build_stream_invoke(stream_joke_agent),
))
app.mount("/", a2a_ui)

Running the joke agent

# One-time: create your .env from the shared template
cp examples/.env.example examples/.env
# edit examples/.env — set AZURE_AI_ENDPOINT and AZURE_AI_DEPLOYMENT

cd examples/joke_agent
pip install -e ../../
pip install -r requirements.txt

docker run -d -p 6379:6379 redis:7-alpine
uvicorn main:app --reload

Open http://localhost:8000/ and try:

"Tell me a programming joke" or "Give me your best dad joke"

Tokens stream directly from the Azure OpenAI API to the browser as they arrive.


Example: Echo Multipart (multi-part responses, no LLM)

examples/echo_multipart/ demonstrates returning text, structured data, and a downloadable file in a single A2A response — no LLM, no API key, no protobuf boilerplate.

examples/echo_multipart/
├── agent.py          # Returns Artifact with 3 parts using only json.dumps
├── main.py           # FastAPI app wired with build_invoke
└── requirements.txt
# agent.py
import json, uuid
from a2a.types import Artifact, Part

async def invoke(prompt: str) -> Artifact:
    words = prompt.split()
    return Artifact(
        artifact_id=str(uuid.uuid4()),
        name="result",
        parts=[
            Part(text=f"Echo: {prompt}"),
            Part(
                raw=json.dumps({"original": prompt, "word_count": len(words)}).encode(),
                media_type="application/json",
            ),
            Part(raw=f"Echo: {prompt}\n".encode(), filename="echo.txt", media_type="text/plain"),
        ],
    )

The UI renders the three parts as: a markdown bubble, a key-value data table, and a file download card.

Running the echo multipart agent

cd examples/echo_multipart
pip install -e ../../
pip install -r requirements.txt

docker run -d -p 6379:6379 redis:7-alpine
uvicorn main:app --reload

Architecture

FastAPI app
├── /a2a    ← Starlette ASGI app (build_a2a_app)
│   ├── POST /            SendMessage, SendStreamingMessage, CancelTask, …
│   └── GET  /.well-known/agent-card.json
└── /       ← a2a_ui (Starlette, single HTML file)

Redis
├── a2a:task:{id}                 Task JSON (24 h TTL)
├── a2a:context:{cid}:tasks       Context index (task_id → sequence)
├── a2a:context:{cid}:sequence    Sequence counter
└── a2a:cancel:{id}               Cancel signal (5 min TTL)

Conversation history injection

Each A2A task has a context_id shared across all turns of a conversation. ContextAwareRequestContextBuilder fetches all prior tasks for the same context_id from Redis, extracts the last 12 lines of dialogue, and prepends them as "Conversation so far:\n…" to each new prompt.

The agent therefore sees recent history without the client needing to replay it — multi-turn conversation just works.

How streaming works

build_stream_invoke wraps your generator in an asyncio.Queue-based relay. Before starting the generator it sets a ContextVar callback that report_progress() reads. Strings from report_progress() are placed in the queue with a sentinel prefix; ConfigurableAgentExecutor routes them to non-final statusUpdate (state TASK_STATE_WORKING) SSE events. All other yielded strings become artifactUpdate events — the streaming text the user sees.


Publishing to PyPI

pip install hatch
hatch build
hatch publish

Or with twine:

pip install build twine
python -m build
twine upload dist/*

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fast_a2a_app-0.3.0.tar.gz (39.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fast_a2a_app-0.3.0-py3-none-any.whl (32.4 kB view details)

Uploaded Python 3

File details

Details for the file fast_a2a_app-0.3.0.tar.gz.

File metadata

  • Download URL: fast_a2a_app-0.3.0.tar.gz
  • Upload date:
  • Size: 39.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: Hatch/1.16.5 cpython/3.12.4 HTTPX/0.28.1

File hashes

Hashes for fast_a2a_app-0.3.0.tar.gz
Algorithm Hash digest
SHA256 c283ea81e9dd9610d4165dad5834cca5829ad43ced8d3a18b286f5a0d72664bf
MD5 635d9617e6087c8165afd7b6122cf5f1
BLAKE2b-256 235777373134fb0a5050c02d31ea09d42e43e254234fafaa87f53ef9fcfa1029

See more details on using hashes here.

File details

Details for the file fast_a2a_app-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: fast_a2a_app-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 32.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: Hatch/1.16.5 cpython/3.12.4 HTTPX/0.28.1

File hashes

Hashes for fast_a2a_app-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 59e887f152c1b3311637b0bc0b0bd4155982f2da47d29df3c7398434701097e5
MD5 08c6c654061c0d76223605dbec715b07
BLAKE2b-256 bcb6a48c788bdd6ab85fa73e1228a94c4e04dba9ef0724291d3b74eaa841d025

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page