EggAI Multi-Agent Meta Framework` is an async-first framework for building, deploying, and scaling multi-agent systems for modern enterprise environments
Project description
EggAI SDK
EggAI Multi-Agent Meta Framework is an async-first framework for building, deploying, and scaling multi-agent systems for modern enterprise environments.
Features
- Multi-Transport Support: In-Memory, Redis Streams, Kafka, and extensible for custom transports
- Async-First: Built on asyncio for high-performance concurrent processing
- Type-Safe: Full type hints and Pydantic validation with CloudEvents-compliant message protocol
- Production-Ready: Comprehensive error handling, logging, and monitoring
- Flexible Architecture: Easily extensible with custom transports and middleware
- Protocol Interoperability: A2A for agent collaboration, MCP for LLM tools, CloudEvents for messaging
Installation
pip install eggai
Redis Streams and Kafka support are included by default.
Optional extras:
pip install eggai[cli] # CLI tools for scaffolding
pip install eggai[a2a] # A2A (Agent-to-Agent) SDK integration
pip install eggai[mcp] # MCP (Model Context Protocol) support
Quick Start
Using In-Memory Transport (Testing & Development)
Perfect for unit tests, prototyping, and local development.
from eggai import Agent, Channel
from eggai.transport.memory import InMemoryTransport
# Create an in-memory transport
transport = InMemoryTransport()
# Create an agent
agent = Agent(name="my-agent", transport=transport)
# Define message handler
@agent.subscribe(channel=Channel("input-channel", transport=transport))
async def handle_message(message):
# Process message
result = await process(message)
# Publish to output channel
output_channel = Channel("output-channel", transport=transport)
await output_channel.publish(result)
# Start the agent
await agent.start()
Using Redis Streams (Recommended for Production)
Redis Streams provides durable message delivery with consumer groups, perfect for microservices and production deployments.
from eggai import Agent, Channel
from eggai.transport.redis import RedisTransport
# Create a Redis transport
transport = RedisTransport(url="redis://localhost:6379")
# Create an agent
agent = Agent(name="my-agent", transport=transport)
# Define message handler
@agent.subscribe(channel=Channel("input-channel", transport=transport))
async def handle_message(message):
# Process message
result = await process(message)
# Publish to output channel
output_channel = Channel("output-channel", transport=transport)
await output_channel.publish(result)
# Start the agent
await agent.start()
Using Kafka (Production - High Throughput)
Kafka is ideal for high-throughput, large-scale distributed streaming workloads.
from eggai import Agent, Channel
from eggai.transport.kafka import KafkaTransport
# Create a Kafka transport
transport = KafkaTransport(
bootstrap_servers="localhost:9092"
)
# Create an agent
agent = Agent(name="my-agent", transport=transport)
# Define message handler
@agent.subscribe(channel=Channel("input-topic", transport=transport))
async def handle_message(message):
# Process message
result = await process(message)
# Publish to output topic
output_channel = Channel("output-topic", transport=transport)
await output_channel.publish(result)
# Start the agent
await agent.start()
Transport Comparison
| Feature | In-Memory | Redis Streams | Kafka |
|---|---|---|---|
| Use Case | Testing, prototyping | Production microservices | Large-scale production |
| Persistence | ❌ No | ✅ Yes (AOF/RDB) | ✅ Yes (highly durable) |
| Setup Complexity | None | Simple | Moderate |
| Throughput | Very High | High (100K+ msgs/sec) | Very High (1M+ msgs/sec) |
| Consumer Groups | ❌ No | ✅ Native support | ✅ Native support |
| Message Retention | N/A | Time or count-based | Time or size-based |
| Production Ready | ❌ No | ✅ Recommended | ✅ Recommended |
| Best For | Unit tests, local dev | Most production workloads | High-throughput distributed systems |
| Operational Cost | None | Lower | Higher |
Reliable Message Delivery (Redis Streams)
Pending Entries List (PEL) and Retry Streams
With NACK_ON_ERROR (the default), a handler exception leaves the message in Redis's Pending
Entries List (PEL). FastStream only reads new messages, so a failed message would be stuck
forever without intervention.
Enable SDK-managed retry by setting retry_on_idle_ms on any subscription:
from eggai import Agent, Channel
from eggai.transport import RedisTransport
transport = RedisTransport(url="redis://localhost:6379")
agent = Agent("order-service", transport=transport)
orders = Channel("orders", transport=transport)
@agent.subscribe(
channel=orders,
retry_on_idle_ms=30_000, # reclaim after 30s idle (required to enable retry)
max_retries=5, # route to DLQ after 5 retries (default: 5, None = unlimited)
retry_reclaim_interval_s=15.0, # PEL scan interval (default: 15.0)
on_dlq=None, # optional async/sync callback(fields, msg_id, count)
)
async def handle_order(message):
# If this raises, the message stays in the PEL.
# After 30s idle the reclaimer moves it to eggai.orders.retry
# and this same handler is called again.
# After 5 failed retries, the message is routed to eggai.orders.dlq.
await process_order(message)
await agent.start()
The SDK automatically:
- Starts a background reclaimer that scans the PEL every 15 seconds (configurable via
retry_reclaim_interval_s). - Moves idle messages (older than
retry_on_idle_ms) to a dedicated{channel}.retrystream. - Subscribes the same handler to the retry stream.
- Runs a second reclaimer on the retry stream that re-queues back to itself (no
.retry.retrychain). - After
max_retriesretry attempts (default 5), routes the message to a{channel}.dlqDead Letter Queue instead of retrying again.
Delivery guarantee: at-least-once. XADD and XACK are not atomic — a crash between
them will re-deliver the message on the next cycle. Handlers must be idempotent.
Two fields are injected on retry delivery to aid deduplication:
| Field | Value |
|---|---|
_retry_count |
"1", "2", … — incremented on each reclaim cycle |
_original_message_id |
Redis stream ID of the original message |
Dead Letter Queue (DLQ): Messages that exceed max_retries (default 5) are routed to
{channel}.dlq. The DLQ is terminal — no automatic reclaimer. Set max_retries=None for
unlimited retries. An optional on_dlq callback fires when a message lands in the DLQ.
Automatic recovery from Redis stream loss (NOGROUP):
If Redis loses streams (restart without persistence, failover, memory eviction), the SDK
auto-recovers. A background monitor periodically ensures consumer groups exist via
XGROUP CREATE with MKSTREAM, and the reclaimer recreates groups on NOGROUP errors.
No configuration needed — always active with RedisTransport.
Constraints:
min_idle_time(FastStream XAUTOCLAIM) andretry_on_idle_msare mutually exclusive on the same subscription — mixing them raisesValueError.- Binary (non-UTF-8) field values are not supported; use JSON-serialisable payloads.
Retry delivery reference
Main stream (eggai.orders)
│
├── FastStream consumer (group/consumer: order-service-handle_order-1)
│ on exception → NACK → message stays in main PEL
│
└── Reclaimer (consumer: order-service-handle_order-1-reclaimer)
every 15s: XPENDING → idle > 30s → XCLAIM
_retry_count ≤ max_retries → XADD orders.retry → XACK
_retry_count > max_retries → XADD orders.dlq → XACK
Retry stream (eggai.orders.retry)
│
├── FastStream consumer (group: order-service-handle_order-1-retry)
│ same handler — on exception → NACK → message stays in retry PEL
│
└── Reclaimer (target: same retry stream — no .retry.retry chain)
every 15s: XPENDING → idle > 30s → XCLAIM
_retry_count ≤ max_retries → XADD orders.retry → XACK
_retry_count > max_retries → XADD orders.dlq → XACK
DLQ stream (eggai.orders.dlq)
terminal — no reclaimer, manual re-drive only
Production Recommendations
For production deployments, we recommend:
- Redis Streams: Best for most production workloads. Simpler to operate, lower cost, excellent performance for microservices and event-driven architectures.
- Kafka: Best for very high-throughput requirements (1M+ messages/sec), complex stream processing, or when you need advanced features like exactly-once semantics.
The In-Memory transport should only be used for testing and development.
Interoperability
EggAI is designed as a meta-framework that enables seamless integration with other agent systems and protocols:
Agent-to-Agent (A2A) Protocol
Connect EggAI agents with other agent frameworks and systems using the A2A protocol:
pip install eggai[a2a]
A2A enables:
- Cross-framework agent communication
- Standardized message formats for multi-agent systems
- Integration with existing agent ecosystems (AutoGen, LangChain, etc.)
- HTTP-based service discovery via AgentCards
Model Context Protocol (MCP)
Integrate with LLM tools and services through MCP:
pip install eggai[mcp]
MCP support enables:
- LLM tool calling and function execution
- Context sharing across agent conversations
- Integration with Claude, GPT, and other LLM providers
- Standardized tool interfaces via FastMCP
Transport Flexibility
EggAI's transport abstraction allows agents to communicate regardless of underlying infrastructure:
- Hybrid deployments: Mix local (in-memory) and distributed (Redis/Kafka) agents
- Migration paths: Start with Redis, scale to Kafka without changing agent code
- Multi-cloud: Deploy agents across different cloud providers using different transports
- Custom transports: Extend with your own transport implementations
Extensibility
Build custom integrations through:
- Custom Transport API: Implement your own message broker backends
- Middleware system: Add cross-cutting concerns (logging, metrics, auth)
- Message adapters: Transform messages between different protocols
- Plugin architecture: Extend agent capabilities with reusable components
Documentation
Core Documentation
- Message Protocol: CloudEvents-based message structure for agent communication
- Agent: Core agent abstraction for building autonomous units
- Channel: Communication layer for event publishing and subscription
Additional Resources
For full documentation, visit: https://docs.egg-ai.com/
License
MIT License - see LICENSE file for details
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file eggai-0.2.14.tar.gz.
File metadata
- Download URL: eggai-0.2.14.tar.gz
- Upload date:
- Size: 42.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b6f2e360c3dd5ed4bfc07cf9f6f6b9a5ba135650bc3572712b67a9b1010cf6a3
|
|
| MD5 |
9e04a2c1708e4e99c0b14c931ceb1b81
|
|
| BLAKE2b-256 |
4400453bff58d90d3785226acd96f96e26a058370ec6b09aa62a5aa3d46d6ba0
|
Provenance
The following attestation bundles were made for eggai-0.2.14.tar.gz:
Publisher:
python-publish.yaml on eggai-tech/EggAI
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
eggai-0.2.14.tar.gz -
Subject digest:
b6f2e360c3dd5ed4bfc07cf9f6f6b9a5ba135650bc3572712b67a9b1010cf6a3 - Sigstore transparency entry: 1147930885
- Sigstore integration time:
-
Permalink:
eggai-tech/EggAI@b0f4fffe688ae8386d06f994348d529e8f9814a4 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/eggai-tech
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yaml@b0f4fffe688ae8386d06f994348d529e8f9814a4 -
Trigger Event:
pull_request
-
Statement type:
File details
Details for the file eggai-0.2.14-py3-none-any.whl.
File metadata
- Download URL: eggai-0.2.14-py3-none-any.whl
- Upload date:
- Size: 52.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bccfaaacb7f784d44d3745460b330082c64f334a8abf5f8cb8dc83a1153b81e0
|
|
| MD5 |
e678e2ff734c807c2a31934ede627fe1
|
|
| BLAKE2b-256 |
5ff0c5a588b0b24d3f286384cdb9dc61d83ed3ac1795d8ecb30cc1551a453e51
|
Provenance
The following attestation bundles were made for eggai-0.2.14-py3-none-any.whl:
Publisher:
python-publish.yaml on eggai-tech/EggAI
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
eggai-0.2.14-py3-none-any.whl -
Subject digest:
bccfaaacb7f784d44d3745460b330082c64f334a8abf5f8cb8dc83a1153b81e0 - Sigstore transparency entry: 1147930893
- Sigstore integration time:
-
Permalink:
eggai-tech/EggAI@b0f4fffe688ae8386d06f994348d529e8f9814a4 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/eggai-tech
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yaml@b0f4fffe688ae8386d06f994348d529e8f9814a4 -
Trigger Event:
pull_request
-
Statement type: