No project description provided
Project description
curv_amqp
Pika framework that handles reconnecting while using a blocking connection and has helpful defaults, building blocks, type hints, and a priority requeue method
Installing package
pip install curv-amqp
Installing for local development
git clone https://github.com/rep-ai/curv_amqp.git
cd curv_amqp
pip install -r requirements.txt
pip install -e .
Usage
from argparse import ArgumentParser
from curv_amqp.connection import Connection, URLParameters, ConnectionParameters
from curv_amqp.publisher import Publisher
from curv_amqp.consumer import Consumer, ConsumerMessage
from curv_amqp.exceptions import ChannelClosedError, ConnectionClosedError
def on_message_callback(message: ConsumerMessage):
print('message.body:', message.body)
message.ack()
message.consumer.stop_consuming()
def main():
parser = ArgumentParser()
parser.add_argument('--url', type=str, default='localhost', help='amqp url or localhost - '
'localhost assumes rabbitmq is installed - '
'"brew install rabbitmq"')
parser.add_argument('--queue', type=str, default='test-queue-name', help='amqp queue name')
parser.add_argument('--body', type=str, default='your message', help='amqp message body')
args = parser.parse_args()
# pass in URLParameters or ConnectionParameters
# its recommended that a single connection per process is used.
url: str = args.url
parameters = ConnectionParameters(url) if url is 'localhost' else URLParameters(url)
queue_name = args.queue
body = bytes(args.body, encoding='utf-8')
connection = Connection(parameters=parameters)
# testing auto reconnect for connection
connection.blocking_connection.close()
# its required that two different channels are used for a publisher and consumer
# NOTE: will automatically declare queue for you
publisher = Publisher(connection=connection)
# testing auto reconnect for channel / publisher
publisher.blocking_channel.close()
publisher.publish(routing_key=queue_name, body=body)
consumer = Consumer(connection=connection)
# testing auto reconnect for channel / consumer
consumer.blocking_channel.close()
consumer.consume(queue=queue_name, prefetch_count=1, on_message_callback=on_message_callback)
try:
# testing proper channel close
publisher.close()
publisher.publish(routing_key=queue_name, body=body)
except ChannelClosedError as e:
print(e)
try:
# testing proper connection close
publisher = Publisher(connection=connection)
connection.close()
publisher.publish(routing_key=queue_name, body=body)
except ConnectionClosedError as e:
print(e)
if __name__ == '__main__':
main()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
curv_amqp-1.0.1.tar.gz
(7.2 kB
view hashes)
Built Distribution
Close
Hashes for curv_amqp-1.0.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 7c4fc5c2352ffc7d285d948b381809af39c30b56c55ded6b981433e623560a1a |
|
MD5 | 424804f0638f1b59ea0fd18539bb3c39 |
|
BLAKE2b-256 | 4ed6e7e899bff14bdd8a1e9a1f65c5b127d70de70c0287dda8b4636511ce9251 |