Skip to main content

Threadsafe implementation of pyamqp transport for kombu

Project description

Open in Dev Containers

kombu-pyamqp-threadsafe

Threadsafe implementation of pyamqp transport for kombu

TL;DR

kombu (pyamqp) designed as "1 thread = 1 connection", no connection sharing between threads

This package make possible design "1 thread = 1 channel", allow connection sharing between threads

import kombu
import kombu_pyamqp_threadsafe

# Use drop-in replacement thread-safe kombu.Connection variant: 
connection = kombu_pyamqp_threadsafe.KombuConnection(...)

# or construct from kombu.Connection
kombu_connection = kombu.Connection(...)
kombu_pyamqp_threadsafe.KombuConnection.from_kombu_connection(kombu_connection)

Motivation

The best practice to work with RabbitMQ is use 2 connections: 1 for consuming and 1 for producer.

https://www.cloudamqp.com/blog/part1-rabbitmq-best-practice.html#connections-and-channels

But it's not possible with kombu (pyamqp) (https://github.com/celery/py-amqp/issues/420)

Without that we can't effectively consume many queues at same time

And when we publish messages in multithread application we need create connection for each publisher (producer) thread

Other solutions

Connection pool.

  • This is concept used by celery, now you do not create a lot of connections when publish messages but still need same connections count to consume

Use same channel to consume from all queues

  • It's bad practice cause any exception in channel will close it and broke all consumers

Usage

Q: Ok, i install it, and now what?

A: You can use ConnectionHolder from this snippet: https://github.com/celery/py-amqp/issues/420#issuecomment-1858429922

Q: It's production ready? How you test it?

A: Yes, it's production ready. We also make stress-test with 900 threads, when run 900 dramatiq actors which consume message and send new one to queue. Only 2 connections used.

Q: Dramatiq?

A: Yes, just use dramatiq-kombu-broker

Rules

Rule 1: Do not share channel between threads

  • Do not use default_channel from different threads, cause responses is channel bound, you can get error from other thread or produce error to different thread
  • Use default_channel_pool and acquire channel for each thread

Rule 2: Channel bound to thread where his created.

  • This required because dispatch frame can raise error which expected in caller thread (see Rule 1). E.g.: queue_declare method.
  • If you declare queue in passive mode RabbitMQ will close channel and exception MAY raise in different thread when his drain events.
  • To ensure all exceptions raised in expected context we bound channels to threads and dispatch received frames only in their own threads.

Install

To add and install this package as a dependency of your project, run poetry add kombu-pyamqp-threadsafe.

Developing
  • This project follows the Conventional Commits standard to automate Semantic Versioning and Keep A Changelog with Commitizen.
  • Run poe from within the development environment to print a list of Poe the Poet tasks available to run on this project.
  • Run poetry add {package} from within the development environment to install a run time dependency and add it to pyproject.toml and poetry.lock. Add --group test or --group dev to install a CI or development dependency, respectively.
  • Run poetry update from within the development environment to upgrade all dependencies to the latest versions allowed by pyproject.toml.
  • Run cz bump to bump the package's version, update the CHANGELOG.md, and create a git tag.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kombu_pyamqp_threadsafe-0.5.1.tar.gz (11.5 kB view details)

Uploaded Source

Built Distribution

kombu_pyamqp_threadsafe-0.5.1-py3-none-any.whl (9.9 kB view details)

Uploaded Python 3

File details

Details for the file kombu_pyamqp_threadsafe-0.5.1.tar.gz.

File metadata

  • Download URL: kombu_pyamqp_threadsafe-0.5.1.tar.gz
  • Upload date:
  • Size: 11.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.4 CPython/3.10.15 Linux/6.5.0-1025-azure

File hashes

Hashes for kombu_pyamqp_threadsafe-0.5.1.tar.gz
Algorithm Hash digest
SHA256 ed71ecf14fe19ae56ffd2f008275b32a0450cc7335448f2500490281ac01fbee
MD5 063b1d17e3107a75bdd75263da2684ca
BLAKE2b-256 3a692b8d30181b5575a6d57af7b2b131182ce8cc179e192c7fef0d7d758d83dd

See more details on using hashes here.

File details

Details for the file kombu_pyamqp_threadsafe-0.5.1-py3-none-any.whl.

File metadata

File hashes

Hashes for kombu_pyamqp_threadsafe-0.5.1-py3-none-any.whl
Algorithm Hash digest
SHA256 c8510963bb59d6f8125ee78628889e8f757f97bd0c424fcf169a07c9d7ba75f0
MD5 8d11faf75159eacd2cf9b6d58cef6e42
BLAKE2b-256 aa17ba79f1aa2de9539409b37b5a10eb5052f95e2cce0ed8093ff449a0b59ad3

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page