Skip to main content

Enhanced Kombu Redis transport for Celery

Project description

celery-redis-plus

Enhanced Redis/Valkey transport for Celery with native delayed delivery, improved reliability, full priority support, and reliable fanout.

Overview

celery-redis-plus is a replacement Redis/Valkey transport for Celery that addresses key limitations of the standard transport:

  1. Native Delayed Delivery - Tasks with long countdown or eta are stored in Redis and delivered when due, instead of being held in worker memory.
  2. Improved Reliability - Atomic message consumption via BZMPOP with improvements regarding visibility timeout ensures zero message loss.
  3. Full Priority Support - All 256 priority levels (0-255) with RabbitMQ-compatible semantics (higher number = higher priority).
  4. Reliable Fanout - Redis Streams replace lossy PUB/SUB for durable broadcast messaging.

Installation

uv add celery-redis-plus

Quick Start

from celery import Celery
from celery_redis_plus import DelayedDeliveryBootstep

app = Celery('myapp')
app.config_from_object({
    'broker_url': 'celery_redis_plus.transport:Transport://localhost:6379/0',
})
app.steps['consumer'].add(DelayedDeliveryBootstep)

@app.task
def my_task():
    print("Hello!")

# Tasks with countdown/eta will use native Redis delayed delivery
my_task.apply_async(countdown=120)

# Priority support (RabbitMQ semantics: higher = more important)
my_task.apply_async(priority=90)   # High priority
my_task.apply_async(priority=0)    # Low priority (default)

How It Works

Sorted Set Queues

Queues use Redis sorted sets instead of lists. Messages are added with ZADD using a score that encodes priority and timestamp. Workers use BZMPOP to atomically pop the highest-priority, oldest message.

Message Persistence

Messages are stored in per-message hashes before being added to the queue. When consumed, the hash persists until explicitly acknowledged. Combined with visibility timeout tracking, this ensures messages are never lost - even if a worker crashes in the instant after the message is pop'ed from the queue, the message can be recovered and requeued.

Delayed Delivery

Delayed messages are stored in a sorted set with timestamps as scores. A background thread periodically checks for messages whose timestamp has passed and moves them to the normal queue.

Stream-based Fanout

Fanout exchanges use Redis Streams. Messages are added with XADD, and each consumer uses XREAD tracking their own position. Old messages are trimmed based on stream_maxlen.

Configuration

Transport Options

Configure via Celery's broker_transport_options:

app.config_from_object({
    'broker_url': 'celery_redis_plus.transport:Transport://localhost:6379/0',
    'broker_transport_options': {
        'global_keyprefix': 'myapp:',        # Prefix for all Redis keys
        'visibility_timeout': 300,          # Seconds before unacked messages are reclaimed
        'stream_max_length': 10000,          # Max messages per stream (approximate)
    },
})

SSL/TLS Connections

For secure connections to Redis, use the ssl transport option:

app.config_from_object({
    'broker_url': 'celery_redis_plus.transport:Transport://localhost:6379/0',
    'broker_transport_options': {
        'ssl': True,  # Use default SSL settings
    },
})

# Or with custom SSL options:
import ssl
app.config_from_object({
    'broker_url': 'celery_redis_plus.transport:Transport://localhost:6379/0',
    'broker_transport_options': {
        'ssl': {
            'ssl_cert_reqs': ssl.CERT_REQUIRED,
            'ssl_ca_certs': '/path/to/ca.crt',
            'ssl_certfile': '/path/to/client.crt',
            'ssl_keyfile': '/path/to/client.key',
        },
    },
})

API

  • celery_redis_plus.Transport - Custom transport with sorted set queues, priority encoding, delayed delivery, and Redis Streams fanout.
  • celery_redis_plus.DelayedDeliveryBootstep - Worker bootstep for background message processing and recovery.

Requirements

We target recent versions for BZMPOP support and to simplify development.

  • Python >= 3.13
  • Celery >= 5.5.0
  • Redis >= 7.0 (for BZMPOP) or Valkey (any version)

Development

# Clone the repository
git clone https://github.com/oliverhaas/celery-redis-plus.git
cd celery-redis-plus

# Create virtual environment and install with development dependencies
uv venv
uv sync --group dev

# Run tests (requires Docker for integration tests)
uv run pytest

# Run linter
uv run ruff check

# Run type checker
uv run ty check

Contributing

This package is intended as a temporary solution until these improvements are merged upstream into Celery/Kombu. Contributions are welcome! For consulting inquiries, contact ohaas@e1plus.de.

License

MIT License - see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

celery_redis_plus-0.2.2.tar.gz (81.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

celery_redis_plus-0.2.2-py3-none-any.whl (23.1 kB view details)

Uploaded Python 3

File details

Details for the file celery_redis_plus-0.2.2.tar.gz.

File metadata

  • Download URL: celery_redis_plus-0.2.2.tar.gz
  • Upload date:
  • Size: 81.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for celery_redis_plus-0.2.2.tar.gz
Algorithm Hash digest
SHA256 e1a689550f6dc5fac4718f8992914d7d018585b34ee809bd90682a9e400f8b80
MD5 b84e7defd11098711728738a6350552f
BLAKE2b-256 01de10329ce4fd7b325e573e4b5f240fdf9dba0f684b329988c7c4f5cc02ef72

See more details on using hashes here.

Provenance

The following attestation bundles were made for celery_redis_plus-0.2.2.tar.gz:

Publisher: publish.yml on oliverhaas/celery-redis-plus

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file celery_redis_plus-0.2.2-py3-none-any.whl.

File metadata

File hashes

Hashes for celery_redis_plus-0.2.2-py3-none-any.whl
Algorithm Hash digest
SHA256 39b12abf5d6360b3e630c04fcc06d0fa7fc112fb20ff704e3d701779c6c7b5d7
MD5 6b983f1103054cba1bb0fc3b9a5aef32
BLAKE2b-256 29b7986a1c6ce748800858b922af984851a64d88c32fbc7835e34400087abe6e

See more details on using hashes here.

Provenance

The following attestation bundles were made for celery_redis_plus-0.2.2-py3-none-any.whl:

Publisher: publish.yml on oliverhaas/celery-redis-plus

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page