fast_a2a_app — Drop-in A2A server and chat UI for any AI agent
Project description
fast_a2a_app
Drop-in A2A server and chat UI for any FastAPI application that runs ai agents — installable from PyPI.
fast_a2a_app packages the battle-tested A2A protocol adapter and self-contained browser chat UI into a standalone pip-installable library. Get a fully spec-compliant A2A server plus a ready-to-use chat interface in under 20 lines.
pip install fast_a2a_app
Why fast_a2a_app?
Pydantic AI ships its own FastA2A integration, which is excellent if you are already inside the Pydantic AI ecosystem. fast_a2a_app exists for a different set of needs:
- Mount point, not a framework. fast_a2a_app is a plain Starlette app you mount into an existing FastAPI application at any path prefix. Everything outside that prefix — authentication middleware, custom routes, dependency injection, observability — is yours to own and compose however you like.
- Framework-agnostic. The library has zero dependency on Pydantic AI. Wire in any agent: raw Anthropic/OpenAI API calls, LangChain, LlamaIndex, or plain Python — as long as it exposes an
async (str) -> strfunction or an async generator. - Separation of concerns. Your FastAPI application stays in charge of the HTTP layer (auth, rate limiting, CORS, health checks). fast_a2a_app only handles the A2A protocol inside its mounted prefix, keeping agent logic cleanly decoupled from transport concerns.
We hope this contributes to a composable AI agent architecture where protocol adapters, agent frameworks, and application infrastructure are independent choices.
What's inside
| Module | What it does |
|---|---|
fast_a2a_app.server |
A2A JSON-RPC server (streaming SSE, multi-turn history, cross-instance cancel) |
fast_a2a_app.ui |
Self-contained browser chat UI — no build step, no npm |
Protocol features (via a2a-sdk)
message/stream— streaming SSE responsestasks/cancel— immediate or cross-replica cancellationtasks/resubscribe— reconnect to an in-flight stream after a network bliptasks/get— snapshot fallback for page-reload recovery.well-known/agent-card.json— agent discovery
Server features
- Multi-turn history — every turn is stored in Redis and injected as a "Conversation so far:" prefix, giving the agent continuity without client-side replay
- Cross-instance cancellation — cancel signals flow through Redis so any replica can stop a task running on another replica
- Live progress updates — call
report_progress("step 2/5…")from any tool and the chat UI spinner updates in real time - Lifecycle hooks —
on_task_start/on_task_cancelcallbacks for metrics, locks, or state resets
Storage
fast_a2a_app currently uses Redis for all server-side state:
| What is stored | Key pattern | TTL |
|---|---|---|
| Task JSON (full A2A task object) | a2a:task:{id} |
24 h |
| Conversation index (task_id → sequence) | a2a:context:{cid}:tasks |
24 h |
| Cross-instance cancel signal | a2a:cancel:{id} |
5 min |
Start a local Redis instance before running any example:
docker run -d -p 6379:6379 redis:7-alpine
Or point REDIS_URL at any managed Redis-compatible service (Redis Cloud, AWS ElastiCache, Azure Cache for Redis, etc.).
Roadmap — pluggable storage backends (MongoDB, PostgreSQL) are planned. The
RedisTaskStorealready implements theA2ATaskStoreProtocol, so a Mongo or Postgres backend can be swapped in by passing a customa2a_task_storetobuild_a2a_app()without any library changes.
Framework-agnostic design
fast_a2a_app has no dependency on any AI framework. build_a2a_app accepts two
plain callables that you implement however you like:
| Callable | Signature |
|---|---|
invoke |
async (prompt: str) -> str |
stream_invoke |
async (prompt: str) -> AsyncIterable[str] |
Wrap them with the two helpers and pass to build_a2a_app:
invoke=build_invoke(my_async_fn)
stream_invoke=build_stream_invoke(my_async_generator_fn)
build_stream_invoke automatically sets up the report_progress() ContextVar,
so any code called during streaming can push live status updates to the chat UI —
regardless of which framework (or none) your agent uses.
You can also implement invoke and stream_invoke directly as bare callables
and pass them straight to build_a2a_app — no wrapper needed.
Quickstart
1. Install
pip install fast_a2a_app
2. Implement your agent
Any async (str) -> str function works as the non-streaming invoke.
Any async (str) -> AsyncIterable[str] generator works for streaming.
# agent.py — Azure OpenAI chat completions, no framework needed
import os
from collections.abc import AsyncIterable
from openai import AsyncOpenAI
AZURE_AI_ENDPOINT = os.environ.get("AZURE_AI_ENDPOINT", "")
AZURE_AI_DEPLOYMENT = os.environ.get("AZURE_AI_DEPLOYMENT", "gpt-4o")
AZURE_AI_KEY = os.environ.get("AZURE_AI_KEY", "")
AZURE_AI_BASE_URL = os.environ.get(
"AZURE_AI_BASE_URL",
f"{AZURE_AI_ENDPOINT.rstrip('/')}/" if AZURE_AI_ENDPOINT else "",
)
if AZURE_AI_KEY:
api_key = AZURE_AI_KEY
else:
from azure.identity import AzureCliCredential, get_bearer_token_provider
api_key = get_bearer_token_provider(
AzureCliCredential(), "https://ai.azure.com/.default"
)
client = AsyncOpenAI(base_url=AZURE_AI_BASE_URL, api_key=api_key)
async def invoke(prompt: str) -> str:
resp = await client.chat.completions.create(
model=AZURE_AI_DEPLOYMENT, max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
)
return (resp.choices[0].message.content or "").strip()
async def stream_invoke(prompt: str) -> AsyncIterable[str]:
stream = await client.chat.completions.create(
model=AZURE_AI_DEPLOYMENT, max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
stream=True,
)
async for chunk in stream:
text = chunk.choices[0].delta.content or ""
if text:
yield text
3. Wire up the server
# main.py
from fastapi import FastAPI
from a2a.types import AgentCapabilities, AgentCard
from fast_a2a_app import a2a_ui, build_a2a_app, build_invoke, build_stream_invoke
from agent import invoke, stream_invoke
app = FastAPI()
agent_card = AgentCard(
name="My Agent",
description="Does cool things",
version="1.0.0",
url="http://localhost:8000/a2a/",
capabilities=AgentCapabilities(streaming=True),
default_input_modes=["text"],
default_output_modes=["text"],
)
app.mount("/a2a", build_a2a_app(
agent_card=agent_card,
invoke=build_invoke(invoke),
stream_invoke=build_stream_invoke(stream_invoke),
))
app.mount("/", a2a_ui) # built-in chat UI at http://localhost:8000/
4. Run
# Start Redis (required for conversation history)
docker run -d -p 6379:6379 redis:7-alpine
# Run the app
uvicorn main:app --reload
Open http://localhost:8000/ — you're chatting.
API reference
build_a2a_app(...)
Assembles a Starlette ASGI app. Mount it at any path prefix.
| Parameter | Type | Default | Description |
|---|---|---|---|
agent_card |
AgentCard |
required | Pre-built A2A agent card (name, description, version, url, skills, capabilities) |
invoke |
Callable |
required | Non-streaming callable — use build_invoke() to wrap |
stream_invoke |
Callable | None |
None |
Streaming callable — use build_stream_invoke() to wrap |
prompt_builder |
Callable | None |
history+cid prefix | Custom prompt assembly function |
on_task_start |
Callable[[str], Awaitable] | None |
None |
Called before each task |
on_task_cancel |
Callable[[str], Awaitable] | None |
None |
Called on cancel |
a2a_task_store |
A2ATaskStore | None |
auto | Custom task store |
redis_client |
aioredis.Redis | None |
auto | Custom Redis client |
redis_url |
str |
"redis://localhost:6379" |
Redis connection string |
debug |
bool |
False |
Include exception details in failure messages |
build_invoke(run)
Wraps any async (prompt: str) -> str function as a non-streaming A2A invoke.
Works with any AI framework or plain API call.
build_stream_invoke(run)
Wraps any async (prompt: str) -> AsyncIterable[str] generator as a streaming A2A invoke.
Also sets up the report_progress() ContextVar so live progress updates work
out of the box — call report_progress("step 2/5…") anywhere during execution
and it will appear as a working-status event in the chat UI.
report_progress(message)
Call from any agent tool to push a status string to the chat UI spinner. Has no effect outside a streaming context (safe to call unconditionally).
@agent.tool_plain
async def long_computation(n: int) -> str:
report_progress(f"Computing step 1/{n}…")
# …
report_progress(f"Computing step 2/{n}…")
return result
a2a_ui
A Starlette ASGI app serving a self-contained single-page chat interface.
No build step, no npm. Mount it at "/" to serve the UI.
app.mount("/", a2a_ui)
The UI reads the agent card from /a2a/.well-known/agent-card.json to populate
the header name and the collapsible info panel.
Example: Holiday Planner
examples/holiday_planner/ is a complete example showing how to build a
domain-specific agent on fast_a2a_app.
examples/holiday_planner/
├── agent.py # pydantic-ai agent with 4 tools
├── main.py # FastAPI app + fast_a2a_app wiring
└── requirements.txt
Running the example
# One-time: create your .env from the shared template
cp examples/.env.example examples/.env
# edit examples/.env — set AZURE_AI_ENDPOINT and AZURE_AI_DEPLOYMENT
cd examples/holiday_planner
pip install -e ../../ # install fast_a2a_app from repo root
pip install -r requirements.txt
docker run -d -p 6379:6379 redis:7-alpine
uvicorn main:app --reload
Open http://localhost:8000/ and ask:
"I want to plan a 10-day trip somewhere in Southeast Asia in September, moderate budget, interested in food, temples, and nature. Can you help?"
The agent will ask follow-up questions, then use its tools to recommend destinations, build a day-by-day itinerary, estimate costs, and provide travel essentials — all with live progress updates in the UI.
Holiday planner tools
| Tool | Description |
|---|---|
recommend_destinations |
2-3 tailored destination suggestions with pros/cons |
create_itinerary |
Day-by-day plan with restaurants and local tips |
estimate_budget |
Cost breakdown table per person per day |
get_travel_essentials |
Visa, health, weather, and packing guide |
Example: Echo Agent (no LLM, no external dependencies)
examples/echo_agent/ is the minimal fast_a2a_app integration — pure Python, no API key, no AI framework.
examples/echo_agent/
├── agent.py # Two plain async functions, zero external imports
├── main.py # FastAPI app
└── requirements.txt # fast_a2a_app only
# agent.py
async def invoke(prompt: str) -> str:
return f"Echo: {prompt}"
async def stream_invoke(prompt: str) -> AsyncIterable[str]:
for i, word in enumerate(f"Echo: {prompt}".split()):
yield word if i == len(words) - 1 else word + " "
await asyncio.sleep(0.05) # makes streaming visible in the UI
Running the echo agent
cd examples/echo_agent
pip install -e ../../
pip install -r requirements.txt
docker run -d -p 6379:6379 redis:7-alpine
uvicorn main:app --reload
No
.envneeded — the echo agent requires no API key. AREDIS_URLcan be set viaexamples/.envif you need a non-default Redis address.
No API key needed. Open http://localhost:8000/ and type anything.
Example: Joke Agent (raw chat completions, no agent framework)
examples/joke_agent/ shows fast_a2a_app wired to plain Azure OpenAI chat completions — no agent framework at all.
examples/joke_agent/
├── agent.py # Two plain async functions: run_joke_agent + stream_joke_agent
├── main.py # FastAPI app using build_invoke / build_stream_invoke
└── requirements.txt
agent.py defines two callables that satisfy the fast_a2a_app contract:
# Non-streaming: async (str) -> str
async def run_joke_agent(prompt: str) -> str:
response = await client.chat.completions.create(model=..., messages=[...])
return (response.choices[0].message.content or "").strip()
# Streaming: async (str) -> AsyncIterable[str]
async def stream_joke_agent(prompt: str) -> AsyncIterable[str]:
stream = await client.chat.completions.create(model=..., messages=[...], stream=True)
async for chunk in stream:
text = chunk.choices[0].delta.content or ""
if text:
yield text
main.py wires them in with the helpers:
from a2a.types import AgentCapabilities, AgentCard, AgentSkill
from fast_a2a_app import build_a2a_app, build_invoke, build_stream_invoke, a2a_ui
from agent import run_joke_agent, stream_joke_agent
agent_card = AgentCard(
name="Joke Agent",
description="Your AI stand-up comedian.",
version="0.1.0",
url="http://localhost:8000/a2a/",
capabilities=AgentCapabilities(streaming=True),
default_input_modes=["text"],
default_output_modes=["text"],
skills=[AgentSkill(id="tell_joke", name="Tell a joke", description="Tells a joke on any topic.")],
)
app.mount("/a2a", build_a2a_app(
agent_card=agent_card,
invoke=build_invoke(run_joke_agent),
stream_invoke=build_stream_invoke(stream_joke_agent),
))
app.mount("/", a2a_ui)
Running the joke agent
# One-time: create your .env from the shared template
cp examples/.env.example examples/.env
# edit examples/.env — set AZURE_AI_ENDPOINT and AZURE_AI_DEPLOYMENT
cd examples/joke_agent
pip install -e ../../
pip install -r requirements.txt
docker run -d -p 6379:6379 redis:7-alpine
uvicorn main:app --reload
Open http://localhost:8000/ and try:
"Tell me a programming joke" or "Give me your best dad joke"
Tokens stream directly from the Azure OpenAI API to the browser as they arrive.
Architecture
FastAPI app
├── /a2a ← Starlette ASGI app (build_a2a_app)
│ ├── POST / message/stream, tasks/cancel, …
│ └── GET /.well-known/agent-card.json
└── / ← a2a_ui (Starlette, single HTML file)
Redis
├── a2a:task:{id} Task JSON (24 h TTL)
├── a2a:context:{cid}:tasks Context index (task_id → sequence)
├── a2a:context:{cid}:sequence Sequence counter
└── a2a:cancel:{id} Cancel signal (5 min TTL)
Conversation history injection
Each A2A task has a context_id shared across all turns of a conversation.
ContextAwareRequestContextBuilder fetches all prior tasks for the same
context_id from Redis, extracts the last 12 lines of dialogue, and
prepends them as "Conversation so far:\n…" to each new prompt.
The agent therefore sees recent history without the client needing to replay it — multi-turn conversation just works.
How streaming works
build_stream_invoke wraps your generator in an asyncio.Queue-based
relay. Before starting the generator it sets a ContextVar callback that
report_progress() reads. Strings from report_progress() are placed in
the queue with a sentinel prefix; ConfigurableAgentExecutor routes them
to non-final working SSE events. All other yielded strings become
artifact-update events — the streaming text the user sees.
Publishing to PyPI
pip install hatch
hatch build
hatch publish
Or with twine:
pip install build twine
python -m build
twine upload dist/*
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fast_a2a_app-0.1.0.tar.gz.
File metadata
- Download URL: fast_a2a_app-0.1.0.tar.gz
- Upload date:
- Size: 35.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: Hatch/1.16.5 cpython/3.12.4 HTTPX/0.28.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
64f8e771330b297526118f4e448f536306e13dfbabb37c4e5fcfb80d1100a58a
|
|
| MD5 |
7e0c048e27db8a85aada0b15244786dc
|
|
| BLAKE2b-256 |
ae20cdb591d93d5030f1b75661af10288b54408af726618906104c86e5f0a6a4
|
File details
Details for the file fast_a2a_app-0.1.0-py3-none-any.whl.
File metadata
- Download URL: fast_a2a_app-0.1.0-py3-none-any.whl
- Upload date:
- Size: 31.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: Hatch/1.16.5 cpython/3.12.4 HTTPX/0.28.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e18d138f6f54ca2537fea4819696a792dfc31b2107e2ecd0d36a4f78d693cfbe
|
|
| MD5 |
a41f6c86081b21d6971806a4c4fecaa4
|
|
| BLAKE2b-256 |
8ad1295138c81224324ad4db91d08132bbcc85a5b9cf95578711bac3042e4e63
|