Skip to main content

An IPC message-bus based on RabbitMQ message broker

Project description

RabbitMqMessagebus Package Description

EventBusClient is an event-driven messaging library for Python, designed to simplify distributed communication using RabbitMQ as the message broker. It enables robust inter-process messaging, topic management, and coordination for scalable applications.

Table of Contents

Getting Started

EventBusClient is available on PyPI. To install, run:

pip install eventbusclient

Ensure RabbitMQ is installed and running. Configure your RabbitMQ server connection in your application settings as required.

Usage

To use the EventBusClient in a real-world scenario with producer and consumer processes, see the example below:

import asyncio
import logging
import time
from multiprocessing import Process
from EventBusClient.event_bus_client import EventBusClient
from EventBusClient.message.base_message import BaseMessage

# Define a custom message class
class TestMessage(BaseMessage):
    def __init__(self, content=None):
        super().__init__()
        self.content = content

# Producer process: sends messages to the topic exchange
async def producer_process(config_path):
    client = await EventBusClient.from_config(config_path)
    for i in range(5):
        msg = TestMessage(f"Message #{i} from producer")
        await client.send("test.topic", msg)
        await asyncio.sleep(1)
    await client.close()

# Consumer process: receives messages from the topic
async def consumer_process(config_path):
    client = await EventBusClient.from_config(config_path)
    async def message_handler(message):
        print(f"Received: {message.content}")
    await client.on("test.topic", TestMessage, message_handler)
    await asyncio.sleep(10)
    await client.close()

# Helper to run async functions in a process
def run_process(target_func, config_path):
    asyncio.run(target_func(config_path))

# Main function to start producer and consumer processes
def main():
    config_path = "../config/config.jsonp"
    consumer = Process(target=run_process, args=(consumer_process, config_path))
    consumer.start()
    time.sleep(2)
    producer = Process(target=run_process, args=(producer_process, config_path))
    producer.start()
    producer.join()
    consumer.join()

if __name__ == "__main__":
    main()

Config File Construction

Create a configuration file (e.g. [config.jsonp]{.title-ref}) with your RabbitMQ and client settings. Example:

{
  "plugins_path": "./plugins",           // Path to plugins directory
  "host": "localhost",                   // RabbitMQ server hostname
  "port": 5672,                          // RabbitMQ server port
  "serializer": "PickleSerializer",      // Message serialization method
  "exchange_handler": "TopicExchangeHandler", // Exchange handler type
  "message_class": "ListenerEventMsg",   // Default message class
  "threadsafe_publish": true,            // Enable thread-safe publishing
  "auto_reconnect": true,                // Automatically reconnect on failure
  "qos_prefetch": 10                     // Prefetch count for QoS
}

Parameter explanations:

  • plugins_path: Directory for loading plugins.
  • host: RabbitMQ server address.
  • port: RabbitMQ server port.
  • serializer: Serialization method for messages.
  • exchange_handler: Handler for exchange type.
  • message_class: Class used for messages.
  • threadsafe_publish: If true, enables thread-safe publishing.
  • auto_reconnect: If true, client will auto-reconnect on connection loss.
  • qos_prefetch: Number of messages to prefetch for consumers.

Update the config_path in your code to point to this file.

Package Documentation

A detailed documentation of the RabbitMqMessagebus package can be found here: EventBusClient.pdf

Feedback

To give us a feedback, you can send an email to Thomas Pollerspöck

In case you want to report a bug or request any interesting feature, please don't hesitate to raise a ticket.

Maintainers

Nguyen Huynh Tri Cuong

Contributors

Nguyen Huynh Tri Cuong

Thomas Pollerspöck

License

Copyright 2020-2025 Robert Bosch GmbH

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

License: Apache v2

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

eventbusclient-0.1.2.tar.gz (326.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

eventbusclient-0.1.2-py3-none-any.whl (351.2 kB view details)

Uploaded Python 3

File details

Details for the file eventbusclient-0.1.2.tar.gz.

File metadata

  • Download URL: eventbusclient-0.1.2.tar.gz
  • Upload date:
  • Size: 326.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for eventbusclient-0.1.2.tar.gz
Algorithm Hash digest
SHA256 d9f0ae37d26a8fcdd01ff478e6c94e7b61c5ae0c426a165aa70268771fc32d68
MD5 62644987d641bd24c9f687a2722b14e4
BLAKE2b-256 50554f89a3681080bf4fa87bd89d4f9f8e9b6fd4d8115e39ac996549a935d12c

See more details on using hashes here.

File details

Details for the file eventbusclient-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: eventbusclient-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 351.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for eventbusclient-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 77b76d38e2dbbcf20a2436f48902e8f08b27824ff30ce77c8ab07f4ba7070bfc
MD5 fc37965d5ef8a7559c9fc1fae36b3b62
BLAKE2b-256 85f71d50027872401e09ba19ef55ab568fd57d925973a5dc4378d266930af8a5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page