Threadsafe implementation of pyamqp transport for kombu
Project description
kombu-pyamqp-threadsafe
Threadsafe implementation of pyamqp transport for kombu
TL;DR
kombu (pyamqp) designed as "1 thread = 1 connection", no connection sharing between threads
This package make possible design "1 thread = 1 channel", allow connection sharing between threads
import kombu_pyamqp_threadsafe
# ... and then
kombu_pyamqp_threadsafe.monkeypatch_pyamqp_transport() # patch exists transports: 'pyamqp://', 'amqp://', 'amqps://'
# ... or
kombu_pyamqp_threadsafe.add_shared_amqp_transport() # explicit use 'shared+pyamqp://', 'shared+amqp://' or 'shared+amqps://' transports
Motivation
The best practice to work with RabbitMQ is use 2 connections: 1 for consuming and 1 for producer.
https://www.cloudamqp.com/blog/part1-rabbitmq-best-practice.html#connections-and-channels
But it's not possible with kombu (pyamqp) (https://github.com/celery/py-amqp/issues/420)
Without that we can't effectively consume many queues at same time
And when we publish messages in multithread application we need create connection for each publisher (producer) thread
Other solutions
Connection pool.
- This is concept used by celery, now you do not create a lot of connections when publish messages but still need same connections count to consume
Use same channel to consume from all queues
- It's bad practice cause any exception in channel will close it and broke all consumers
Usage
Q: Ok, i install it, and now what?
A: You can use ConnectionHolder from this snippet: https://github.com/celery/py-amqp/issues/420#issuecomment-1858429922
Q: It's production ready? How you test it?
A: Yes, it's production ready. We also make stress-test with 900 threads, when run 900 dramatiq actors which consume message and send new one to queue. Only 2 connections used.
Q: Dramatiq? A: Yes, dramatiq-kombu-broker will be released soon :)
Rules
Rule 1: Do not share channel between threads
- Do not use
default_channel
from different threads, cause responses is channel bound, you can get error from other thread or produce error to different thread - Use
default_channel_pool
and acquire channel for each thread
Rule 2: Channel bound to thread where his created.
- This required because dispatch frame can raise error which expected in caller thread (see Rule 1). E.g.:
queue_declare
method. - If you declare queue in passive mode RabbitMQ will close channel and exception MAY raise in different thread when his drain events.
- To ensure all exceptions raised in expected context we bound channels to threads and dispatch received frames only in their own threads.
Install
To add and install this package as a dependency of your project, run poetry add kombu-pyamqp-threadsafe
.
Contributing
Prerequisites
1. Set up Git to use SSH
- Generate an SSH key and add the SSH key to your GitHub account.
- Configure SSH to automatically load your SSH keys:
cat << EOF >> ~/.ssh/config Host * AddKeysToAgent yes IgnoreUnknown UseKeychain UseKeychain yes EOF
2. Install Docker
- Install Docker Desktop.
- Enable Use Docker Compose V2 in Docker Desktop's preferences window.
- Linux only:
- Export your user's user id and group id so that files created in the Dev Container are owned by your user:
cat << EOF >> ~/.bashrc export UID=$(id --user) export GID=$(id --group) EOF
- Export your user's user id and group id so that files created in the Dev Container are owned by your user:
3. Install VS Code or PyCharm
- Install VS Code and VS Code's Dev Containers extension. Alternatively, install PyCharm.
- Optional: install a Nerd Font such as FiraCode Nerd Font and configure VS Code or configure PyCharm to use it.
Development environments
The following development environments are supported:
- ⭐️ GitHub Codespaces: click on Code and select Create codespace to start a Dev Container with GitHub Codespaces.
- ⭐️ Dev Container (with container volume): click on Open in Dev Containers to clone this repository in a container volume and create a Dev Container with VS Code.
- Dev Container: clone this repository, open it with VS Code, and run Ctrl/⌘ + ⇧ + P → Dev Containers: Reopen in Container.
- PyCharm: clone this repository, open it with PyCharm, and configure Docker Compose as a remote interpreter with the
dev
service. - Terminal: clone this repository, open it with your terminal, and run
docker compose up --detach dev
to start a Dev Container in the background, and then rundocker compose exec dev zsh
to open a shell prompt in the Dev Container.
Developing
- This project follows the Conventional Commits standard to automate Semantic Versioning and Keep A Changelog with Commitizen.
- Run
poe
from within the development environment to print a list of Poe the Poet tasks available to run on this project. - Run
poetry add {package}
from within the development environment to install a run time dependency and add it topyproject.toml
andpoetry.lock
. Add--group test
or--group dev
to install a CI or development dependency, respectively. - Run
poetry update
from within the development environment to upgrade all dependencies to the latest versions allowed bypyproject.toml
. - Run
cz bump
to bump the package's version, update theCHANGELOG.md
, and create a git tag.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for kombu_pyamqp_threadsafe-0.1.0.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 93969237c88ae198dc461fec361bf598c0078e6563486e64410b010cd096ea34 |
|
MD5 | 42735dddd3ba5e0c26c79b218c6dff99 |
|
BLAKE2b-256 | da0e1cea3f51eb1962ad0fbc1a3bc5b5d8b85a91ad2cfbbb16693d00afbb9785 |
Hashes for kombu_pyamqp_threadsafe-0.1.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | bc3e180257c1e4eb477bf588dca7eaa7ac9dbcbbae35c76dfabc24703d4e7c9a |
|
MD5 | 0feef32f6408048db58990f84dc5a4a7 |
|
BLAKE2b-256 | c90628e067bf23049c202f5abd528935d17c8aaf3d225b73664f42d6ac17b004 |