A lightweight, broker-agnostic message bus designed specifically for AI Agents
Project description
AMB - Agent Message Bus
Part of Agent OS - Kernel-level governance for AI agents
Broker-agnostic message transport for decoupled agent communication.
Why AMB?
In multi-agent systems, tight coupling between agents creates dependency graphs that scale exponentially with system size. When Agent A must know about Agent B, C, and D to communicate, the system becomes rigid and unmaintainable.
We built amb because direct agent coupling creates spaghetti code. The solution: Scale by Subtraction.
By removing the requirement for agents to know about each other, we eliminate O(nยฒ) dependencies and replace them with O(1) broadcast semantics. Agents emit signals ("I am thinking", "I need verification") without knowing who listens. The bus stays dumb and fastโit just transports the envelope.
Installation
pip install agentmesh-message-bus
For production deployments with Redis, RabbitMQ, or Kafka:
pip install agentmesh-message-bus[redis] # Redis support
pip install agentmesh-message-bus[rabbitmq] # RabbitMQ support
pip install agentmesh-message-bus[kafka] # Kafka support
pip install agentmesh-message-bus[all] # All adapters
Quick Start
import asyncio
from amb_core import MessageBus, Message
async def main():
async with MessageBus() as bus:
async def handler(msg: Message): print(msg.payload)
await bus.subscribe("agent.events", handler)
await bus.publish("agent.events", {"status": "ready"})
await asyncio.sleep(0.1)
asyncio.run(main())
Features
๐ฆ Priority Lanes
Tag messages as CRITICAL (Security/Governance) vs BACKGROUND (Memory consolidation). Critical messages jump the queue.
# Critical security alert - jumps ahead
await bus.publish(
"agent.alerts",
{"alert": "Security anomaly detected"},
priority=MessagePriority.CRITICAL
)
# Background task - processed when system is idle
await bus.publish(
"agent.tasks",
{"task": "Memory consolidation"},
priority=MessagePriority.BACKGROUND
)
Priority Levels: CRITICAL > URGENT > HIGH > NORMAL > LOW > BACKGROUND
๐ Backpressure Protocols
Implements Reactive Streams-style flow control. If a consumer is slow, the producer automatically slows down.
# Configure backpressure parameters
broker = InMemoryBroker(
max_queue_size=1000, # Max messages per topic
backpressure_threshold=0.8, # Activate at 80% capacity
backpressure_delay=0.01 # 10ms delay when active
)
bus = MessageBus(adapter=broker)
# If 100 agents spam the bus, backpressure prevents crashes
for agent_id in range(100):
await bus.publish("agent.events", {"agent": agent_id})
# Producer automatically throttles when consumer is overwhelmed
Scale by Subtraction: No external load balancer needed. The bus handles flow control automatically.
๐ OpenTelemetry Tracing (The "X-Ray")
Built-in distributed tracing for debugging multi-agent workflows. When an SDLC agent fails, trace the flow: Thought โ Message โ Tool Call โ Error across all agents.
from amb_core import MessageBus, get_tracer, initialize_tracing
# Initialize tracing (usually done once at startup)
initialize_tracing("my-agent-system")
# Get a tracer for creating spans
tracer = get_tracer("agent-workflow")
async with MessageBus() as bus:
# Messages published within a span automatically get the trace_id
with tracer.start_as_current_span("agent-thinking"):
await bus.publish("agent.thoughts", {"thought": "Processing data"})
# Or explicitly set trace_id for cross-system tracing
await bus.publish(
"agent.action",
{"action": "execute"},
trace_id="custom-trace-id-from-upstream"
)
Key Features:
- Automatic Injection:
trace_idautomatically injected from active OpenTelemetry span - Cross-Agent Tracing: Same
trace_idflows through request-response patterns - Explicit Control: Can manually set
trace_idfor integration with external systems - Zero Config: Works out of the box with InMemoryBroker, scales to production backends
See examples/tracing_demo.py for a complete multi-agent tracing example.
Architecture
amb sits in Layer 2 (Infrastructure) of the Agent OS stack. It transports message envelopes without inspecting content or enforcing policy.
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Layer 3: Framework โ agent-control-plane, scak
โ (Orchestration & Self-Correction) โ
โโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโ
โ Layer 2: Infrastructure โ AMB โ iatp (Trust), atr (Registry)
โ (Transport & Discovery) โ
โโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโ
โ Layer 1: Primitives โ caas (Context), cmvk (Verification),
โ (State & Identity) โ emk (Memory)
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Design Principles:
- No Business Logic: The bus never decides routing based on message content.
- Broker Agnostic: Swap Redis for RabbitMQ without changing application code.
- Local-First: Works on a laptop with InMemoryBrokerโno Docker required.
- Separation of Concerns: The bus transports. The receiver validates trust (via
iatp), not the bus.
The Agent OS Ecosystem
amb is one component of a modular Agent Operating System. Each layer solves a specific problem.
Layer 1: Primitives (State & Identity)
- caas - Context as a Service: Manages agent context and state
- cmvk - Context Verification Kit: Cryptographic verification of context
- emk - Episodic Memory Kit: Persistent memory for agents
Layer 2: Infrastructure (Transport & Discovery)
- iatp - Inter-Agent Trust Protocol: Trust verification for agent messages
- amb - Agent Message Bus: Broker-agnostic transport (you are here)
- atr - Agent Tool Registry: Decentralized tool discovery
Layer 3: Framework (Orchestration & Self-Correction)
- agent-control-plane - The orchestration core
- scak - Self-Correction & Alignment Kit: Runtime safety and alignment
Citation
If you use AMB in research, please cite:
@software{amb2026,
author = {Siddique, Imran},
title = {AMB: Agent Message Bus for Decoupled Multi-Agent Systems},
year = {2026},
url = {https://github.com/microsoft/agent-governance-toolkit},
version = {0.1.0}
}
License: MIT | Contributing: CONTRIBUTING.md | Changelog: CHANGELOG.md
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file agentmesh_message_bus-3.6.0.tar.gz.
File metadata
- Download URL: agentmesh_message_bus-3.6.0.tar.gz
- Upload date:
- Size: 42.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: RestSharp/106.13.0.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b75e9348dc5d73aae08a9f3bf16e0b1f6b5c4a89b3bf83e18703545cf89cd5a2
|
|
| MD5 |
0354c0ca1af45a1ddb52ee6129cd3ae9
|
|
| BLAKE2b-256 |
4968653fd0e3fe2b752d5b3d168fb874d94c1ac8f87ce3025efdc961b106cd86
|
File details
Details for the file agentmesh_message_bus-3.6.0-py3-none-any.whl.
File metadata
- Download URL: agentmesh_message_bus-3.6.0-py3-none-any.whl
- Upload date:
- Size: 55.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: RestSharp/106.13.0.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ebe8a32af9045468e61368fffdc1ed89c7d96a8ccb2cc71045bd8f9b4ec9aed1
|
|
| MD5 |
1c819efb5835453f4f6d4c2b5acbca14
|
|
| BLAKE2b-256 |
8dfa21781b5ad728b478ca16f64b3b4092f3cdb37e1656380ed88bf908b433c3
|