Skip to main content

Threadsafe implementation of pyamqp transport for kombu

Project description

Open in Dev Containers

kombu-pyamqp-threadsafe

Threadsafe implementation of pyamqp transport for kombu

TL;DR

kombu (pyamqp) designed as "1 thread = 1 connection", no connection sharing between threads

This package make possible design "1 thread = 1 channel", allow connection sharing between threads

import kombu_pyamqp_threadsafe
# ... and then
kombu_pyamqp_threadsafe.monkeypatch_pyamqp_transport()  # patch exists transports: 'pyamqp://', 'amqp://', 'amqps://'
# ... or
kombu_pyamqp_threadsafe.add_shared_amqp_transport()  # explicit use 'shared+pyamqp://', 'shared+amqp://' or 'shared+amqps://' transports 

Motivation

The best practice to work with RabbitMQ is use 2 connections: 1 for consuming and 1 for producer.

https://www.cloudamqp.com/blog/part1-rabbitmq-best-practice.html#connections-and-channels

But it's not possible with kombu (pyamqp) (https://github.com/celery/py-amqp/issues/420)

Without that we can't effectively consume many queues at same time

And when we publish messages in multithread application we need create connection for each publisher (producer) thread

Other solutions

Connection pool.

  • This is concept used by celery, now you do not create a lot of connections when publish messages but still need same connections count to consume

Use same channel to consume from all queues

  • It's bad practice cause any exception in channel will close it and broke all consumers

Usage

Q: Ok, i install it, and now what?

A: You can use ConnectionHolder from this snippet: https://github.com/celery/py-amqp/issues/420#issuecomment-1858429922

Q: It's production ready? How you test it?

A: Yes, it's production ready. We also make stress-test with 900 threads, when run 900 dramatiq actors which consume message and send new one to queue. Only 2 connections used.

Q: Dramatiq? A: Yes, dramatiq-kombu-broker will be released soon :)

Rules

Rule 1: Do not share channel between threads

  • Do not use default_channel from different threads, cause responses is channel bound, you can get error from other thread or produce error to different thread
  • Use default_channel_pool and acquire channel for each thread

Rule 2: Channel bound to thread where his created.

  • This required because dispatch frame can raise error which expected in caller thread (see Rule 1). E.g.: queue_declare method.
  • If you declare queue in passive mode RabbitMQ will close channel and exception MAY raise in different thread when his drain events.
  • To ensure all exceptions raised in expected context we bound channels to threads and dispatch received frames only in their own threads.

Install

To add and install this package as a dependency of your project, run poetry add kombu-pyamqp-threadsafe.

Contributing

Prerequisites
1. Set up Git to use SSH
  1. Generate an SSH key and add the SSH key to your GitHub account.
  2. Configure SSH to automatically load your SSH keys:
    cat << EOF >> ~/.ssh/config
    Host *
      AddKeysToAgent yes
      IgnoreUnknown UseKeychain
      UseKeychain yes
    EOF
    
2. Install Docker
  1. Install Docker Desktop.
3. Install VS Code or PyCharm
  1. Install VS Code and VS Code's Dev Containers extension. Alternatively, install PyCharm.
  2. Optional: install a Nerd Font such as FiraCode Nerd Font and configure VS Code or configure PyCharm to use it.
Development environments

The following development environments are supported:

  1. ⭐️ GitHub Codespaces: click on Code and select Create codespace to start a Dev Container with GitHub Codespaces.
  2. ⭐️ Dev Container (with container volume): click on Open in Dev Containers to clone this repository in a container volume and create a Dev Container with VS Code.
  3. Dev Container: clone this repository, open it with VS Code, and run Ctrl/⌘ + + PDev Containers: Reopen in Container.
  4. PyCharm: clone this repository, open it with PyCharm, and configure Docker Compose as a remote interpreter with the dev service.
  5. Terminal: clone this repository, open it with your terminal, and run docker compose up --detach dev to start a Dev Container in the background, and then run docker compose exec dev zsh to open a shell prompt in the Dev Container.
Developing
  • This project follows the Conventional Commits standard to automate Semantic Versioning and Keep A Changelog with Commitizen.
  • Run poe from within the development environment to print a list of Poe the Poet tasks available to run on this project.
  • Run poetry add {package} from within the development environment to install a run time dependency and add it to pyproject.toml and poetry.lock. Add --group test or --group dev to install a CI or development dependency, respectively.
  • Run poetry update from within the development environment to upgrade all dependencies to the latest versions allowed by pyproject.toml.
  • Run cz bump to bump the package's version, update the CHANGELOG.md, and create a git tag.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kombu_pyamqp_threadsafe-0.1.0.tar.gz (8.1 kB view hashes)

Uploaded Source

Built Distribution

kombu_pyamqp_threadsafe-0.1.0-py3-none-any.whl (7.6 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page