Persistent, pod-restart-safe A2A task store for Google Agent ADK. Drop-in replacement for InMemoryTaskStore.
Project description
adk-task-persistence
Persistent, pod-restart-safe A2A task store for Google Agent ADK.
Drop-in replacement for ADK's InMemoryTaskStore. Implements the real a2a.server.tasks.TaskStore ABC — backed by Postgres, MySQL, or SQLite. Works with RemoteA2aAgent, SSE streaming, and multi-pod Kubernetes deployments today.
pip install adk-task-persistence
The Problem
Google ADK's get_fast_api_app(a2a=True) hardcodes InMemoryTaskStore() with no injection point. In any real deployment this causes:
| Symptom | Root cause |
|---|---|
| In-flight tasks vanish on pod restart | State lives in process memory |
| Status polls return 404 across pods | Each pod has its own store |
| Can't scale horizontally without sticky sessions | Defeats stateless pod design |
ADK already solved the same problem for session state via DatabaseSessionService. This library applies the same pattern to A2A task state.
Features
- Native ADK interface — implements
a2a.server.tasks.TaskStoreABC exactly; zero glue code - Any SQL database — Postgres, MySQL, SQLite via async SQLAlchemy
- Auto schema — table created on first use; no migrations needed
- Multi-pod safe — shared task state across all replicas
- Pod-restart safe — tasks survive container restarts and rolling deploys
- SSE / streaming preserved —
RemoteA2aAgentcallers see identical A2A protocol responses - Forward-compatible — ready for ADK PRs #3839 and #4970 with zero code changes
- Optional Celery layer — for agent runs that must survive an HTTP pod crash mid-execution
- Typed —
py.typedmarker, full type annotations
Quick Start
from sqlalchemy.ext.asyncio import create_async_engine
from adk_task_persistence import SqlAlchemyTaskStore, create_a2a_app
engine = create_async_engine("postgresql+asyncpg://user:pass@db/mydb")
task_store = SqlAlchemyTaskStore(engine)
app = create_a2a_app(
runner=runner, # google.adk.runners.Runner
agent_card=agent_card, # a2a.types.AgentCard
task_store=task_store,
)
Run with:
uvicorn mymodule:app --host 0.0.0.0 --port 8000
Installation
Core (task persistence only)
pip install adk-task-persistence
Requires an async driver for your database:
# Postgres
pip install asyncpg
# SQLite (dev / testing)
pip install aiosqlite
With Celery (agent-run survival)
pip install "adk-task-persistence[celery]"
Usage
1. Basic — SQLite for local development
from sqlalchemy.ext.asyncio import create_async_engine
from adk_task_persistence import SqlAlchemyTaskStore, create_a2a_app
engine = create_async_engine("sqlite+aiosqlite:///./tasks.db")
task_store = SqlAlchemyTaskStore(engine)
app = create_a2a_app(
runner=runner,
agent_card=agent_card,
task_store=task_store,
title="My Agent",
)
2. Production — Postgres on Kubernetes
import os
from sqlalchemy.ext.asyncio import create_async_engine
from google.adk.agents import LlmAgent
from google.adk.runners import Runner
from google.adk.sessions import DatabaseSessionService
from a2a.types import AgentCard
from adk_task_persistence import SqlAlchemyTaskStore, create_a2a_app
DB_URL = os.environ["DB_URL"] # postgresql+asyncpg://user:pass@host/db
agent = LlmAgent(name="my_agent", model="gemini-2.0-flash")
runner = Runner(
agent=agent,
app_name="my_app",
session_service=DatabaseSessionService(db_url=DB_URL),
)
agent_card = AgentCard(name="My Agent", url="http://localhost:8000", version="1.0.0")
engine = create_async_engine(DB_URL)
task_store = SqlAlchemyTaskStore(engine)
app = create_a2a_app(runner=runner, agent_card=agent_card, task_store=task_store)
3. After ADK PR #4970 merges
Once upstream support lands, pass the store directly to get_fast_api_app — no helper needed:
from google.adk.cli.fast_api import get_fast_api_app
from adk_task_persistence import SqlAlchemyTaskStore
app = get_fast_api_app(
agents_dir="./agents",
a2a=True,
a2a_task_store=SqlAlchemyTaskStore(engine),
)
SqlAlchemyTaskStore is already interface-compatible — no changes required.
Optional: Celery for Agent-Run Survival
The core library solves task state persistence (state survives pod restart; polls succeed cross-pod). If your agents run for minutes and you also need execution to survive an HTTP pod crash mid-run, add the Celery extension.
from adk_task_persistence.celery import AdkAgentRunner, registry
registry.register(
"my_agent",
agent_factory=lambda: AdkAgentRunner(my_runner),
session_service_factory=lambda: None,
task_store_factory=lambda: SqlAlchemyTaskStore(engine),
)
Start a worker:
celery -A adk_task_persistence.celery worker --loglevel=info
The agent run executes inside the Celery worker. If the HTTP pod dies, the worker completes the task and writes the result to the shared store. Callers polling any pod will find the result.
Multi-Pod Architecture
┌─────────────────────────────┐
│ Load Balancer │
└──────────┬──────────┬────────┘
│ │
┌──────┘ └──────┐
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Agent Pod 1 │ │ Agent Pod 2 │
│ │ │ │
│ FastAPI + │ │ FastAPI + │
│ ADK Runner │ │ ADK Runner │
└──────┬──────┘ └──────┬──────┘
│ │
└──────────┬─────────────┘
▼
┌─────────────────┐
│ Postgres DB │
│ │
│ adk_a2a_tasks │ ← SqlAlchemyTaskStore
│ adk_sessions │ ← DatabaseSessionService
└─────────────────┘
Both pods share the same task and session state. Polls routed to any pod return the correct result.
API Reference
SqlAlchemyTaskStore
SqlAlchemyTaskStore(
engine: AsyncEngine,
table_name: str = "adk_a2a_tasks",
create_table: bool = True,
)
Implements a2a.server.tasks.TaskStore:
| Method | Signature |
|---|---|
save |
async def save(task, context) -> None |
get |
async def get(task_id, context) -> Task | None |
list |
async def list(params, context) -> ListTasksResponse |
delete |
async def delete(task_id, context) -> None |
create_a2a_app
create_a2a_app(
*,
runner: Runner,
agent_card: AgentCard,
task_store: TaskStore,
push_config_store=None,
url_prefix: str = "",
**fastapi_kwargs,
) -> FastAPI
Builds ADK's native A2A stack (A2aAgentExecutor → DefaultRequestHandler → A2AStarletteApplication) with your task store injected.
Requirements
- Python 3.10+
google-adk >= 1.0.0sqlalchemy[asyncio] >= 2.0fastapi >= 0.110.1pydantic >= 2.6.4- Async DB driver:
asyncpg(Postgres),aiomysql(MySQL),aiosqlite(SQLite)
License
Apache 2.0 — © Sthitaprajna Sahoo
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file adk_task_persistence-0.2.0.tar.gz.
File metadata
- Download URL: adk_task_persistence-0.2.0.tar.gz
- Upload date:
- Size: 19.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2405f5da4419b86674664a0c547f428e385efbcb765529efed82ea420f2cc390
|
|
| MD5 |
5fd5ae5f1e366682026c59ccf8f30b2d
|
|
| BLAKE2b-256 |
53edbdac683cc930da1025e768728cba0810abc04517643e8ee3c56f25c6dc17
|
Provenance
The following attestation bundles were made for adk_task_persistence-0.2.0.tar.gz:
Publisher:
publish.yml on STHITAPRAJNAS/adk-task-persistence
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
adk_task_persistence-0.2.0.tar.gz -
Subject digest:
2405f5da4419b86674664a0c547f428e385efbcb765529efed82ea420f2cc390 - Sigstore transparency entry: 1431505123
- Sigstore integration time:
-
Permalink:
STHITAPRAJNAS/adk-task-persistence@ce47c1f85dbee67867afc011a74451403c384942 -
Branch / Tag:
refs/tags/v0.2.0 - Owner: https://github.com/STHITAPRAJNAS
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@ce47c1f85dbee67867afc011a74451403c384942 -
Trigger Event:
push
-
Statement type:
File details
Details for the file adk_task_persistence-0.2.0-py3-none-any.whl.
File metadata
- Download URL: adk_task_persistence-0.2.0-py3-none-any.whl
- Upload date:
- Size: 19.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b835f74dc70026a0d09d0e2c2f9f01fb2927444594600284aa03b956e6081f5d
|
|
| MD5 |
9eb0840b651bc7f5f3b94365e4c16c98
|
|
| BLAKE2b-256 |
713d35d5ecc2faf456f6c598fc5d0e5ffab69e39db8f8d50a66ea9e87dc6c0d9
|
Provenance
The following attestation bundles were made for adk_task_persistence-0.2.0-py3-none-any.whl:
Publisher:
publish.yml on STHITAPRAJNAS/adk-task-persistence
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
adk_task_persistence-0.2.0-py3-none-any.whl -
Subject digest:
b835f74dc70026a0d09d0e2c2f9f01fb2927444594600284aa03b956e6081f5d - Sigstore transparency entry: 1431505292
- Sigstore integration time:
-
Permalink:
STHITAPRAJNAS/adk-task-persistence@ce47c1f85dbee67867afc011a74451403c384942 -
Branch / Tag:
refs/tags/v0.2.0 - Owner: https://github.com/STHITAPRAJNAS
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@ce47c1f85dbee67867afc011a74451403c384942 -
Trigger Event:
push
-
Statement type: