A lightweight, broker-agnostic message bus designed specifically for AI Agents
Project description
AMB - Agent Message Bus
"The Nervous System" for AI Agents
A lightweight, broker-agnostic transport layer designed specifically for AI Agents. AMB allows agents to emit signals ("I am thinking," "I am stuck," "I need verification") without knowing who is listening. It decouples the "Sender" from the "Receiver" and handles the async nature of agent communication.
⚡ Performance Highlights
| Pattern | Latency (100B) | Throughput |
|---|---|---|
| Fire-and-Forget | 0.032 ms | 30,989 msg/s |
| End-to-End Pub/Sub | 0.091 ms | 10,946 msg/s |
| Request-Response | 0.096 ms | 10,372 msg/s |
Benchmarks run with InMemoryBroker, Python 3.13, seed=42. See experiments/ for full results.
Features
- Broker-Agnostic Transport: Pure transport layer that doesn't care about message content
- Decoupled Communication: Senders don't know who receives; receivers don't know who sends
- Async-First: Built entirely on
asyncio/anyioto prevent blocking agent thought loops - Strict PubSub Interface: Simple
publish(),subscribe(), and acknowledgment patterns - Type-Safe: Full type hints and Pydantic message validation
- Communication Patterns:
- Fire and forget (fast, no guarantee)
- Wait for acknowledgment (slower, with confirmation)
- Request-response (wait for reply)
- Minimal Core Dependencies: Only
pydanticandanyio - No Business Logic: The bus stays dumb and fast - it just broadcasts
- Adapters:
- MemoryAdapter (InMemoryBroker): For local development/testing, no external dependencies
- RedisAdapter: For production deployments
- RabbitMQ & Kafka: Optional adapters for specific use cases
Installation
Core Installation
pip install amb-core
With Optional Adapters
# Redis support
pip install amb-core[redis]
# RabbitMQ support
pip install amb-core[rabbitmq]
# Kafka support
pip install amb-core[kafka]
# All adapters
pip install amb-core[all]
# Development dependencies
pip install amb-core[dev]
Quick Start
Basic Usage with In-Memory Broker (MemoryAdapter)
import asyncio
from amb_core import MessageBus, Message
async def main():
# Create message bus (uses in-memory broker by default)
# No external dependencies required!
async with MessageBus() as bus:
# Subscribe to a topic
async def handle_message(msg: Message):
print(f"Received: {msg.payload}")
await bus.subscribe("agent.thoughts", handle_message)
# Publish a message (fire and forget)
await bus.publish("agent.thoughts", {"thought": "Hello World!"})
# Wait a bit for async processing
await asyncio.sleep(0.1)
asyncio.run(main())
Agent Signal Examples
AMB is designed for agent communication patterns. Here are typical signals agents might emit:
# Agent emits a thinking signal
await bus.publish("agent.thinking", {
"agent_id": "agent-1",
"thought": "Analyzing user request..."
})
# Agent emits a stuck signal
await bus.publish("agent.stuck", {
"agent_id": "agent-1",
"reason": "Insufficient context",
"needs": "user_clarification"
})
# Agent requests verification
response = await bus.request("agent.verification", {
"agent_id": "agent-1",
"action": "delete_file",
"requires_approval": True
})
Fire and Forget Pattern
# Fast publishing without waiting for confirmation
await bus.publish(
"agent.actions",
{"action": "process", "data": "..."},
wait_for_confirmation=False # Default
)
Acknowledgment Pattern (Wait for Verification)
# Slower but ensures message was acknowledged by the broker
msg_id = await bus.publish(
"critical.task",
{"task": "important"},
wait_for_confirmation=True # Wait for broker acknowledgment
)
print(f"Message {msg_id} acknowledged by broker!")
Request-Response Pattern
# Set up a responder
async def handle_request(msg: Message):
# Process request
result = process(msg.payload)
# Send response
await bus.reply(msg, {"result": result})
await bus.subscribe("agent.query", handle_request)
# Send request and wait for response
response = await bus.request(
"agent.query",
{"query": "What is the status?"},
timeout=10.0
)
print(f"Response: {response.payload}")
Using Redis Broker
from amb_core import MessageBus
from amb_core.adapters.redis_broker import RedisBroker
async def main():
# Create Redis broker
redis_broker = RedisBroker("redis://localhost:6379/0")
# Use with message bus
async with MessageBus(adapter=redis_broker) as bus:
await bus.publish("agent.topic", {"data": "hello"})
asyncio.run(main())
Using RabbitMQ Broker
from amb_core import MessageBus
from amb_core.adapters.rabbitmq_broker import RabbitMQBroker
async def main():
# Create RabbitMQ broker
rabbitmq_broker = RabbitMQBroker("amqp://guest:guest@localhost/")
async with MessageBus(adapter=rabbitmq_broker) as bus:
await bus.publish("agent.topic", {"data": "hello"})
asyncio.run(main())
Using Kafka Broker
from amb_core import MessageBus
from amb_core.adapters.kafka_broker import KafkaBroker
async def main():
# Create Kafka broker
kafka_broker = KafkaBroker("localhost:9092")
async with MessageBus(adapter=kafka_broker) as bus:
await bus.publish("agent.topic", {"data": "hello"})
asyncio.run(main())
Message Model
Messages are defined using Pydantic models:
from amb_core import Message, MessagePriority
msg = Message(
id="unique-id",
topic="agent.topic",
payload={"key": "value"},
priority=MessagePriority.HIGH,
sender="agent-1",
correlation_id="request-123", # For request-response
reply_to="reply.topic", # Where to send replies
ttl=60, # Time to live in seconds
metadata={"custom": "data"} # Additional metadata
)
Message Priorities
MessagePriority.LOW- Low priority messagesMessagePriority.NORMAL- Normal priority (default)MessagePriority.HIGH- High priority messagesMessagePriority.URGENT- Urgent messages
Architecture
AMB follows a strict broker-agnostic architecture with zero business logic:
┌─────────────────────┐
│ Agent / App │ ← Your code decides what to send/receive
└──────────┬──────────┘
│
┌──────▼──────┐
│ MessageBus │ ← Thin wrapper (publish/subscribe/acknowledge)
└──────┬──────┘
│
┌──────▼──────────┐
│ BrokerAdapter │ ← Abstract interface
└──────┬──────────┘
│
┌────┴─────┐
│ │
┌─────▼────┐ ┌─▼─────────┐
│ Memory │ │ Redis │ ← Concrete implementations
│ Adapter │ │ Adapter │
└──────────┘ └───────────┘
Design Principles
- No Business Logic: The bus never decides "if message X, send to Y". It just broadcasts.
- Dumb and Fast: The bus doesn't inspect payloads, validate schemas, or enforce policies.
- Broker Agnostic: Swap brokers without changing your code.
- Local-First: Must work with MemoryAdapter on a laptop without Docker.
- Separation of Concerns:
- The Bus: Transports the envelope
- The Receiver: Opens the envelope and decides what to do
- NOT the Bus: Doesn't verify trust, validate permissions, or enforce rules
Dependencies
Core Dependencies (Always Installed)
pydantic>=2.0.0- For message schema validationanyio>=3.0.0- For async support
Optional Dependencies (Install as extras)
redis>=4.0.0- For Redis broker adapteraio-pika>=9.0.0- For RabbitMQ broker adapteraiokafka>=0.8.0- For Kafka broker adapter
Forbidden Dependencies
The following packages are explicitly NOT used to keep the bus pure and fast:
agent-control-plane- The bus must not import the agent. It doesn't know about agent logic.iatp- The bus transports the envelope; it does not verify trust (that's the receiver's job)scak- Not required for core functionality
Philosophy: Keep the pipe dumb and fast. No business logic in the bus.
Development
Running Tests
# Install dev dependencies
pip install -e ".[dev]"
# Run tests
pytest
# Run with coverage
pytest --cov=amb_core --cov-report=html
Running Benchmarks
# Run reproducible benchmarks
python experiments/reproduce_results.py --seed 42 --iterations 500
# Results saved to experiments/results.json
Building the Package
python -m build
Research
If you use AMB in your research, please cite:
@software{amb2026,
author = {Siddique, Imran},
title = {AMB: A Broker-Agnostic Message Bus for AI Agents},
year = {2026},
url = {https://github.com/imran-siddique/amb},
version = {0.1.0}
}
See the paper/ directory for the research whitepaper.
License
MIT License - see LICENSE for details.
Contributing
Contributions are welcome! Please see CONTRIBUTING.md for guidelines.
Changelog
See CHANGELOG.md for version history.
Contributions are welcome! Please feel free to submit a Pull Request.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file amb_core-0.1.0.tar.gz.
File metadata
- Download URL: amb_core-0.1.0.tar.gz
- Upload date:
- Size: 24.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
08319dbc864972f0a50c28b481491fd821506bb1804190e9e6efee4bd06c70d6
|
|
| MD5 |
4f026d31d623c6fd5ed0d8425e9fdc4c
|
|
| BLAKE2b-256 |
06a351adf331e80c2c728514125345c493fcd9f19d94d49492f34c77782bc059
|
Provenance
The following attestation bundles were made for amb_core-0.1.0.tar.gz:
Publisher:
publish.yml on imran-siddique/amb
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
amb_core-0.1.0.tar.gz -
Subject digest:
08319dbc864972f0a50c28b481491fd821506bb1804190e9e6efee4bd06c70d6 - Sigstore transparency entry: 848879700
- Sigstore integration time:
-
Permalink:
imran-siddique/amb@b19ab4b3a905860a8f66d048148246888d414c95 -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/imran-siddique
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@b19ab4b3a905860a8f66d048148246888d414c95 -
Trigger Event:
release
-
Statement type:
File details
Details for the file amb_core-0.1.0-py3-none-any.whl.
File metadata
- Download URL: amb_core-0.1.0-py3-none-any.whl
- Upload date:
- Size: 24.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
61c387404a8a549f9e3ade87003ec1e13202903159fff81bfb2db02fc9696052
|
|
| MD5 |
5b62b0fd5272531d8d0d9544102e5ea7
|
|
| BLAKE2b-256 |
679024a0ddc405cb8be102b3cb96e1d178ec645da2438aebba3bb31000c21224
|
Provenance
The following attestation bundles were made for amb_core-0.1.0-py3-none-any.whl:
Publisher:
publish.yml on imran-siddique/amb
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
amb_core-0.1.0-py3-none-any.whl -
Subject digest:
61c387404a8a549f9e3ade87003ec1e13202903159fff81bfb2db02fc9696052 - Sigstore transparency entry: 848879705
- Sigstore integration time:
-
Permalink:
imran-siddique/amb@b19ab4b3a905860a8f66d048148246888d414c95 -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/imran-siddique
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@b19ab4b3a905860a8f66d048148246888d414c95 -
Trigger Event:
release
-
Statement type: