Skip to main content

Redis components for the Agent-to-Agent (A2A) Python SDK

Project description

a2a-redis

Redis integrations for the Agent-to-Agent (A2A) Python SDK.

This package provides Redis-backed implementations of core A2A components for persistent task storage, reliable event queue management, and push notification configuration using Redis.

Features

  • RedisTaskStore & RedisJSONTaskStore: Redis-backed task storage using hashes or JSON
  • RedisStreamsQueueManager & RedisStreamsEventQueue: Persistent, reliable event queues with consumer groups
  • RedisPubSubQueueManager & RedisPubSubEventQueue: Real-time, low-latency event broadcasting
  • RedisPushNotificationConfigStore: Task-based push notification configuration storage
  • Consumer Group Strategies for Streams: Flexible load balancing and instance isolation patterns

Installation

pip install a2a-redis

Quick Start

from a2a_redis import RedisTaskStore, RedisStreamsQueueManager, RedisPushNotificationConfigStore
from a2a_redis.utils import create_redis_client
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.apps import A2AStarletteApplication

# Create Redis client with connection management
redis_client = create_redis_client(url="redis://localhost:6379/0", max_connections=50)

# Initialize Redis components
task_store = RedisTaskStore(redis_client, prefix="myapp:tasks:")
queue_manager = RedisStreamsQueueManager(redis_client, prefix="myapp:queues:")
push_config_store = RedisPushNotificationConfigStore(redis_client, prefix="myapp:push:")

# Use with A2A request handler
request_handler = DefaultRequestHandler(
    agent_executor=YourAgentExecutor(),
    task_store=task_store,
    queue_manager=queue_manager,
    push_config_store=push_config_store,
)

# Create A2A server
server = A2AStarletteApplication(
    agent_card=your_agent_card,
    http_handler=request_handler
)

Queue Components

The package provides both high-level queue managers and direct queue implementations:

Queue Managers

  • RedisStreamsQueueManager - Manages Redis Streams-based queues
  • RedisPubSubQueueManager - Manages Redis Pub/Sub-based queues
  • Both implement the A2A SDK's QueueManager interface

Event Queues

  • RedisStreamsEventQueue - Direct Redis Streams queue implementation
  • RedisPubSubEventQueue - Direct Redis Pub/Sub queue implementation
  • Both implement the EventQueue interface through protocol compliance

Queue Manager Types: Streams vs Pub/Sub

RedisStreamsQueueManager

Key Features:

  • Persistent storage: Events remain in streams until explicitly trimmed
  • Guaranteed delivery: Consumer groups with acknowledgments prevent message loss
  • Load balancing: Multiple consumers can share work via consumer groups
  • Failure recovery: Unacknowledged messages can be reclaimed by other consumers
  • Event replay: Historical events can be re-read from any point in time
  • Ordering: Maintains strict insertion order with unique message IDs

Use Cases:

  • Task event queues requiring reliability
  • Audit trails and event history
  • Work distribution systems
  • Systems requiring failure recovery
  • Multi-consumer load balancing

Trade-offs:

  • Higher memory usage (events persist)
  • More complex setup (consumer groups)
  • Slightly higher latency than pub/sub

RedisPubSubQueueManager

Key Features:

  • Real-time delivery: Events delivered immediately to active subscribers
  • No persistence: Events not stored, only delivered to active consumers
  • Fire-and-forget: No acknowledgments or delivery guarantees
  • Broadcasting: All subscribers receive all events
  • Low latency: Minimal overhead for immediate delivery
  • Minimal memory usage: No storage of events

Use Cases:

  • Live status updates and notifications
  • Real-time dashboard updates
  • System event broadcasting
  • Non-critical event distribution
  • Low-latency requirements
  • Simple fan-out scenarios

Not suitable for:

  • Critical event processing requiring guarantees
  • Systems requiring event replay or audit trails
  • Offline-capable applications
  • Work queues requiring load balancing

Components

Task Storage

RedisTaskStore

Stores task data in Redis using hashes with JSON serialization. Works with any Redis server.

from a2a_redis import RedisTaskStore

task_store = RedisTaskStore(redis_client, prefix="mytasks:")

# A2A TaskStore interface methods
await task_store.save("task123", {"status": "pending", "data": {"key": "value"}})
task = await task_store.get("task123")
success = await task_store.delete("task123")

# List all task IDs (utility method)
task_ids = await task_store.list_task_ids()

RedisJSONTaskStore

Stores task data using Redis's JSON module for native JSON operations and complex nested data.

from a2a_redis import RedisJSONTaskStore

# Requires Redis 8 or RedisJSON module
json_task_store = RedisJSONTaskStore(redis_client, prefix="mytasks:")

# Same interface as RedisTaskStore but with native JSON support
await json_task_store.save("task123", {"complex": {"nested": {"data": "value"}}})

Queue Managers

Both queue managers implement the A2A QueueManager interface with full async support:

import asyncio
from a2a_redis import RedisStreamsQueueManager, RedisPubSubQueueManager
from a2a_redis.streams_consumer_strategy import ConsumerGroupConfig, ConsumerGroupStrategy

# Choose based on your requirements:

# For reliable, persistent processing
streams_manager = RedisStreamsQueueManager(redis_client, prefix="myapp:streams:")

# For real-time, low-latency broadcasting
pubsub_manager = RedisPubSubQueueManager(redis_client, prefix="myapp:pubsub:")

# With custom consumer group configuration (streams only)
config = ConsumerGroupConfig(strategy=ConsumerGroupStrategy.SHARED_LOAD_BALANCING)
streams_manager = RedisStreamsQueueManager(redis_client, consumer_config=config)

async def main():
    # Same interface for both managers
    queue = await streams_manager.create_or_tap("task123")

    # Enqueue events
    await queue.enqueue_event({"type": "progress", "message": "Task started"})
    await queue.enqueue_event({"type": "progress", "message": "50% complete"})

    # Dequeue events
    try:
        event = await queue.dequeue_event(no_wait=True)  # Non-blocking
        print(f"Got event: {event}")
        await queue.task_done()  # Acknowledge the message (streams only)
    except RuntimeError:
        print("No events available")

    # Close queue when done
    await queue.close()

asyncio.run(main())

Consumer Group Strategies

The Streams queue manager supports different consumer group strategies:

from a2a_redis.streams_consumer_strategy import ConsumerGroupStrategy, ConsumerGroupConfig

# Multiple instances share work across a single consumer group
config = ConsumerGroupConfig(strategy=ConsumerGroupStrategy.SHARED_LOAD_BALANCING)

# Each instance gets its own consumer group
config = ConsumerGroupConfig(strategy=ConsumerGroupStrategy.INSTANCE_ISOLATED)

# Custom consumer group name
config = ConsumerGroupConfig(strategy=ConsumerGroupStrategy.CUSTOM, group_name="my_group")

streams_manager = RedisStreamsQueueManager(redis_client, consumer_config=config)

RedisPushNotificationConfigStore

Stores push notification configurations per task. Implements the A2A PushNotificationConfigStore interface.

from a2a_redis import RedisPushNotificationConfigStore
from a2a.types import PushNotificationConfig

config_store = RedisPushNotificationConfigStore(redis_client, prefix="myapp:push:")

# Create push notification config
config = PushNotificationConfig(
    url="https://webhook.example.com/notify",
    token="secret_token",
    id="webhook_1"
)

# A2A interface methods
await config_store.set_info("task123", config)

# Get all configs for a task
configs = await config_store.get_info("task123")
for config in configs:
    print(f"Config {config.id}: {config.url}")

# Delete specific config or all configs for a task
await config_store.delete_info("task123", "webhook_1")  # Delete specific
await config_store.delete_info("task123")  # Delete all

Requirements

  • Python 3.11+
  • redis >= 4.0.0
  • a2a-sdk >= 0.2.16 (Agent-to-Agent Python SDK)
  • uvicorn >= 0.35.0

Optional Dependencies

  • RedisJSON module for RedisJSONTaskStore (enhanced nested data support)
  • Redis Stack or Redis with modules for full feature support

Development

# Clone the repository
git clone https://github.com/a2aproject/a2a-redis.git
cd a2a-redis

# Create virtual environment and install dependencies
uv venv
source .venv/bin/activate  # or .venv\Scripts\activate on Windows
uv sync --dev

# Run tests with coverage
uv run pytest --cov=a2a_redis --cov-report=term-missing

# Run linting and formatting
uv run ruff check src/ tests/
uv run ruff format src/ tests/
uv run pyright src/

# Install pre-commit hooks
uv run pre-commit install

# Run examples
uv run python examples/basic_usage.py
uv run python examples/redis_travel_agent.py

Testing

Tests use Redis database 15 for isolation and include both mock and real Redis integration tests:

# Run all tests
uv run pytest

# Run specific test file
uv run pytest tests/test_streams_queue_manager.py -v

# Run with coverage
uv run pytest --cov=a2a_redis --cov-report=term-missing

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

a2a_redis-0.2.1.tar.gz (32.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

a2a_redis-0.2.1-py3-none-any.whl (22.3 kB view details)

Uploaded Python 3

File details

Details for the file a2a_redis-0.2.1.tar.gz.

File metadata

  • Download URL: a2a_redis-0.2.1.tar.gz
  • Upload date:
  • Size: 32.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.26 {"installer":{"name":"uv","version":"0.9.26","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for a2a_redis-0.2.1.tar.gz
Algorithm Hash digest
SHA256 a240773f277e9d319f7ca34926776be77facd1ed7f92ed767aaa041e2c4a447f
MD5 4e82d4991d5d804a5c871a4ca64d066d
BLAKE2b-256 ce30e75af35135469113ae93f680f59248ec20029c30134cf4d6e56fa338b228

See more details on using hashes here.

File details

Details for the file a2a_redis-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: a2a_redis-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 22.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.26 {"installer":{"name":"uv","version":"0.9.26","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for a2a_redis-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 14ca32b67ed556cb89705fcc00281da62350f513cc64a7343fa83b7c4992d58d
MD5 bd87c912f6270a1805ad904cf96ac14f
BLAKE2b-256 e1f6e1df45ce1606b91a747cf9865277bc002191cc6b99d5c10d3f8f9960d2d5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page