asyncio RabbitMQ client
Project description
Asyncio RabbitMQ client that handles all the edge cases.
Installation
pip install carehare
Usage
Consumer (async for):
try: async with carehare.connect("amqps://guest:guest@localhost") as connection: try: await connection.queue_declare("my-queue", exclusive=True) async with connection.acking_consumer("my-queue") as consumer: async for message in consumer: print(repr(message)) # ... if we raise an exception here, we won't ack. # # ... if we `break` from this loop, we won't ack. # # Call `consumer.close()` before (or instead of) `break` # to abort iteration. except carehare.ChannelClosedByServer: logger.info("RabbitMQ told this one consumer to go away") except carehare.ConnectionClosed: # Either RabbitMQ is telling us an error (and the outer context # manager will throw it), or we called connection.close() # ourselves (so we want to close). pass except carehare.ConnectionClosedByServer: # str(error) will give the RabbitMQ error message logger.error("RabbitMQ closed our connection") except carehare.ConnectionClosedByHeartbeatMonitor: logger.error("RabbitMQ went away")
Consumer (next_delivery):
try: async with carehare.connect("amqps://guest:guest@localhost") as connection: try: await connection.queue_declare("my-queue", exclusive=True) async with connection.acking_consumer("my-queue") as consumer: while True: message, delivery_tag = await consumer.next_delivery() # You must ack() (with no await). If RabbitMQ doesn't # receive this ack, it may deliver the same message to # another client. consumer.ack(delivery_tag) if message.startswith(b"okay, go away now"): break except carehare.ChannelClosedByServer: logger.info("RabbitMQ told this one consumer to go away") except carehare.ConnectionClosed: # Either RabbitMQ is telling us an error (and the outer context # manager will throw it), or we called connection.close() # ourselves (so we want to close). pass except carehare.ConnectionClosedByServer: # str(error) will give the RabbitMQ error message logger.error("RabbitMQ closed our connection") except carehare.ConnectionClosedByHeartbeatMonitor: logger.error("RabbitMQ went away")
Publisher:
try: async with carehare.connect("amqps://guest:guest@localhost") as connection: try: await connection.publish(b"Hello, world!", routing_key="my-queue") except carehare.ServerSentNack: logger.warn("Failed to publish message") except carehare.ChannelClosedByServer: # str(err) will give the RabbitMQ error message -- for instance, # "404 NOT_FOUND" if the exchange does not exist logger.error("Problem with the exchange") except carehare.ConnectionClosedByServer: # str(error) will give the RabbitMQ error message logger.error("RabbitMQ closed our connection") except carehare.ConnectionClosedByHeartbeatMonitor: logger.error("RabbitMQ went away")
Design decisions
carehare is designed to turn RabbitMQ’s asynchronous error system into _understandable_ Python exceptions.
Channels
Carehare doesn’t let you control RabbitMQ Channels. They aren’t Pythonic. (In RabbitMQ, an exception on a channel closes the channel – and cancels all its pending operations.)
Instead, carehare uses channels to handle errors. For instance, Queue.Declare costs three operations: Channel.Open, Queue.Declare, Channel.Close. Since the operation has its own channel, it won’t interfere with other operations if it causes an exception.
There’s a speed-up for publishing: we lazily open a Channel per exchange. Error codes like “not found”, “access refused” and “not implemented” will make carehare raise an exception on all pending publishes on the same exchange. Don’t worry: a normal “Nack” (“message wasn’t delivered”) will only make your single message fail.
Exceptions
“Exceptions” are hardly exceptional: as a programmer, they are your job. These ones are designed to help you solve them.
Connection methods return asyncio.Future objects. You must await each one and handle its errors.
Even though you’re using Python async context managers, exceptions can’t happen everywhere. Carehare will only raise when you await a response from RabbitMQ.
In particular, consumer.ack() will never raise! You must call it from the main event loop, but you won’t await it.
To code safely, catch these exceptions religiously:
carehare.ChannelClosedByServer: RabbitMQ did not like the command you just ran. Read the exception message for details. After you receive this message, you may continue using the RabbitMQ connection.
carehare.ConnectionClosed: When the connection shuts down, every pending Future will raise this. Only the actual Connection context manager will raise the underlying exception: a carehare.ConnectionClosedByServer with a descriptive error message.
Carehare won’t raise asyncio.Cancelled.
Back-pressure
The core logic is synchronous. It’s simpler to reason about. The downside: neither RabbitMQ nor users will wait for buffers to empty before sending more data.
Use application-level logic to make sure you don’t run out of memory:
Consuming? Don’t worry. Use prefetch_count to limit the number of messages RabbitMQ sends. Always ack: carehare won’t permit no-ack.
Publishing? Carehare forces “publisher confirms”, so each publish returns a Future. Your application is responsible for not calling publish() too many times simultaneously. Use an asyncio.Semaphore or reason about your specific use (for instance, “my server will host max 100 clients, and each client can only publish one message at a time”).
Comparison to other async RabbitMQ clients
Other clients tend to try and achieve “RabbitMQ in Python”. They fail when it comes to exceptions:
aiormq: If your code generates an error during consume, aiormq (4.1.1) will catch it and ignore it – stalling your program.
aioamqp: If your connection produces an unexpected error, aioamqp will catch it and ignore it – stalling your program. Also, the latest release was in 2019.
This author believes it’s too confusing to model RabbitMQ’s API in Python. Instead, carehare models your intent in Python.
Dependencies
You’ll need Python 3.8+ and a RabbitMQ server.
If you have Docker, here’s how to start a development server:
test-server/prepare-certs.sh # Create SSL certificates used in tests docker run --rm -it \ -p 5671:5671 \ -p 5672:5672 \ -p 15672:15672 \ -v "/$(pwd)"/test-server:/test-server \ -e RABBITMQ_SSL_CACERTFILE=/test-server/ca.cert \ -e RABBITMQ_SSL_CERTFILE=/test-server/server.cert \ -e RABBITMQ_SSL_KEYFILE=/test-server/server.key \ -e RABBITMQ_SSL_VERIFY=verify_peer \ -e RABBITMQ_SSL_FAIL_IF_NO_PEER_CERT=true \ -e RABBITMQ_CONFIG_FILE=/test-server/rabbitmq \ rabbitmq:3.8.11-management-alpine
During testing, see the RabbitMQ management interface at http://localhost:15672.
Contributing
To add features and fix bugs
First, start a development RabbitMQ server (see above).
Now take on the development cycle:
tox # to ensure tests pass.
Write new tests in tests/ and make sure they fail.
Write new code in carehare/ to make the tests pass.
Submit a pull request.
To deploy
Use semver.
git push and make sure Travis tests all pass.
git tag vX.X.X
git push --tags
TravisCI will push to PyPi.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file carehare-1.0.2.tar.gz
.
File metadata
- Download URL: carehare-1.0.2.tar.gz
- Upload date:
- Size: 20.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.12 CPython/3.9.0 Linux/5.11.0-1023-gcp
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 7c6795b2d0a0e486498497f0a113cf021b061a8d20db13834cc1f3e7b6ba7a3c |
|
MD5 | cc0e1c318a4c405ab089b6ab86f24073 |
|
BLAKE2b-256 | 6207409a3923a6ecfa688edee5e3f3be96c6398952941be75a997e39c10c2091 |
File details
Details for the file carehare-1.0.2-py3-none-any.whl
.
File metadata
- Download URL: carehare-1.0.2-py3-none-any.whl
- Upload date:
- Size: 20.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.12 CPython/3.9.0 Linux/5.11.0-1023-gcp
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 572c24663d19a9bc94137f9a6cec978658753dc35e928a3cb2d63faee39e4a2a |
|
MD5 | 3e5be1ba63b630196e71db9862a666fb |
|
BLAKE2b-256 | 634e8a483d4b7241af22456f40c27035971dc858205fe22260a369bab45084ae |