Skip to main content

EggAI Multi-Agent Meta Framework` is an async-first framework for building, deploying, and scaling multi-agent systems for modern enterprise environments

Project description

EggAI SDK

EggAI Multi-Agent Meta Framework is an async-first framework for building, deploying, and scaling multi-agent systems for modern enterprise environments.

EggAI Meta Framework Architecture

Features

  • Multi-Transport Support: In-Memory, Redis Streams, Kafka, and extensible for custom transports
  • Async-First: Built on asyncio for high-performance concurrent processing
  • Type-Safe: Full type hints and Pydantic validation with CloudEvents-compliant message protocol
  • Production-Ready: Comprehensive error handling, logging, and monitoring
  • Flexible Architecture: Easily extensible with custom transports and middleware
  • Protocol Interoperability: A2A for agent collaboration, MCP for LLM tools, CloudEvents for messaging

Installation

pip install eggai

Redis Streams and Kafka support are included by default.

Optional extras:

pip install eggai[cli]    # CLI tools for scaffolding
pip install eggai[a2a]     # A2A (Agent-to-Agent) SDK integration
pip install eggai[mcp]     # MCP (Model Context Protocol) support

Quick Start

Using In-Memory Transport (Testing & Development)

Perfect for unit tests, prototyping, and local development.

from eggai import Agent, Channel
from eggai.transport.memory import InMemoryTransport

# Create an in-memory transport
transport = InMemoryTransport()

# Create an agent
agent = Agent(name="my-agent", transport=transport)

# Define message handler
@agent.subscribe(channel=Channel("input-channel", transport=transport))
async def handle_message(message):
    # Process message
    result = await process(message)
    # Publish to output channel
    output_channel = Channel("output-channel", transport=transport)
    await output_channel.publish(result)

# Start the agent
await agent.start()

Using Redis Streams (Recommended for Production)

Redis Streams provides durable message delivery with consumer groups, perfect for microservices and production deployments.

from eggai import Agent, Channel
from eggai.transport.redis import RedisTransport

# Create a Redis transport
transport = RedisTransport(url="redis://localhost:6379")

# Create an agent
agent = Agent(name="my-agent", transport=transport)

# Define message handler
@agent.subscribe(channel=Channel("input-channel", transport=transport))
async def handle_message(message):
    # Process message
    result = await process(message)
    # Publish to output channel
    output_channel = Channel("output-channel", transport=transport)
    await output_channel.publish(result)

# Start the agent
await agent.start()

Using Kafka (Production - High Throughput)

Kafka is ideal for high-throughput, large-scale distributed streaming workloads.

from eggai import Agent, Channel
from eggai.transport.kafka import KafkaTransport

# Create a Kafka transport
transport = KafkaTransport(
    bootstrap_servers="localhost:9092"
)

# Create an agent
agent = Agent(name="my-agent", transport=transport)

# Define message handler
@agent.subscribe(channel=Channel("input-topic", transport=transport))
async def handle_message(message):
    # Process message
    result = await process(message)
    # Publish to output topic
    output_channel = Channel("output-topic", transport=transport)
    await output_channel.publish(result)

# Start the agent
await agent.start()

Transport Comparison

Feature In-Memory Redis Streams Kafka
Use Case Testing, prototyping Production microservices Large-scale production
Persistence ❌ No ✅ Yes (AOF/RDB) ✅ Yes (highly durable)
Setup Complexity None Simple Moderate
Throughput Very High High (100K+ msgs/sec) Very High (1M+ msgs/sec)
Consumer Groups ❌ No ✅ Native support ✅ Native support
Message Retention N/A Time or count-based Time or size-based
Production Ready ❌ No Recommended Recommended
Best For Unit tests, local dev Most production workloads High-throughput distributed systems
Operational Cost None Lower Higher

Reliable Message Delivery (Redis Streams)

Pending Entries List (PEL) and Retry Streams

With NACK_ON_ERROR (the default), a handler exception leaves the message in Redis's Pending Entries List (PEL). FastStream only reads new messages, so a failed message would be stuck forever without intervention.

Enable SDK-managed retry by setting retry_on_idle_ms on any subscription:

from eggai import Agent, Channel
from eggai.transport import RedisTransport

transport = RedisTransport(url="redis://localhost:6379")
agent = Agent("order-service", transport=transport)
orders = Channel("orders", transport=transport)

@agent.subscribe(
    channel=orders,
    retry_on_idle_ms=30_000,          # reclaim after 30s idle (required to enable retry)
    max_retries=5,                    # route to DLQ after 5 retries (default: 5, None = unlimited)
    retry_reclaim_interval_s=15.0,    # PEL scan interval (default: 15.0)
    on_dlq=None,                      # optional async/sync callback(fields, msg_id, count)
)
async def handle_order(message):
    # If this raises, the message stays in the PEL.
    # After 30s idle the reclaimer moves it to eggai.orders.retry
    # and this same handler is called again.
    # After 5 failed retries, the message is routed to eggai.orders.dlq.
    await process_order(message)

await agent.start()

The SDK automatically:

  1. Starts a background reclaimer that scans the PEL every 15 seconds (configurable via retry_reclaim_interval_s).
  2. Moves idle messages (older than retry_on_idle_ms) to a dedicated {channel}.retry stream.
  3. Subscribes the same handler to the retry stream.
  4. Runs a second reclaimer on the retry stream that re-queues back to itself (no .retry.retry chain).
  5. After max_retries retry attempts (default 5), routes the message to a {channel}.dlq Dead Letter Queue instead of retrying again.

Delivery guarantee: at-least-once. XADD and XACK are not atomic — a crash between them will re-deliver the message on the next cycle. Handlers must be idempotent. Two fields are injected on retry delivery to aid deduplication:

Field Value
_retry_count "1", "2", … — incremented on each reclaim cycle
_original_message_id Redis stream ID of the original message

Dead Letter Queue (DLQ): Messages that exceed max_retries (default 5) are routed to {channel}.dlq. The DLQ is terminal — no automatic reclaimer. Set max_retries=None for unlimited retries. An optional on_dlq callback fires when a message lands in the DLQ.

Automatic recovery from Redis stream loss (NOGROUP): If Redis loses streams (restart without persistence, failover, memory eviction), the SDK auto-recovers. A background monitor periodically ensures consumer groups exist via XGROUP CREATE with MKSTREAM, and the reclaimer recreates groups on NOGROUP errors. No configuration needed — always active with RedisTransport.

Constraints:

  • min_idle_time (FastStream XAUTOCLAIM) and retry_on_idle_ms are mutually exclusive on the same subscription — mixing them raises ValueError.
  • Binary (non-UTF-8) field values are not supported; use JSON-serialisable payloads.

Retry delivery reference

Main stream  (eggai.orders)
    │
    ├── FastStream consumer  (group/consumer: order-service-handle_order-1)
    │       on exception → NACK → message stays in main PEL
    │
    └── Reclaimer            (consumer: order-service-handle_order-1-reclaimer)
            every 15s: XPENDING → idle > 30s → XCLAIM
            _retry_count ≤ max_retries → XADD orders.retry → XACK
            _retry_count > max_retries → XADD orders.dlq   → XACK

Retry stream (eggai.orders.retry)
    │
    ├── FastStream consumer  (group: order-service-handle_order-1-retry)
    │       same handler — on exception → NACK → message stays in retry PEL
    │
    └── Reclaimer            (target: same retry stream — no .retry.retry chain)
            every 15s: XPENDING → idle > 30s → XCLAIM
            _retry_count ≤ max_retries → XADD orders.retry → XACK
            _retry_count > max_retries → XADD orders.dlq   → XACK

DLQ stream   (eggai.orders.dlq)
        terminal — no reclaimer, manual re-drive only

Production Recommendations

For production deployments, we recommend:

  • Redis Streams: Best for most production workloads. Simpler to operate, lower cost, excellent performance for microservices and event-driven architectures.
  • Kafka: Best for very high-throughput requirements (1M+ messages/sec), complex stream processing, or when you need advanced features like exactly-once semantics.

The In-Memory transport should only be used for testing and development.

Interoperability

EggAI is designed as a meta-framework that enables seamless integration with other agent systems and protocols:

Agent-to-Agent (A2A) Protocol

Connect EggAI agents with other agent frameworks and systems using the A2A protocol:

pip install eggai[a2a]

A2A enables:

  • Cross-framework agent communication
  • Standardized message formats for multi-agent systems
  • Integration with existing agent ecosystems (AutoGen, LangChain, etc.)
  • HTTP-based service discovery via AgentCards

Read full A2A documentation →

Model Context Protocol (MCP)

Integrate with LLM tools and services through MCP:

pip install eggai[mcp]

MCP support enables:

  • LLM tool calling and function execution
  • Context sharing across agent conversations
  • Integration with Claude, GPT, and other LLM providers
  • Standardized tool interfaces via FastMCP

Read full MCP documentation →

Transport Flexibility

EggAI's transport abstraction allows agents to communicate regardless of underlying infrastructure:

  • Hybrid deployments: Mix local (in-memory) and distributed (Redis/Kafka) agents
  • Migration paths: Start with Redis, scale to Kafka without changing agent code
  • Multi-cloud: Deploy agents across different cloud providers using different transports
  • Custom transports: Extend with your own transport implementations

Extensibility

Build custom integrations through:

  • Custom Transport API: Implement your own message broker backends
  • Middleware system: Add cross-cutting concerns (logging, metrics, auth)
  • Message adapters: Transform messages between different protocols
  • Plugin architecture: Extend agent capabilities with reusable components

Documentation

Core Documentation

  • Message Protocol: CloudEvents-based message structure for agent communication
  • Agent: Core agent abstraction for building autonomous units
  • Channel: Communication layer for event publishing and subscription

Additional Resources

For full documentation, visit: https://docs.egg-ai.com/

License

MIT License - see LICENSE file for details

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

eggai-0.2.14.tar.gz (42.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

eggai-0.2.14-py3-none-any.whl (52.8 kB view details)

Uploaded Python 3

File details

Details for the file eggai-0.2.14.tar.gz.

File metadata

  • Download URL: eggai-0.2.14.tar.gz
  • Upload date:
  • Size: 42.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for eggai-0.2.14.tar.gz
Algorithm Hash digest
SHA256 b6f2e360c3dd5ed4bfc07cf9f6f6b9a5ba135650bc3572712b67a9b1010cf6a3
MD5 9e04a2c1708e4e99c0b14c931ceb1b81
BLAKE2b-256 4400453bff58d90d3785226acd96f96e26a058370ec6b09aa62a5aa3d46d6ba0

See more details on using hashes here.

Provenance

The following attestation bundles were made for eggai-0.2.14.tar.gz:

Publisher: python-publish.yaml on eggai-tech/EggAI

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file eggai-0.2.14-py3-none-any.whl.

File metadata

  • Download URL: eggai-0.2.14-py3-none-any.whl
  • Upload date:
  • Size: 52.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for eggai-0.2.14-py3-none-any.whl
Algorithm Hash digest
SHA256 bccfaaacb7f784d44d3745460b330082c64f334a8abf5f8cb8dc83a1153b81e0
MD5 e678e2ff734c807c2a31934ede627fe1
BLAKE2b-256 5ff0c5a588b0b24d3f286384cdb9dc61d83ed3ac1795d8ecb30cc1551a453e51

See more details on using hashes here.

Provenance

The following attestation bundles were made for eggai-0.2.14-py3-none-any.whl:

Publisher: python-publish.yaml on eggai-tech/EggAI

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page