No project description provided
Project description
curv_amqp
Pika framework that handles reconnecting while using a blocking connection and has helpful defaults, building blocks, type hints, and a priority requeue method
Installing package
pip install curv-amqp
Installing for local development
git clone https://github.com/rep-ai/curv_amqp.git
cd curv_amqp
pip install -r requirements.txt
pip install -e .
Basic Usage
from curv_amqp.connection import Connection, ConnectionParameters from curv_amqp.publisher import Publisher from curv_amqp.consumer import Consumer, ConsumerMessage host = 'localhost' queue_name = 'test' connection = Connection(parameters=ConnectionParameters(host)) publisher = Publisher(connection=connection) publisher.publish(routing_key=queue_name, body=b'message') consumer = Consumer(connection=connection) def on_message_callback(message: ConsumerMessage): print('message.body:', message.body) message.ack() # you don't have to stop consuming here - but you do have to stop the consumer in this thread # eventually since consumer.consume is blocking message.consumer.stop_consuming() consumer.consume(queue=queue_name, prefetch_count=1, on_message_callback=on_message_callback)
Usage
from argparse import ArgumentParser from curv_amqp.connection import Connection, URLParameters, ConnectionParameters from curv_amqp.publisher import Publisher from curv_amqp.consumer import Consumer, ConsumerMessage from curv_amqp.exceptions import ChannelClosedError, ConnectionClosedError, RequeueRetryCountError def main(): parser = ArgumentParser() parser.add_argument('--url', type=str, default='localhost', help='amqp url or localhost - ' 'localhost assumes rabbitmq is installed - ' '"brew install rabbitmq"') parser.add_argument('--queue', type=str, default='test-queue-name', help='amqp queue name') parser.add_argument('--body', type=str, default='your message', help='amqp message body') args = parser.parse_args() # pass in URLParameters or ConnectionParameters # its recommended that a single connection per process is used. url: str = args.url parameters = ConnectionParameters(url) if url is 'localhost' else URLParameters(url) queue_name = args.queue body = bytes(args.body, encoding='utf-8') connection = Connection(parameters=parameters) # testing auto reconnect for connection connection.blocking_connection.close() # its required that two different channels are used for a publisher and consumer # NOTE: will automatically declare queue for you publisher = Publisher(connection=connection) # testing auto reconnect for channel / publisher publisher.blocking_channel.close() publisher.publish(routing_key=queue_name, body=body) consumer = Consumer(connection=connection) # testing auto reconnect for channel / consumer consumer.blocking_channel.close() def on_message_callback(message: ConsumerMessage): print('message.body:', message.body) print('message.properties.priority', message.properties.priority) try: message.priority_requeue(publisher) except RequeueRetryCountError as ex: print(ex) message.ack() message.consumer.stop_consuming() consumer.consume(queue=queue_name, prefetch_count=1, on_message_callback=on_message_callback) publisher.publish(routing_key=queue_name, body=body) for msg in consumer.consume_generator(queue=queue_name, prefetch_count=1, auto_ack=True, inactivity_timeout=1): print('msg.body:', msg.body) try: # testing proper channel close publisher.close() publisher.publish(routing_key=queue_name, body=body) except ChannelClosedError as e: print(e) try: # testing proper connection close publisher = Publisher(connection=connection) connection.close() publisher.publish(routing_key=queue_name, body=body) except ConnectionClosedError as e: print(e) if __name__ == '__main__': main()
Updating package
# update __version__ in __init__.py
python setup.py sdist bdist_wheel
twine check dist/*
twine upload dist/*
or using this script - note it will remove build and dist directories
bash deploy.sh
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Filename, size | File type | Python version | Upload date | Hashes |
---|---|---|---|---|
Filename, size curv_amqp-1.3.7-py3-none-any.whl (19.1 kB) | File type Wheel | Python version py3 | Upload date | Hashes View |
Filename, size curv_amqp-1.3.7.tar.gz (15.8 kB) | File type Source | Python version None | Upload date | Hashes View |
Close
Hashes for curv_amqp-1.3.7-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | b4132b60144b3d8918c40b8e8d581e0a8883e3f882715c7903e40a61b68846ef |
|
MD5 | 26fa505f3ce888c87ed590e3c7a1c044 |
|
BLAKE2-256 | a70744f5118f79a5d56093c20a8623e449f28bef7d34b38f5cf847f31dc5fbed |