Peer-to-peer agent discovery and communication network
Project description
AgentMesh
A peer-to-peer protocol for AI agent discovery, negotiation, and reputation.
Agents register capabilities, discover each other semantically at runtime, negotiate task contracts over WebSocket, and earn trust scores that influence future routing — all without a central orchestrator.
Why AgentMesh
Most multi-agent frameworks hardwire agent relationships at build time. You write a graph, a crew, or a pipeline — and it's fixed. Adding a new agent means editing the orchestrator. A flaky agent silently degrades the whole pipeline. There's no way to ask "find the best available agent for this task right now."
AgentMesh treats agents like services on a network:
- Any agent can join or leave the mesh at runtime
- Tasks are routed to the best available agent by capability, trust, load, and cost
- Agents that fail consistently are automatically deprioritized via circuit breaking
- Overloaded agents can counter-propose a later deadline instead of failing silently
- Quality is measured, not assumed — an OutputEvaluator scores every result and feeds real scores back into trust
The registry is a phone book, not an orchestrator. Agents talk directly to each other.
Architecture
┌─────────────────────────────────────────────────────────────────┐
│ AgentMesh Runtime │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Mesh Registry (port 8080) │ │
│ │ FastAPI · PostgreSQL · Semantic Search · Traces │ │
│ │ Registration · Discovery · Trust · Memory · Stats │ │
│ └──────────────────────────┬──────────────────────────────┘ │
│ │ POST /discover │
│ ┌──────────────────┼──────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Research │ │ Data │ │ Code │ ··· │
│ │ Agent │ │ Agent │ │ Agent │ │
│ │ :9001 │ │ :9002 │ │ :9003 │ │
│ └──────┬──────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ ┌─────────────┐ ┌──────────────┐ │ │
│ │ Writer │ │ WebSearch │ │ │
│ │ Agent │ │ Agent │ │ │
│ │ :9004 │ │ :9005 (MCP) │ │ │
│ └─────────────┘ └──────────────┘ │ │
│ │ WebSocket task.request ──────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ React Dashboard (port 5173) │ │
│ │ Mesh Graph · Trace Timeline · Memory Inspector │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
How the Protocol Works
1. Register
An agent joins the mesh by posting a capability manifest:
{
"agent_id": "agent-data-7f3a",
"name": "Data Agent",
"endpoint": "ws://localhost:9002",
"max_concurrent_tasks": 3,
"capabilities": [{
"name": "analyze_csv",
"description": "Statistical analysis of CSV or tabular data",
"avg_latency_ms": 8000,
"cost_per_call_usd": 0.002
}],
"tags": ["data", "analysis", "csv"]
}
2. Discover
Any agent queries the mesh with a natural-language description or an exact name:
agents = await self.discover(
description="analyze and summarize tabular data",
min_trust=0.4,
max_cost_usd=0.005,
)
The router ranks candidates by a composite score:
score = (semantic_match × 0.35)
+ (trust_score × 0.35)
+ (availability × 0.15) # based on current task load
+ (cost_factor × 0.15) # cheaper = higher score within budget
Degraded agents (circuit open) are included but scored at 30% availability.
3. Negotiate
The requester opens a WebSocket to the chosen agent and sends a task.request. The agent responds with one of three outcomes:
| Response | Meaning |
|---|---|
accepted |
Agent takes the task, returns estimated latency |
rejected |
Agent can't do it (wrong capability, etc.) |
countered |
Agent is at capacity — proposes a later deadline |
On countered, the SDK automatically retries with the proposed deadline if it's within 1.5× the original. Otherwise it falls back to the next discovery candidate.
4. Execute
The agent runs the task using whatever it has — an LLM, a database, an MCP tool, or a subprocess. Result comes back as a structured TaskResult.
5. Trust Update
After every task, the requester evaluates the output and reports a quality score. Trust updates via an ELO-style algorithm:
new_score = old_score + 0.1 × (actual_quality − expected_quality)
Scores are bounded to [0.0, 1.0]. After 3 consecutive failures, the circuit opens and the agent moves to degraded status. It auto-recovers after 60 seconds (half-open probe).
Quality is measured, not assumed. The OutputEvaluator scores each result deterministically where possible:
analyze_csv— cross-checks reported numbers against actual pandas/numpy computed stats (±10% tolerance)fetch_code— checks subprocess exit code (0= correct, syntax error =0.0)web_search— checks non-empty result contentwrite_report— LLM-as-judge via Groq (generic rubric, returns0.5if API key unavailable)
This closes the feedback loop: better agents accumulate trust over time, poor ones decline and get routed around.
Quick Start
Requirements: Python 3.11+, a Groq API key (free tier works)
# 1. Install from PyPI
pip install "agentmesh-proto[all]"
# Or clone for local development
git clone https://github.com/arshadvani3/AgentMesh
cd AgentMesh
pip install -e ".[all,dev]"
# 2. Configure
cp .env.example .env
# Set GROQ_API_KEY=gsk_...
# 4. Run
python3 demo.py
Install options:
| Command | What you get |
|---|---|
pip install agentmesh-proto |
Registry + SDK only — no LLM deps |
pip install "agentmesh-proto[agents]" |
+ LangChain, LangGraph, Groq, Pandas |
pip install "agentmesh-proto[database]" |
+ asyncpg for PostgreSQL |
pip install "agentmesh-proto[cache]" |
+ Redis for session memory |
pip install "agentmesh-proto[mcp]" |
+ MCP tool client |
pip install "agentmesh-proto[all]" |
Everything above |
The demo starts the registry, launches 4 agents, submits a research task against the included startup funding dataset, and prints the final report. Everything — including semantic search model loading — is automatic.
Custom query:
python3 demo.py --query "Compare vector databases for RAG: Pinecone vs Weaviate vs Chroma"
Custom dataset:
python3 demo.py --csv-path /path/to/your/data.csv --query "Analyze the trends in this dataset"
Building an Agent
Subclass MeshAgent, decorate methods with @capability, call start().
from sdk.agent import MeshAgent, capability
import asyncio, os
class TranslationAgent(MeshAgent):
@capability(
name="translate_text",
description="Translates text between languages. Supports 50+ languages.",
input_schema={
"type": "object",
"properties": {
"text": {"type": "string"},
"source_lang": {"type": "string"},
"target_lang": {"type": "string"},
},
"required": ["text", "target_lang"],
},
avg_latency_ms=1200,
cost_per_call_usd=0.001,
)
async def translate_text(self, input_data: dict) -> dict:
# your model / API call here
return {"translated": "...", "detected_source": "en"}
async def main():
agent = TranslationAgent(
name="Translation Agent",
registry_url=os.environ.get("REGISTRY_URL", "http://localhost:8080"),
ws_port=9010,
max_concurrent_tasks=5,
tags=["translation", "nlp", "language"],
)
await agent.start()
asyncio.run(main())
Once running, any other mesh agent can find and use it:
# From any MeshAgent subclass:
agents = await self.discover(description="translate text between languages")
result = await self.delegate("translate_text", {
"text": "Hello, world!",
"target_lang": "es",
}, target=agents[0])
print(result.output["translated"]) # "¡Hola, mundo!"
Using MCP Tools
Agents can call MCP tools directly via the built-in client. Configure servers in ~/.agentmesh/mcp_servers.json:
{
"brave-search": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-brave-search"],
"env": {"BRAVE_API_KEY": "your-key"}
}
}
Then call from any agent:
result = await self.call_mcp_tool(
"brave-search",
"brave_web_search",
{"query": input_data["query"]}
)
A mock fallback activates automatically when the server isn't configured — so the mesh runs and tests pass without a Brave API key.
Example Use Cases
Autonomous Research Pipeline
A research agent receives a complex query, decomposes it into subtasks, discovers specialists dynamically, and synthesizes results — all without knowing in advance which agents exist.
Query: "Competitive analysis of vector databases"
→ discovers web_search_agent → delegates retrieval
→ discovers data_analysis_agent → delegates metric comparison
→ discovers report_writer_agent → delegates synthesis
→ merges results into final report
New specialists can join the mesh at any time and get discovered automatically on the next query.
Code Review Mesh
Push a PR → trigger a fleet of independent review agents:
security_scanner— OWASP vulnerability checksstyle_checker— linting and formattingtest_coverage— gap analysisdocumentation— missing docstring detection
Each agent is independent. Add or remove reviewers by registering/deregistering. No orchestrator rewrite needed.
Budget-Constrained Routing
Multiple LLM backends registered with different cost profiles:
# Find the best agent within a $0.005/call budget
agents = await self.discover(
description="generate structured JSON from unstructured text",
max_cost_usd=0.005,
min_trust=0.6,
)
The router excludes agents over budget and ranks cheaper ones higher within budget. As trust scores accumulate, the system learns which model performs best per capability.
Resilient Data Pipelines
Three ETL agents registered with the same capability. If the primary hits 3 consecutive failures, its circuit opens and the next-best agent takes over automatically — no manual failover configuration.
Enterprise Tool Composition
Teams register their internal tooling as agents:
salesforce_agent— CRM queriesanalytics_agent— data warehouse SQLslack_agent— formatted channel notifications
An orchestrator composes them into workflows at runtime, discovering what's available rather than having it hardwired.
Comparison
| Feature | LangGraph / CrewAI | Google A2A | AgentMesh |
|---|---|---|---|
| Topology | Centralized graph | Spec only | Decentralized mesh |
| Discovery | Hardwired at build time | Static agent cards | Semantic search at runtime |
| Negotiation | None | Defined, not implemented | Accept / reject / counter |
| Trust & Reputation | None | None | ELO scoring, auto-updates |
| Circuit Breaker | None | None | Built-in, auto-recovery |
| Cost-Aware Routing | None | None | Budget filter + cost factor |
| Load-Aware Routing | None | None | Live task count per agent |
| Quality Evaluation | None | None | OutputEvaluator per-capability |
| MCP Tool Support | Via LangChain | None | Native MCPToolClient |
| Working Runtime | Yes | No (spec only) | Yes |
Project Structure
agentmesh/
│
├── mesh/
│ ├── models.py # Pydantic schemas — protocol source of truth
│ ├── registry.py # FastAPI service: register, discover, trust, traces, memory, stats
│ ├── router.py # 4-factor semantic routing (match/trust/load/cost)
│ ├── db.py # asyncpg + full in-memory fallback (no Docker needed)
│ ├── evaluator.py # OutputEvaluator: deterministic + LLM-as-judge quality scoring
│ ├── mcp_client.py # MCPToolClient: subprocess MCP server wrapper, mock fallback
│ ├── memory.py # AgentMemory: Redis-backed session state, in-memory fallback
│ ├── cli.py # Click CLI
│ └── migrations/ # SQL migrations 001–004
│
├── sdk/
│ └── agent.py # MeshAgent base class + @capability decorator
│ # Handles: registration, heartbeat, WebSocket task server,
│ # discover(), delegate(), counter-proposal retry, MCP tools
│
├── agents/
│ ├── research_agent.py # LangGraph orchestrator: plan→discover→delegate→synthesize
│ ├── data_agent.py # Real pandas/numpy CSV computation; LLM narrates, doesn't invent
│ ├── code_agent.py # Code generation via Groq + optional subprocess execution
│ ├── writer_agent.py # Markdown report synthesis via Groq
│ └── web_search_agent.py # Web search via brave-search MCP (mock fallback built-in)
│
├── dashboard/ # Vite + React 18 + TypeScript + Tailwind
│ └── src/
│ ├── components/
│ │ ├── MeshGraph.tsx # Live force-directed agent network graph
│ │ ├── TraceTimeline.tsx # Swimlane view of agent interactions
│ │ ├── AgentCard.tsx # Per-agent trust history (recharts)
│ │ └── MemoryPanel.tsx # Live session state inspector
│ └── hooks/
│ ├── useDashboardSocket.ts # Auto-reconnecting WebSocket for live events
│ ├── useAgents.ts # Registry polling hook
│ └── useStats.ts # Aggregate mesh stats polling
│
├── examples/
│ ├── hello_agent.py # Minimal 15-line agent — register and respond
│ ├── translation_agent.py # Full translation agent (runnable standalone)
│ └── two_agent_pipeline.py # 2-agent mini-mesh without LangGraph
│
├── tests/
│ ├── test_models.py # Schema validation, JSON roundtrips
│ ├── test_registry.py # Registration, discovery, trust endpoints
│ ├── test_router.py # Semantic match, cost filter, ranking
│ ├── test_trust.py # ELO convergence and bounds
│ ├── test_circuit_breaker.py # Degraded status, recovery, ranking penalty
│ ├── test_negotiation.py # Accept/reject/counter over real WebSocket
│ ├── test_e2e.py # Full register→discover→delegate→verify
│ ├── test_auth.py # JWT auth, token endpoint, protected endpoints
│ ├── test_memory.py # AgentMemory set/get/TTL/clear/sessions
│ ├── test_data_agent.py # Real CSV computation, SSRF/path protection
│ ├── test_code_agent.py # Print capture, syntax error, timeout, isolation
│ ├── test_evaluator.py # OutputEvaluator per-capability scoring
│ ├── test_mcp_client.py # MCPToolClient tool listing, calling, mock fallback
│ └── test_latency.py # Latency tracking and reporting
│
├── data/
│ └── startups.csv # 100-row demo dataset (company/sector/funding/year/country/stage)
│
├── demo.py # One-command demo: registry + 4 agents + research task + Rich UI
├── .env.example # Environment variable template
└── pyproject.toml # Package metadata + dependencies
API Reference
Registry Endpoints
| Method | Path | Description |
|---|---|---|
POST |
/agents/register |
Register an agent with its capability manifest |
GET |
/agents |
List all registered agents |
GET |
/agents/{agent_id} |
Get a specific agent's record |
DELETE |
/agents/{agent_id} |
Deregister an agent |
POST |
/discover |
Find agents matching a DiscoveryQuery |
POST |
/trust/update |
Report task outcome and update trust score |
GET |
/agents/{agent_id}/trust_history |
Per-agent trust score history |
GET |
/traces |
Query stored trace events |
POST |
/traces |
Emit a trace event |
GET |
/stats |
Aggregate mesh stats (agents, tasks, avg trust, sessions) |
GET |
/memory |
List active session IDs |
GET |
/memory/{session_id} |
Inspect session state (auth required) |
POST |
/auth/token |
Issue a JWT for an agent |
WS |
/ws/heartbeat/{agent_id} |
Agent heartbeat + task load reporting |
WS |
/ws/dashboard |
Real-time event stream for the dashboard |
DiscoveryQuery Fields
DiscoveryQuery(
capability_name="analyze_csv", # exact match
capability_description="...", # semantic match
min_trust_score=0.4, # reputation filter
max_cost_usd=0.005, # budget filter
tags=["data", "analysis"], # tag intersection filter
top_k=5, # max results (1–20)
)
Running Tests
pip install -e ".[all,dev]"
pytest tests/ -v
164 tests, no external dependencies required. The in-memory database and memory fallbacks mean no PostgreSQL, Redis, or Docker needed.
Environment Variables
| Variable | Required | Default | Description |
|---|---|---|---|
GROQ_API_KEY |
Yes | — | Groq API key (get free at console.groq.com) |
REGISTRY_URL |
No | http://localhost:8080 |
Registry base URL for agents |
REGISTRY_PORT |
No | 8080 |
Port for the registry server |
DATABASE_URL |
No | in-memory | PostgreSQL connection string |
REDIS_URL |
No | in-memory | Redis URL for AgentMemory session state |
AGENT_SECRET |
No | dev mode | JWT signing secret — set to enable auth enforcement |
CSV_SAFE_DIR |
No | /data/uploads |
Allowed base path for csv_path (path traversal protection) |
LOG_LEVEL |
No | INFO |
Logging verbosity |
Tech Stack
| Layer | Technology |
|---|---|
| Registry API | FastAPI + asyncpg + PostgreSQL |
| LLM | Groq API — Llama 3.3 70B via langchain-groq |
| Orchestration | LangGraph StateGraph |
| Semantic Search | sentence-transformers (all-MiniLM-L6-v2) |
| Agent Transport | WebSocket (websockets) |
| MCP Tools | mcp>=1.0.0 — stdio subprocess client |
| Session Memory | Redis (in-memory fallback) |
| Dashboard | React 18 + TypeScript + Vite + Tailwind + Recharts |
| Testing | pytest + pytest-asyncio |
| CLI | Click |
Security
- JWT authentication — set
AGENT_SECRETto enable.POST /auth/tokenissues tokens; register/deregister/trust endpoints validate ownership (tokensubmust matchagent_id) - SSRF protection —
csv_urlblocks RFC-1918, loopback, and link-local IPs before fetching - Path traversal protection —
csv_pathrestricted toCSV_SAFE_DIR; paths outside this dir are rejected with HTTP 400 - Session memory auth —
GET /memory/{session_id}requires a valid JWT; session data may contain intermediate task state - Input length limits — capability descriptions truncated to 1,000 chars before embedding (DoS prevention)
- Endpoint scheme validation — agents only connect to
ws://orwss://endpoints - Field length constraints — agent name (128 chars) and description (1,024 chars) capped in the data model
- Input validation — trust quality scores outside
[0.0, 1.0]rejected with HTTP 422; negative costs rejected - CodeAgent subprocess — process isolation only (same OS user). Not a true sandbox — wrap with Docker/bubblewrap before enabling
execute=trueon untrusted input
License
MIT — Arsh Advani
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file agentmesh_proto-0.1.0.tar.gz.
File metadata
- Download URL: agentmesh_proto-0.1.0.tar.gz
- Upload date:
- Size: 71.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0486d68a2326acf4dd56577d7986ab6320757ca6458f162455149c837d9d7eac
|
|
| MD5 |
b8962b422bf878adee55b440fa042e09
|
|
| BLAKE2b-256 |
e3b9446bfe59aca12299aa82ddea0de6c91a8116848c9209b6da45a06d952a89
|
File details
Details for the file agentmesh_proto-0.1.0-py3-none-any.whl.
File metadata
- Download URL: agentmesh_proto-0.1.0-py3-none-any.whl
- Upload date:
- Size: 61.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
294c740fd4f8d7431364b720e1975634f39a3f768f23c6612b21b4391025a695
|
|
| MD5 |
929f3e4fbb55428e5daa4ec36b50a2f0
|
|
| BLAKE2b-256 |
ea2098052977f0e0662db2187cd15dd60fac9e678db8fa6eee08936bf25a4c2c
|