Skip to main content

Enhanced Kombu Redis transport for Celery

Project description

celery-redis-plus

Enhanced Redis/Valkey transport for Celery with native delayed delivery, improved reliability, full priority support, and reliable fanout.

Overview

celery-redis-plus is a replacement Redis/Valkey transport for Celery that addresses key limitations of the standard transport:

  1. Native Delayed Delivery - Tasks with long countdown or eta are stored in Redis and delivered when due, instead of being held in worker memory.
  2. Improved Reliability - Atomic message consumption via BZMPOP with improvements regarding visibility timeout ensures zero message loss.
  3. Full Priority Support - All 256 priority levels (0-255) with RabbitMQ-compatible semantics (higher number = higher priority).
  4. Reliable Fanout - Redis Streams replace lossy PUB/SUB for durable broadcast messaging.

Installation

uv add celery-redis-plus

Quick Start

from celery import Celery
from celery_redis_plus import DelayedDeliveryBootstep

app = Celery('myapp')
app.config_from_object({
    'broker_url': 'celery_redis_plus.transport:Transport://localhost:6379/0',
})
app.steps['consumer'].add(DelayedDeliveryBootstep)

@app.task
def my_task():
    print("Hello!")

# Tasks with countdown/eta will use native Redis delayed delivery
my_task.apply_async(countdown=120)

# Priority support (RabbitMQ semantics: higher = more important)
my_task.apply_async(priority=90)   # High priority
my_task.apply_async(priority=0)    # Low priority (default)

How It Works

Sorted Set Queues

Queues use Redis sorted sets instead of lists. Messages are added with ZADD using a score that encodes priority and timestamp. Workers use BZMPOP to atomically pop the highest-priority, oldest message.

Message Persistence

Messages are stored in per-message hashes before being added to the queue. When consumed, the hash persists until explicitly acknowledged. Combined with visibility timeout tracking, this ensures messages are never lost - even if a worker crashes in the instant after the message is pop'ed from the queue, the message can be recovered and requeued.

Delayed Delivery

Delayed messages are stored in a sorted set with timestamps as scores. A background thread periodically checks for messages whose timestamp has passed and moves them to the normal queue.

Stream-based Fanout

Fanout exchanges use Redis Streams. Messages are added with XADD, and each consumer uses XREAD tracking their own position. Old messages are trimmed based on stream_maxlen.

Configuration

Transport Options

Configure via Celery's broker_transport_options:

app.config_from_object({
    'broker_url': 'celery_redis_plus.transport:Transport://localhost:6379/0',
    'broker_transport_options': {
        'global_keyprefix': 'myapp:',        # Prefix for all Redis keys
        'visibility_timeout': 300,          # Seconds before unacked messages are reclaimed
        'stream_max_length': 10000,          # Max messages per stream (approximate)
    },
})

SSL/TLS Connections

For secure connections to Redis, use the ssl transport option:

app.config_from_object({
    'broker_url': 'celery_redis_plus.transport:Transport://localhost:6379/0',
    'broker_transport_options': {
        'ssl': True,  # Use default SSL settings
    },
})

# Or with custom SSL options:
import ssl
app.config_from_object({
    'broker_url': 'celery_redis_plus.transport:Transport://localhost:6379/0',
    'broker_transport_options': {
        'ssl': {
            'ssl_cert_reqs': ssl.CERT_REQUIRED,
            'ssl_ca_certs': '/path/to/ca.crt',
            'ssl_certfile': '/path/to/client.crt',
            'ssl_keyfile': '/path/to/client.key',
        },
    },
})

API

  • celery_redis_plus.Transport - Custom transport with sorted set queues, priority encoding, delayed delivery, and Redis Streams fanout.
  • celery_redis_plus.DelayedDeliveryBootstep - Worker bootstep for background message processing and recovery.

Requirements

We target recent versions for BZMPOP support and to simplify development.

  • Python >= 3.13
  • Celery >= 5.5.0
  • Redis >= 7.0 (for BZMPOP) or Valkey (any version)

Development

# Clone the repository
git clone https://github.com/oliverhaas/celery-redis-plus.git
cd celery-redis-plus

# Create virtual environment and install with development dependencies
uv venv
uv sync --group dev

# Run tests (requires Docker for integration tests)
uv run pytest

# Run linter
uv run ruff check

# Run type checker
uv run ty check

Contributing

This package is intended as a temporary solution until these improvements are merged upstream into Celery/Kombu. Contributions are welcome! For consulting inquiries, contact ohaas@e1plus.de.

License

MIT License - see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

celery_redis_plus-0.2.0.tar.gz (81.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

celery_redis_plus-0.2.0-py3-none-any.whl (22.7 kB view details)

Uploaded Python 3

File details

Details for the file celery_redis_plus-0.2.0.tar.gz.

File metadata

  • Download URL: celery_redis_plus-0.2.0.tar.gz
  • Upload date:
  • Size: 81.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for celery_redis_plus-0.2.0.tar.gz
Algorithm Hash digest
SHA256 46c31f81d541ecba543357147169ec40fa020d67d5988e8d0344137c319db114
MD5 ddf5e3b1b00ddca206860091fb11b2c9
BLAKE2b-256 d005af08f34e852587d3f6fcc724b9debb37e1e6e8eee1141999cf0e508330ef

See more details on using hashes here.

Provenance

The following attestation bundles were made for celery_redis_plus-0.2.0.tar.gz:

Publisher: publish.yml on oliverhaas/celery-redis-plus

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file celery_redis_plus-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for celery_redis_plus-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 77d6c0cf31c83d2ebc0c63dd6424a5e7113fe86d2d731b1de9ba25b84db9d3ab
MD5 f8e7630bb45400e48e0c06f8b00ceaa9
BLAKE2b-256 fcd29e036f8a14c4a7b0843a10423c299275addbac7cee1b9725d1e27db687fb

See more details on using hashes here.

Provenance

The following attestation bundles were made for celery_redis_plus-0.2.0-py3-none-any.whl:

Publisher: publish.yml on oliverhaas/celery-redis-plus

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page