Skip to main content

A lightweight, broker-agnostic message bus designed specifically for AI Agents

Project description

AMB - Agent Message Bus

PyPI version Python 3.8+ License: MIT CI

"The Nervous System" for AI Agents

A lightweight, broker-agnostic transport layer designed specifically for AI Agents. AMB allows agents to emit signals ("I am thinking," "I am stuck," "I need verification") without knowing who is listening. It decouples the "Sender" from the "Receiver" and handles the async nature of agent communication.

⚡ Performance Highlights

Pattern Latency (100B) Throughput
Fire-and-Forget 0.032 ms 30,989 msg/s
End-to-End Pub/Sub 0.091 ms 10,946 msg/s
Request-Response 0.096 ms 10,372 msg/s

Benchmarks run with InMemoryBroker, Python 3.13, seed=42. See experiments/ for full results.

Features

  • Broker-Agnostic Transport: Pure transport layer that doesn't care about message content
  • Decoupled Communication: Senders don't know who receives; receivers don't know who sends
  • Async-First: Built entirely on asyncio/anyio to prevent blocking agent thought loops
  • Strict PubSub Interface: Simple publish(), subscribe(), and acknowledgment patterns
  • Type-Safe: Full type hints and Pydantic message validation
  • Communication Patterns:
    • Fire and forget (fast, no guarantee)
    • Wait for acknowledgment (slower, with confirmation)
    • Request-response (wait for reply)
  • Minimal Core Dependencies: Only pydantic and anyio
  • No Business Logic: The bus stays dumb and fast - it just broadcasts
  • Adapters:
    • MemoryAdapter (InMemoryBroker): For local development/testing, no external dependencies
    • RedisAdapter: For production deployments
    • RabbitMQ & Kafka: Optional adapters for specific use cases

Installation

Core Installation

pip install amb-core

With Optional Adapters

# Redis support
pip install amb-core[redis]

# RabbitMQ support
pip install amb-core[rabbitmq]

# Kafka support
pip install amb-core[kafka]

# All adapters
pip install amb-core[all]

# Development dependencies
pip install amb-core[dev]

Quick Start

Basic Usage with In-Memory Broker (MemoryAdapter)

import asyncio
from amb_core import MessageBus, Message

async def main():
    # Create message bus (uses in-memory broker by default)
    # No external dependencies required!
    async with MessageBus() as bus:
        # Subscribe to a topic
        async def handle_message(msg: Message):
            print(f"Received: {msg.payload}")
        
        await bus.subscribe("agent.thoughts", handle_message)
        
        # Publish a message (fire and forget)
        await bus.publish("agent.thoughts", {"thought": "Hello World!"})
        
        # Wait a bit for async processing
        await asyncio.sleep(0.1)

asyncio.run(main())

Agent Signal Examples

AMB is designed for agent communication patterns. Here are typical signals agents might emit:

# Agent emits a thinking signal
await bus.publish("agent.thinking", {
    "agent_id": "agent-1",
    "thought": "Analyzing user request..."
})

# Agent emits a stuck signal
await bus.publish("agent.stuck", {
    "agent_id": "agent-1", 
    "reason": "Insufficient context",
    "needs": "user_clarification"
})

# Agent requests verification
response = await bus.request("agent.verification", {
    "agent_id": "agent-1",
    "action": "delete_file",
    "requires_approval": True
})

Fire and Forget Pattern

# Fast publishing without waiting for confirmation
await bus.publish(
    "agent.actions",
    {"action": "process", "data": "..."},
    wait_for_confirmation=False  # Default
)

Acknowledgment Pattern (Wait for Verification)

# Slower but ensures message was acknowledged by the broker
msg_id = await bus.publish(
    "critical.task",
    {"task": "important"},
    wait_for_confirmation=True  # Wait for broker acknowledgment
)
print(f"Message {msg_id} acknowledged by broker!")

Request-Response Pattern

# Set up a responder
async def handle_request(msg: Message):
    # Process request
    result = process(msg.payload)
    # Send response
    await bus.reply(msg, {"result": result})

await bus.subscribe("agent.query", handle_request)

# Send request and wait for response
response = await bus.request(
    "agent.query",
    {"query": "What is the status?"},
    timeout=10.0
)
print(f"Response: {response.payload}")

Using Redis Broker

from amb_core import MessageBus
from amb_core.adapters.redis_broker import RedisBroker

async def main():
    # Create Redis broker
    redis_broker = RedisBroker("redis://localhost:6379/0")
    
    # Use with message bus
    async with MessageBus(adapter=redis_broker) as bus:
        await bus.publish("agent.topic", {"data": "hello"})

asyncio.run(main())

Using RabbitMQ Broker

from amb_core import MessageBus
from amb_core.adapters.rabbitmq_broker import RabbitMQBroker

async def main():
    # Create RabbitMQ broker
    rabbitmq_broker = RabbitMQBroker("amqp://guest:guest@localhost/")
    
    async with MessageBus(adapter=rabbitmq_broker) as bus:
        await bus.publish("agent.topic", {"data": "hello"})

asyncio.run(main())

Using Kafka Broker

from amb_core import MessageBus
from amb_core.adapters.kafka_broker import KafkaBroker

async def main():
    # Create Kafka broker
    kafka_broker = KafkaBroker("localhost:9092")
    
    async with MessageBus(adapter=kafka_broker) as bus:
        await bus.publish("agent.topic", {"data": "hello"})

asyncio.run(main())

Message Model

Messages are defined using Pydantic models:

from amb_core import Message, MessagePriority

msg = Message(
    id="unique-id",
    topic="agent.topic",
    payload={"key": "value"},
    priority=MessagePriority.HIGH,
    sender="agent-1",
    correlation_id="request-123",  # For request-response
    reply_to="reply.topic",         # Where to send replies
    ttl=60,                         # Time to live in seconds
    metadata={"custom": "data"}     # Additional metadata
)

Message Priorities

  • MessagePriority.LOW - Low priority messages
  • MessagePriority.NORMAL - Normal priority (default)
  • MessagePriority.HIGH - High priority messages
  • MessagePriority.URGENT - Urgent messages

Architecture

AMB follows a strict broker-agnostic architecture with zero business logic:

┌─────────────────────┐
│    Agent / App      │  ← Your code decides what to send/receive
└──────────┬──────────┘
           │
    ┌──────▼──────┐
    │  MessageBus │        ← Thin wrapper (publish/subscribe/acknowledge)
    └──────┬──────┘
           │
    ┌──────▼──────────┐
    │ BrokerAdapter   │    ← Abstract interface
    └──────┬──────────┘
           │
      ┌────┴─────┐
      │          │
┌─────▼────┐  ┌─▼─────────┐
│  Memory  │  │   Redis   │  ← Concrete implementations
│ Adapter  │  │  Adapter  │
└──────────┘  └───────────┘

Design Principles

  1. No Business Logic: The bus never decides "if message X, send to Y". It just broadcasts.
  2. Dumb and Fast: The bus doesn't inspect payloads, validate schemas, or enforce policies.
  3. Broker Agnostic: Swap brokers without changing your code.
  4. Local-First: Must work with MemoryAdapter on a laptop without Docker.
  5. Separation of Concerns:
    • The Bus: Transports the envelope
    • The Receiver: Opens the envelope and decides what to do
    • NOT the Bus: Doesn't verify trust, validate permissions, or enforce rules

Dependencies

Core Dependencies (Always Installed)

  • pydantic>=2.0.0 - For message schema validation
  • anyio>=3.0.0 - For async support

Optional Dependencies (Install as extras)

  • redis>=4.0.0 - For Redis broker adapter
  • aio-pika>=9.0.0 - For RabbitMQ broker adapter
  • aiokafka>=0.8.0 - For Kafka broker adapter

Forbidden Dependencies

The following packages are explicitly NOT used to keep the bus pure and fast:

  • agent-control-plane - The bus must not import the agent. It doesn't know about agent logic.
  • iatp - The bus transports the envelope; it does not verify trust (that's the receiver's job)
  • scak - Not required for core functionality

Philosophy: Keep the pipe dumb and fast. No business logic in the bus.

Development

Running Tests

# Install dev dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Run with coverage
pytest --cov=amb_core --cov-report=html

Running Benchmarks

# Run reproducible benchmarks
python experiments/reproduce_results.py --seed 42 --iterations 500

# Results saved to experiments/results.json

Building the Package

python -m build

Research

If you use AMB in your research, please cite:

@software{amb2026,
  author = {Siddique, Imran},
  title = {AMB: A Broker-Agnostic Message Bus for AI Agents},
  year = {2026},
  url = {https://github.com/imran-siddique/amb},
  version = {0.1.0}
}

See the paper/ directory for the research whitepaper.

License

MIT License - see LICENSE for details.

Contributing

Contributions are welcome! Please see CONTRIBUTING.md for guidelines.

Changelog

See CHANGELOG.md for version history.

Contributions are welcome! Please feel free to submit a Pull Request.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

amb_core-0.1.0.tar.gz (24.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

amb_core-0.1.0-py3-none-any.whl (24.1 kB view details)

Uploaded Python 3

File details

Details for the file amb_core-0.1.0.tar.gz.

File metadata

  • Download URL: amb_core-0.1.0.tar.gz
  • Upload date:
  • Size: 24.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for amb_core-0.1.0.tar.gz
Algorithm Hash digest
SHA256 08319dbc864972f0a50c28b481491fd821506bb1804190e9e6efee4bd06c70d6
MD5 4f026d31d623c6fd5ed0d8425e9fdc4c
BLAKE2b-256 06a351adf331e80c2c728514125345c493fcd9f19d94d49492f34c77782bc059

See more details on using hashes here.

Provenance

The following attestation bundles were made for amb_core-0.1.0.tar.gz:

Publisher: publish.yml on imran-siddique/amb

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file amb_core-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: amb_core-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 24.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for amb_core-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 61c387404a8a549f9e3ade87003ec1e13202903159fff81bfb2db02fc9696052
MD5 5b62b0fd5272531d8d0d9544102e5ea7
BLAKE2b-256 679024a0ddc405cb8be102b3cb96e1d178ec645da2438aebba3bb31000c21224

See more details on using hashes here.

Provenance

The following attestation bundles were made for amb_core-0.1.0-py3-none-any.whl:

Publisher: publish.yml on imran-siddique/amb

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page