Skip to main content

Easily send and receive messages to and from RabbitMQ message broker

Project description

RabbitMQ Decorator

  • Work in progress

Easily send and receive messages to and from RabbitMQ message broker.

Installation

pip install rabbitmq-decorator

Usage

Optional:

export RABBITMQ_HOST=localhost
export RABBITMQ_PORT=5672
export RABBITMQ_USERNAME=username
export RABBITMQ_PASSWORD=password

Example:

import asyncio
from typing import Dict, Any

from rabbitmq_decorator import RabbitMQHandler, RabbitMQConsumer, RabbitMQProducer, RabbitMQMessage, Exchange

event_loop = asyncio.get_event_loop()

# If environment variables was set
rabbit_handler = RabbitMQHandler(event_loop=event_loop)

# Or if environment variables wasn't set 
rabbit_handler = RabbitMQHandler(
    event_loop=event_loop,
    rabbitmq_host="localhost",
    rabbitmq_port=5672,
    rabbitmq_username="username",
    rabbitmq_password="password"
)


@rabbit_handler
class SomeClass:
    EXCHANGE = Exchange("test_exchange", durable=True)
    _producer: RabbitMQProducer = rabbit_handler.get_new_producer(EXCHANGE)

    def some_unrelated_function(self):
        pass

    @RabbitMQConsumer(EXCHANGE)
    async def consumer_1(self, body: Dict[str, Any]):
        print("I'm getting all the messages on test_exchange")
        self._producer.publish(RabbitMQMessage(routing_key="response.consumer1", body={"status": "Success"}))

    @RabbitMQConsumer(EXCHANGE, route="*.*.*")
    async def consumer_2(self, body: Dict[str, Any]):
        print("I'm getting only the messages on test_exchange with *.*.* as routing-key ")
        response_exchange = Exchange("response_exchanges", durable=True)
        self._producer.publish(
            exchange=response_exchange,
            message=RabbitMQMessage(routing_key="response.consumer2", body={"status": "Success"})
        )


# ================ main ================

async def main():
    """do something"""


if __name__ == '__main__':
    some_class = SomeClass()
    rabbit_handler.start(some_class)
    event_loop.run_until_complete(main())
    rabbit_handler.stop_consume(some_class)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rabbitmq-decorator-0.0.0.post3.tar.gz (11.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rabbitmq_decorator-0.0.0.post3-py3-none-any.whl (14.9 kB view details)

Uploaded Python 3

File details

Details for the file rabbitmq-decorator-0.0.0.post3.tar.gz.

File metadata

  • Download URL: rabbitmq-decorator-0.0.0.post3.tar.gz
  • Upload date:
  • Size: 11.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.10.6

File hashes

Hashes for rabbitmq-decorator-0.0.0.post3.tar.gz
Algorithm Hash digest
SHA256 633bb5d60be15af929aa9b108874253bc10638156fbdca507b6a6b12525a5c47
MD5 2f1b70a85bbcac1337491c3dbdcc2953
BLAKE2b-256 22d88d1871e573d93e30f066fbad9ee9f7310c797affec2aa4cee914f8581c8b

See more details on using hashes here.

File details

Details for the file rabbitmq_decorator-0.0.0.post3-py3-none-any.whl.

File metadata

File hashes

Hashes for rabbitmq_decorator-0.0.0.post3-py3-none-any.whl
Algorithm Hash digest
SHA256 3e71369992e5301991a0c86b08fef15a0e8a35935af2f4751a53433e4dab01d6
MD5 86a77f67fb71c3b8a65703236b991172
BLAKE2b-256 5490980e86049f3fa02b702f86acadcf5c42acf6fb1aa0e58f257b4f27d1818d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page