Skip to main content

fast_a2a_app — Drop-in A2A server and chat UI for any AI agent

Project description

fast_a2a_app

Drop-in A2A server and chat UI for any FastAPI application that runs ai agents — installable from PyPI.

fast_a2a_app packages the battle-tested A2A protocol adapter and self-contained browser chat UI into a standalone pip-installable library. Get a fully spec-compliant A2A server plus a ready-to-use chat interface in under 20 lines.

pip install fast_a2a_app

Why fast_a2a_app?

Pydantic AI ships its own FastA2A integration, which is excellent if you are already inside the Pydantic AI ecosystem. fast_a2a_app exists for a different set of needs:

  • Mount point, not a framework. fast_a2a_app is a plain Starlette app you mount into an existing FastAPI application at any path prefix. Everything outside that prefix — authentication middleware, custom routes, dependency injection, observability — is yours to own and compose however you like.
  • Framework-agnostic. The library has zero dependency on Pydantic AI. Wire in any agent: raw Anthropic/OpenAI API calls, LangChain, LlamaIndex, or plain Python — as long as it exposes an async (str) -> str function or an async generator.
  • Separation of concerns. Your FastAPI application stays in charge of the HTTP layer (auth, rate limiting, CORS, health checks). fast_a2a_app only handles the A2A protocol inside its mounted prefix, keeping agent logic cleanly decoupled from transport concerns.

We hope this contributes to a composable AI agent architecture where protocol adapters, agent frameworks, and application infrastructure are independent choices.


What's inside

Module What it does
fast_a2a_app.server A2A JSON-RPC server (streaming SSE, multi-turn history, cross-instance cancel)
fast_a2a_app.ui Self-contained browser chat UI — no build step, no npm

Protocol features (via a2a-sdk)

  • message/stream — streaming SSE responses
  • tasks/cancel — immediate or cross-replica cancellation
  • tasks/resubscribe — reconnect to an in-flight stream after a network blip
  • tasks/get — snapshot fallback for page-reload recovery
  • .well-known/agent-card.json — agent discovery

Server features

  • Multi-turn history — every turn is stored in Redis and injected as a "Conversation so far:" prefix, giving the agent continuity without client-side replay
  • Cross-instance cancellation — cancel signals flow through Redis so any replica can stop a task running on another replica
  • Live progress updates — call report_progress("step 2/5…") from any tool and the chat UI spinner updates in real time
  • Lifecycle hookson_task_start / on_task_cancel callbacks for metrics, locks, or state resets

Storage

fast_a2a_app currently uses Redis for all server-side state:

What is stored Key pattern TTL
Task JSON (full A2A task object) a2a:task:{id} 24 h
Conversation index (task_id → sequence) a2a:context:{cid}:tasks 24 h
Cross-instance cancel signal a2a:cancel:{id} 5 min

Start a local Redis instance before running any example:

docker run -d -p 6379:6379 redis:7-alpine

Or point REDIS_URL at any managed Redis-compatible service (Redis Cloud, AWS ElastiCache, Azure Cache for Redis, etc.).

Roadmap — pluggable storage backends (MongoDB, PostgreSQL) are planned. The RedisTaskStore already implements the A2ATaskStore Protocol, so a Mongo or Postgres backend can be swapped in by passing a custom a2a_task_store to build_a2a_app() without any library changes.


Framework-agnostic design

fast_a2a_app has no dependency on any AI framework. build_a2a_app accepts two plain callables that you implement however you like:

Callable Signature
invoke async (prompt: str) -> str
stream_invoke async (prompt: str) -> AsyncIterable[str]

Wrap them with the two helpers and pass to build_a2a_app:

invoke=build_invoke(my_async_fn)
stream_invoke=build_stream_invoke(my_async_generator_fn)

build_stream_invoke automatically sets up the report_progress() ContextVar, so any code called during streaming can push live status updates to the chat UI — regardless of which framework (or none) your agent uses.

You can also implement invoke and stream_invoke directly as bare callables and pass them straight to build_a2a_app — no wrapper needed.


Quickstart

1. Install

pip install fast_a2a_app

2. Implement your agent

Any async (str) -> str function works as the non-streaming invoke. Any async (str) -> AsyncIterable[str] generator works for streaming.

# agent.py — Azure OpenAI chat completions, no framework needed
import os
from collections.abc import AsyncIterable
from openai import AsyncOpenAI

AZURE_AI_ENDPOINT = os.environ.get("AZURE_AI_ENDPOINT", "")
AZURE_AI_DEPLOYMENT = os.environ.get("AZURE_AI_DEPLOYMENT", "gpt-4o")
AZURE_AI_KEY = os.environ.get("AZURE_AI_KEY", "")
AZURE_AI_BASE_URL = os.environ.get(
    "AZURE_AI_BASE_URL",
    f"{AZURE_AI_ENDPOINT.rstrip('/')}/" if AZURE_AI_ENDPOINT else "",
)

if AZURE_AI_KEY:
    api_key = AZURE_AI_KEY
else:
    from azure.identity import AzureCliCredential, get_bearer_token_provider
    api_key = get_bearer_token_provider(
        AzureCliCredential(), "https://ai.azure.com/.default"
    )

client = AsyncOpenAI(base_url=AZURE_AI_BASE_URL, api_key=api_key)

async def invoke(prompt: str) -> str:
    resp = await client.chat.completions.create(
        model=AZURE_AI_DEPLOYMENT, max_tokens=1024,
        messages=[{"role": "user", "content": prompt}],
    )
    return (resp.choices[0].message.content or "").strip()

async def stream_invoke(prompt: str) -> AsyncIterable[str]:
    stream = await client.chat.completions.create(
        model=AZURE_AI_DEPLOYMENT, max_tokens=1024,
        messages=[{"role": "user", "content": prompt}],
        stream=True,
    )
    async for chunk in stream:
        text = chunk.choices[0].delta.content or ""
        if text:
            yield text

3. Wire up the server

# main.py
from fastapi import FastAPI
from a2a.types import AgentCapabilities, AgentCard
from fast_a2a_app import a2a_ui, build_a2a_app, build_invoke, build_stream_invoke
from agent import invoke, stream_invoke

app = FastAPI()

agent_card = AgentCard(
    name="My Agent",
    description="Does cool things",
    version="1.0.0",
    url="http://localhost:8000/a2a/",
    capabilities=AgentCapabilities(streaming=True),
    default_input_modes=["text"],
    default_output_modes=["text"],
)

app.mount("/a2a", build_a2a_app(
    agent_card=agent_card,
    invoke=build_invoke(invoke),
    stream_invoke=build_stream_invoke(stream_invoke),
))

app.mount("/", a2a_ui)        # built-in chat UI at http://localhost:8000/

4. Run

# Start Redis (required for conversation history)
docker run -d -p 6379:6379 redis:7-alpine

# Run the app
uvicorn main:app --reload

Open http://localhost:8000/ — you're chatting.


API reference

build_a2a_app(...)

Assembles a Starlette ASGI app. Mount it at any path prefix.

Parameter Type Default Description
agent_card AgentCard required Pre-built A2A agent card (name, description, version, url, skills, capabilities)
invoke Callable required Non-streaming callable — use build_invoke() to wrap
stream_invoke Callable | None None Streaming callable — use build_stream_invoke() to wrap
prompt_builder Callable | None history+cid prefix Custom prompt assembly function
on_task_start Callable[[str], Awaitable] | None None Called before each task
on_task_cancel Callable[[str], Awaitable] | None None Called on cancel
a2a_task_store A2ATaskStore | None auto Custom task store
redis_client aioredis.Redis | None auto Custom Redis client
redis_url str "redis://localhost:6379" Redis connection string
debug bool False Include exception details in failure messages

build_invoke(run)

Wraps any async (prompt: str) -> str function as a non-streaming A2A invoke. Works with any AI framework or plain API call.

build_stream_invoke(run)

Wraps any async (prompt: str) -> AsyncIterable[str] generator as a streaming A2A invoke. Also sets up the report_progress() ContextVar so live progress updates work out of the box — call report_progress("step 2/5…") anywhere during execution and it will appear as a working-status event in the chat UI.

report_progress(message)

Call from any agent tool to push a status string to the chat UI spinner. Has no effect outside a streaming context (safe to call unconditionally).

@agent.tool_plain
async def long_computation(n: int) -> str:
    report_progress(f"Computing step 1/{n}…")
    # …
    report_progress(f"Computing step 2/{n}…")
    return result

a2a_ui

A Starlette ASGI app serving a self-contained single-page chat interface. No build step, no npm. Mount it at "/" to serve the UI.

app.mount("/", a2a_ui)

The UI reads the agent card from /a2a/.well-known/agent-card.json to populate the header name and the collapsible info panel.


Example: Holiday Planner

examples/holiday_planner/ is a complete example showing how to build a domain-specific agent on fast_a2a_app.

examples/holiday_planner/
├── agent.py          # pydantic-ai agent with 4 tools
├── main.py           # FastAPI app + fast_a2a_app wiring
└── requirements.txt

Running the example

# One-time: create your .env from the shared template
cp examples/.env.example examples/.env
# edit examples/.env — set AZURE_AI_ENDPOINT and AZURE_AI_DEPLOYMENT

cd examples/holiday_planner
pip install -e ../../          # install fast_a2a_app from repo root
pip install -r requirements.txt

docker run -d -p 6379:6379 redis:7-alpine

uvicorn main:app --reload

Open http://localhost:8000/ and ask:

"I want to plan a 10-day trip somewhere in Southeast Asia in September, moderate budget, interested in food, temples, and nature. Can you help?"

The agent will ask follow-up questions, then use its tools to recommend destinations, build a day-by-day itinerary, estimate costs, and provide travel essentials — all with live progress updates in the UI.

Holiday planner tools

Tool Description
recommend_destinations 2-3 tailored destination suggestions with pros/cons
create_itinerary Day-by-day plan with restaurants and local tips
estimate_budget Cost breakdown table per person per day
get_travel_essentials Visa, health, weather, and packing guide

Example: Echo Agent (no LLM, no external dependencies)

examples/echo_agent/ is the minimal fast_a2a_app integration — pure Python, no API key, no AI framework.

examples/echo_agent/
├── agent.py          # Two plain async functions, zero external imports
├── main.py           # FastAPI app
└── requirements.txt  # fast_a2a_app only
# agent.py
async def invoke(prompt: str) -> str:
    return f"Echo: {prompt}"

async def stream_invoke(prompt: str) -> AsyncIterable[str]:
    for i, word in enumerate(f"Echo: {prompt}".split()):
        yield word if i == len(words) - 1 else word + " "
        await asyncio.sleep(0.05)   # makes streaming visible in the UI

Running the echo agent

cd examples/echo_agent
pip install -e ../../
pip install -r requirements.txt

docker run -d -p 6379:6379 redis:7-alpine
uvicorn main:app --reload

No .env needed — the echo agent requires no API key. A REDIS_URL can be set via examples/.env if you need a non-default Redis address.

No API key needed. Open http://localhost:8000/ and type anything.


Example: Joke Agent (raw chat completions, no agent framework)

examples/joke_agent/ shows fast_a2a_app wired to plain Azure OpenAI chat completions — no agent framework at all.

examples/joke_agent/
├── agent.py          # Two plain async functions: run_joke_agent + stream_joke_agent
├── main.py           # FastAPI app using build_invoke / build_stream_invoke
└── requirements.txt

agent.py defines two callables that satisfy the fast_a2a_app contract:

# Non-streaming: async (str) -> str
async def run_joke_agent(prompt: str) -> str:
    response = await client.chat.completions.create(model=..., messages=[...])
    return (response.choices[0].message.content or "").strip()

# Streaming: async (str) -> AsyncIterable[str]
async def stream_joke_agent(prompt: str) -> AsyncIterable[str]:
    stream = await client.chat.completions.create(model=..., messages=[...], stream=True)
    async for chunk in stream:
        text = chunk.choices[0].delta.content or ""
        if text:
            yield text

main.py wires them in with the helpers:

from a2a.types import AgentCapabilities, AgentCard, AgentSkill
from fast_a2a_app import build_a2a_app, build_invoke, build_stream_invoke, a2a_ui
from agent import run_joke_agent, stream_joke_agent

agent_card = AgentCard(
    name="Joke Agent",
    description="Your AI stand-up comedian.",
    version="0.1.0",
    url="http://localhost:8000/a2a/",
    capabilities=AgentCapabilities(streaming=True),
    default_input_modes=["text"],
    default_output_modes=["text"],
    skills=[AgentSkill(id="tell_joke", name="Tell a joke", description="Tells a joke on any topic.")],
)

app.mount("/a2a", build_a2a_app(
    agent_card=agent_card,
    invoke=build_invoke(run_joke_agent),
    stream_invoke=build_stream_invoke(stream_joke_agent),
))
app.mount("/", a2a_ui)

Running the joke agent

# One-time: create your .env from the shared template
cp examples/.env.example examples/.env
# edit examples/.env — set AZURE_AI_ENDPOINT and AZURE_AI_DEPLOYMENT

cd examples/joke_agent
pip install -e ../../
pip install -r requirements.txt

docker run -d -p 6379:6379 redis:7-alpine
uvicorn main:app --reload

Open http://localhost:8000/ and try:

"Tell me a programming joke" or "Give me your best dad joke"

Tokens stream directly from the Azure OpenAI API to the browser as they arrive.


Architecture

FastAPI app
├── /a2a    ← Starlette ASGI app (build_a2a_app)
│   ├── POST /            message/stream, tasks/cancel, …
│   └── GET  /.well-known/agent-card.json
└── /       ← a2a_ui (Starlette, single HTML file)

Redis
├── a2a:task:{id}                 Task JSON (24 h TTL)
├── a2a:context:{cid}:tasks       Context index (task_id → sequence)
├── a2a:context:{cid}:sequence    Sequence counter
└── a2a:cancel:{id}               Cancel signal (5 min TTL)

Conversation history injection

Each A2A task has a context_id shared across all turns of a conversation. ContextAwareRequestContextBuilder fetches all prior tasks for the same context_id from Redis, extracts the last 12 lines of dialogue, and prepends them as "Conversation so far:\n…" to each new prompt.

The agent therefore sees recent history without the client needing to replay it — multi-turn conversation just works.

How streaming works

build_stream_invoke wraps your generator in an asyncio.Queue-based relay. Before starting the generator it sets a ContextVar callback that report_progress() reads. Strings from report_progress() are placed in the queue with a sentinel prefix; ConfigurableAgentExecutor routes them to non-final working SSE events. All other yielded strings become artifact-update events — the streaming text the user sees.


Publishing to PyPI

pip install hatch
hatch build
hatch publish

Or with twine:

pip install build twine
python -m build
twine upload dist/*

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fast_a2a_app-0.1.0.tar.gz (35.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fast_a2a_app-0.1.0-py3-none-any.whl (31.6 kB view details)

Uploaded Python 3

File details

Details for the file fast_a2a_app-0.1.0.tar.gz.

File metadata

  • Download URL: fast_a2a_app-0.1.0.tar.gz
  • Upload date:
  • Size: 35.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: Hatch/1.16.5 cpython/3.12.4 HTTPX/0.28.1

File hashes

Hashes for fast_a2a_app-0.1.0.tar.gz
Algorithm Hash digest
SHA256 64f8e771330b297526118f4e448f536306e13dfbabb37c4e5fcfb80d1100a58a
MD5 7e0c048e27db8a85aada0b15244786dc
BLAKE2b-256 ae20cdb591d93d5030f1b75661af10288b54408af726618906104c86e5f0a6a4

See more details on using hashes here.

File details

Details for the file fast_a2a_app-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: fast_a2a_app-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 31.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: Hatch/1.16.5 cpython/3.12.4 HTTPX/0.28.1

File hashes

Hashes for fast_a2a_app-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e18d138f6f54ca2537fea4819696a792dfc31b2107e2ecd0d36a4f78d693cfbe
MD5 a41f6c86081b21d6971806a4c4fecaa4
BLAKE2b-256 8ad1295138c81224324ad4db91d08132bbcc85a5b9cf95578711bac3042e4e63

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page