Skip to main content

An IPC message-bus based on RabbitMQ message broker

Project description

RabbitMqMessagebus Package Description

EventBusClient is an event-driven messaging library for Python, designed to simplify distributed communication using RabbitMQ as the message broker. It enables robust inter-process messaging, topic management, and coordination for scalable applications.

Table of Contents

Getting Started

EventBusClient is available on PyPI. To install, run:

pip install eventbusclient

Ensure RabbitMQ is installed and running. Configure your RabbitMQ server connection in your application settings as required.

Usage

To use the EventBusClient in a real-world scenario with producer and consumer processes, see the example below:

import asyncio
import logging
import time
from multiprocessing import Process
from EventBusClient.event_bus_client import EventBusClient
from EventBusClient.message.base_message import BaseMessage

# Define a custom message class
class TestMessage(BaseMessage):
    def __init__(self, content=None):
        super().__init__()
        self.content = content

# Producer process: sends messages to the topic exchange
async def producer_process(config_path):
    client = await EventBusClient.from_config(config_path)
    for i in range(5):
        msg = TestMessage(f"Message #{i} from producer")
        await client.send("test.topic", msg)
        await asyncio.sleep(1)
    await client.close()

# Consumer process: receives messages from the topic
async def consumer_process(config_path):
    client = await EventBusClient.from_config(config_path)
    async def message_handler(message):
        print(f"Received: {message.content}")
    await client.on("test.topic", TestMessage, message_handler)
    await asyncio.sleep(10)
    await client.close()

# Helper to run async functions in a process
def run_process(target_func, config_path):
    asyncio.run(target_func(config_path))

# Main function to start producer and consumer processes
def main():
    config_path = "../config/config.jsonp"
    consumer = Process(target=run_process, args=(consumer_process, config_path))
    consumer.start()
    time.sleep(2)
    producer = Process(target=run_process, args=(producer_process, config_path))
    producer.start()
    producer.join()
    consumer.join()

if __name__ == "__main__":
    main()

Config File Construction

Create a configuration file (e.g. [config.jsonp]{.title-ref}) with your RabbitMQ and client settings. Example:

{
  "plugins_path": "./plugins",           // Path to plugins directory
  "host": "localhost",                   // RabbitMQ server hostname
  "port": 5672,                          // RabbitMQ server port
  "serializer": "PickleSerializer",      // Message serialization method
  "exchange_handler": "TopicExchangeHandler", // Exchange handler type
  "message_class": "ListenerEventMsg",   // Default message class
  "threadsafe_publish": true,            // Enable thread-safe publishing
  "auto_reconnect": true,                // Automatically reconnect on failure
  "qos_prefetch": 10                     // Prefetch count for QoS
}

Parameter explanations:

  • plugins_path: Directory for loading plugins.
  • host: RabbitMQ server address.
  • port: RabbitMQ server port.
  • serializer: Serialization method for messages.
  • exchange_handler: Handler for exchange type.
  • message_class: Class used for messages.
  • threadsafe_publish: If true, enables thread-safe publishing.
  • auto_reconnect: If true, client will auto-reconnect on connection loss.
  • qos_prefetch: Number of messages to prefetch for consumers.

Update the config_path in your code to point to this file.

Package Documentation

A detailed documentation of the RabbitMqMessagebus package can be found here: EventBusClient.pdf

Feedback

To give us a feedback, you can send an email to Thomas Pollerspöck

In case you want to report a bug or request any interesting feature, please don't hesitate to raise a ticket.

Maintainers

Nguyen Huynh Tri Cuong

Contributors

Nguyen Huynh Tri Cuong

Thomas Pollerspöck

License

Copyright 2020-2025 Robert Bosch GmbH

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

License: Apache v2

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

EventBusClient-0.1.1.tar.gz (53.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

EventBusClient-0.1.1-py3-none-any.whl (372.0 kB view details)

Uploaded Python 3

File details

Details for the file EventBusClient-0.1.1.tar.gz.

File metadata

  • Download URL: EventBusClient-0.1.1.tar.gz
  • Upload date:
  • Size: 53.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for EventBusClient-0.1.1.tar.gz
Algorithm Hash digest
SHA256 0091e4bacf856bcd7a0c99de005974f10477ba706693dd49b2c87ea43cba07f0
MD5 a101622615c00372cc638ffdb16a2122
BLAKE2b-256 13c12fde8fbc868607f04a036e78f1a9ede126bd0ac5b6430cbf17b273fdb94b

See more details on using hashes here.

File details

Details for the file EventBusClient-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: EventBusClient-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 372.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for EventBusClient-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 ebdc9e397d913826fb94094f7b8546aec8c2e062a7c713cd50328588b2ab04d8
MD5 8f29a5230d18f099657afe3e354f48ed
BLAKE2b-256 13a38fbb9bc17924a802b49c8cbd169ff122aa362b3db68646ca80f38ea8ae8f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page