Skip to main content

fast_a2a_app — Drop-in A2A server and chat UI for any AI agent

Project description

fast_a2a_app

Drop-in A2A server and chat UI for any FastAPI application that runs ai agents — installable from PyPI.

fast_a2a_app packages the battle-tested A2A protocol adapter and self-contained browser chat UI into a standalone pip-installable library. Get a fully spec-compliant A2A server plus a ready-to-use chat interface in under 20 lines.

pip install fast_a2a_app

Why fast_a2a_app?

Pydantic AI ships its own FastA2A integration, which is excellent if you are already inside the Pydantic AI ecosystem. fast_a2a_app exists for a different set of needs:

  • Mount point, not a framework. fast_a2a_app is a plain Starlette app you mount into an existing FastAPI application at any path prefix. Everything outside that prefix — authentication middleware, custom routes, dependency injection, observability — is yours to own and compose however you like.
  • Framework-agnostic. The library has zero dependency on Pydantic AI. Wire in any agent: raw Anthropic/OpenAI API calls, LangChain, LlamaIndex, or plain Python — as long as it exposes an async (str) -> str function or an async generator.
  • Separation of concerns. Your FastAPI application stays in charge of the HTTP layer (auth, rate limiting, CORS, health checks). fast_a2a_app only handles the A2A protocol inside its mounted prefix, keeping agent logic cleanly decoupled from transport concerns.

We hope this contributes to a composable AI agent architecture where protocol adapters, agent frameworks, and application infrastructure are independent choices.


What's inside

Module What it does
fast_a2a_app.server A2A JSON-RPC server (streaming SSE, multi-turn history, cross-instance cancel)
fast_a2a_app.ui Self-contained browser chat UI — no build step, no npm

Protocol features (via a2a-sdk 1.0.x)

  • SendMessage — single-shot request/response (non-streaming)
  • SendStreamingMessage — streaming SSE responses
  • CancelTask — immediate or cross-replica cancellation
  • SubscribeToTask — reconnect to an in-flight stream after a network blip
  • GetTask — snapshot fallback for page-reload recovery
  • .well-known/agent-card.json — agent discovery

Server features

  • Multi-turn history — every turn is stored in Redis and injected as a "Conversation so far:" prefix, giving the agent continuity without client-side replay. History depth and a system prompt are configurable via history_max_lines and system_prompt on build_a2a_app; full custom prompt assembly is also supported.
  • Cross-instance cancellation — cancel signals flow through Redis so any replica can stop a task running on another replica
  • Live progress updates — call report_progress("step 2/5…") from any tool and the chat UI spinner updates in real time
  • Lifecycle hookson_task_start / on_task_cancel callbacks for metrics, locks, or state resets

UI features

  • Stream toggle — checkbox in the input bar switches between SendStreamingMessage (tokens arrive live) and SendMessage (full response rendered at once); preference persisted in localStorage. Hidden automatically when the agent card reports capabilities.streaming = false.
  • Data part widget — parts with media_type: application/json are rendered as a labeled key-value table with color-coded value types; no raw JSON brackets shown
  • File part widget — raw binary parts show a type icon, filename, and media type with a "Download" button that creates a temporary Blob URL
  • Page-reload recovery — active task is stored in localStorage and resubscribed on the next load
  • Markdown rendering — agent responses rendered as GitHub-Flavored Markdown with DOMPurify sanitisation
  • Collapsible agent card — name, version, capabilities, and skills pulled from .well-known/agent-card.json

Storage

fast_a2a_app currently uses Redis for all server-side state:

What is stored Key pattern TTL
Task JSON (full A2A task object) a2a:task:{id} 24 h
Conversation index (task_id → sequence) a2a:context:{cid}:tasks 24 h
Cross-instance cancel signal a2a:cancel:{id} 5 min

Start a local Redis instance before running any example:

docker run -d -p 6379:6379 redis:7-alpine

Or point REDIS_URL at any managed Redis-compatible service (Redis Cloud, AWS ElastiCache, Azure Cache for Redis, etc.).

Roadmap — pluggable storage backends (MongoDB, PostgreSQL) are planned. The RedisTaskStore already implements the A2ATaskStore Protocol, so a Mongo or Postgres backend can be swapped in by passing a custom a2a_task_store to build_a2a_app() without any library changes.


Framework-agnostic design

fast_a2a_app has no dependency on any AI framework. build_a2a_app accepts two plain callables that you implement however you like:

Callable Signature
invoke async (prompt: str) -> str or async (prompt: str) -> Artifact
stream_invoke async (prompt: str) -> AsyncIterable[str]

Wrap them with the two helpers and pass to build_a2a_app:

invoke=build_invoke(my_async_fn)
stream_invoke=build_stream_invoke(my_async_generator_fn)

build_invoke accepts both plain-text and multi-part agents — return a str for a single text response, or return an Artifact to send text, structured data, and file parts together in one response.

build_stream_invoke automatically sets up the report_progress() ContextVar, so any code called during streaming can push live status updates to the chat UI — regardless of which framework (or none) your agent uses.

You can also implement invoke and stream_invoke directly as bare callables and pass them straight to build_a2a_app — no wrapper needed.


Quickstart

1. Install

pip install fast_a2a_app

2. Implement your agent

Any async (str) -> str function works as the non-streaming invoke. Any async (str) -> AsyncIterable[str] generator works for streaming.

# agent.py
from collections.abc import AsyncIterable

client = ...  # any OpenAI-compatible async client

async def invoke(prompt: str) -> str:
    resp = await client.chat.completions.create(
        model="gpt-4o", max_completion_tokens=1024,
        messages=[{"role": "user", "content": prompt}],
    )
    return (resp.choices[0].message.content or "").strip()

async def stream_invoke(prompt: str) -> AsyncIterable[str]:
    stream = await client.chat.completions.create(
        model="gpt-4o", max_completion_tokens=1024,
        messages=[{"role": "user", "content": prompt}],
        stream=True,
    )
    async for chunk in stream:
        if not chunk.choices:
            continue
        text = chunk.choices[0].delta.content or ""
        if text:
            yield text

3. Wire up the server

# main.py
from fastapi import FastAPI
from a2a.types import AgentCapabilities, AgentCard, AgentInterface
from fast_a2a_app import a2a_ui, build_a2a_app, build_invoke, build_stream_invoke
from agent import invoke, stream_invoke

app = FastAPI()

agent_card = AgentCard(
    name="My Agent",
    description="Does cool things",
    version="1.0.0",
    supported_interfaces=[AgentInterface(url="http://localhost:8000/a2a/", protocol_binding="JSONRPC")],
    capabilities=AgentCapabilities(streaming=True),
    default_input_modes=["text"],
    default_output_modes=["text"],
)

app.mount("/a2a", build_a2a_app(
    agent_card=agent_card,
    invoke=build_invoke(invoke),
    stream_invoke=build_stream_invoke(stream_invoke),
))

app.mount("/", a2a_ui)        # built-in chat UI at http://localhost:8000/

4. Run

# Start Redis (required for conversation history)
docker run -d -p 6379:6379 redis:7-alpine

# Run the app
uvicorn main:app --reload

Open http://localhost:8000/ — you're chatting.


Prompt management

fast_a2a_app injects conversation history automatically, but you can take as much or as little control over prompt construction as you need. The API follows Progressive Disclosure — use only the level that fits your use case.

Level 0 — zero config

Works out of the box. The last 12 lines of conversation history are prepended to the user's message as "Conversation so far:\n…". Nothing to set.

build_a2a_app(agent_card=card, stream_invoke=build_stream_invoke(my_fn))

Level 1 — keyword parameters

Tune the built-in prompt without writing any code:

build_a2a_app(
    agent_card=card,
    stream_invoke=build_stream_invoke(my_fn),
    system_prompt="You are a concise travel planner. Reply in JSON.",
    history_max_lines=6,   # default is 12; set to 0 for a stateless agent
)

system_prompt is prepended before the history block and the user message. history_max_lines=0 disables history injection entirely.

Level 2 — compose from helpers

Build a custom prompt from the exported building blocks:

from fast_a2a_app import build_conversation_prefix, get_user_input

def my_prompt(context) -> str:
    return (
        "You are an expert planner.\n\n"
        + build_conversation_prefix(context, max_lines=4)
        + f"Respond in JSON:\n{get_user_input(context)}"
    )

build_a2a_app(..., prompt_builder=my_prompt)

Level 3 — full custom builder

Pass any (RequestContext) -> str as prompt_builder for complete control. system_prompt and history_max_lines are ignored when a custom prompt_builder is supplied.

def my_prompt(context) -> str:
    # context.get_user_input()   — current user message
    # context.related_tasks      — prior Task objects for this conversation
    # context.current_task       — task being executed now
    # context.message            — raw A2A Message object
    return f"Be concise.\n{context.get_user_input()}"

build_a2a_app(..., prompt_builder=my_prompt)

API reference

build_a2a_app(...)

Assembles a Starlette ASGI app. Mount it at any path prefix.

Parameter Type Default Description
agent_card AgentCard required Pre-built A2A agent card (name, description, version, url, skills, capabilities)
invoke Callable | None None Non-streaming callable — use build_invoke() to wrap
stream_invoke Callable | None None Streaming callable — use build_stream_invoke() to wrap
system_prompt str | None None Level 1 — prepended to every prompt before history and user input
history_max_lines int 12 Level 1 — number of prior conversation lines to inject; 0 disables history
prompt_builder Callable | None auto Level 2/3 — custom (RequestContext) -> str; overrides system_prompt and history_max_lines
on_task_start Callable[[str], Awaitable] | None None Called before each task
on_task_cancel Callable[[str], Awaitable] | None None Called on cancel
a2a_task_store A2ATaskStore | None auto Custom task store
redis_client aioredis.Redis | None auto Custom Redis client
redis_url str "redis://localhost:6379" Redis connection string
debug bool False Include exception details in failure messages

build_invoke(run)

Wraps any async (prompt: str) -> str | Artifact function as a non-streaming A2A invoke. Works with any AI framework or plain API call. Return a plain str for a text response, or return an Artifact to send multiple parts (text, JSON data, files) in one response:

from a2a.types import Artifact, Part
import json, uuid

async def my_agent(prompt: str) -> Artifact:
    return Artifact(
        artifact_id=str(uuid.uuid4()),
        name="result",
        parts=[
            Part(text=f"Here is your data for: {prompt}"),
            Part(raw=json.dumps({"count": 42}).encode(), media_type="application/json"),
            Part(raw=b"file content", filename="out.txt", media_type="text/plain"),
        ],
    )

app.mount("/a2a", build_a2a_app(agent_card=card, invoke=build_invoke(my_agent)))

build_stream_invoke(run)

Wraps any async (prompt: str) -> AsyncIterable[str] generator as a streaming A2A invoke. Also sets up the report_progress() ContextVar so live progress updates work out of the box — call report_progress("step 2/5…") anywhere during execution and it will appear as a working-status event in the chat UI.

report_progress(message)

Call from any agent tool to push a status string to the chat UI spinner. Has no effect outside a streaming context (safe to call unconditionally).

@agent.tool_plain
async def long_computation(n: int) -> str:
    report_progress(f"Computing step 1/{n}…")
    # …
    report_progress(f"Computing step 2/{n}…")
    return result

get_user_input(context)

Returns the current user message text from a RequestContext. Use this in a custom prompt_builder so you don't need to know the internal SDK method name:

from fast_a2a_app import get_user_input

def my_prompt(context) -> str:
    return f"Respond in JSON:\n{get_user_input(context)}"

build_conversation_prefix(context, *, max_lines=12)

Returns prior conversation lines as a formatted "Conversation so far:\n…" string, capped at max_lines lines. Returns an empty string when there is no prior history. Use in a custom prompt_builder:

from fast_a2a_app import build_conversation_prefix, get_user_input

def my_prompt(context) -> str:
    return (
        "You are an expert.\n\n"
        + build_conversation_prefix(context, max_lines=6)
        + get_user_input(context)
    )

a2a_ui

A Starlette ASGI app serving a self-contained single-page chat interface. No build step, no npm. Mount it at "/" to serve the UI.

app.mount("/", a2a_ui)

The UI reads the agent card from /a2a/.well-known/agent-card.json to populate the header name and the collapsible info panel.


Example: Holiday Planner

examples/holiday_planner/ is a complete example showing how to build a domain-specific agent on fast_a2a_app.

examples/holiday_planner/
├── agent.py          # pydantic-ai agent with 4 tools
├── main.py           # FastAPI app + fast_a2a_app wiring
└── requirements.txt

Running the example

# One-time: create your .env from the shared template
cp examples/.env.example examples/.env
# edit examples/.env — set AZURE_AI_ENDPOINT and AZURE_AI_DEPLOYMENT

cd examples/holiday_planner
pip install -e ../../          # install fast_a2a_app from repo root
pip install -r requirements.txt

docker run -d -p 6379:6379 redis:7-alpine

uvicorn main:app --reload

Open http://localhost:8000/ and ask:

"I want to plan a 10-day trip somewhere in Southeast Asia in September, moderate budget, interested in food, temples, and nature. Can you help?"

The agent will ask follow-up questions, then use its tools to recommend destinations, build a day-by-day itinerary, estimate costs, and provide travel essentials — all with live progress updates in the UI.

Holiday planner tools

Tool Description
recommend_destinations 2-3 tailored destination suggestions with pros/cons
create_itinerary Day-by-day plan with restaurants and local tips
estimate_budget Cost breakdown table per person per day
get_travel_essentials Visa, health, weather, and packing guide

Example: Echo Agent (no LLM, no external dependencies)

examples/echo_agent/ is the minimal fast_a2a_app integration — pure Python, no API key, no AI framework.

examples/echo_agent/
├── agent.py          # Two plain async functions, zero external imports
├── main.py           # FastAPI app
└── requirements.txt  # fast_a2a_app only
# agent.py
async def invoke(prompt: str) -> str:
    return f"Echo: {prompt}"

async def stream_invoke(prompt: str) -> AsyncIterable[str]:
    words = f"Echo: {prompt}".split()
    for i, word in enumerate(words):
        yield word if i == len(words) - 1 else word + " "
        await asyncio.sleep(0.05)   # makes streaming visible in the UI

Running the echo agent

cd examples/echo_agent
pip install -e ../../
pip install -r requirements.txt

docker run -d -p 6379:6379 redis:7-alpine
uvicorn main:app --reload

No .env needed — the echo agent requires no API key. A REDIS_URL can be set via examples/.env if you need a non-default Redis address.

No API key needed. Open http://localhost:8000/ and type anything.


Example: Joke Agent (raw chat completions, no agent framework)

examples/joke_agent/ shows fast_a2a_app wired to plain Azure OpenAI chat completions — no agent framework at all.

examples/joke_agent/
├── agent.py          # Two plain async functions: run_joke_agent + stream_joke_agent
├── main.py           # FastAPI app using build_invoke / build_stream_invoke
└── requirements.txt

agent.py defines two callables that satisfy the fast_a2a_app contract:

# Non-streaming: async (str) -> str
async def run_joke_agent(prompt: str) -> str:
    response = await client.chat.completions.create(model=..., messages=[...])
    return (response.choices[0].message.content or "").strip()

# Streaming: async (str) -> AsyncIterable[str]
async def stream_joke_agent(prompt: str) -> AsyncIterable[str]:
    stream = await client.chat.completions.create(model=..., messages=[...], stream=True)
    async for chunk in stream:
        if not chunk.choices:
            continue
        text = chunk.choices[0].delta.content or ""
        if text:
            yield text

main.py wires them in with the helpers:

from a2a.types import AgentCapabilities, AgentCard, AgentInterface, AgentSkill
from fast_a2a_app import build_a2a_app, build_invoke, build_stream_invoke, a2a_ui
from agent import run_joke_agent, stream_joke_agent

agent_card = AgentCard(
    name="Joke Agent",
    description="Your AI stand-up comedian.",
    version="0.1.0",
    supported_interfaces=[AgentInterface(url="http://localhost:8000/a2a/", protocol_binding="JSONRPC")],
    capabilities=AgentCapabilities(streaming=True),
    default_input_modes=["text"],
    default_output_modes=["text"],
    skills=[AgentSkill(id="tell_joke", name="Tell a joke", description="Tells a joke on any topic.", tags=[])],
)

app.mount("/a2a", build_a2a_app(
    agent_card=agent_card,
    invoke=build_invoke(run_joke_agent),
    stream_invoke=build_stream_invoke(stream_joke_agent),
))
app.mount("/", a2a_ui)

Running the joke agent

# One-time: create your .env from the shared template
cp examples/.env.example examples/.env
# edit examples/.env — set AZURE_AI_ENDPOINT and AZURE_AI_DEPLOYMENT

cd examples/joke_agent
pip install -e ../../
pip install -r requirements.txt

docker run -d -p 6379:6379 redis:7-alpine
uvicorn main:app --reload

Open http://localhost:8000/ and try:

"Tell me a programming joke" or "Give me your best dad joke"

Tokens stream directly from the Azure OpenAI API to the browser as they arrive.


Example: Echo Multipart (multi-part responses, no LLM)

examples/echo_multipart/ demonstrates returning text, structured data, and a downloadable file in a single A2A response — no LLM, no API key, no protobuf boilerplate.

examples/echo_multipart/
├── agent.py          # Returns Artifact with 3 parts using only json.dumps
├── main.py           # FastAPI app wired with build_invoke
└── requirements.txt
# agent.py
import json, uuid
from a2a.types import Artifact, Part

async def invoke(prompt: str) -> Artifact:
    words = prompt.split()
    return Artifact(
        artifact_id=str(uuid.uuid4()),
        name="result",
        parts=[
            Part(text=f"Echo: {prompt}"),
            Part(
                raw=json.dumps({"original": prompt, "word_count": len(words)}).encode(),
                media_type="application/json",
            ),
            Part(raw=f"Echo: {prompt}\n".encode(), filename="echo.txt", media_type="text/plain"),
        ],
    )

The UI renders the three parts as: a markdown bubble, a key-value data table, and a file download card.

Running the echo multipart agent

cd examples/echo_multipart
pip install -e ../../
pip install -r requirements.txt

docker run -d -p 6379:6379 redis:7-alpine
uvicorn main:app --reload

Architecture

FastAPI app
├── /a2a    ← Starlette ASGI app (build_a2a_app)
│   ├── POST /            SendMessage, SendStreamingMessage, CancelTask, …
│   └── GET  /.well-known/agent-card.json
└── /       ← a2a_ui (Starlette, single HTML file)

Redis
├── a2a:task:{id}                 Task JSON (24 h TTL)
├── a2a:context:{cid}:tasks       Context index (task_id → sequence)
├── a2a:context:{cid}:sequence    Sequence counter
└── a2a:cancel:{id}               Cancel signal (5 min TTL)

Conversation history injection

Each A2A task has a context_id shared across all turns of a conversation. ContextAwareRequestContextBuilder fetches all prior tasks for the same context_id from Redis and attaches them to the RequestContext as related_tasks.

The default prompt_builder then calls build_conversation_prefix() to extract the most recent history_max_lines lines of dialogue (default: 12) and prepend them as "Conversation so far:\n…" before the user's message. An optional system_prompt is inserted first.

The agent therefore sees recent history without the client needing to replay it. Depth and format are fully configurable — see Prompt management above.

How streaming works

build_stream_invoke wraps your generator in an asyncio.Queue-based relay. Before starting the generator it sets a ContextVar callback that report_progress() reads. Strings from report_progress() are placed in the queue with a sentinel prefix; ConfigurableAgentExecutor routes them to non-final statusUpdate (state TASK_STATE_WORKING) SSE events. All other yielded strings become artifactUpdate events — the streaming text the user sees.


Publishing to PyPI

pip install hatch
hatch build
hatch publish

Or with twine:

pip install build twine
python -m build
twine upload dist/*

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fast_a2a_app-0.3.1.tar.gz (41.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fast_a2a_app-0.3.1-py3-none-any.whl (34.5 kB view details)

Uploaded Python 3

File details

Details for the file fast_a2a_app-0.3.1.tar.gz.

File metadata

  • Download URL: fast_a2a_app-0.3.1.tar.gz
  • Upload date:
  • Size: 41.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: Hatch/1.16.5 cpython/3.12.4 HTTPX/0.28.1

File hashes

Hashes for fast_a2a_app-0.3.1.tar.gz
Algorithm Hash digest
SHA256 1a3764c7db7d8f30fadfaef9996ed91370b9481b020ba36c7c8eb0d2964c1194
MD5 e31a8ae550b628e04aecd173d299a875
BLAKE2b-256 3f17e00da7218ab7aaf0f2bfbcc917e48a88284ea846cc60e05d7703c6de74fd

See more details on using hashes here.

File details

Details for the file fast_a2a_app-0.3.1-py3-none-any.whl.

File metadata

  • Download URL: fast_a2a_app-0.3.1-py3-none-any.whl
  • Upload date:
  • Size: 34.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: Hatch/1.16.5 cpython/3.12.4 HTTPX/0.28.1

File hashes

Hashes for fast_a2a_app-0.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 f09aa3bf6fe256c30683acd87c953cf5501cc0c89bb14cbbd6d888f042c53184
MD5 fdde94680001ffa24aeb63276b1bd2d5
BLAKE2b-256 84f8fb4e01d64b968a05f6f84df4d647d6645a0eca9bda04d3d4dbc43d23d530

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page