The easiest way to create and orchestrate multi-agent fleets
Project description
Agent Orchestra: Production-Ready Multi-Agent Orchestration Platform
Agent Orchestra is a production-grade, open-source framework for building sophisticated multi-agent workflows with enterprise-level features. Built on top of the Model Context Protocol (MCP), it provides advanced orchestration, rate limiting, agent pooling, and comprehensive observability for real-world AI applications.
๐ Production-Ready Features
Agent Orchestra has been battle-tested and includes all the polish improvements needed for real-world deployment:
- ๐ Profile-Based Agent Pooling - Intelligent agent reuse with race-safe creation and no duplicate initialization
- โก Global Rate Limiting - Per-model RPM/RPD limits with 429-aware retries and jittered exponential backoff
- ๐ Multi-Server Routing - Single MCP client with dynamic server-name routing per workflow node
- ๐ก๏ธ Security & Safety - Path validation, directory traversal prevention, and secure parameter handling
- ๐ฏ Advanced Orchestration - DAG workflows with concurrent
foreach, intelligentreduce, and conditionalgatenodes - ๐ Comprehensive Telemetry - Event-driven architecture with structured logging and performance metrics
- ๐งน Clean Async Management - Proper resource lifecycle with graceful startup/shutdown
๐๏ธ Architecture Overview
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ Orchestrator โโโโโถโ MCPExecutor โโโโโถโ AgentPool โ
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ โ โ
โ โผ โผ
โ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ โ CallBroker โ โ SidecarMCPAgent โ
โ โ (Rate Limiting) โ โ (with Telemetry)โ
โ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ โ โ
โผ โ โผ
โโโโโโโโโโโโโโโโโโโ โ โโโโโโโโโโโโโโโโโโโ
โ GraphSpec โ โ โ SidecarMCPClientโ
โ (Workflow) โ โ โ (Multi-Server) โ
โโโโโโโโโโโโโโโโโโโ โ โโโโโโโโโโโโโโโโโโโ
โ โ
โผ โผ
โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ Broker Stats โ โ MCP Servers โ
โ (Monitoring) โ โ (fs, web, etc) โ
โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
๐ฏ Key Components
Orchestrator
The central workflow engine that executes DAG-based workflows with support for:
- Task Nodes - Single agent operations
- Foreach Nodes - Concurrent batch processing with configurable concurrency
- Reduce Nodes - Intelligent aggregation of multiple results
- Gate Nodes - Conditional workflow control
AgentPool (Profile-Based)
Production-grade agent management with:
- Profile Keys -
(server_name, model_key, policy_id)for precise agent categorization - Race-Safe Creation - Double-checked locking prevents duplicate agent initialization
- Agent Reuse - Automatic sharing of agents across workflow nodes with same profile
- Resource Limits - Configurable max agents per run with automatic cleanup
CallBroker (Rate Limiting)
Global rate limiting system with:
- Per-Model Limits - Separate RPM, RPD, and concurrency limits per model
- 429-Aware Retries - Automatic retry with jittered exponential backoff
- Sliding Window Counters - Precise rate tracking with time-based windows
- Request Queuing - Fair scheduling across multiple agents
MCPExecutor (Multi-Server)
Enhanced executor with:
- Server-Name Routing - Dynamic routing to different MCP servers per node
- Parameter Filtering - Clean parameter handling for backward compatibility
- Output Capture - Enhanced result processing with fallback to text
- Streaming Support - Real-time chunk processing with telemetry
SidecarMCPAgent (Enhanced)
Drop-in replacement for mcp-use MCPAgent with:
- Telemetry Integration - Comprehensive event emission and performance tracking
- Parameter Safety - Secure handling of server_name and other routing parameters
- Enhanced Error Handling - Better error reporting and recovery
- Full API Compatibility - 100% compatible with existing
mcp-usecode
๐ฆ Installation
Prerequisites
- Python 3.11+
- Node.js 18+ (for MCP servers)
- OpenAI API key (or other LLM provider)
Install Agent Orchestra
pip install agent-orchestra
Install MCP Servers
# Filesystem server
npm install -g @modelcontextprotocol/server-filesystem
# Web browser server
npm install -g @modelcontextprotocol/server-playwright
# Or use npx to run without global install
npx @modelcontextprotocol/server-filesystem --help
๐ Quick Start
Simple Agent Usage (Drop-in Replacement)
import asyncio
from agent_orchestra import SidecarMCPClient, SidecarMCPAgent
from langchain_openai import ChatOpenAI
async def simple_example():
# Configure MCP client
config = {
"mcpServers": {
"filesystem": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem",
"--stdio", "--root", "/tmp"]
}
}
}
client = SidecarMCPClient.from_dict(config)
llm = ChatOpenAI(model="gpt-4o-mini")
agent = SidecarMCPAgent(llm=llm, client=client)
result = await agent.run("List the files in the current directory")
print(result)
await client.close_all_sessions()
asyncio.run(simple_example())
Production Workflow with All Features
import asyncio
from agent_orchestra import SidecarMCPClient, SidecarMCPAgent
from agent_orchestra.orchestrator.core import Orchestrator
from agent_orchestra.orchestrator.types import GraphSpec, NodeSpec, RunSpec
from agent_orchestra.orchestrator.executors_mcp import MCPExecutor
from agent_orchestra.orchestrator.broker_config import create_development_broker
from agent_orchestra.orchestrator.agent_pool import AgentPool, create_default_agent_factory
from langchain_openai import ChatOpenAI
async def production_workflow():
# Multi-server MCP configuration
config = {
"mcpServers": {
"fs_business": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem",
"--stdio", "--root", "/business/data"]
},
"fs_reports": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem",
"--stdio", "--root", "/reports/output"]
},
"web": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-playwright", "--stdio"]
}
}
}
# Create production components
client = SidecarMCPClient.from_dict(config)
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.1)
# Profile-based agent pool
agent_factory = create_default_agent_factory(client, llm)
agent_pool = AgentPool(agent_factory, max_agents_per_run=10)
# Global rate limiting
broker = create_development_broker()
# Production-ready executor
executor = MCPExecutor(
agent=None, # No template agent needed
default_server="fs_business",
broker=broker,
agent_pool=agent_pool,
model_key="openai:gpt-4o-mini"
)
orchestrator = Orchestrator(executor)
# Define multi-server workflow
workflow = GraphSpec(
nodes=[
# Concurrent data processing
NodeSpec(
id="read_sales_data",
type="foreach",
server_name="fs_business", # Route to business filesystem
inputs={
"items": ["sales.json", "marketing.json", "operations.json"],
"instruction": "Read and summarize each business file"
},
concurrency=3
),
# Cross-department analysis
NodeSpec(
id="analyze_trends",
type="reduce",
inputs={
"from_ids": ["read_sales_data"],
"instruction": "Analyze trends across all departments"
}
),
# Web research for market context
NodeSpec(
id="market_research",
type="task",
server_name="web", # Route to web browser
inputs={
"from": "analyze_trends",
"instruction": "Research current market trends for context"
}
),
# Save final report
NodeSpec(
id="save_report",
type="task",
server_name="fs_reports", # Route to reports filesystem
inputs={
"from": "market_research",
"instruction": "Create executive summary and save as report.pdf"
}
)
],
edges=[
("read_sales_data", "analyze_trends"),
("analyze_trends", "market_research"),
("market_research", "save_report")
]
)
run_spec = RunSpec(
run_id="business_analysis_001",
goal="Multi-department business analysis with market research"
)
# Execute with full observability
print("๐ Starting production workflow...")
async for event in orchestrator.run_streaming(workflow, run_spec):
if event.type == "NODE_START":
print(f"๐ Starting {event.node_id}")
elif event.type == "NODE_COMPLETE":
print(f"โ
Completed {event.node_id}")
elif event.type == "AGENT_CHUNK":
print(f" ๐ง Agent progress: {event.data.get('content', '')[:50]}...")
elif event.type == "RUN_COMPLETE":
print(f"๐ Workflow completed successfully!")
# Get production metrics
broker_stats = await broker.get_stats()
pool_stats = await agent_pool.get_pool_stats()
print(f"\n๐ Production Metrics:")
print(f" ๐ Agent profiles created: {len(pool_stats['profiles'])}")
for profile_key, profile_info in pool_stats['profiles'].items():
server = profile_info['server_name'] or 'default'
usage = profile_info['usage_count']
print(f" {server} server: {usage} uses")
for model, stats in broker_stats.items():
if stats['rpm_used'] > 0:
print(f" ๐ {model}: {stats['rpm_used']}/{stats['rpm_limit']} RPM used")
# Clean shutdown
await broker.shutdown()
await agent_pool.shutdown()
await client.close_all_sessions()
# Run with proper error handling
if __name__ == "__main__":
asyncio.run(production_workflow())
๐ Examples
The examples/ directory contains production-ready demonstrations:
Production Examples
polished_part4_demo.py- Complete production workflow with all featurespolished_simple_demo.py- Simple demo without complex MCP setuppolished_verification_demo.py- Verification of all polish improvementspart4_complete_demo.py- CallBroker + AgentPool integration
Core Feature Examples
basic_orchestration.py- Simple DAG workflowforeach_example.py- Concurrent batch processingreduce_example.py- Data aggregation patternsgate_example.py- Conditional workflow control
Integration Examples
multi_server_example.py- Multiple MCP servers in one workflowrate_limiting_example.py- CallBroker rate limiting demonstrationagent_pooling_example.py- AgentPool management patterns
๐งช Testing
Agent Orchestra includes comprehensive test coverage:
# Run all tests
python -m pytest tests/ -v
# Run specific test categories
python -m pytest tests/test_polish_improvements.py -v # Production features
python -m pytest tests/test_orchestration.py -v # Core orchestration
python -m pytest tests/test_agent_pool.py -v # Agent management
Test Coverage:
- Polish Improvements - All 10 production-ready improvements
- Race Conditions - Concurrent agent creation safety
- Path Validation - Security and directory traversal prevention
- Rate Limiting - Global rate limiting across multiple agents
- Multi-Server - Server routing and profile management
๐ง Configuration
CallBroker Configuration
from agent_orchestra.orchestrator.call_broker import CallBroker, ModelLimits
# Custom rate limits
limits = {
"openai:gpt-4": ModelLimits(rpm=60, rpd=1000, max_concurrency=10),
"openai:gpt-4o-mini": ModelLimits(rpm=200, rpd=5000, max_concurrency=20),
"anthropic:claude-3": ModelLimits(rpm=50, rpd=800, max_concurrency=5)
}
broker = CallBroker(limits)
AgentPool Configuration
from agent_orchestra.orchestrator.agent_pool import AgentPool, AgentSpec
# Profile-based agent management
async def custom_factory(spec: AgentSpec):
# Custom agent creation logic based on spec
return SidecarMCPAgent(...)
pool = AgentPool(custom_factory, max_agents_per_run=15)
# Get agent for specific profile
spec = AgentSpec(
server_name="fs_business",
model_key="openai:gpt-4",
policy_id="standard"
)
agent = await pool.get(spec, "run_123")
Multi-Server MCP Configuration
from agent_orchestra.orchestrator.fs_utils import create_multi_server_config
configs = {
"fs_sales": {"root": "/data/sales"},
"fs_reports": {"root": "/data/reports"},
"playwright": {"type": "playwright"},
"custom_server": {
"command": "python",
"args": ["-m", "my_custom_server", "--stdio"]
}
}
mcp_config = create_multi_server_config(configs)
๐ก๏ธ Security Features
Path Validation
from agent_orchestra.orchestrator.fs_utils import fs_args
# Safe path handling with directory traversal prevention
root = Path("/safe/root")
try:
safe_args = fs_args(root, "../../etc/passwd") # Raises ValueError
except ValueError as e:
print(f"Security violation prevented: {e}")
Parameter Filtering
Agent Orchestra automatically filters potentially unsafe parameters before passing them to underlying MCP agents, ensuring backward compatibility while maintaining security.
๐ Performance & Monitoring
Built-in Metrics
# Get real-time broker statistics
broker_stats = await broker.get_stats()
print(f"RPM usage: {broker_stats['openai:gpt-4']['rpm_used']}")
# Get agent pool statistics
pool_stats = await agent_pool.get_pool_stats()
print(f"Active agents: {pool_stats['total_agents']}")
print(f"Profile usage: {pool_stats['profiles']}")
Event-Driven Telemetry
# Access structured events during execution
async for event in orchestrator.run_streaming(workflow, run_spec):
if event.type == "AGENT_CHUNK":
# Log or emit to external monitoring
telemetry_system.emit({
"timestamp": event.timestamp,
"node_id": event.node_id,
"content": event.data
})
๐ค Migration from mcp-use
Agent Orchestra is designed as a drop-in replacement. To migrate:
-
Replace imports:
# Old from mcp_use import MCPClient, MCPAgent # New from agent_orchestra import SidecarMCPClient as MCPClient from agent_orchestra import SidecarMCPAgent as MCPAgent
-
Optional: Add production features:
# Add rate limiting from agent_orchestra.orchestrator.broker_config import create_development_broker broker = create_development_broker() # Add agent pooling from agent_orchestra.orchestrator.agent_pool import AgentPool pool = AgentPool(agent_factory)
-
Optional: Use orchestration:
# Define workflows instead of sequential calls from agent_orchestra.orchestrator import Orchestrator, GraphSpec, NodeSpec
๐ Documentation
- Architecture Guide - System design and component overview
- Production Deployment - Best practices for production use
- API Reference - Comprehensive API documentation
- Migration Guide - Detailed migration from mcp-use
- Performance Tuning - Optimization strategies
๐ฏ Production Readiness Checklist
Agent Orchestra has been thoroughly tested and includes all features needed for production deployment:
- โ Race-safe agent creation with double-checked locking
- โ Global rate limiting with 429-aware retries
- โ Profile-based agent pooling with automatic cleanup
- โ Multi-server routing with parameter filtering
- โ Security validations preventing directory traversal
- โ Comprehensive error handling with graceful degradation
- โ Resource lifecycle management with proper async cleanup
- โ Production monitoring with structured events and metrics
- โ Backward compatibility with existing mcp-use code
- โ Comprehensive test coverage including race conditions
๐ ๏ธ Development
Setup Development Environment
git clone https://github.com/your-org/agent-orchestra
cd agent-orchestra
pip install -e .
pip install -r requirements-dev.txt
Run Tests
python -m pytest tests/ -v --cov=agent_orchestra
Code Quality
ruff check . # Linting
ruff format . # Formatting
mypy src/agent_orchestra/ # Type checking
๐ License
Agent Orchestra is licensed under the MIT License.
๐ Contributing
We welcome contributions! Please see CONTRIBUTING.md for guidelines.
Key Areas for Contribution:
- Additional MCP server integrations
- Enhanced telemetry and monitoring features
- Performance optimizations
- Documentation improvements
- Example workflows and use cases
๐ Roadmap
Upcoming Features:
- OpenTelemetry integration for distributed tracing
- Human-in-the-loop (HITL) workflow nodes
- Advanced policy enforcement with RBAC
- Workflow versioning and rollback
- Distributed execution across multiple nodes
- Enhanced security with request signing
Agent Orchestra: Production-Ready Multi-Agent Orchestration ๐ผ
Built for enterprises, loved by developers.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file agentic_orchestra-0.1.0.tar.gz.
File metadata
- Download URL: agentic_orchestra-0.1.0.tar.gz
- Upload date:
- Size: 60.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
00840d0f027833d664f31fa2515ff4b059c378df908ce6eb7c506f2e3681445c
|
|
| MD5 |
c08242dc7683911954f0714bbacb278e
|
|
| BLAKE2b-256 |
ac4bea5213e9533e7406adf6348317668a7a79042ac0a3eb4d3fa957278642bb
|
File details
Details for the file agentic_orchestra-0.1.0-py3-none-any.whl.
File metadata
- Download URL: agentic_orchestra-0.1.0-py3-none-any.whl
- Upload date:
- Size: 48.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0fdbab686826606106aa62409def890f79843151bd48ce77ef51eca97a453d14
|
|
| MD5 |
7096a3b764ca7ce84b0db42b866e8fce
|
|
| BLAKE2b-256 |
a69a800c76bcdb6a60df381b37736a7f08f5a281a017a89a01bbf09eb3ed2769
|