Skip to main content

Enhanced Kombu Redis transport for Celery

Project description

celery-redis-plus

Enhanced Redis/Valkey transport for Celery with native delayed delivery, improved reliability, full priority support, and reliable fanout.

Overview

celery-redis-plus is a replacement Redis/Valkey transport for Celery that addresses key limitations of the standard transport:

  1. Native Delayed Delivery - Tasks with long countdown or eta are stored in Redis and delivered when due, instead of being held in worker memory.
  2. Improved Reliability - Atomic message consumption via BZMPOP with improvements regarding visibility timeout ensures zero message loss.
  3. Full Priority Support - All 256 priority levels (0-255) with RabbitMQ-compatible semantics (higher number = higher priority).
  4. Reliable Fanout - Redis Streams replace lossy PUB/SUB for durable broadcast messaging.

Installation

uv add celery-redis-plus

Quick Start

from celery import Celery
from celery_redis_plus import DelayedDeliveryBootstep

app = Celery('myapp')
app.config_from_object({
    'broker_url': 'celery_redis_plus.transport:Transport://localhost:6379/0',
})
app.steps['consumer'].add(DelayedDeliveryBootstep)

@app.task
def my_task():
    print("Hello!")

# Tasks with countdown/eta will use native Redis delayed delivery
my_task.apply_async(countdown=120)

# Priority support (RabbitMQ semantics: higher = more important)
my_task.apply_async(priority=90)   # High priority
my_task.apply_async(priority=0)    # Low priority (default)

How It Works

Sorted Set Queues

Queues use Redis sorted sets instead of lists. Messages are added with ZADD using a score that encodes priority and timestamp. Workers use BZMPOP to atomically pop the highest-priority, oldest message.

Message Persistence

Messages are stored in per-message hashes before being added to the queue. When consumed, the hash persists until explicitly acknowledged. Combined with visibility timeout tracking, this ensures messages are never lost - even if a worker crashes in the instant after the message is pop'ed from the queue, the message can be recovered and requeued.

Delayed Delivery

Delayed messages are stored in a sorted set with timestamps as scores. A background thread periodically checks for messages whose timestamp has passed and moves them to the normal queue.

Stream-based Fanout

Fanout exchanges use Redis Streams. Messages are added with XADD, and each consumer uses XREAD tracking their own position. Old messages are trimmed based on stream_maxlen.

Configuration

Transport Options

Configure via Celery's broker_transport_options:

app.config_from_object({
    'broker_url': 'celery_redis_plus.transport:Transport://localhost:6379/0',
    'broker_transport_options': {
        'global_keyprefix': 'myapp:',        # Prefix for all Redis keys
        'visibility_timeout': 300,          # Seconds before unacked messages are reclaimed
        'stream_max_length': 10000,          # Max messages per stream (approximate)
    },
})

SSL/TLS Connections

For secure connections to Redis, use the ssl transport option:

app.config_from_object({
    'broker_url': 'celery_redis_plus.transport:Transport://localhost:6379/0',
    'broker_transport_options': {
        'ssl': True,  # Use default SSL settings
    },
})

# Or with custom SSL options:
import ssl
app.config_from_object({
    'broker_url': 'celery_redis_plus.transport:Transport://localhost:6379/0',
    'broker_transport_options': {
        'ssl': {
            'ssl_cert_reqs': ssl.CERT_REQUIRED,
            'ssl_ca_certs': '/path/to/ca.crt',
            'ssl_certfile': '/path/to/client.crt',
            'ssl_keyfile': '/path/to/client.key',
        },
    },
})

API

  • celery_redis_plus.Transport - Custom transport with sorted set queues, priority encoding, delayed delivery, and Redis Streams fanout.
  • celery_redis_plus.DelayedDeliveryBootstep - Worker bootstep for background message processing and recovery.

Requirements

We target recent versions for BZMPOP support and to simplify development.

  • Python >= 3.13
  • Celery >= 5.5.0
  • Redis >= 7.0 (for BZMPOP) or Valkey (any version)

Development

# Clone the repository
git clone https://github.com/oliverhaas/celery-redis-plus.git
cd celery-redis-plus

# Create virtual environment and install with development dependencies
uv venv
uv sync --group dev

# Run tests (requires Docker for integration tests)
uv run pytest

# Run linter
uv run ruff check

# Run type checker
uv run ty check

Contributing

This package is intended as a temporary solution until these improvements are merged upstream into Celery/Kombu. Contributions are welcome! For consulting inquiries, contact ohaas@e1plus.de.

License

MIT License - see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

celery_redis_plus-0.2.1.tar.gz (81.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

celery_redis_plus-0.2.1-py3-none-any.whl (22.7 kB view details)

Uploaded Python 3

File details

Details for the file celery_redis_plus-0.2.1.tar.gz.

File metadata

  • Download URL: celery_redis_plus-0.2.1.tar.gz
  • Upload date:
  • Size: 81.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for celery_redis_plus-0.2.1.tar.gz
Algorithm Hash digest
SHA256 36d6cb892808fa8279e0d52cba0aeaa047a3ef7c05dcf680b6ad6a2cc767ca53
MD5 f53f52ca5479d4519566594fca98cf28
BLAKE2b-256 9a86f9722f0396e0421dac298c06cb7d5eb2a8382eb3c6fe9d02a08c5a89923f

See more details on using hashes here.

Provenance

The following attestation bundles were made for celery_redis_plus-0.2.1.tar.gz:

Publisher: publish.yml on oliverhaas/celery-redis-plus

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file celery_redis_plus-0.2.1-py3-none-any.whl.

File metadata

File hashes

Hashes for celery_redis_plus-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 ada457e0b668345dbe5a4626a8662333d49b2fe4e86ebb61ff6f4d179dce5366
MD5 2328515fde3bfcbe6a6f5eca2149256c
BLAKE2b-256 87507884579b2b18e847c65f673c8335225af31ec70cc4babbdc7bea57130a41

See more details on using hashes here.

Provenance

The following attestation bundles were made for celery_redis_plus-0.2.1-py3-none-any.whl:

Publisher: publish.yml on oliverhaas/celery-redis-plus

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page