An IPC message-bus based on RabbitMQ message broker
Project description
EventBusClient - RabbitMQ Message Bus Library
EventBusClient is an event-driven messaging library for Python, designed to simplify distributed communication using RabbitMQ as the message broker. It provides a clean, pluggable architecture for robust inter-process messaging, topic management, and coordination in scalable applications.
Why EventBusClient?
The Problem
Building distributed systems with message queues typically involves:
- Writing boilerplate code for connection management, serialization, and error handling
- Implementing retry logic, reconnection, and graceful shutdown
- Coordinating startup order across multiple processes
- Handling different environments (dev, test, production) with different configurations
- Supporting both async and sync codebases
The Solution
EventBusClient abstracts away the complexity of RabbitMQ while remaining flexible:
| Challenge | EventBusClient Solution |
|---|---|
| Boilerplate code | Clean send() / on() API with automatic setup |
| Connection management | Auto-reconnection, robust lifecycle handling |
| Multi-process coordination | Built-in Rendezvous pattern for startup synchronization |
| Environment configuration | JSONP config files with environment variable support |
| Async vs Sync | Async-first API with sync wrappers for legacy code |
| Extensibility | Pluggable serializers, exchange handlers, and policies |
When to Use EventBusClient
Ideal for:
- Test Automation Systems - Coordinate multiple test runners, controllers, and reporters
- Multi-Process Applications - Decouple processes that need to communicate asynchronously
- Microservices - Event-driven communication between services
- Data Pipelines - Stream data between producers and consumers
- Distributed Systems - Any system requiring reliable message passing
Consider alternatives if:
- You need simple in-process pub/sub (use Python's built-in
queuemodule) - You're building a single monolithic application with no IPC needs
- You need guaranteed exactly-once delivery (RabbitMQ provides at-least-once)
Key Features
- Async-First API - Native async/await support with sync wrappers for legacy code
- Pluggable Architecture - Extensible serializers, exchange handlers, message types, and startup policies
- Configuration-Driven - JSONP-based configuration with environment variable support
- Multiple Exchange Types - Topic, Fanout, and X-RTopic exchange handlers
- Coordinated Startup - Rendezvous pattern for multi-process synchronization
- Unroutable Message Handling - Configurable policies (drop, return, alternate-exchange)
- Thread-Safe Operations - Safe publishing from multiple threads
- Auto-Reconnection - Robust connection management with automatic recovery
Table of Contents
- Getting Started
- Real-World Scenarios
- Main APIs
- Configuration Reference
- Architecture
- Examples
- Documentation
- Feedback
- License
Getting Started
Installation
pip install eventbusclient
Prerequisites
- Python 3.8+
- RabbitMQ server running (default: localhost:5672)
Quick Start
1. Create a configuration file (config.jsonp):
{
"host": "localhost",
"port": 5672,
"serializer": "JsonSerializer",
"exchange_handler": "TopicExchangeHandler",
"auto_reconnect": true
}
2. Create a producer:
import asyncio
from EventBusClient import EventBusClient
from EventBusClient.message.base_message import BaseMessage
class MyMessage(BaseMessage):
def __init__(self, content=None):
self.content = content
async def main():
client = await EventBusClient.from_config("config.jsonp")
await client.send("my.topic", MyMessage("Hello, World!"))
await client.close()
asyncio.run(main())
3. Create a consumer:
import asyncio
from EventBusClient import EventBusClient
async def main():
client = await EventBusClient.from_config("config.jsonp")
async def handler(message, headers):
print(f"Received: {message.content}")
await client.on("my.topic", MyMessage, handler)
await asyncio.sleep(60) # Keep listening
await client.close()
asyncio.run(main())
Real-World Scenarios
1. Test Automation Framework
Coordinate multiple test processes (Robot Framework, pytest) with a central controller:
# Controller process
client = await EventBusClient.from_config("config.jsonp")
await client.wait_until_ready({"worker": 3}, timeout=30) # Wait for 3 workers
await client.send("test.start", StartTestMessage(suite="regression"))
# Worker process
client = await EventBusClient.from_config("config.jsonp")
await client.announce_ready(["worker"])
await client.on("test.start", StartTestMessage, run_tests)
2. Microservices Communication
Decouple services with topic-based messaging:
# Order Service - publishes order events
await client.send("orders.created", OrderCreatedMessage(order_id=123))
await client.send("orders.shipped", OrderShippedMessage(order_id=123))
# Notification Service - subscribes to order events
await client.on("orders.*", OrderMessage, send_notification)
# Inventory Service - subscribes to specific events
await client.on("orders.created", OrderCreatedMessage, update_inventory)
3. Sensor Data Pipeline
Stream sensor data from multiple sources:
# Sensor Publisher
while True:
reading = sensor.read()
await client.send(f"sensor.{sensor_id}.temperature", SensorMessage(reading))
await asyncio.sleep(1)
# Data Aggregator - subscribes to all sensors
await client.on("sensor.*.temperature", SensorMessage, aggregate_data)
4. Request-Reply Pattern (RPC)
Implement synchronous-style RPC over async messaging:
# Server
async def handle_request(request, headers):
result = process(request)
reply_to = headers.get("reply_to")
correlation_id = headers.get("correlation_id")
await client.send(reply_to, ResultMessage(result),
headers={"correlation_id": correlation_id})
await client.on("rpc.requests", RequestMessage, handle_request)
# Client
reply_queue = f"rpc.replies.{uuid4()}"
cache = await client.on(reply_queue, ResultMessage, lambda m, h: None)
await client.send("rpc.requests", request,
headers={"reply_to": reply_queue, "correlation_id": "123"})
result = cache.wait_for_one(lambda m, h: h.get("correlation_id") == "123", timeout=10)
Main APIs
Async API (Primary)
# Factory methods
client = await EventBusClient.from_config(config_path)
# Connection
await client.connect(host, port, prefetch_count=10)
await client.close()
is_connected = client.is_connected()
# Publish/Subscribe (routing key based)
await client.send(routing_key, message, headers=None)
cache = await client.on(routing_key, message_cls, callback)
await client.off(routing_key, callback)
# Publish/Subscribe (headers based - for HeadersExchangeHandler)
cache = await client.on(
routing_key="", # Ignored in headers exchange
message_cls=ReportMessage,
callback=handler,
binding_headers={"format": "pdf", "type": "report"},
match_all=True # AND logic (x-match=all)
)
# Coordination (Rendezvous)
await client.announce_ready(roles=["worker"])
success = await client.wait_until_ready(requirements={"worker": 2}, timeout=30)
# Unroutable messages
unroutables = client.pop_unroutables()
Sync API (Legacy Support)
# Factory methods
client = EventBusClient.from_config_sync(config_path)
# Connection
client.connect_sync(host, port, prefetch_count=10)
client.close_sync()
# Publish/Subscribe
client.send_sync(routing_key, message, headers=None)
cache = client.on_sync(routing_key, message_cls, callback)
client.off_sync(routing_key, callback)
SubscriptionCache API
# Get messages from cache
message = cache.get(timeout=5.0) # Block until message or timeout
message = cache.pop(timeout=5.0) # Get and remove
message = cache.peek(timeout=5.0) # Get without removing
# Wait for specific messages
found = cache.wait_for_one(target_msg, timeout=10)
indices = cache.wait_for_many(
targets=[msg1, msg2, msg3],
mode=WaitMode.ALL_IN_GIVEN_ORDER, # or ALL_IN_RANDOM_ORDER, ANY_OF_GIVEN_MSGS
timeout=30
)
# Drain all messages
messages = cache.drain(max_items=100)
Configuration Reference
Create a configuration file (config.jsonp) based on the template at EventBusClient/config/config.jsonp.template:
{
// ============== Connection Settings ==============
// RabbitMQ server hostname or IP address
"host": "localhost",
// RabbitMQ server port (default: 5672)
"port": 5672,
// Automatically reconnect if connection is lost
"auto_reconnect": true,
// Number of messages to prefetch (QoS)
"qos_prefetch": 10,
// ============== Plugin Selection ==============
// Serializer: "PickleSerializer", "JsonSerializer", "ProtobufSerializer"
"serializer": "PickleSerializer",
// Exchange handler: "TopicExchangeHandler", "FanoutExchangeHandler", "XRTopicExchangeHandler"
"exchange_handler": "TopicExchangeHandler",
// Custom exchange name (optional)
"exchange_name": "my_exchange",
// Path to custom plugins directory
"plugins_path": "./plugins",
// ============== General Cache Settings ==============
// Cache policy: "off", "on_connect", "on_demand"
"general_cache_policy": "off",
// Routing keys for general cache
"general_routing_keys": "general.*",
// Message class for general cache
"general_message_cls": "BaseMessage",
// ============== Logging Settings ==============
// Logger instance name
"logger_name": "event_bus_client",
// Log file path (use "console" for stdout, or path like "./logs/app.log")
"logfile": "console",
// Log level: "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"
"loglevel": "INFO",
// Log file mode: "w" (overwrite) or "a" (append)
"logger_mode": "a"
}
Configuration Options Table
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
host |
str | No | "localhost" | RabbitMQ server hostname |
port |
int | No | 5672 | RabbitMQ server port |
serializer |
str | Yes | - | Serializer class name (see Serializers) |
exchange_handler |
str | Yes | - | Exchange handler class name (see Exchange Handlers) |
exchange_name |
str | No | auto | Custom exchange name |
auto_reconnect |
bool | No | true | Enable auto-reconnection |
qos_prefetch |
int | No | 10 | Prefetch count for QoS |
plugins_path |
str | No | "./plugins" | Custom plugins directory |
general_cache_policy |
str | No | "off" | General cache policy |
general_routing_keys |
str | No | "general" | Routing keys for cache |
general_message_cls |
str | No | "BaseMessage" | Message class for cache |
logger_name |
str | No | "event_bus_client" | Logger name |
logfile |
str | No | None | Log file path |
loglevel |
str | No | "INFO" | Log level |
logger_mode |
str | No | "a" | Log file mode |
Exchange Handlers
The exchange_handler configuration determines the RabbitMQ exchange type used for message routing:
| Handler Class | Exchange Type | Routing Behavior | Use Case |
|---|---|---|---|
TopicExchangeHandler |
topic |
Pattern matching with * (one word) and # (zero or more words) |
Most common - selective routing (e.g., sensor.*.temperature, order.#) |
FanoutExchangeHandler |
fanout |
Broadcasts to all bound queues, ignores routing key | Notifications, system-wide events |
HeadersExchangeHandler |
headers |
Routes based on message header attributes, not routing key | Complex routing based on multiple attributes |
XRTopicExchangeHandler |
x-rtopic |
Reverse topic matching (requires broker plugin) | Publisher specifies pattern, subscriber specifies exact key |
Example routing patterns (Topic Exchange):
Routing Key: "sensor.living_room.temperature"
Subscription Pattern | Matches?
------------------------|----------
sensor.*.temperature | Yes (matches one word)
sensor.# | Yes (matches zero or more words)
sensor.living_room.* | Yes
*.*.temperature | Yes
sensor.bedroom.temperature | No (different room)
When to use which handler:
- TopicExchangeHandler (recommended default) - When you need flexible routing with patterns
- FanoutExchangeHandler - When all subscribers should receive all messages
- HeadersExchangeHandler - When routing depends on multiple message attributes (AND/OR logic)
- XRTopicExchangeHandler - Advanced use cases requiring reverse matching (requires rabbitmq-rtopic-exchange plugin)
Headers Exchange Example:
# Subscribe to messages where format=pdf AND type=report
cache = await handler.subscribe_with_headers(
binding_headers={"format": "pdf", "type": "report"},
message_cls=ReportMessage,
callback=process_report,
match_all=True # x-match=all (AND logic)
)
# Publish with matching headers
await handler.publish(
message=report,
routing_key="", # Ignored for headers exchange
headers={"format": "pdf", "type": "report", "author": "john"}
)
Serializers
The serializer configuration determines how messages are encoded/decoded:
| Serializer Class | Format | Pros | Cons | Use Case |
|---|---|---|---|---|
PickleSerializer |
Python Pickle | Fast, supports any Python object | Python-only, security concerns | Internal Python-to-Python communication |
JsonSerializer |
JSON | Human-readable, cross-language | Limited types, larger size | Interoperability, debugging |
ProtobufSerializer |
Protocol Buffers | Compact, fast, schema-enforced | Requires .proto files | High-performance, strict contracts |
Recommendation:
- Use
PickleSerializerfor pure Python systems (fastest, most flexible) - Use
JsonSerializerfor debugging or multi-language systems - Use
ProtobufSerializerfor high-performance production systems
JSONP Features
The configuration uses JsonPreprocessor which supports:
{
// Environment variable substitution
"host": "${RABBITMQ_HOST}",
"port": ${RABBITMQ_PORT:-5672}, // With default value
// Include other files
${include: "./env/${ENVIRONMENT}.jsonp"}
}
Architecture
EventBusClient follows a pluggable strategy pattern with four extensible interfaces:
+------------------+ +-------------------+
| User App |---->| Public APIs |
+------------------+ | - Async API |
| - Sync API |
| - Factory API |
| - Rendezvous API |
+--------+----------+
|
+--------v----------+
| EventBusClient |
| (Core) |
+--------+----------+
|
+-------------------+-------------------+
| | |
+---------v-------+ +--------v--------+ +------v--------+
| ExchangeHandler | | Serializer | | StartupPolicy |
| (Interface) | | (Interface) | | (Interface) |
+-----------------+ +-----------------+ +---------------+
| TopicExchange | | PickleSerializer| | NoWait |
| FanoutExchange | | JsonSerializer | | FixedDelay |
| XRTopicExchange | | ProtobufSerial. | | HandshakeBar. |
| [Your Handler] | | [Your Serial.] | | PolicyChain |
+-----------------+ +-----------------+ +---------------+
Key Components
| Component | Description |
|---|---|
| EventBusClient | Main facade providing async/sync APIs |
| ConnectionManager | Manages AMQP connection lifecycle |
| ExchangeHandler | Handles exchange declaration, publish, subscribe |
| AsyncPublisher | Publishes messages to exchange |
| AsyncSubscriber | Consumes messages from queues |
| SubscriptionCache | Thread-safe buffer for sync consumers |
| PluginLoader | Dynamically loads plugins from config |
| Rendezvous | Coordinates multi-process startup |
Examples
The examples/ folder contains comprehensive examples:
| Example | Description |
|---|---|
| basic_sample.py | Basic publish/subscribe |
| sync_sample.py | Synchronous API usage |
| wait_mode_sample.py | WaitMode options |
| rendezvous_sample.py | Multi-process coordination |
| request_reply_sample.py | RPC pattern |
| multiple_subscriptions_sample.py | Multiple topics |
| custom_message_sample.py | Custom message types |
| error_handling_sample.py | Error handling patterns |
| alternate_exchange_sample.py | Unroutable handling |
| headers_exchange_sample.py | Header-based message routing |
See examples/README.md for detailed documentation.
Documentation
Architecture Decision Records (ADRs)
The docs/adr/ folder contains ADRs documenting key design decisions:
| ADR | Title |
|---|---|
| ADR-001 | Standardize IPC / Message Bus API |
| ADR-002 | Async-First Public API with Sync Wrappers |
| ADR-003 | Plugin-based Strategy Pattern |
| ADR-004 | Configuration-Driven Library Setup |
| ADR-005 | Central ConnectionManager |
| ADR-006 | Multiple Exchange Types via Handlers |
| ADR-007 | StartupPolicy and Rendezvous |
| ADR-008 | SubscriptionCache for Sync Access |
| ADR-009 | Configurable Unroutable Handling |
Diagrams
The docs/diagrams/ folder contains PlantUML diagrams:
| Diagram | Description |
|---|---|
| overview.puml | Plugin strategy overview |
| architecture.puml | Package structure |
| component.puml | Component interfaces |
| class.puml | Full class diagram |
| sequence-lifecycle.puml | End-to-end lifecycle |
API Documentation
Detailed API documentation: EventBusClient.pdf
Feedback
To give us feedback, you can send an email to Thomas Pollerspöck
To report bugs or request features, please raise a ticket on GitHub.
Maintainers
Contributors
License
Copyright 2020-2025 Robert Bosch GmbH
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file eventbusclient-0.2.0.tar.gz.
File metadata
- Download URL: eventbusclient-0.2.0.tar.gz
- Upload date:
- Size: 385.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
216720a4cf98fd62c784fc4b4d83b0be19b0057c1b698177d9d1f6160af431f0
|
|
| MD5 |
01b8fa94cd7a2fbc3284b9838444fc54
|
|
| BLAKE2b-256 |
ce94ae7c8709c0469140483c3532f1b0efc0ebd34f9db75f2d79fa6ff2b9861c
|
File details
Details for the file eventbusclient-0.2.0-py3-none-any.whl.
File metadata
- Download URL: eventbusclient-0.2.0-py3-none-any.whl
- Upload date:
- Size: 404.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a9086f5a4ab0139f8ef91aa13a9d7e67bb7a51d3535140ef2f7beef5d4c97cc3
|
|
| MD5 |
49ce144e6bed18b1ce87d4b0444358c6
|
|
| BLAKE2b-256 |
05e0f8113907f3c5683f2676c2a9c73f7ea6932d17d70cd17d09e77b2064b34c
|