Prevent duplicate Celery task execution using Redis locks.
Project description
celery-once-task
Prevent duplicate Celery task execution using Redis locks.
When the same task is called multiple times with the same arguments, celery-once-task makes sure only one instance gets queued and only one instance runs at a time. It uses Redis to coordinate locks across workers.
How It Works
The library provides two independent locks:
- Queue lock — acquired when
apply_async()/delay()is called. If a lock already exists for that task + arguments combination, the call is silently dropped (returnsNone). Released when the worker picks up the task. - Running lock — acquired when the worker starts executing the task. If another worker is already running the same task with the same arguments, the new execution is rejected. Released when the task finishes (success, failure, or revocation).
Both locks use Redis keys with a TTL, so they expire automatically if something goes wrong.
Lock keys are built from the task name and a SHA-256 hash of the arguments, so two calls with different arguments are treated as separate tasks.
Installation
pip install celery-once-task
For Django integration:
pip install celery-once-task[django]
Requirements
- Python 3.9+
- Celery 5.0+
- Redis 4.0+ (Python client)
- A running Redis server
Quick Start
Using Django? Skip to Django Integration, it handles configuration and signals for you automatically.
1. Configure
Call this once at app startup, before any tasks run. Add it to your Celery app module (e.g., celery.py or wherever you create your Celery() instance).
# celery.py
from celery_once_task import configure
configure(
redis_url="redis://localhost:6379/3",
queue_lock_timeout=3600,
running_lock_timeout=3600,
)
All three settings are optional. These are the defaults.
2. Connect Signals
Add this right after your configure() call in the same file (e.g., celery.py).
# celery.py
from celery_once_task import setup_once_task_signals
setup_once_task_signals()
This hooks into Celery's task_revoked and task_internal_error signals to release locks when tasks are revoked or hit internal errors.
3. Register the Worker Boot Step
Also in celery.py, after creating your Celery app instance:
# celery.py
from celery_once_task import OnceTaskUnlockBootStep
app.steps["worker"].add(OnceTaskUnlockBootStep)
This releases running locks for any active tasks when a worker shuts down.
4. Use It
In your task modules (e.g., tasks.py):
# tasks.py
from celery import shared_task
from celery_once_task import OnceTask
@shared_task(base=OnceTask)
def my_task(taskArg1, taskArg2):
...
That's it. Calling my_task.delay(42) multiple times will only queue one instance. If a worker is already running my_task(42), a second worker won't start another one.
Django Integration
If you use Django, the library provides an AppConfig that handles configuration and signal setup automatically. You only need two steps: add it to INSTALLED_APPS and register the boot step.
1. Add to INSTALLED_APPS
In your Django settings file (e.g., settings.py):
# settings.py
INSTALLED_APPS = [
...
"celery_once_task.django.OnceTaskAppConfig",
]
2. Set Django Settings (optional)
In the same settings file (e.g., settings.py):
# settings.py
CELERY_ONCE_REDIS_URL = "redis://localhost:6379/3"
CELERY_ONCE_QUEUE_LOCK_TIMEOUT = 3600 # seconds, default: 3600
CELERY_ONCE_RUNNING_LOCK_TIMEOUT = 3600 # seconds, default: 3600
All three are optional. The defaults are shown above.
3. Register the Boot Step
In your Celery app module (e.g., celery.py):
# celery.py
from celery_once_task import OnceTaskUnlockBootStep
app.steps["worker"].add(OnceTaskUnlockBootStep)
4. Use It
In your task modules (e.g., tasks.py):
# tasks.py
from celery import shared_task
from celery_once_task import OnceTask
@shared_task(base=OnceTask)
def my_task(taskArg1, taskArg2):
...
Configuration Reference
| Setting | Type | Default | Description |
|---|---|---|---|
redis_url |
str |
redis://localhost:6379/3 |
Redis server URL for storing locks |
queue_lock_timeout |
int |
3600 |
TTL in seconds for queue locks |
running_lock_timeout |
int |
3600 |
TTL in seconds for running locks |
When using Django, prefix these with CELERY_ONCE_ and set them in your Django settings (e.g., CELERY_ONCE_REDIS_URL).
Per-Task Options
You can enable or disable each lock per task by passing queue_lock and running_lock directly in the decorator:
from celery import shared_task
from celery_once_task import OnceTask
@shared_task(base=OnceTask, queue_lock=False, running_lock=True)
def my_task():
...
| Option | Type | Default | Description |
|---|---|---|---|
queue_lock |
bool |
True |
Enable/disable the queue lock for this task |
running_lock |
bool |
True |
Enable/disable the running lock for this task |
Examples:
# Only prevent concurrent execution, allow multiple queued instances
@shared_task(base=OnceTask, queue_lock=False)
def allow_queue_duplicates():
...
# Only prevent duplicate queueing, allow concurrent execution
@shared_task(base=OnceTask, running_lock=False)
def allow_parallel_runs():
...
API
celery_once_task.configure(**kwargs)
Set the global configuration. Call this once at startup before any tasks run.
celery_once_task.setup_once_task_signals()
Connect Celery signals for lock cleanup on task revocation and internal errors.
celery_once_task.teardown_once_task_signals()
Disconnect the signals. Useful in tests.
celery_once_task.OnceTask
Celery Task subclass. Use as base=OnceTask in your task decorators.
celery_once_task.OnceTaskLocked
Exception raised (subclass of celery.exceptions.Reject) when a task is rejected because a running lock already exists.
celery_once_task.OnceTaskUnlockBootStep
Celery worker boot step that releases running locks on shutdown.
How Lock Keys Are Built
Lock keys follow this pattern:
once_task:{task_name}:{hash}:{lock_type}
task_name— the full dotted task name (e.g.,myapp.tasks.my_task)hash— first 16 characters of a SHA-256 hash of the serialized argumentslock_type— eitherqueueorrunning
Two calls with different arguments get different lock keys and run independently.
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file celery_once_task-1.0.0.tar.gz.
File metadata
- Download URL: celery_once_task-1.0.0.tar.gz
- Upload date:
- Size: 7.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8024f6bd64727397f03c0e186df3cedda072df5c11f55a81630e6103235b5788
|
|
| MD5 |
f828d8a8ce184bca196fa855cc6978c2
|
|
| BLAKE2b-256 |
a98f0c8316ef79d7c592071a2662c5da8a7c2277537ce48d248de18d44367bfd
|
Provenance
The following attestation bundles were made for celery_once_task-1.0.0.tar.gz:
Publisher:
publish.yml on shayanline/celery-once-task
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
celery_once_task-1.0.0.tar.gz -
Subject digest:
8024f6bd64727397f03c0e186df3cedda072df5c11f55a81630e6103235b5788 - Sigstore transparency entry: 957717083
- Sigstore integration time:
-
Permalink:
shayanline/celery-once-task@3995df888ce5a8892171fe7f45e7e73fc4d16c5a -
Branch / Tag:
refs/tags/1.0.0 - Owner: https://github.com/shayanline
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@3995df888ce5a8892171fe7f45e7e73fc4d16c5a -
Trigger Event:
release
-
Statement type:
File details
Details for the file celery_once_task-1.0.0-py3-none-any.whl.
File metadata
- Download URL: celery_once_task-1.0.0-py3-none-any.whl
- Upload date:
- Size: 8.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dc321f6b3020fbfedcb212230969fbcd627c98e733094cd3385498cb7864d5cb
|
|
| MD5 |
6a346cbfdf57d3d88b33d41fc054ae41
|
|
| BLAKE2b-256 |
40622fc5a6f6b44c7995cad15dc2d22247b8d357b8dcc9218becf1105c28eda0
|
Provenance
The following attestation bundles were made for celery_once_task-1.0.0-py3-none-any.whl:
Publisher:
publish.yml on shayanline/celery-once-task
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
celery_once_task-1.0.0-py3-none-any.whl -
Subject digest:
dc321f6b3020fbfedcb212230969fbcd627c98e733094cd3385498cb7864d5cb - Sigstore transparency entry: 957717135
- Sigstore integration time:
-
Permalink:
shayanline/celery-once-task@3995df888ce5a8892171fe7f45e7e73fc4d16c5a -
Branch / Tag:
refs/tags/1.0.0 - Owner: https://github.com/shayanline
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@3995df888ce5a8892171fe7f45e7e73fc4d16c5a -
Trigger Event:
release
-
Statement type: