Skip to main content

A framework for autonomous agents that collaborate via P2P communication

Project description

Distributed Agent Mesh (DAM)

๐Ÿš€ A revolutionary framework for autonomous agents that collaborate via P2P communication

Python 3.8+ License: MIT

๐ŸŽฏ What is DAM?

Distributed Agent Mesh (DAM) is a peer-to-peer framework where autonomous AI agents:

  • ๐Ÿค– Decide autonomously if they can contribute to tasks
  • ๐Ÿค Self-organize into teams without central control
  • โšก Execute in parallel for maximum performance
  • ๐Ÿ”„ Collaborate directly via P2P communication
  • ๐Ÿ“ˆ Scale dynamically by adding/removing agents anytime
  • ๐Ÿ’ช Tolerate failures - system continues if agents fail

๐Ÿ†š DAM vs MCP (Model Context Protocol)

Architecture Comparison

Aspect MCP DAM
Architecture Client-Server P2P Mesh
Control Centralized (LLM) Distributed (Agents)
Decision Making LLM decides everything Agents decide autonomously
Execution Sequential tool calls Parallel agent collaboration
Scalability Limited by LLM Add agents anytime
Resilience Single point of failure Fault tolerant
Performance Sequential (1x baseline) 2-5x faster (parallel)

When to Use What?

Use MCP when:

  • โœ… Simple tool integration
  • โœ… Single-agent systems
  • โœ… Well-defined sequential workflows
  • โœ… Resource access (files, databases)

Use DAM when:

  • โœ… Complex, multi-step tasks
  • โœ… Tasks requiring parallelism
  • โœ… Large-scale distributed systems
  • โœ… Systems requiring high availability
  • โœ… Dynamic, adaptive workflows

๐Ÿ’ก Best Approach: Hybrid

  • Use MCP for resource access (data, tools)
  • Use DAM for agent coordination
  • Get the best of both worlds!

๐Ÿš€ Installation

pip install distributed-agent-mesh

Or install from source:

git clone https://github.com/yourusername/distributed-agent-mesh.git
cd distributed-agent-mesh
pip install -e .

๐Ÿ“– Quick Start

Basic Example

import asyncio
from distributed_agent_mesh import (
    DistributedAgentMesh,
    AutonomousAgent,
    AgentCapability,
    Task
)

# Define custom executor
async def research_executor(task):
    # Your custom logic here
    return {
        'status': 'completed',
        'result': f'Research completed: {task.description}'
    }

# Create mesh
mesh = DistributedAgentMesh()

# Create autonomous agent
agent = AutonomousAgent(
    agent_id="research_agent",
    capabilities=[
        AgentCapability("research", "Conduct research")
    ],
    executor=research_executor
)

# Register agent
mesh.register_agent(agent)

# Create task
task = Task(
    id="task_001",
    description="Research AI trends",
    requirements=["research"],
    priority=1
)

# Agents collaborate autonomously!
async def main():
    result = await mesh.solve_complex_task(task)
    print(f"Result: {result}")

asyncio.run(main())

๐ŸŒŸ Real-World Example: Document Analysis

import asyncio
from distributed_agent_mesh import (
    DistributedAgentMesh,
    AutonomousAgent,
    AgentCapability,
    Task
)

# Create specialized agents
async def text_extractor(task):
    # Extract text from documents
    return {'text': '...', 'pages': 10}

async def sentiment_analyzer(task):
    # Analyze sentiment
    return {'sentiment': 'positive', 'confidence': 0.89}

async def summarizer(task):
    # Generate summary
    return {'summary': '...', 'key_points': [...]}

# Setup mesh
mesh = DistributedAgentMesh()

# Register agents with capabilities
mesh.register_agent(AutonomousAgent(
    "text_agent",
    [AgentCapability("text_extraction", "Extract text")],
    text_extractor
))

mesh.register_agent(AutonomousAgent(
    "sentiment_agent",
    [AgentCapability("sentiment_analysis", "Analyze sentiment")],
    sentiment_analyzer
))

mesh.register_agent(AutonomousAgent(
    "summary_agent",
    [AgentCapability("summarization", "Summarize")],
    summarizer
))

# Process documents in parallel
async def analyze_documents():
    tasks = [
        Task(
            id=f"doc_{i}",
            description=f"Document {i}",
            requirements=["text_extraction", "sentiment_analysis", "summarization"],
            priority=1
        )
        for i in range(10)
    ]
    
    # All documents processed in parallel!
    results = await asyncio.gather(*[
        mesh.solve_complex_task(task) for task in tasks
    ])
    
    return results

# Run
results = asyncio.run(analyze_documents())

๐ŸŽฏ Key Features

1. Autonomous Decision Making

Agents decide on their own if they can contribute:

async def can_contribute(self, task: Task) -> bool:
    """Agent decides autonomously"""
    relevance = await self.analyze_relevance(task)
    availability = self.check_availability()
    capability = self.check_capabilities(task)
    
    return relevance > 0.7 and availability and capability

2. Self-Organizing Teams

No central controller - agents form teams automatically:

# Mesh broadcasts task
await mesh.solve_complex_task(task)

# Agents autonomously:
# 1. Decide if they can help
# 2. Self-organize into optimal team
# 3. Collaborate peer-to-peer
# 4. Execute in parallel

3. Dynamic Scalability

Add/remove agents anytime without restart:

# Start with 2 agents
mesh.register_agent(agent1)
mesh.register_agent(agent2)

# Process tasks
result1 = await mesh.solve_complex_task(task1)

# High load? Add more agents dynamically!
mesh.register_agent(agent3)
mesh.register_agent(agent4)

# System automatically uses new agents
result2 = await mesh.solve_complex_task(task2)

4. Fault Tolerance

System continues working even when agents fail:

# 5 agents working
mesh.register_agent(agent1)
mesh.register_agent(agent2)
# ... agent5

# 2 agents fail mid-execution
mesh.unregister_agent("agent1")
mesh.unregister_agent("agent2")

# System continues with remaining 3 agents!
result = await mesh.solve_complex_task(task)  # Still completes

๐Ÿ“Š Performance Benchmarks

Based on our POC comparison:

Single Task Performance

  • MCP (Sequential): 2.40s
  • DAM (Parallel): 1.00s
  • ๐Ÿš€ Speedup: 2.4x faster

Multiple Tasks (3 tasks)

  • MCP (Sequential): 7.21s
  • DAM (Parallel): 2.33s
  • ๐Ÿš€ Speedup: 3.1x faster

Scalability Test

  • Add agents dynamically: โœ… No restart needed
  • Fault tolerance: โœ… Continues despite failures
  • Load balancing: โœ… Automatic distribution

๐Ÿ—๏ธ Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚         Distributed Agent Mesh (DAM)            โ”‚
โ”‚                                                 โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚
โ”‚  โ”‚ Agent 1  โ”‚โ—„โ”€โ”€โ–บโ”‚ Agent 2  โ”‚โ—„โ”€โ”€โ–บโ”‚ Agent 3  โ”‚ โ”‚
โ”‚  โ”‚          โ”‚    โ”‚          โ”‚    โ”‚          โ”‚ โ”‚
โ”‚  โ”‚ Research โ”‚    โ”‚ Analysis โ”‚    โ”‚   Viz    โ”‚ โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚
โ”‚       โ–ฒ               โ–ฒ               โ–ฒ        โ”‚
โ”‚       โ”‚               โ”‚               โ”‚        โ”‚
โ”‚       โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜        โ”‚
โ”‚              P2P Network Layer                 โ”‚
โ”‚                                                 โ”‚
โ”‚  โ€ข Autonomous decision making                  โ”‚
โ”‚  โ€ข Self-organizing teams                       โ”‚
โ”‚  โ€ข Parallel execution                          โ”‚
โ”‚  โ€ข Fault tolerance                             โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

๐Ÿ› ๏ธ Advanced Usage

Custom Agent Executor

async def advanced_executor(task: Task) -> Dict[str, Any]:
    """Custom logic with LLM, tools, etc."""
    
    # Use LLM
    llm_response = await call_llm(task.description)
    
    # Use tools (via MCP or direct)
    tool_result = await use_tool("search", {"query": task.description})
    
    # Process data
    processed = process_results(llm_response, tool_result)
    
    return {
        'status': 'completed',
        'result': processed,
        'metadata': {'time': time.time()}
    }

agent = AutonomousAgent(
    "advanced_agent",
    [AgentCapability("advanced_task", "Advanced processing")],
    advanced_executor
)

Monitoring and Stats

# Get mesh statistics
stats = mesh.get_mesh_stats()

print(f"Total Agents: {stats['total_agents']}")
print(f"Active Agents: {stats['active_agents']}")
print(f"Completed Tasks: {stats['completed_tasks']}")
print(f"Peer Connections: {stats['total_peers']}")

# Individual agent stats
for agent_stat in stats['agent_stats']:
    print(f"{agent_stat['agent_id']}:")
    print(f"  Completed: {agent_stat['completed_tasks']}")
    print(f"  Performance: {agent_stat['performance_score']}")

๐Ÿ”ฌ Research & POC

We've conducted comprehensive research comparing DAM with MCP:

Run the POC yourself:

cd dam_research
python poc_comparison.py

Results:

  • โœ… DAM is 2-3x faster than MCP
  • โœ… True parallelism vs sequential execution
  • โœ… Fault tolerance and resilience
  • โœ… Dynamic scalability without restart

๐Ÿ“š Examples

Check out the examples/ directory for:

  1. Basic Usage (examples/basic.py)
  2. Document Analysis (examples/document_analysis.py)
  3. Real-time Streaming (examples/streaming.py)
  4. MCP vs DAM Comparison (examples/poc_comparison.py)

๐Ÿค Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

๐Ÿ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.

๐Ÿ™ Acknowledgments

  • Inspired by the concept of distributed autonomous agents
  • Built on Python's asyncio for high-performance async operations
  • Zero external dependencies for maximum portability

๐Ÿ“ž Support


โญ Star us on GitHub if you find this useful!

Made with โค๏ธ for the AI agent community

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

distributed_agent_mesh-0.1.0.tar.gz (23.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

distributed_agent_mesh-0.1.0-py3-none-any.whl (12.3 kB view details)

Uploaded Python 3

File details

Details for the file distributed_agent_mesh-0.1.0.tar.gz.

File metadata

  • Download URL: distributed_agent_mesh-0.1.0.tar.gz
  • Upload date:
  • Size: 23.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.2

File hashes

Hashes for distributed_agent_mesh-0.1.0.tar.gz
Algorithm Hash digest
SHA256 49a62b572f103b3d8e8bbba8be94ad44f3a4d1ff2fa58df172adc087f2b96274
MD5 6ef29b8ab7f3278f19765067a1a38ada
BLAKE2b-256 f1a401658d6423ed7a3a1d98021e3abf2e0a9eead00ce9cf28de373978cbdc46

See more details on using hashes here.

File details

Details for the file distributed_agent_mesh-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for distributed_agent_mesh-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d70cb98b483b324d031d81426ccc939658aed89b974349176736f328f9cde101
MD5 db638995d7f5de597620d1b246c8d7fb
BLAKE2b-256 3fd419a285fa134b1c9282c54d6a85ca5091ccaa3e4b01d372fc62e7341d2238

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page