Skip to main content

Library to deal with RabbitMQ and Google PubSub

Project description

Library to deal with RabbitMQ and Google PubSub

Usage

QueueManager class

>>> from queue_manager.queue_manager import QueueManager
>>> conn_params = {
...     'host': '',
...     'port': '',
...     'username': '',
...     'password': ''
... }
>>> # or use multiple urls
>>> conn_params = ('amqp://host1', 'amqp://host2',)
qm = QueueManager(conn_params)
>>> qm.push('hello', 'Hello from QueueManager')
True
>>> qm.pop('hello')
Hello from QueueManager
>>> del(qm)

RabbitMqPublisher class

from queue_manager.rabbitmq_consumer import RabbitMqPublisher

producer = RabbitMqPublisher('amqp://username:password@hostname:port',
                             'exchange', 'exchange_type', 'queue_name', 'routing_key')

producer.publish_message('hello')
# or passing message property
producer.publish_message('hello', dict(priority=8))

RabbitMqConsumer class

This class is an async consumer class, that still connected.

Inspired on: asynchronous_consumer_example

single_url = 'amqp://username:password@hostname:port'
# or multiple urls
multiple_urls = ('amqp://username:password@hostnameone:port', 'amqp://username:password@hostnametwo:port')
consumer = RabbitMqConsumer(single_url, queue='queue_name')

try:
    def callback(body):
        print("message body", body)

    consumer.run(callback)
except KeyboardInterrupt:
    consumer.stop()

TornadoConsumer class

from tornado.ioloop import IOLoop
from queue_manager.tornado_consumer import TornadoConsumer

consumer = TornadoConsumer('amqp://username:password@hostname:port',
                           'exchange', 'exchange_type', 'queue_name', 'routing_key')


def callback(body):
    print("message body", body)

consumer.run(callback)
IOLoop.instance().start()

PubsubConsumer class

consumer = PubsubConsumer('project_id', 'path/to/sa.json', 'subscription_name', 'topic_name')

def callback(message):
    print("message", message)

try:
    consumer.start_listening(callback)
except KeyboardInterrupt:
    consumer.stop()

Running tests with tox

Install tox

$ pip install tox

Run tests

tox

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

PyQueueManager-1.12.1.tar.gz (8.1 kB view details)

Uploaded Source

File details

Details for the file PyQueueManager-1.12.1.tar.gz.

File metadata

  • Download URL: PyQueueManager-1.12.1.tar.gz
  • Upload date:
  • Size: 8.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: Python-urllib/3.8

File hashes

Hashes for PyQueueManager-1.12.1.tar.gz
Algorithm Hash digest
SHA256 4ebf4f39bafc76a2f8535e8acbb31db37b5a0679fc98abd48abb71fc3dec1cd5
MD5 8e34b917d1bc04fd62a802bf23b85ee4
BLAKE2b-256 cc97e4da66cd665b37e2928ca4987bb0115490de1c16a37c88409a466bd8c078

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page