Skip to main content

Python SDK for agent-to-agent communication based on the Open Agent Mesh Protocol

Project description


OpenAgentMesh

OpenAgentMesh

The fabric for multi-agent systems, with the simplicity of a REST endpoint.
Documentation

PyPI Python License: MIT

✨ Highlights

  • Decoupled multi-agent system Run agents and tools however you want. Have them talk to each other as if they were running in the same process.
  • Bring your own agent. Works with any agentic framework (LangChain, CrewAI, PydanticAI, etc.).
  • Agents as functions. Just wrap your agents in a handler function and the @mesh.agent decorator. Done.
  • Batteries included. Agent catalog with runtime discovery, sync/async requests, pub/sub, shared KV and Object store, load balancing. All from a single infrastructure dependency.
  • Protocol-first. The protocol is the product. Python SDK and CLI facilitate use. Any NATS client in any language can participate by following the subject and envelope format.
  • No-effort scaling. Code remains the same. Just add instances.

📦 Installation

pip install openagentmesh

🎬 See it in action

oam demo

🚀 Quickstart

1. Start a local mesh:

oam mesh up

2. Register an agent (agent.py):

from openagentmesh import AgentMesh, AgentSpec

mesh = AgentMesh()

@mesh.agent(AgentSpec(name="echo", description="Echoes a message back."))
async def echo(req: str) -> str: 
    return f"Echo: {req}"

mesh.run()
python agent.py

3. Discover and call it from the terminal:

oam mesh catalog               # list registered agents
oam agent contract echo        # view the full contract
oam agent call echo '"hello"'  # invoke it

No hardcoded addresses. The CLI discovers echo from the mesh, reads its contract, and calls it.

🔗 How OAM relates to MCP and A2A

OAM is not a replacement for MCP or A2A. It complements both.

MCP is the standard for connecting LLMs to tools. OAM adds value where MCP alone hits friction: context bloat from loading too many tool schemas, patterns beyond request/reply (pub/sub, async callbacks, shared context), and automatic cross-team discovery.

A2A is Google's protocol for cross-organization agent federation over HTTP. OAM contracts are an A2A-compatible superset: A2A fields at top level, OAM extensions under x-agentmesh. Use OAM internally, project to A2A at the boundary where internal meets external.

Concern MCP OAM A2A
Scope Third party toolsets Agent-to-agent/tool (internal) Agent-to-agent (cross-org)
Transport stdio / SSE NATS HTTP
Discovery Manual config Automatic Agent Card directory
Load balancing N/A NATS queue groups External
Interaction Streaming Request/reply, streaming, async callback, pub/sub, reactive watcher Request/reply, streaming

📖 How to use

Any async function can be an agent: LLM chains, deterministic tools, data transformers, event publishers. The @mesh.agent decorator inspects the function signature to infer capabilities automatically.

Five handler patterns, determined by the function shape:

Pattern Signature What it does
Responder async def f(req: TypeIn) -> TypeOut (returns) Takes input, returns output
Streamer async def f(req: In) -> TypeOut (yields) Takes input, streams data back
Trigger async def f() -> TypeOut (returns) No input, returns output on call
Publisher async def f() -> TypeOut (yields) Emits events continuously
Watcher async def f() No input/output. Background task (KV watch, polling)

No capability flags to set. The handler shape is the source of truth.

📡 Invocation patterns

Four ways to interact with agents:

# Synchronous request/reply
result = await mesh.call("summarizer", {"text": doc})

# Streaming
async for chunk in mesh.stream("summarizer", {"text": doc}):
    print(chunk["delta"], end="")

# Async callback (non-blocking)
await mesh.send("summarizer", payload, on_reply=callback)

# Pub/sub events
async for event in mesh.subscribe(agent="price-feed"):
    print(event["price"])

🔍 Discovery

Two-tier discovery designed for efficient agent selection, including by LLMs:

# Tier 1: lightweight catalog (~20-30 tokens per agent)
catalog = await mesh.catalog() # Get summary of all agents on the mesh
catalog = await mesh.catalog(channel="finance.risk") # Filter the catalog

# Tier 2: full contract with JSON Schemas (only for the agent you need)
contract = await mesh.contract("summarizer")
contract.input_schema   # JSON Schema dict
contract.description    # LLM-consumable description

Catalog entries are compact enough for direct LLM consumption. The LLM picks from the catalog, then only the selected agent's full schema is fetched. Channels and tags narrow the candidate set further.

💾 Shared state

Agents can share context through KV Store and binary artifacts through Workspace, and watch for changes in both.

# KV: structured data with watch and compare-and-swap
await mesh.kv.put("config/threshold", "0.85")
value = await mesh.kv.get("config/threshold")

async for value in mesh.kv.watch("pipeline.*.status"): 
    print(f"Status changed: {value}")

# Workspace: binary artifacts (files, images, embeddings)
await mesh.workspace.put("docs/report.pdf", pdf_bytes)
data = await mesh.workspace.get("docs/report.pdf")

🤝 Participation patterns

Two ways to participate: register agents, or just connect and call.

Registered processes use @mesh.agent to declare agents. They appear in the catalog, have contracts, and participate in liveness tracking.

Unregistered processes connect, discover, call, and disconnect. Scripts, notebooks, CLI tools can use this pattern:

async with mesh:
    catalog = await mesh.catalog()
    result = await mesh.call("summarizer", {"text": "..."})

⚙️ Technology

OAM uses NATS as its single infrastructure dependency. One embedded binary provides everything, no need for additional servers/services:

NATS capability OAM use
Pub/sub Event fan-out, callbacks, streaming
Request/reply mesh.call()
Queue groups Automatic load balancing across agent instances
KV store Contract registry, agent catalog, shared context
Object store Shared workspace for artifacts

No Consul for discovery. No Redis for state. No RabbitMQ or Kafka for messaging. One connection, all primitives.

Pydantic v2 generates JSON Schemas from type hints for every agent contract. Runtime validation at the mesh boundary; malformed requests are rejected before reaching your handler.

Protocol-first. The protocol is the product, the Python SDK and CLI are the first implementation. Any NATS client in any language (Javascript, Go, Rust, etc.) can participate by following the subject naming and envelope format. Dedicated SDKs will come in the future.

Scaling

Deploy multiple instances of the same agent. NATS queue groups distribute requests automatically:

# Same code, any number of instances. No config changes.
@mesh.agent(AgentSpec(name="summarizer", channel="nlp", description="..."))
async def summarize(req: SummarizeInput) -> SummarizeOutput:
    ...

Local and production use the same architecture. The only thing that changes is the connection string:

mesh = AgentMesh()                                # local (oam mesh up)
mesh = AgentMesh("nats://mesh.company.com:4222")  # production

💻 CLI

The oam CLI is installed with the SDK:

Command Description
oam demo Launch interactive demo with sample agents
oam mesh up Start local dev server (NATS + JetStream + KV)
oam mesh down Stop local server
oam mesh connect <url> Point at a remote mesh
oam mesh listen <subject> Subscribe to live traffic
oam mesh catalog List registered agents
oam agent contract <name> View an agent's contract
oam agent call <name> <json> Invoke an agent
oam agent stream <name> <json> Stream from an agent
oam agent subscribe <name> Subscribe to publisher events

See the full documentation for more details.

🤲 Contributing

Contributions are welcome. Open an issue for bugs or new features before submitting a pull request.

For bug fixes, include a failing test that demonstrates the issue. For new features, start with the use case: describe what you're trying to build and why the current API doesn't cover it.

⭐️ Hit the star if you like this! ⭐️

License

Distributed under the MIT License. See LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

openagentmesh-0.1.5.tar.gz (29.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

openagentmesh-0.1.5-py3-none-any.whl (38.2 kB view details)

Uploaded Python 3

File details

Details for the file openagentmesh-0.1.5.tar.gz.

File metadata

  • Download URL: openagentmesh-0.1.5.tar.gz
  • Upload date:
  • Size: 29.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for openagentmesh-0.1.5.tar.gz
Algorithm Hash digest
SHA256 afd2705ce0ae409de3e1f512b51d722609939596a49547d26d55fd313b34740d
MD5 1ac0febcb4237b6cdbf9b4fa26b723be
BLAKE2b-256 01097e85c9d2cb18623935a51b93cdbbeac798058e18e2bcb71d69934f5149b6

See more details on using hashes here.

Provenance

The following attestation bundles were made for openagentmesh-0.1.5.tar.gz:

Publisher: publish.yml on openagentmesh/openagentmesh

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file openagentmesh-0.1.5-py3-none-any.whl.

File metadata

  • Download URL: openagentmesh-0.1.5-py3-none-any.whl
  • Upload date:
  • Size: 38.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for openagentmesh-0.1.5-py3-none-any.whl
Algorithm Hash digest
SHA256 519f472bd9d9a4552e6e1ac425ccc7d684caaf2fc707924b1a57f72d33c19685
MD5 812f0ccecc7f6faf1841601ea97b1041
BLAKE2b-256 97ef8d9486cf70f12277b1f2fc3ec0ff7365938ee107dbffa46f3615b86244b7

See more details on using hashes here.

Provenance

The following attestation bundles were made for openagentmesh-0.1.5-py3-none-any.whl:

Publisher: publish.yml on openagentmesh/openagentmesh

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page