A framework for autonomous agents that collaborate via P2P communication
Project description
Distributed Agent Mesh (DAM)
๐ A revolutionary framework for autonomous agents that collaborate via P2P communication
๐ฏ What is DAM?
Distributed Agent Mesh (DAM) is a peer-to-peer framework where autonomous AI agents:
- ๐ค Decide autonomously if they can contribute to tasks
- ๐ค Self-organize into teams without central control
- โก Execute in parallel for maximum performance
- ๐ Collaborate directly via P2P communication
- ๐ Scale dynamically by adding/removing agents anytime
- ๐ช Tolerate failures - system continues if agents fail
๐ DAM vs MCP (Model Context Protocol)
Architecture Comparison
| Aspect | MCP | DAM |
|---|---|---|
| Architecture | Client-Server | P2P Mesh |
| Control | Centralized (LLM) | Distributed (Agents) |
| Decision Making | LLM decides everything | Agents decide autonomously |
| Execution | Sequential tool calls | Parallel agent collaboration |
| Scalability | Limited by LLM | Add agents anytime |
| Resilience | Single point of failure | Fault tolerant |
| Performance | Sequential (1x baseline) | 2-5x faster (parallel) |
When to Use What?
Use MCP when:
- โ Simple tool integration
- โ Single-agent systems
- โ Well-defined sequential workflows
- โ Resource access (files, databases)
Use DAM when:
- โ Complex, multi-step tasks
- โ Tasks requiring parallelism
- โ Large-scale distributed systems
- โ Systems requiring high availability
- โ Dynamic, adaptive workflows
๐ก Best Approach: Hybrid
- Use MCP for resource access (data, tools)
- Use DAM for agent coordination
- Get the best of both worlds!
๐ Installation
pip install distributed-agent-mesh
Or install from source:
git clone https://github.com/yourusername/distributed-agent-mesh.git
cd distributed-agent-mesh
pip install -e .
๐ Quick Start
Basic Example
import asyncio
from distributed_agent_mesh import (
DistributedAgentMesh,
AutonomousAgent,
AgentCapability,
Task
)
# Define custom executor
async def research_executor(task):
# Your custom logic here
return {
'status': 'completed',
'result': f'Research completed: {task.description}'
}
# Create mesh
mesh = DistributedAgentMesh()
# Create autonomous agent
agent = AutonomousAgent(
agent_id="research_agent",
capabilities=[
AgentCapability("research", "Conduct research")
],
executor=research_executor
)
# Register agent
mesh.register_agent(agent)
# Create task
task = Task(
id="task_001",
description="Research AI trends",
requirements=["research"],
priority=1
)
# Agents collaborate autonomously!
async def main():
result = await mesh.solve_complex_task(task)
print(f"Result: {result}")
asyncio.run(main())
๐ Real-World Example: Document Analysis
import asyncio
from distributed_agent_mesh import (
DistributedAgentMesh,
AutonomousAgent,
AgentCapability,
Task
)
# Create specialized agents
async def text_extractor(task):
# Extract text from documents
return {'text': '...', 'pages': 10}
async def sentiment_analyzer(task):
# Analyze sentiment
return {'sentiment': 'positive', 'confidence': 0.89}
async def summarizer(task):
# Generate summary
return {'summary': '...', 'key_points': [...]}
# Setup mesh
mesh = DistributedAgentMesh()
# Register agents with capabilities
mesh.register_agent(AutonomousAgent(
"text_agent",
[AgentCapability("text_extraction", "Extract text")],
text_extractor
))
mesh.register_agent(AutonomousAgent(
"sentiment_agent",
[AgentCapability("sentiment_analysis", "Analyze sentiment")],
sentiment_analyzer
))
mesh.register_agent(AutonomousAgent(
"summary_agent",
[AgentCapability("summarization", "Summarize")],
summarizer
))
# Process documents in parallel
async def analyze_documents():
tasks = [
Task(
id=f"doc_{i}",
description=f"Document {i}",
requirements=["text_extraction", "sentiment_analysis", "summarization"],
priority=1
)
for i in range(10)
]
# All documents processed in parallel!
results = await asyncio.gather(*[
mesh.solve_complex_task(task) for task in tasks
])
return results
# Run
results = asyncio.run(analyze_documents())
๐ฏ Key Features
1. Autonomous Decision Making
Agents decide on their own if they can contribute:
async def can_contribute(self, task: Task) -> bool:
"""Agent decides autonomously"""
relevance = await self.analyze_relevance(task)
availability = self.check_availability()
capability = self.check_capabilities(task)
return relevance > 0.7 and availability and capability
2. Self-Organizing Teams
No central controller - agents form teams automatically:
# Mesh broadcasts task
await mesh.solve_complex_task(task)
# Agents autonomously:
# 1. Decide if they can help
# 2. Self-organize into optimal team
# 3. Collaborate peer-to-peer
# 4. Execute in parallel
3. Dynamic Scalability
Add/remove agents anytime without restart:
# Start with 2 agents
mesh.register_agent(agent1)
mesh.register_agent(agent2)
# Process tasks
result1 = await mesh.solve_complex_task(task1)
# High load? Add more agents dynamically!
mesh.register_agent(agent3)
mesh.register_agent(agent4)
# System automatically uses new agents
result2 = await mesh.solve_complex_task(task2)
4. Fault Tolerance
System continues working even when agents fail:
# 5 agents working
mesh.register_agent(agent1)
mesh.register_agent(agent2)
# ... agent5
# 2 agents fail mid-execution
mesh.unregister_agent("agent1")
mesh.unregister_agent("agent2")
# System continues with remaining 3 agents!
result = await mesh.solve_complex_task(task) # Still completes
๐ Performance Benchmarks
Based on our POC comparison:
Single Task Performance
- MCP (Sequential): 2.40s
- DAM (Parallel): 1.00s
- ๐ Speedup: 2.4x faster
Multiple Tasks (3 tasks)
- MCP (Sequential): 7.21s
- DAM (Parallel): 2.33s
- ๐ Speedup: 3.1x faster
Scalability Test
- Add agents dynamically: โ No restart needed
- Fault tolerance: โ Continues despite failures
- Load balancing: โ Automatic distribution
๐๏ธ Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Distributed Agent Mesh (DAM) โ
โ โ
โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ
โ โ Agent 1 โโโโโบโ Agent 2 โโโโโบโ Agent 3 โ โ
โ โ โ โ โ โ โ โ
โ โ Research โ โ Analysis โ โ Viz โ โ
โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ
โ โฒ โฒ โฒ โ
โ โ โ โ โ
โ โโโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโโ โ
โ P2P Network Layer โ
โ โ
โ โข Autonomous decision making โ
โ โข Self-organizing teams โ
โ โข Parallel execution โ
โ โข Fault tolerance โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
๐ ๏ธ Advanced Usage
Custom Agent Executor
async def advanced_executor(task: Task) -> Dict[str, Any]:
"""Custom logic with LLM, tools, etc."""
# Use LLM
llm_response = await call_llm(task.description)
# Use tools (via MCP or direct)
tool_result = await use_tool("search", {"query": task.description})
# Process data
processed = process_results(llm_response, tool_result)
return {
'status': 'completed',
'result': processed,
'metadata': {'time': time.time()}
}
agent = AutonomousAgent(
"advanced_agent",
[AgentCapability("advanced_task", "Advanced processing")],
advanced_executor
)
Monitoring and Stats
# Get mesh statistics
stats = mesh.get_mesh_stats()
print(f"Total Agents: {stats['total_agents']}")
print(f"Active Agents: {stats['active_agents']}")
print(f"Completed Tasks: {stats['completed_tasks']}")
print(f"Peer Connections: {stats['total_peers']}")
# Individual agent stats
for agent_stat in stats['agent_stats']:
print(f"{agent_stat['agent_id']}:")
print(f" Completed: {agent_stat['completed_tasks']}")
print(f" Performance: {agent_stat['performance_score']}")
๐ฌ Research & POC
We've conducted comprehensive research comparing DAM with MCP:
Run the POC yourself:
cd dam_research
python poc_comparison.py
Results:
- โ DAM is 2-3x faster than MCP
- โ True parallelism vs sequential execution
- โ Fault tolerance and resilience
- โ Dynamic scalability without restart
๐ Examples
Check out the examples/ directory for:
- Basic Usage (
examples/basic.py) - Document Analysis (
examples/document_analysis.py) - Real-time Streaming (
examples/streaming.py) - MCP vs DAM Comparison (
examples/poc_comparison.py)
๐ค Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
๐ License
This project is licensed under the MIT License - see the LICENSE file for details.
๐ Acknowledgments
- Inspired by the concept of distributed autonomous agents
- Built on Python's asyncio for high-performance async operations
- Zero external dependencies for maximum portability
๐ Support
- Issues: GitHub Issues
- Discussions: GitHub Discussions
- Email: your.email@example.com
โญ Star us on GitHub if you find this useful!
Made with โค๏ธ for the AI agent community
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file distributed_agent_mesh-0.1.0.tar.gz.
File metadata
- Download URL: distributed_agent_mesh-0.1.0.tar.gz
- Upload date:
- Size: 23.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
49a62b572f103b3d8e8bbba8be94ad44f3a4d1ff2fa58df172adc087f2b96274
|
|
| MD5 |
6ef29b8ab7f3278f19765067a1a38ada
|
|
| BLAKE2b-256 |
f1a401658d6423ed7a3a1d98021e3abf2e0a9eead00ce9cf28de373978cbdc46
|
File details
Details for the file distributed_agent_mesh-0.1.0-py3-none-any.whl.
File metadata
- Download URL: distributed_agent_mesh-0.1.0-py3-none-any.whl
- Upload date:
- Size: 12.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d70cb98b483b324d031d81426ccc939658aed89b974349176736f328f9cde101
|
|
| MD5 |
db638995d7f5de597620d1b246c8d7fb
|
|
| BLAKE2b-256 |
3fd419a285fa134b1c9282c54d6a85ca5091ccaa3e4b01d372fc62e7341d2238
|