Skip to main content

An IPC message-bus based on RabbitMQ message broker

Project description

RabbitMqMessagebus Package Description

EventBusClient is an event-driven messaging library for Python, designed to simplify distributed communication using RabbitMQ as the message broker. It enables robust inter-process messaging, topic management, and coordination for scalable applications.

Table of Contents

Getting Started

EventBusClient is available on PyPI. To install, run:

pip install eventbusclient

Ensure RabbitMQ is installed and running. Configure your RabbitMQ server connection in your application settings as required.

Usage

To use the EventBusClient in a real-world scenario with producer and consumer processes, see the example below:

import asyncio
import logging
import time
from multiprocessing import Process
from EventBusClient.event_bus_client import EventBusClient
from EventBusClient.message.base_message import BaseMessage

# Define a custom message class
class TestMessage(BaseMessage):
    def __init__(self, content=None):
        super().__init__()
        self.content = content

# Producer process: sends messages to the topic exchange
async def producer_process(config_path):
    client = await EventBusClient.from_config(config_path)
    for i in range(5):
        msg = TestMessage(f"Message #{i} from producer")
        await client.send("test.topic", msg)
        await asyncio.sleep(1)
    await client.close()

# Consumer process: receives messages from the topic
async def consumer_process(config_path):
    client = await EventBusClient.from_config(config_path)
    async def message_handler(message):
        print(f"Received: {message.content}")
    await client.on("test.topic", TestMessage, message_handler)
    await asyncio.sleep(10)
    await client.close()

# Helper to run async functions in a process
def run_process(target_func, config_path):
    asyncio.run(target_func(config_path))

# Main function to start producer and consumer processes
def main():
    config_path = "../config/config.jsonp"
    consumer = Process(target=run_process, args=(consumer_process, config_path))
    consumer.start()
    time.sleep(2)
    producer = Process(target=run_process, args=(producer_process, config_path))
    producer.start()
    producer.join()
    consumer.join()

if __name__ == "__main__":
    main()

Config File Construction

Create a configuration file (e.g. [config.jsonp]{.title-ref}) with your RabbitMQ and client settings. Example:

{
  "plugins_path": "./plugins",           // Path to plugins directory
  "host": "localhost",                   // RabbitMQ server hostname
  "port": 5672,                          // RabbitMQ server port
  "serializer": "PickleSerializer",      // Message serialization method
  "exchange_handler": "TopicExchangeHandler", // Exchange handler type
  "message_class": "ListenerEventMsg",   // Default message class
  "threadsafe_publish": true,            // Enable thread-safe publishing
  "auto_reconnect": true,                // Automatically reconnect on failure
  "qos_prefetch": 10                     // Prefetch count for QoS
}

Parameter explanations:

  • plugins_path: Directory for loading plugins.
  • host: RabbitMQ server address.
  • port: RabbitMQ server port.
  • serializer: Serialization method for messages.
  • exchange_handler: Handler for exchange type.
  • message_class: Class used for messages.
  • threadsafe_publish: If true, enables thread-safe publishing.
  • auto_reconnect: If true, client will auto-reconnect on connection loss.
  • qos_prefetch: Number of messages to prefetch for consumers.

Update the config_path in your code to point to this file.

Package Documentation

A detailed documentation of the RabbitMqMessagebus package can be found here: EventBusClient.pdf

Feedback

To give us a feedback, you can send an email to Thomas Pollerspöck

In case you want to report a bug or request any interesting feature, please don't hesitate to raise a ticket.

Maintainers

Nguyen Huynh Tri Cuong

Contributors

Nguyen Huynh Tri Cuong

Thomas Pollerspöck

License

Copyright 2020-2025 Robert Bosch GmbH

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

License: Apache v2

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

EventBusClient-0.1.0.tar.gz (38.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

EventBusClient-0.1.0-py3-none-any.whl (296.0 kB view details)

Uploaded Python 3

File details

Details for the file EventBusClient-0.1.0.tar.gz.

File metadata

  • Download URL: EventBusClient-0.1.0.tar.gz
  • Upload date:
  • Size: 38.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for EventBusClient-0.1.0.tar.gz
Algorithm Hash digest
SHA256 5c00081470a7a3fb71910a33c8da52607ac960c09cd245dba87c37d0aa8ef25f
MD5 71abc89faa3f36fdab11a46b05a8d09c
BLAKE2b-256 b8689b152fa8826a5ccdece0aaca820d52a226cb372111b4955bffc5cb7d6291

See more details on using hashes here.

File details

Details for the file EventBusClient-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: EventBusClient-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 296.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for EventBusClient-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 155a409a28909f777623e998ea6c9f7b80beac7f940181630b6c1385cc09e95c
MD5 283fd1acecaf7d446fe2c90fe8c6eb94
BLAKE2b-256 b4dbf301c965bf6ed49bcec80604a2e4a120e4483ff4c0b75578643bfe7cd879

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page