Skip to main content

EggAI Multi-Agent Meta Framework` is an async-first framework for building, deploying, and scaling multi-agent systems for modern enterprise environments

Project description

EggAI word and figuremark

Multi-Agent Meta Framework

Documentation: EggAI Docs

Python 3.x License: MIT PRs Welcome GitHub Issues GitHub Stars

EggAI Multi-Agent Meta Framework makes it easy to build enterprise-grade multi-agent systems with quality-controlled output, using an async-first, distributed and composable architecture. The framework includes:

  • EggAI SDK: A lightweight abstraction layer for building agents and enabling agent-to-agent communication.
  • Examples: Practical use cases showing how to use the SDK and integrate EggAI with leading AI frameworks.
  • Demo: A working multi-agent insurance support system showcasing the Meta Framework in action.

EggAI Meta Framework Architecture
EggAI Meta Framework design principles: framework-agnostic, async-first, distributed and composable

Demo: Multi-Agent Insurance Support System

The EggAI Meta Framework powers a multi-agent insurance support system demo. This interactive demo showcases how EggAI can orchestrate multiple specialized agents to provide personalized insurance support. It features billing inquiries, claims processing, policy information retrieval (RAG), and intelligent routing.

Multi-Agent Insurance Support System Demo Screenshot

Examples: AI Enablement in Action

EggAI's SDK is intentionally simple, lightweight, and framework-agnostic, making it easy to integrate with today's leading AI tools—and future-ready for what's next. Here we show practical implementation scenarios and integration guides with popular AI frameworks. Each example is self-contained and ready to run out of the box. We encourage you to explore and copy/paste from our examples for your projects.

If you're new to EggAI, we recommend starting with the Getting Started example to learn the basics of agent definition, communication flows and async orchestration.

Getting Started Getting Started
Orchestrate two agents asynchronously.
Tags: Communication
Coordinator Coordinator
Bridge multiple communication channels.
Tags: Communication, Pattern
Websocket Gateway Websocket Gateway
Real-time interaction via WebSockets.
Tags: Communication, Realtime
DSPy ReAct DSPy ReAct Agent
Advanced Agents with DSPy ReAct.
Tags: DSPy, Tool Calling, React
LangChain Tool Calling LangChain Agent
Integrate tool calling with LangChain.
Tags: Tool Calling, LangChain
LiteLLM Agent LiteLLM Agent
Power agents with LiteLLM.
Tags: LiteLLM
Agent Evaluation & DSPy Agent Evaluation & Optimization with DSPy
Data-driven development with DSPy.
Tags: DSPy, Evaluation, Optimization
Safe Agents with Guardrails AI Safe Agents with Guardrails AI
Guarding LLM agents against toxicity and PII leakage.
Tags: DSPy, Guardrails
Triage Agent Triage Agent
Triage Agent with classification and routing.
Tags: Classification, Routing
Shared Context Shared Context
Maintain shared context across agents.
Tags: Communication, Memory
Multi-Agent Conversation Multi-Agent Conversation
Context-aware multi-agent conversations.
Tags: Communication, Classification, Routing, Chat

EggAI SDK

EggAI SDK includes components like Agent and Channel for decoupled communication in multi-agent systems. Its slim design offers flexibility for enterprise-grade applications and seamless integration with popular AI frameworks such as DSPy, LangChain, and LlamaIndex, see examples below:

AI Framework Integrations

DSPy Agent
# Install `eggai` and `dspy` and set OPENAI_API_KEY in the environment

import asyncio
import dspy
from eggai import Agent, Channel, eggai_main

dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))
qa_model = dspy.Predict("question -> answer")
agent, channel = Agent("QAAgent"), Channel()

@agent.subscribe(filter_by_message=lambda event: event.get("event_name") == "question_created")
async def handle_question(event):
    question = event["payload"]["question"]
    answer = qa_model(question=question).answer
    print(f"[QAAgent] Question: {question} | Answer: {answer}")

    await channel.publish({
        "event_name": "answer_generated",
        "payload": {"question": question, "answer": answer}
    })

@eggai_main
async def main():
    await agent.start()
    await channel.publish({
        "event_name": "question_created",
        "payload": {"question": "When was the Eiffel Tower built?"}
    })
    await asyncio.Future()

if __name__ == "__main__":
    asyncio.run(main())
LangChain Agent
# Install `eggai` and `langchain` and set OPENAI_API_KEY in the environment

import asyncio
from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage
from eggai import Agent, Channel, eggai_main

llm = ChatOpenAI(model_name="gpt-4o", temperature=0)
agent, channel = Agent("QAAgent"), Channel()

@agent.subscribe(filter_by_message=lambda event: event.get("event_name") == "question_created")
async def handle_question(event):
    question = event["payload"]["question"]
    answer = llm([HumanMessage(content=question)]).content

    print(f"[QAAgent] Question: {question} | Answer: {answer}")

    await channel.publish({
        "event_name": "answer_generated",
        "payload": {"question": question, "answer": answer}
    })

@eggai_main
async def main():
    await agent.start()
    await channel.publish({
        "event_name": "question_created",
        "payload": {"question": "When was the Eiffel Tower built?"}
    })
    await asyncio.Future()

if __name__ == "__main__":
    asyncio.run(main())
LiteLLM Agent
# Install `eggai` and `litellm` and set OPENAI_API_KEY in the environment

import asyncio
import litellm
from eggai import Agent, Channel, eggai_main

litellm.model = "gpt-4o"
agent, channel = Agent("QAAgent"), Channel()

@agent.subscribe(filter_by_message=lambda event: event.get("event_name") == "question_created")
async def handle_question(event):
    question = event["payload"]["question"]
    answer = litellm.completion(model=litellm.model, messages=[{"role": "user", "content": question}])["choices"][0]["message"]["content"]

    print(f"[QAAgent] Question: {question} | Answer: {answer}")

    await channel.publish({
        "event_name": "answer_generated",
        "payload": {"question": question, "answer": answer}
    })

@eggai_main
async def main():
    await agent.start()
    await channel.publish({
        "event_name": "question_created",
        "payload": {"question": "When was the Eiffel Tower built?"}
    })
    await asyncio.Future()

if __name__ == "__main__":
    asyncio.run(main())
LlamaIndex Agent
# Install `eggai` and `llama_index` and set OPENAI_API_KEY in the environment

import asyncio
from llama_index.llms.openai import OpenAI
from eggai import Agent, Channel, eggai_main

llm = OpenAI(model="gpt-4o")
agent, channel = Agent("QAAgent"), Channel()

@agent.subscribe(filter_by_message=lambda event: event.get("event_name") == "question_created")
async def handle_question(event):
    question = event["payload"]["question"]
    answer = llm.complete(question).text

    print(f"[QAAgent] Question: {question} | Answer: {answer}")

    await channel.publish({
        "event_name": "answer_generated",
        "payload": {"question": question, "answer": answer}
    })

@eggai_main
async def main():
    await agent.start()
    await channel.publish({
        "event_name": "question_created",
        "payload": {"question": "When was the Eiffel Tower built?"}
    })
    await asyncio.Future()

if __name__ == "__main__":
    asyncio.run(main())

AI Agent Evaluations

Agent Evaluation using LLM-as-a-Judge metrics
# Install `eggai` and `dspy` and set OPENAI_API_KEY in the environment
# Make sure to have the agent implementation in the `agent.py` file defined

import asyncio
import pytest
import dspy
from agent import agent
from eggai import Agent, Channel

dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))

ground_truth = [
    {"question": "When was the Eiffel Tower built?", "answer": "The Eiffel Tower was built between 1887 and 1889."},
    {"question": "Who wrote Hamlet?", "answer": "Hamlet was written by William Shakespeare."},
    {"question": "What is the capital of France?", "answer": "The capital of France is Paris."},
]

class EvaluationSignature(dspy.Signature):
    question: str = dspy.InputField(desc="Ground truth question.")
    agent_answer: str = dspy.InputField(desc="Agent-generated answer.")
    ground_truth_answer: str = dspy.InputField(desc="Expected correct answer.")

    judgment: bool = dspy.OutputField(desc="Pass (True) or Fail (False).")
    reasoning: str = dspy.OutputField(desc="Detailed justification in Markdown.")
    precision_score: float = dspy.OutputField(desc="Precision score (0.0 to 1.0).")

test_agent = Agent("TestAgent")
test_channel = Channel()
event_received = asyncio.Event()
received_event = None

@test_agent.subscribe(filter_by_message=lambda event: event.get("event_name") == "answer_generated")
async def handle_answer(event):
    global received_event
    received_event = event
    event_received.set()

@pytest.mark.asyncio
async def test_qa_agent():
    await agent.start()
    await test_agent.start()

    for item in ground_truth:
        event_received.clear()

        await test_channel.publish({"event_name": "question_created", "payload": {"question": item["question"]}})

        try:
            await asyncio.wait_for(event_received.wait(), timeout=5.0)
        except asyncio.TimeoutError:
            pytest.fail(f"Timeout: No 'answer_generated' event was published for question: {item['question']}")

        assert received_event is not None, "No 'answer_generated' event was received."
        assert received_event["event_name"] == "answer_generated", "Unexpected event type received."
        assert "answer" in received_event["payload"], "The 'answer' key is missing in the payload."

        agent_answer = received_event["payload"]["answer"]
        question = received_event["payload"]["question"]
        ground_truth_answer = item["answer"]

        assert question == item["question"], f"Incorrect question in the answer payload: {question}"

        eval_model = dspy.asyncify(dspy.Predict(EvaluationSignature))
        evaluation_result = await eval_model(
            question=question,
            agent_answer=agent_answer,
            ground_truth_answer=ground_truth_answer
        )

        assert evaluation_result.judgment, "Judgment must be True. " + evaluation_result.reasoning
        assert 0.8 <= evaluation_result.precision_score <= 1.0, "Precision score must be between 0.8 and 1.0."

Installation

Install eggai via pip:

pip install eggai

For local development, clone the repository and install the package in editable mode:

pip install -e .

Getting Started

For a quick start with a structured project, use the EggAI CLI to generate a new multi-agent application:

pipx run eggai[cli] init

This will create a complete project with agents, console interface, and configuration files.

Alternatively, here's how you can quickly set up an agent to handle events in an event-driven system:

import asyncio

from eggai import Agent, Channel, eggai_main

agent = Agent("OrderAgent")
channel = Channel()

@agent.subscribe(filter_by_message=lambda e: e.get("type") == "order_requested")
async def handle_order_requested(event):
    print(f"[ORDER AGENT]: Received order request. Event: {event}")
    await channel.publish({"type": "order_created", "payload": event})


@agent.subscribe(filter_by_message=lambda e: e.get("type") == "order_created")
async def handle_order_created(event):
    print(f"[ORDER AGENT]: Order created. Event: {event}")


@eggai_main
async def main():
    await agent.start()
    await channel.publish({
        "type": "order_requested",
        "payload": {
            "product": "Laptop",
            "quantity": 1
        }
    })

    await asyncio.Future()

if __name__ == "__main__":
    asyncio.run(main())

Copy this snippet into your project, customize it, and you're good to go!

Core Concepts

An Agent is an autonomous unit of business logic designed to orchestrate workflows, process events, and communicate with external systems such as Large Language Models (LLMs) and APIs. It reduces boilerplate code while supporting complex and long-running workflows. Key features include:

  • Event Handling: Use the subscribe decorator to bind user-defined handlers to specific events.
  • Workflow Orchestration: Manage long-running workflows and tasks efficiently.
  • External System Communication: Seamlessly interact with Large Language Models (LLMs), external APIs, and other systems.
  • Lifecycle Management: Automatically handle the lifecycle of Kafka consumers, producers, and other connected components.
  • Boilerplate Reduction: Focus on core business logic while leveraging built-in integrations for messaging and workflows.

A Channel is the foundational communication layer that facilitates both event publishing and subscription. It abstracts Kafka producers and consumers, enabling efficient and flexible event-driven operations. Key features include:

  • Event Communication: Publish events to Kafka topics with ease.
  • Event Subscription: Subscribe to Kafka topics and process events directly through the Channel.
  • Shared Resources: Optimize resource usage by managing singleton Kafka producers and consumers across multiple agents or channels.
  • Seamless Integration: Act as a communication hub, supporting both Agents and other system components.
  • Flexibility: Allow Agents to leverage Channels for both publishing and subscribing, reducing complexity and duplication.

Interoperability

In enterprise environments, diverse programming languages and frameworks create fragmentation. EggAI Agents serve as thin, flexible connectors, enabling seamless integration within the multi-agent system. This ensures enterprises can continuously enhance their AI capabilities without the need for costly re-platforming.

If you use the Kafka transport, you can directly integrate using Kafka libraries available for various programming languages:

For structured communication within the multi-agent system, we recommend using the EggAI Message Base Schema, which defines a standardized message format for consistency and interoperability.

View EggAI Message Base Schema
{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "title": "MessageBase",
  "description": "Base class for all messages in the communication protocol.",
  "type": "object",
  "properties": {
    "id": {
      "type": "string",
      "format": "uuid",
      "description": "Unique identifier for correlating requests and responses."
    },
    "type": {
      "type": "string",
      "description": "Type of the message (e.g., request, response, event)."
    },
    "metadata": {
      "type": "object",
      "additionalProperties": true,
      "description": "Additional metadata for the message."
    },
    "context": {
      "type": "object",
      "additionalProperties": true,
      "description": "Contextual information for the message."
    },
    "payload": {
      "type": "object",
      "additionalProperties": true,
      "description": "Message-specific data."
    }
  },
  "required": ["id", "type"]
}

Why Copy/Paste?

1. Full Ownership and Control By copying and pasting, you have direct access to the underlying implementation. Tweak or rewrite as you see fit, the code is truly yours.

2. Separation of Concerns Just like decoupling design from implementation, copying code (rather than installing a monolithic dependency) reduces friction if you want to restyle or refactor how agents are structured.

3. Flexibility Not everyone wants a one-size-fits-all library. With copy/paste "recipes," you can integrate only the parts you need.

4. No Hidden Coupling Sometimes, prepackaged frameworks lock in design decisions. By copying from examples, you choose exactly what gets included and how it's used.

Contribution

EggAI Multi-Agent Meta Framework is open-source and we welcome contributions. If you're looking to contribute, please refer to CONTRIBUTING.md.

License

This project is licensed under the MIT License. See the LICENSE.md file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

eggai-0.2.11.tar.gz (41.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

eggai-0.2.11-py3-none-any.whl (48.1 kB view details)

Uploaded Python 3

File details

Details for the file eggai-0.2.11.tar.gz.

File metadata

  • Download URL: eggai-0.2.11.tar.gz
  • Upload date:
  • Size: 41.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for eggai-0.2.11.tar.gz
Algorithm Hash digest
SHA256 5654293c74117cd5d140c2707b26e2d21f9e83d5bd9d62e45f123132bc4606c5
MD5 6545616ee092dced557a08220331e6ad
BLAKE2b-256 4e05c17197e5109007dc2e39b68c3c2e9632440243cc10155db03b1666205cfb

See more details on using hashes here.

Provenance

The following attestation bundles were made for eggai-0.2.11.tar.gz:

Publisher: python-publish.yaml on eggai-tech/EggAI

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file eggai-0.2.11-py3-none-any.whl.

File metadata

  • Download URL: eggai-0.2.11-py3-none-any.whl
  • Upload date:
  • Size: 48.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for eggai-0.2.11-py3-none-any.whl
Algorithm Hash digest
SHA256 6d2e934643969137fc3b5e515379e1aaf2ea8d3aeb7b68cc2160f4eb7dfd93e2
MD5 a55f011261968f35f980f24016659a7d
BLAKE2b-256 08ed25842107108e31b9793fd8656301c6b075ff00c053d948dcc19e4a1366f9

See more details on using hashes here.

Provenance

The following attestation bundles were made for eggai-0.2.11-py3-none-any.whl:

Publisher: python-publish.yaml on eggai-tech/EggAI

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page