Enhanced Kombu Redis transport for Celery
Project description
celery-redis-plus
Enhanced Redis/Valkey transport for Celery with native delayed delivery, improved reliability, full priority support, and reliable fanout.
Overview
celery-redis-plus is a replacement Redis/Valkey transport for Celery that addresses key limitations of the standard transport:
- Native Delayed Delivery - Tasks with long
countdownoretaare stored in Redis and delivered when due, instead of being held in worker memory. - Improved Reliability - Atomic message consumption via BZMPOP with improvements regarding visibility timeout ensures zero message loss.
- Full Priority Support - All 256 priority levels (0-255) with RabbitMQ-compatible semantics (higher number = higher priority).
- Reliable Fanout - Redis Streams replace lossy PUB/SUB for durable broadcast messaging.
Installation
uv add celery-redis-plus
Quick Start
from celery import Celery
from celery_redis_plus import DelayedDeliveryBootstep
app = Celery('myapp')
app.config_from_object({
'broker_url': 'celery_redis_plus.transport:Transport://localhost:6379/0',
})
app.steps['consumer'].add(DelayedDeliveryBootstep)
@app.task
def my_task():
print("Hello!")
# Tasks with countdown/eta will use native Redis delayed delivery
my_task.apply_async(countdown=120)
# Priority support (RabbitMQ semantics: higher = more important)
my_task.apply_async(priority=90) # High priority
my_task.apply_async(priority=0) # Low priority (default)
How It Works
Sorted Set Queues
Queues use Redis sorted sets instead of lists. Messages are added with ZADD using a score that encodes priority and timestamp. Workers use BZMPOP to atomically pop the highest-priority, oldest message.
Message Persistence
Messages are stored in per-message hashes before being added to the queue. When consumed, the hash persists until explicitly acknowledged. Combined with visibility timeout tracking, this ensures messages are never lost - even if a worker crashes in the instant after the message is pop'ed from the queue, the message can be recovered and requeued.
Delayed Delivery
Delayed messages are stored in a sorted set with timestamps as scores. A background thread periodically checks for messages whose timestamp has passed and moves them to the normal queue.
Stream-based Fanout
Fanout exchanges use Redis Streams. Messages are added with XADD, and each consumer uses XREAD tracking their own position. Old messages are trimmed based on stream_maxlen.
Configuration
Transport Options
Configure via Celery's broker_transport_options:
app.config_from_object({
'broker_url': 'celery_redis_plus.transport:Transport://localhost:6379/0',
'broker_transport_options': {
'global_keyprefix': 'myapp:', # Prefix for all Redis keys
'visibility_timeout': 300, # Seconds before unacked messages are reclaimed
'stream_max_length': 10000, # Max messages per stream (approximate)
},
})
SSL/TLS Connections
For secure connections to Redis, use the ssl transport option:
app.config_from_object({
'broker_url': 'celery_redis_plus.transport:Transport://localhost:6379/0',
'broker_transport_options': {
'ssl': True, # Use default SSL settings
},
})
# Or with custom SSL options:
import ssl
app.config_from_object({
'broker_url': 'celery_redis_plus.transport:Transport://localhost:6379/0',
'broker_transport_options': {
'ssl': {
'ssl_cert_reqs': ssl.CERT_REQUIRED,
'ssl_ca_certs': '/path/to/ca.crt',
'ssl_certfile': '/path/to/client.crt',
'ssl_keyfile': '/path/to/client.key',
},
},
})
API
celery_redis_plus.Transport- Custom transport with sorted set queues, priority encoding, delayed delivery, and Redis Streams fanout.celery_redis_plus.DelayedDeliveryBootstep- Worker bootstep for background message processing and recovery.
Requirements
We target recent versions for BZMPOP support and to simplify development.
- Python >= 3.13
- Celery >= 5.5.0
- Redis >= 7.0 (for BZMPOP) or Valkey (any version)
Development
# Clone the repository
git clone https://github.com/oliverhaas/celery-redis-plus.git
cd celery-redis-plus
# Create virtual environment and install with development dependencies
uv venv
uv sync --group dev
# Run tests (requires Docker for integration tests)
uv run pytest
# Run linter
uv run ruff check
# Run type checker
uv run ty check
Contributing
This package is intended as a temporary solution until these improvements are merged upstream into Celery/Kombu. Contributions are welcome! For consulting inquiries, contact ohaas@e1plus.de.
License
MIT License - see LICENSE for details.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file celery_redis_plus-0.2.0.tar.gz.
File metadata
- Download URL: celery_redis_plus-0.2.0.tar.gz
- Upload date:
- Size: 81.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
46c31f81d541ecba543357147169ec40fa020d67d5988e8d0344137c319db114
|
|
| MD5 |
ddf5e3b1b00ddca206860091fb11b2c9
|
|
| BLAKE2b-256 |
d005af08f34e852587d3f6fcc724b9debb37e1e6e8eee1141999cf0e508330ef
|
Provenance
The following attestation bundles were made for celery_redis_plus-0.2.0.tar.gz:
Publisher:
publish.yml on oliverhaas/celery-redis-plus
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
celery_redis_plus-0.2.0.tar.gz -
Subject digest:
46c31f81d541ecba543357147169ec40fa020d67d5988e8d0344137c319db114 - Sigstore transparency entry: 790554002
- Sigstore integration time:
-
Permalink:
oliverhaas/celery-redis-plus@4c1e525be9ccc214bfdbcc12e58cb9af6b940105 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/oliverhaas
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@4c1e525be9ccc214bfdbcc12e58cb9af6b940105 -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file celery_redis_plus-0.2.0-py3-none-any.whl.
File metadata
- Download URL: celery_redis_plus-0.2.0-py3-none-any.whl
- Upload date:
- Size: 22.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
77d6c0cf31c83d2ebc0c63dd6424a5e7113fe86d2d731b1de9ba25b84db9d3ab
|
|
| MD5 |
f8e7630bb45400e48e0c06f8b00ceaa9
|
|
| BLAKE2b-256 |
fcd29e036f8a14c4a7b0843a10423c299275addbac7cee1b9725d1e27db687fb
|
Provenance
The following attestation bundles were made for celery_redis_plus-0.2.0-py3-none-any.whl:
Publisher:
publish.yml on oliverhaas/celery-redis-plus
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
celery_redis_plus-0.2.0-py3-none-any.whl -
Subject digest:
77d6c0cf31c83d2ebc0c63dd6424a5e7113fe86d2d731b1de9ba25b84db9d3ab - Sigstore transparency entry: 790554004
- Sigstore integration time:
-
Permalink:
oliverhaas/celery-redis-plus@4c1e525be9ccc214bfdbcc12e58cb9af6b940105 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/oliverhaas
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@4c1e525be9ccc214bfdbcc12e58cb9af6b940105 -
Trigger Event:
workflow_dispatch
-
Statement type: