Skip to main content

Prevent duplicate Celery task execution using Redis locks.

Project description

celery-once-task

Prevent duplicate Celery task execution using Redis locks.

When the same task is called multiple times with the same arguments, celery-once-task makes sure only one instance gets queued and only one instance runs at a time. It uses Redis to coordinate locks across workers.

How It Works

The library provides two independent locks:

  • Queue lock — acquired when apply_async() / delay() is called. If a lock already exists for that task + arguments combination, the call is silently dropped (returns None). Released when the worker picks up the task.
  • Running lock — acquired when the worker starts executing the task. If another worker is already running the same task with the same arguments, the new execution is rejected. Released when the task finishes (success, failure, or revocation).

Both locks use Redis keys with a TTL, so they expire automatically if something goes wrong.

Lock keys are built from the task name and a SHA-256 hash of the arguments, so two calls with different arguments are treated as separate tasks.

Installation

pip install celery-once-task

For Django integration:

pip install celery-once-task[django]

Requirements

  • Python 3.9+
  • Celery 5.0+
  • Redis 4.0+ (Python client)
  • A running Redis server

Quick Start

Using Django? Skip to Django Integration, it handles configuration and signals for you automatically.

1. Configure

Call this once at app startup, before any tasks run. Add it to your Celery app module (e.g., celery.py or wherever you create your Celery() instance).

# celery.py
from celery_once_task import configure

configure(
    redis_url="redis://localhost:6379/3",
    queue_lock_timeout=3600,
    running_lock_timeout=3600,
)

All three settings are optional. These are the defaults.

2. Connect Signals

Add this right after your configure() call in the same file (e.g., celery.py).

# celery.py
from celery_once_task import setup_once_task_signals

setup_once_task_signals()

This hooks into Celery's task_revoked and task_internal_error signals to release locks when tasks are revoked or hit internal errors.

3. Register the Worker Boot Step

Also in celery.py, after creating your Celery app instance:

# celery.py
from celery_once_task import OnceTaskUnlockBootStep

app.steps["worker"].add(OnceTaskUnlockBootStep)

This releases running locks for any active tasks when a worker shuts down.

4. Use It

In your task modules (e.g., tasks.py):

# tasks.py
from celery import shared_task
from celery_once_task import OnceTask

@shared_task(base=OnceTask)
def my_task(taskArg1, taskArg2):
    ...

That's it. Calling my_task.delay(42) multiple times will only queue one instance. If a worker is already running my_task(42), a second worker won't start another one.

Django Integration

If you use Django, the library provides an AppConfig that handles configuration and signal setup automatically. You only need two steps: add it to INSTALLED_APPS and register the boot step.

1. Add to INSTALLED_APPS

In your Django settings file (e.g., settings.py):

# settings.py
INSTALLED_APPS = [
    ...
    "celery_once_task.django.OnceTaskAppConfig",
]

2. Set Django Settings (optional)

In the same settings file (e.g., settings.py):

# settings.py
CELERY_ONCE_REDIS_URL = "redis://localhost:6379/3"
CELERY_ONCE_QUEUE_LOCK_TIMEOUT = 3600       # seconds, default: 3600
CELERY_ONCE_RUNNING_LOCK_TIMEOUT = 3600     # seconds, default: 3600

All three are optional. The defaults are shown above.

3. Register the Boot Step

In your Celery app module (e.g., celery.py):

# celery.py
from celery_once_task import OnceTaskUnlockBootStep

app.steps["worker"].add(OnceTaskUnlockBootStep)

4. Use It

In your task modules (e.g., tasks.py):

# tasks.py
from celery import shared_task
from celery_once_task import OnceTask

@shared_task(base=OnceTask)
def my_task(taskArg1, taskArg2):
    ...

Configuration Reference

Setting Type Default Description
redis_url str redis://localhost:6379/3 Redis server URL for storing locks
queue_lock_timeout int 3600 TTL in seconds for queue locks
running_lock_timeout int 3600 TTL in seconds for running locks

When using Django, prefix these with CELERY_ONCE_ and set them in your Django settings (e.g., CELERY_ONCE_REDIS_URL).

Per-Task Options

You can enable or disable each lock per task by passing queue_lock and running_lock directly in the decorator:

from celery import shared_task
from celery_once_task import OnceTask

@shared_task(base=OnceTask, queue_lock=False, running_lock=True)
def my_task():
    ...
Option Type Default Description
queue_lock bool True Enable/disable the queue lock for this task
running_lock bool True Enable/disable the running lock for this task

Examples:

# Only prevent concurrent execution, allow multiple queued instances
@shared_task(base=OnceTask, queue_lock=False)
def allow_queue_duplicates():
    ...

# Only prevent duplicate queueing, allow concurrent execution
@shared_task(base=OnceTask, running_lock=False)
def allow_parallel_runs():
    ...

API

celery_once_task.configure(**kwargs)

Set the global configuration. Call this once at startup before any tasks run.

celery_once_task.setup_once_task_signals()

Connect Celery signals for lock cleanup on task revocation and internal errors.

celery_once_task.teardown_once_task_signals()

Disconnect the signals. Useful in tests.

celery_once_task.OnceTask

Celery Task subclass. Use as base=OnceTask in your task decorators.

celery_once_task.OnceTaskLocked

Exception raised (subclass of celery.exceptions.Reject) when a task is rejected because a running lock already exists.

celery_once_task.OnceTaskUnlockBootStep

Celery worker boot step that releases running locks on shutdown.

How Lock Keys Are Built

Lock keys follow this pattern:

once_task:{task_name}:{hash}:{lock_type}
  • task_name — the full dotted task name (e.g., myapp.tasks.my_task)
  • hash — first 16 characters of a SHA-256 hash of the serialized arguments
  • lock_type — either queue or running

Two calls with different arguments get different lock keys and run independently.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

celery_once_task-1.0.0.tar.gz (7.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

celery_once_task-1.0.0-py3-none-any.whl (8.5 kB view details)

Uploaded Python 3

File details

Details for the file celery_once_task-1.0.0.tar.gz.

File metadata

  • Download URL: celery_once_task-1.0.0.tar.gz
  • Upload date:
  • Size: 7.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for celery_once_task-1.0.0.tar.gz
Algorithm Hash digest
SHA256 8024f6bd64727397f03c0e186df3cedda072df5c11f55a81630e6103235b5788
MD5 f828d8a8ce184bca196fa855cc6978c2
BLAKE2b-256 a98f0c8316ef79d7c592071a2662c5da8a7c2277537ce48d248de18d44367bfd

See more details on using hashes here.

Provenance

The following attestation bundles were made for celery_once_task-1.0.0.tar.gz:

Publisher: publish.yml on shayanline/celery-once-task

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file celery_once_task-1.0.0-py3-none-any.whl.

File metadata

File hashes

Hashes for celery_once_task-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 dc321f6b3020fbfedcb212230969fbcd627c98e733094cd3385498cb7864d5cb
MD5 6a346cbfdf57d3d88b33d41fc054ae41
BLAKE2b-256 40622fc5a6f6b44c7995cad15dc2d22247b8d357b8dcc9218becf1105c28eda0

See more details on using hashes here.

Provenance

The following attestation bundles were made for celery_once_task-1.0.0-py3-none-any.whl:

Publisher: publish.yml on shayanline/celery-once-task

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page