Skip to main content

The easiest way to create and orchestrate multi-agent fleets

Project description

Agent Orchestra: Production-Ready Multi-Agent Orchestration Platform

Agent Orchestra is a production-grade, open-source framework for building sophisticated multi-agent workflows with enterprise-level features. Built on top of the Model Context Protocol (MCP), it provides advanced orchestration, rate limiting, agent pooling, and comprehensive observability for real-world AI applications.

๐Ÿš€ Production-Ready Features

Agent Orchestra has been battle-tested and includes all the polish improvements needed for real-world deployment:

  • ๐ŸŠ Profile-Based Agent Pooling - Intelligent agent reuse with race-safe creation and no duplicate initialization
  • โšก Global Rate Limiting - Per-model RPM/RPD limits with 429-aware retries and jittered exponential backoff
  • ๐Ÿ”€ Multi-Server Routing - Single MCP client with dynamic server-name routing per workflow node
  • ๐Ÿ›ก๏ธ Security & Safety - Path validation, directory traversal prevention, and secure parameter handling
  • ๐ŸŽฏ Advanced Orchestration - DAG workflows with concurrent foreach, intelligent reduce, and conditional gate nodes
  • ๐Ÿ“Š Comprehensive Telemetry - Event-driven architecture with structured logging and performance metrics
  • ๐Ÿงน Clean Async Management - Proper resource lifecycle with graceful startup/shutdown

๐Ÿ—๏ธ Architecture Overview

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚   Orchestrator  โ”‚โ”€โ”€โ”€โ–ถโ”‚   MCPExecutor    โ”‚โ”€โ”€โ”€โ–ถโ”‚   AgentPool     โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
         โ”‚                       โ”‚                       โ”‚
         โ”‚                       โ–ผ                       โ–ผ
         โ”‚              โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
         โ”‚              โ”‚   CallBroker     โ”‚    โ”‚ SidecarMCPAgent โ”‚
         โ”‚              โ”‚  (Rate Limiting) โ”‚    โ”‚ (with Telemetry)โ”‚
         โ”‚              โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
         โ”‚                       โ”‚                       โ”‚
         โ–ผ                       โ”‚                       โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”              โ”‚              โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚   GraphSpec     โ”‚              โ”‚              โ”‚ SidecarMCPClientโ”‚
โ”‚   (Workflow)    โ”‚              โ”‚              โ”‚ (Multi-Server)  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜              โ”‚              โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                                 โ”‚                       โ”‚
                                 โ–ผ                       โ–ผ
                        โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
                        โ”‚   Broker Stats   โ”‚    โ”‚   MCP Servers   โ”‚
                        โ”‚   (Monitoring)   โ”‚    โ”‚  (fs, web, etc) โ”‚
                        โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

๐ŸŽฏ Key Components

Orchestrator

The central workflow engine that executes DAG-based workflows with support for:

  • Task Nodes - Single agent operations
  • Foreach Nodes - Concurrent batch processing with configurable concurrency
  • Reduce Nodes - Intelligent aggregation of multiple results
  • Gate Nodes - Conditional workflow control

AgentPool (Profile-Based)

Production-grade agent management with:

  • Profile Keys - (server_name, model_key, policy_id) for precise agent categorization
  • Race-Safe Creation - Double-checked locking prevents duplicate agent initialization
  • Agent Reuse - Automatic sharing of agents across workflow nodes with same profile
  • Resource Limits - Configurable max agents per run with automatic cleanup

CallBroker (Rate Limiting)

Global rate limiting system with:

  • Per-Model Limits - Separate RPM, RPD, and concurrency limits per model
  • 429-Aware Retries - Automatic retry with jittered exponential backoff
  • Sliding Window Counters - Precise rate tracking with time-based windows
  • Request Queuing - Fair scheduling across multiple agents

MCPExecutor (Multi-Server)

Enhanced executor with:

  • Server-Name Routing - Dynamic routing to different MCP servers per node
  • Parameter Filtering - Clean parameter handling for backward compatibility
  • Output Capture - Enhanced result processing with fallback to text
  • Streaming Support - Real-time chunk processing with telemetry

SidecarMCPAgent (Enhanced)

Drop-in replacement for mcp-use MCPAgent with:

  • Telemetry Integration - Comprehensive event emission and performance tracking
  • Parameter Safety - Secure handling of server_name and other routing parameters
  • Enhanced Error Handling - Better error reporting and recovery
  • Full API Compatibility - 100% compatible with existing mcp-use code

๐Ÿ“ฆ Installation

Prerequisites

  • Python 3.11+
  • Node.js 18+ (for MCP servers)
  • OpenAI API key (or other LLM provider)

Install Agent Orchestra

pip install agent-orchestra

Install MCP Servers

# Filesystem server
npm install -g @modelcontextprotocol/server-filesystem

# Web browser server  
npm install -g @modelcontextprotocol/server-playwright

# Or use npx to run without global install
npx @modelcontextprotocol/server-filesystem --help

๐Ÿš€ Quick Start

Simple Agent Usage (Drop-in Replacement)

import asyncio
from agent_orchestra import SidecarMCPClient, SidecarMCPAgent
from langchain_openai import ChatOpenAI

async def simple_example():
    # Configure MCP client
    config = {
        "mcpServers": {
            "filesystem": {
                "command": "npx",
                "args": ["-y", "@modelcontextprotocol/server-filesystem", 
                        "--stdio", "--root", "/tmp"]
            }
        }
    }
    
    client = SidecarMCPClient.from_dict(config)
    llm = ChatOpenAI(model="gpt-4o-mini")
    agent = SidecarMCPAgent(llm=llm, client=client)
    
    result = await agent.run("List the files in the current directory")
    print(result)
    
    await client.close_all_sessions()

asyncio.run(simple_example())

Production Workflow with All Features

import asyncio
from agent_orchestra import SidecarMCPClient, SidecarMCPAgent
from agent_orchestra.orchestrator.core import Orchestrator
from agent_orchestra.orchestrator.types import GraphSpec, NodeSpec, RunSpec
from agent_orchestra.orchestrator.executors_mcp import MCPExecutor
from agent_orchestra.orchestrator.broker_config import create_development_broker
from agent_orchestra.orchestrator.agent_pool import AgentPool, create_default_agent_factory
from langchain_openai import ChatOpenAI

async def production_workflow():
    # Multi-server MCP configuration
    config = {
        "mcpServers": {
            "fs_business": {
                "command": "npx",
                "args": ["-y", "@modelcontextprotocol/server-filesystem", 
                        "--stdio", "--root", "/business/data"]
            },
            "fs_reports": {
                "command": "npx", 
                "args": ["-y", "@modelcontextprotocol/server-filesystem",
                        "--stdio", "--root", "/reports/output"]
            },
            "web": {
                "command": "npx",
                "args": ["-y", "@modelcontextprotocol/server-playwright", "--stdio"]
            }
        }
    }
    
    # Create production components
    client = SidecarMCPClient.from_dict(config)
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.1)
    
    # Profile-based agent pool
    agent_factory = create_default_agent_factory(client, llm)
    agent_pool = AgentPool(agent_factory, max_agents_per_run=10)
    
    # Global rate limiting
    broker = create_development_broker()
    
    # Production-ready executor
    executor = MCPExecutor(
        agent=None,  # No template agent needed
        default_server="fs_business",
        broker=broker,
        agent_pool=agent_pool,
        model_key="openai:gpt-4o-mini"
    )
    
    orchestrator = Orchestrator(executor)
    
    # Define multi-server workflow
    workflow = GraphSpec(
        nodes=[
            # Concurrent data processing
            NodeSpec(
                id="read_sales_data",
                type="foreach",
                server_name="fs_business",  # Route to business filesystem
                inputs={
                    "items": ["sales.json", "marketing.json", "operations.json"],
                    "instruction": "Read and summarize each business file"
                },
                concurrency=3
            ),
            
            # Cross-department analysis  
            NodeSpec(
                id="analyze_trends",
                type="reduce",
                inputs={
                    "from_ids": ["read_sales_data"],
                    "instruction": "Analyze trends across all departments"
                }
            ),
            
            # Web research for market context
            NodeSpec(
                id="market_research",
                type="task",
                server_name="web",  # Route to web browser
                inputs={
                    "from": "analyze_trends",
                    "instruction": "Research current market trends for context"
                }
            ),
            
            # Save final report
            NodeSpec(
                id="save_report",
                type="task", 
                server_name="fs_reports",  # Route to reports filesystem
                inputs={
                    "from": "market_research",
                    "instruction": "Create executive summary and save as report.pdf"
                }
            )
        ],
        edges=[
            ("read_sales_data", "analyze_trends"),
            ("analyze_trends", "market_research"),
            ("market_research", "save_report")
        ]
    )
    
    run_spec = RunSpec(
        run_id="business_analysis_001",
        goal="Multi-department business analysis with market research"
    )
    
    # Execute with full observability
    print("๐Ÿš€ Starting production workflow...")
    async for event in orchestrator.run_streaming(workflow, run_spec):
        if event.type == "NODE_START":
            print(f"๐Ÿ”„ Starting {event.node_id}")
        elif event.type == "NODE_COMPLETE":
            print(f"โœ… Completed {event.node_id}")
        elif event.type == "AGENT_CHUNK":
            print(f"   ๐Ÿง  Agent progress: {event.data.get('content', '')[:50]}...")
        elif event.type == "RUN_COMPLETE":
            print(f"๐ŸŽ‰ Workflow completed successfully!")
    
    # Get production metrics
    broker_stats = await broker.get_stats()
    pool_stats = await agent_pool.get_pool_stats()
    
    print(f"\n๐Ÿ“Š Production Metrics:")
    print(f"   ๐ŸŠ Agent profiles created: {len(pool_stats['profiles'])}")
    for profile_key, profile_info in pool_stats['profiles'].items():
        server = profile_info['server_name'] or 'default'
        usage = profile_info['usage_count']
        print(f"      {server} server: {usage} uses")
    
    for model, stats in broker_stats.items():
        if stats['rpm_used'] > 0:
            print(f"   ๐Ÿ“ˆ {model}: {stats['rpm_used']}/{stats['rpm_limit']} RPM used")
    
    # Clean shutdown
    await broker.shutdown()
    await agent_pool.shutdown()
    await client.close_all_sessions()

# Run with proper error handling
if __name__ == "__main__":
    asyncio.run(production_workflow())

๐Ÿ“Š Examples

The examples/ directory contains production-ready demonstrations:

Production Examples

  • polished_part4_demo.py - Complete production workflow with all features
  • polished_simple_demo.py - Simple demo without complex MCP setup
  • polished_verification_demo.py - Verification of all polish improvements
  • part4_complete_demo.py - CallBroker + AgentPool integration

Core Feature Examples

  • basic_orchestration.py - Simple DAG workflow
  • foreach_example.py - Concurrent batch processing
  • reduce_example.py - Data aggregation patterns
  • gate_example.py - Conditional workflow control

Integration Examples

  • multi_server_example.py - Multiple MCP servers in one workflow
  • rate_limiting_example.py - CallBroker rate limiting demonstration
  • agent_pooling_example.py - AgentPool management patterns

๐Ÿงช Testing

Agent Orchestra includes comprehensive test coverage:

# Run all tests
python -m pytest tests/ -v

# Run specific test categories
python -m pytest tests/test_polish_improvements.py -v  # Production features
python -m pytest tests/test_orchestration.py -v       # Core orchestration
python -m pytest tests/test_agent_pool.py -v          # Agent management

Test Coverage:

  • Polish Improvements - All 10 production-ready improvements
  • Race Conditions - Concurrent agent creation safety
  • Path Validation - Security and directory traversal prevention
  • Rate Limiting - Global rate limiting across multiple agents
  • Multi-Server - Server routing and profile management

๐Ÿ”ง Configuration

CallBroker Configuration

from agent_orchestra.orchestrator.call_broker import CallBroker, ModelLimits

# Custom rate limits
limits = {
    "openai:gpt-4": ModelLimits(rpm=60, rpd=1000, max_concurrency=10),
    "openai:gpt-4o-mini": ModelLimits(rpm=200, rpd=5000, max_concurrency=20),
    "anthropic:claude-3": ModelLimits(rpm=50, rpd=800, max_concurrency=5)
}

broker = CallBroker(limits)

AgentPool Configuration

from agent_orchestra.orchestrator.agent_pool import AgentPool, AgentSpec

# Profile-based agent management
async def custom_factory(spec: AgentSpec):
    # Custom agent creation logic based on spec
    return SidecarMCPAgent(...)

pool = AgentPool(custom_factory, max_agents_per_run=15)

# Get agent for specific profile
spec = AgentSpec(
    server_name="fs_business",
    model_key="openai:gpt-4",
    policy_id="standard"
)
agent = await pool.get(spec, "run_123")

Multi-Server MCP Configuration

from agent_orchestra.orchestrator.fs_utils import create_multi_server_config

configs = {
    "fs_sales": {"root": "/data/sales"},
    "fs_reports": {"root": "/data/reports"},
    "playwright": {"type": "playwright"},
    "custom_server": {
        "command": "python",
        "args": ["-m", "my_custom_server", "--stdio"]
    }
}

mcp_config = create_multi_server_config(configs)

๐Ÿ›ก๏ธ Security Features

Path Validation

from agent_orchestra.orchestrator.fs_utils import fs_args

# Safe path handling with directory traversal prevention
root = Path("/safe/root")
try:
    safe_args = fs_args(root, "../../etc/passwd")  # Raises ValueError
except ValueError as e:
    print(f"Security violation prevented: {e}")

Parameter Filtering

Agent Orchestra automatically filters potentially unsafe parameters before passing them to underlying MCP agents, ensuring backward compatibility while maintaining security.

๐Ÿ“ˆ Performance & Monitoring

Built-in Metrics

# Get real-time broker statistics
broker_stats = await broker.get_stats()
print(f"RPM usage: {broker_stats['openai:gpt-4']['rpm_used']}")

# Get agent pool statistics  
pool_stats = await agent_pool.get_pool_stats()
print(f"Active agents: {pool_stats['total_agents']}")
print(f"Profile usage: {pool_stats['profiles']}")

Event-Driven Telemetry

# Access structured events during execution
async for event in orchestrator.run_streaming(workflow, run_spec):
    if event.type == "AGENT_CHUNK":
        # Log or emit to external monitoring
        telemetry_system.emit({
            "timestamp": event.timestamp,
            "node_id": event.node_id, 
            "content": event.data
        })

๐Ÿค Migration from mcp-use

Agent Orchestra is designed as a drop-in replacement. To migrate:

  1. Replace imports:

    # Old
    from mcp_use import MCPClient, MCPAgent
    
    # New  
    from agent_orchestra import SidecarMCPClient as MCPClient
    from agent_orchestra import SidecarMCPAgent as MCPAgent
    
  2. Optional: Add production features:

    # Add rate limiting
    from agent_orchestra.orchestrator.broker_config import create_development_broker
    broker = create_development_broker()
    
    # Add agent pooling
    from agent_orchestra.orchestrator.agent_pool import AgentPool
    pool = AgentPool(agent_factory)
    
  3. Optional: Use orchestration:

    # Define workflows instead of sequential calls
    from agent_orchestra.orchestrator import Orchestrator, GraphSpec, NodeSpec
    

๐Ÿ“š Documentation

๐ŸŽฏ Production Readiness Checklist

Agent Orchestra has been thoroughly tested and includes all features needed for production deployment:

  • โœ… Race-safe agent creation with double-checked locking
  • โœ… Global rate limiting with 429-aware retries
  • โœ… Profile-based agent pooling with automatic cleanup
  • โœ… Multi-server routing with parameter filtering
  • โœ… Security validations preventing directory traversal
  • โœ… Comprehensive error handling with graceful degradation
  • โœ… Resource lifecycle management with proper async cleanup
  • โœ… Production monitoring with structured events and metrics
  • โœ… Backward compatibility with existing mcp-use code
  • โœ… Comprehensive test coverage including race conditions

๐Ÿ› ๏ธ Development

Setup Development Environment

git clone https://github.com/your-org/agent-orchestra
cd agent-orchestra
pip install -e .
pip install -r requirements-dev.txt

Run Tests

python -m pytest tests/ -v --cov=agent_orchestra

Code Quality

ruff check .                    # Linting
ruff format .                   # Formatting  
mypy src/agent_orchestra/       # Type checking

๐Ÿ“„ License

Agent Orchestra is licensed under the MIT License.

๐Ÿ™ Contributing

We welcome contributions! Please see CONTRIBUTING.md for guidelines.

Key Areas for Contribution:

  • Additional MCP server integrations
  • Enhanced telemetry and monitoring features
  • Performance optimizations
  • Documentation improvements
  • Example workflows and use cases

๐ŸŒŸ Roadmap

Upcoming Features:

  • OpenTelemetry integration for distributed tracing
  • Human-in-the-loop (HITL) workflow nodes
  • Advanced policy enforcement with RBAC
  • Workflow versioning and rollback
  • Distributed execution across multiple nodes
  • Enhanced security with request signing

Agent Orchestra: Production-Ready Multi-Agent Orchestration ๐ŸŽผ

Built for enterprises, loved by developers.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agentic_orchestra-0.1.0.tar.gz (60.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agentic_orchestra-0.1.0-py3-none-any.whl (48.9 kB view details)

Uploaded Python 3

File details

Details for the file agentic_orchestra-0.1.0.tar.gz.

File metadata

  • Download URL: agentic_orchestra-0.1.0.tar.gz
  • Upload date:
  • Size: 60.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.1

File hashes

Hashes for agentic_orchestra-0.1.0.tar.gz
Algorithm Hash digest
SHA256 00840d0f027833d664f31fa2515ff4b059c378df908ce6eb7c506f2e3681445c
MD5 c08242dc7683911954f0714bbacb278e
BLAKE2b-256 ac4bea5213e9533e7406adf6348317668a7a79042ac0a3eb4d3fa957278642bb

See more details on using hashes here.

File details

Details for the file agentic_orchestra-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for agentic_orchestra-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 0fdbab686826606106aa62409def890f79843151bd48ce77ef51eca97a453d14
MD5 7096a3b764ca7ce84b0db42b866e8fce
BLAKE2b-256 a69a800c76bcdb6a60df381b37736a7f08f5a281a017a89a01bbf09eb3ed2769

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page