Threadsafe implementation of pyamqp transport for kombu
Project description
kombu-pyamqp-threadsafe
Threadsafe implementation of pyamqp transport for kombu
TL;DR
kombu (pyamqp) designed as "1 thread = 1 connection", no connection sharing between threads
This package make possible design "1 thread = 1 channel", allow connection sharing between threads
import kombu
import kombu_pyamqp_threadsafe
# Use drop-in replacement thread-safe kombu.Connection variant:
connection = kombu_pyamqp_threadsafe.KombuConnection(...)
# or construct from kombu.Connection
kombu_connection = kombu.Connection(...)
kombu_pyamqp_threadsafe.KombuConnection.from_kombu_connection(kombu_connection)
Motivation
The best practice to work with RabbitMQ is use 2 connections: 1 for consuming and 1 for producer.
https://www.cloudamqp.com/blog/part1-rabbitmq-best-practice.html#connections-and-channels
But it's not possible with kombu (pyamqp) (https://github.com/celery/py-amqp/issues/420)
Without that we can't effectively consume many queues at same time
And when we publish messages in multithread application we need create connection for each publisher (producer) thread
Other solutions
Connection pool.
- This is concept used by celery, now you do not create a lot of connections when publish messages but still need same connections count to consume
Use same channel to consume from all queues
- It's bad practice cause any exception in channel will close it and broke all consumers
Usage
Q: Ok, i install it, and now what?
A: You can use ConnectionHolder from this snippet: https://github.com/celery/py-amqp/issues/420#issuecomment-1858429922
Q: It's production ready? How you test it?
A: Yes, it's production ready. We also make stress-test with 900 threads, when run 900 dramatiq actors which consume message and send new one to queue. Only 2 connections used.
Q: Dramatiq?
A: Yes, just use dramatiq-kombu-broker
Rules
Rule 1: Do not share channel between threads
- Do not use
default_channel
from different threads, cause responses is channel bound, you can get error from other thread or produce error to different thread - Use
default_channel_pool
and acquire channel for each thread
Rule 2: Channel bound to thread where his created.
- This required because dispatch frame can raise error which expected in caller thread (see Rule 1). E.g.:
queue_declare
method. - If you declare queue in passive mode RabbitMQ will close channel and exception MAY raise in different thread when his drain events.
- To ensure all exceptions raised in expected context we bound channels to threads and dispatch received frames only in their own threads.
Install
To add and install this package as a dependency of your project, run poetry add kombu-pyamqp-threadsafe
.
Developing
- This project follows the Conventional Commits standard to automate Semantic Versioning and Keep A Changelog with Commitizen.
- Run
poe
from within the development environment to print a list of Poe the Poet tasks available to run on this project. - Run
poetry add {package}
from within the development environment to install a run time dependency and add it topyproject.toml
andpoetry.lock
. Add--group test
or--group dev
to install a CI or development dependency, respectively. - Run
poetry update
from within the development environment to upgrade all dependencies to the latest versions allowed bypyproject.toml
. - Run
cz bump
to bump the package's version, update theCHANGELOG.md
, and create a git tag.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for kombu_pyamqp_threadsafe-0.5.0.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | d88c36b81c0b8f0eb7697561cc57a098a3ecd0a0b9320fc89c46fdcc7f198d0c |
|
MD5 | 8d73a3ec803ea49386e004981df059da |
|
BLAKE2b-256 | 26af09ed12738c58469a142e86bb9ae51bc85f54c1d7bc3bd095f3b8da428699 |
Hashes for kombu_pyamqp_threadsafe-0.5.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2034a90110265e4a4be32fa04056e453b3efeeace88ca2db9336f56d3dc5d64c |
|
MD5 | 9037923a169955f5ac20b0e78f80d16e |
|
BLAKE2b-256 | 3359302f74fe9a8078a0a1a3255d86eb0c8a5a2e30df678d4a5e91164193fb6b |