Python SDK for agent-to-agent communication based on the Open Agent Mesh Protocol
Project description
OpenAgentMesh
The fabric for multi-agent systems, with the simplicity of a REST endpoint.
Documentation
✨ Highlights
- Decoupled multi-agent system Run agents and tools however you want. Have them talk to each other as if they were running in the same process.
- Bring your own agent. Works with any agentic framework (LangChain, CrewAI, PydanticAI, etc.).
- Agents as functions. Just wrap your agents in a handler function and the
@mesh.agentdecorator. Done. - Batteries included. Agent catalog with runtime discovery, sync/async requests, pub/sub, shared KV and Object store, load balancing. All from a single infrastructure dependency.
- Protocol-first. The protocol is the product. Python SDK and CLI facilitate use. Any NATS client in any language can participate by following the subject and envelope format.
- No-effort scaling. Code remains the same. Just add instances.
📦 Installation
pip install openagentmesh
🎬 See it in action
oam demo
🚀 Quickstart
1. Start a local mesh:
oam mesh up
2. Register an agent (agent.py):
from openagentmesh import AgentMesh, AgentSpec
mesh = AgentMesh()
@mesh.agent(AgentSpec(name="echo", description="Echoes a message back."))
async def echo(req: str) -> str:
return f"Echo: {req}"
mesh.run()
python agent.py
3. Discover and call it from the terminal:
oam mesh catalog # list registered agents
oam agent contract echo # view the full contract
oam agent call echo '"hello"' # invoke it
No hardcoded addresses. The CLI discovers echo from the mesh, reads its contract, and calls it.
🔗 How OAM relates to MCP and A2A
OAM is not a replacement for MCP or A2A. It complements both.
MCP is the standard for connecting LLMs to tools. OAM adds value where MCP alone hits friction: context bloat from loading too many tool schemas, patterns beyond request/reply (pub/sub, async callbacks, shared context), and automatic cross-team discovery.
A2A is Google's protocol for cross-organization agent federation over HTTP. OAM contracts are an A2A-compatible superset: A2A fields at top level, OAM extensions under x-agentmesh. Use OAM internally, project to A2A at the boundary where internal meets external.
| Concern | MCP | OAM | A2A |
|---|---|---|---|
| Scope | Third party toolsets | Agent-to-agent/tool (internal) | Agent-to-agent (cross-org) |
| Transport | stdio / SSE | NATS | HTTP |
| Discovery | Manual config | Automatic | Agent Card directory |
| Load balancing | N/A | NATS queue groups | External |
| Interaction | Streaming | Request/reply, streaming, async callback, pub/sub, reactive watcher | Request/reply, streaming |
📖 How to use
Any async function can be an agent: LLM chains, deterministic tools, data transformers, event publishers. The @mesh.agent decorator inspects the function signature to infer capabilities automatically.
Five handler patterns, determined by the function shape:
| Pattern | Signature | What it does |
|---|---|---|
| Responder | async def f(req: TypeIn) -> TypeOut (returns) |
Takes input, returns output |
| Streamer | async def f(req: In) -> TypeOut (yields) |
Takes input, streams data back |
| Trigger | async def f() -> TypeOut (returns) |
No input, returns output on call |
| Publisher | async def f() -> TypeOut (yields) |
Emits events continuously |
| Watcher | async def f() |
No input/output. Background task (KV watch, polling) |
No capability flags to set. The handler shape is the source of truth.
📡 Invocation patterns
Four ways to interact with agents:
# Synchronous request/reply
result = await mesh.call("summarizer", {"text": doc})
# Streaming
async for chunk in mesh.stream("summarizer", {"text": doc}):
print(chunk["delta"], end="")
# Async callback (non-blocking)
await mesh.send("summarizer", payload, on_reply=callback)
# Pub/sub events
async for event in mesh.subscribe(agent="price-feed"):
print(event["price"])
🔍 Discovery
Two-tier discovery designed for efficient agent selection, including by LLMs:
# Tier 1: lightweight catalog (~20-30 tokens per agent)
catalog = await mesh.catalog() # Get summary of all agents on the mesh
catalog = await mesh.catalog(channel="finance.risk") # Filter the catalog
# Tier 2: full contract with JSON Schemas (only for the agent you need)
contract = await mesh.contract("summarizer")
contract.input_schema # JSON Schema dict
contract.description # LLM-consumable description
Catalog entries are compact enough for direct LLM consumption. The LLM picks from the catalog, then only the selected agent's full schema is fetched. Channels and tags narrow the candidate set further.
💾 Shared state
Agents can share context through KV Store and binary artifacts through Workspace, and watch for changes in both.
# KV: structured data with watch and compare-and-swap
await mesh.kv.put("config/threshold", "0.85")
value = await mesh.kv.get("config/threshold")
async for value in mesh.kv.watch("pipeline.*.status"):
print(f"Status changed: {value}")
# Workspace: binary artifacts (files, images, embeddings)
await mesh.workspace.put("docs/report.pdf", pdf_bytes)
data = await mesh.workspace.get("docs/report.pdf")
🤝 Participation patterns
Two ways to participate: register agents, or just connect and call.
Registered processes use @mesh.agent to declare agents. They appear in the catalog, have contracts, and participate in liveness tracking.
Unregistered processes connect, discover, call, and disconnect. Scripts, notebooks, CLI tools can use this pattern:
async with mesh:
catalog = await mesh.catalog()
result = await mesh.call("summarizer", {"text": "..."})
⚙️ Technology
OAM uses NATS as its single infrastructure dependency. One embedded binary provides everything, no need for additional servers/services:
| NATS capability | OAM use |
|---|---|
| Pub/sub | Event fan-out, callbacks, streaming |
| Request/reply | mesh.call() |
| Queue groups | Automatic load balancing across agent instances |
| KV store | Contract registry, agent catalog, shared context |
| Object store | Shared workspace for artifacts |
No Consul for discovery. No Redis for state. No RabbitMQ or Kafka for messaging. One connection, all primitives.
Pydantic v2 generates JSON Schemas from type hints for every agent contract. Runtime validation at the mesh boundary; malformed requests are rejected before reaching your handler.
Protocol-first. The protocol is the product, the Python SDK and CLI are the first implementation. Any NATS client in any language (Javascript, Go, Rust, etc.) can participate by following the subject naming and envelope format. Dedicated SDKs will come in the future.
Scaling
Deploy multiple instances of the same agent. NATS queue groups distribute requests automatically:
# Same code, any number of instances. No config changes.
@mesh.agent(AgentSpec(name="summarizer", channel="nlp", description="..."))
async def summarize(req: SummarizeInput) -> SummarizeOutput:
...
Local and production use the same architecture. The only thing that changes is the connection string:
mesh = AgentMesh() # local (oam mesh up)
mesh = AgentMesh("nats://mesh.company.com:4222") # production
💻 CLI
The oam CLI is installed with the SDK:
| Command | Description |
|---|---|
oam demo |
Launch interactive demo with sample agents |
oam mesh up |
Start local dev server (NATS + JetStream + KV) |
oam mesh down |
Stop local server |
oam mesh connect <url> |
Point at a remote mesh |
oam mesh listen <subject> |
Subscribe to live traffic |
oam mesh catalog |
List registered agents |
oam agent contract <name> |
View an agent's contract |
oam agent call <name> <json> |
Invoke an agent |
oam agent stream <name> <json> |
Stream from an agent |
oam agent subscribe <name> |
Subscribe to publisher events |
See the full documentation for more details.
🤲 Contributing
Contributions are welcome. Open an issue for bugs or new features before submitting a pull request.
For bug fixes, include a failing test that demonstrates the issue. For new features, start with the use case: describe what you're trying to build and why the current API doesn't cover it.
⭐️ Hit the star if you like this! ⭐️
License
Distributed under the MIT License. See LICENSE for details.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file openagentmesh-0.1.5.tar.gz.
File metadata
- Download URL: openagentmesh-0.1.5.tar.gz
- Upload date:
- Size: 29.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
afd2705ce0ae409de3e1f512b51d722609939596a49547d26d55fd313b34740d
|
|
| MD5 |
1ac0febcb4237b6cdbf9b4fa26b723be
|
|
| BLAKE2b-256 |
01097e85c9d2cb18623935a51b93cdbbeac798058e18e2bcb71d69934f5149b6
|
Provenance
The following attestation bundles were made for openagentmesh-0.1.5.tar.gz:
Publisher:
publish.yml on openagentmesh/openagentmesh
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
openagentmesh-0.1.5.tar.gz -
Subject digest:
afd2705ce0ae409de3e1f512b51d722609939596a49547d26d55fd313b34740d - Sigstore transparency entry: 1351513510
- Sigstore integration time:
-
Permalink:
openagentmesh/openagentmesh@01ab2f4401616468d5043e9c8e46ce2bda70b561 -
Branch / Tag:
refs/tags/v0.1.5 - Owner: https://github.com/openagentmesh
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@01ab2f4401616468d5043e9c8e46ce2bda70b561 -
Trigger Event:
push
-
Statement type:
File details
Details for the file openagentmesh-0.1.5-py3-none-any.whl.
File metadata
- Download URL: openagentmesh-0.1.5-py3-none-any.whl
- Upload date:
- Size: 38.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
519f472bd9d9a4552e6e1ac425ccc7d684caaf2fc707924b1a57f72d33c19685
|
|
| MD5 |
812f0ccecc7f6faf1841601ea97b1041
|
|
| BLAKE2b-256 |
97ef8d9486cf70f12277b1f2fc3ec0ff7365938ee107dbffa46f3615b86244b7
|
Provenance
The following attestation bundles were made for openagentmesh-0.1.5-py3-none-any.whl:
Publisher:
publish.yml on openagentmesh/openagentmesh
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
openagentmesh-0.1.5-py3-none-any.whl -
Subject digest:
519f472bd9d9a4552e6e1ac425ccc7d684caaf2fc707924b1a57f72d33c19685 - Sigstore transparency entry: 1351514084
- Sigstore integration time:
-
Permalink:
openagentmesh/openagentmesh@01ab2f4401616468d5043e9c8e46ce2bda70b561 -
Branch / Tag:
refs/tags/v0.1.5 - Owner: https://github.com/openagentmesh
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@01ab2f4401616468d5043e9c8e46ce2bda70b561 -
Trigger Event:
push
-
Statement type: