A simple, framework-agnostic Celery health check mechanism using a heartbeat file.
Project description
Celery Pulse
A simple, framework-agnostic heartbeat mechanism for Celery.
celery-pulse helps you monitor the true health of your Celery workers and beat scheduler. A common problem with Celery monitoring is that a worker process can be running but "stuck" or frozen, unable to process tasks. Standard health checks that just check if the process exists will report a false positive.
This library solves the problem by running a lightweight background thread (or greenlet) inside each Celery worker process and the beat process. This thread's only job is to periodically "touch" a healthcheck file. If the file's modification timestamp is not recent, it means the process is unresponsive, allowing you to take action.
Key Features
- Framework-Agnostic: Works with Django, Flask, FastAPI, or any other Python application.
- Smart Pool Detection: Automatically uses the correct concurrency primitive:
gevent: A non-blocking greenlet via a Celery bootstep.prefork/solo: A standard background thread via Celery signals.
- Beat Support: Monitors the
celery beatscheduler process in addition to workers. - Lightweight & Safe: The heartbeat runs in a separate thread/greenlet and won't block your tasks.
- Simple Integration: Requires only a one-line function call to set up.
- Easy to Monitor: Compatible with any monitoring system that can check a file's timestamp (Docker
HEALTHCHECK, KuberneteslivenessProbe, etc.).
Installation
pip install celery-pulse
If you are using the gevent worker pool, you can install the required dependency as an extra:
pip install celery-pulse[gevent]
Quick Start
The only step is to import and call init_celery_pulse() with your Celery app instance after it has been configured.
Example: Standalone Celery (Flask, FastAPI, etc.)
In your Celery app file (e.g., tasks.py):
# tasks.py
from celery import Celery
from celery_pulse import init_celery_pulse
app = Celery('my_tasks', broker='redis://localhost:6379/0')
# Configure celery-pulse settings directly in Celery's config
app.conf.update(
pulse_heartbeat_interval=30, # Heartbeat every 30 seconds
pulse_healthcheck_file="/var/run/celery_health.txt" # Path to the heartbeat file
)
# Initialize the heartbeat mechanism
init_celery_pulse(app)
# --- Define your tasks as usual ---
@app.task
def add(x, y):
return x + y
Example: Django Integration
In your project's celery.py file:
# myproject/celery.py
import os
from celery import Celery
from celery_pulse import init_celery_pulse
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
# Use a namespace for all Celery-related settings in django's settings.py
app.config_from_object('django.conf:settings', namespace='CELERY')
# Initialize the heartbeat mechanism
# It will read its configuration from your Django settings
init_celery_pulse(app)
app.autodiscover_tasks()
Then, configure the settings in your myproject/settings.py:
# myproject/settings.py
# ... your other Django settings
# Celery settings
CELERY_BROKER_URL = "redis://localhost:6379/0"
# ... other celery settings
# Celery Pulse settings (must use the CELERY_ namespace)
CELERY_PULSE_HEARTBEAT_INTERVAL = 30
CELERY_PULSE_HEALTHCHECK_FILE = "/var/run/myproject/celery_health.txt"
Configuration
celery-pulse is configured through your Celery application's configuration.
| Parameter | app.conf key |
Django settings.py key |
Description | Default |
|---|---|---|---|---|
| Interval | pulse_heartbeat_interval |
CELERY_PULSE_HEARTBEAT_INTERVAL |
The interval in seconds at which the heartbeat file is touched. | 60 |
| File Path | pulse_healthcheck_file |
CELERY_PULSE_HEALTHCHECK_FILE |
The absolute path to the healthcheck file that will be created and updated. | "/tmp/celery_healthcheck" |
How to Use the Healthcheck File
Once celery-pulse is running, it will update the specified file every pulse_heartbeat_interval seconds. Your monitoring system should check if the file's last modification time is recent.
A good rule of thumb is to fail the health check if the file hasn't been updated in 2 * pulse_heartbeat_interval seconds. This provides a safe margin for delays.
Example: Docker HEALTHCHECK
In your Dockerfile, you can add a health check that verifies the file was modified within the last 65 seconds (assuming a 30-second interval).
# Assuming pulse_heartbeat_interval = 30
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
CMD [ -f /var/run/celery_health.txt ] && [ $(($(date +%s) - $(stat -c %Y /var/run/celery_health.txt))) -lt 65 ] || exit 1
[ -f ... ]: Checks if the file exists.$(($(date +%s) - $(stat -c %Y ...))): Calculates the age of the file in seconds.-lt 65: Checks if the age is less than 65 seconds.
Example: Kubernetes livenessProbe
In your Kubernetes deployment YAML, you can add a livenessProbe to automatically restart a pod if its worker becomes unresponsive.
apiVersion: apps/v1
kind: Deployment
metadata:
name: celery-worker
spec:
# ...
template:
# ...
spec:
containers:
- name: worker
image: my-celery-app:latest
command: ["celery", "-A", "myproject", "worker", "-l", "info"]
livenessProbe:
exec:
command:
- /bin/sh
- -c
- "[ -f /var/run/celery_health.txt ] && [ $(($(date +%s) - $(stat -c %Y /var/run/celery_health.txt))) -lt 65 ]"
initialDelaySeconds: 60 # Give time for the worker to start
periodSeconds: 30 # Check every 30 seconds
failureThreshold: 2 # Fail after 2 consecutive failures
How It Works
celery-pulse intelligently adapts to the Celery execution environment:
- Gevent Pool: If
geventis detected as the worker pool,celery-pulseregisters a Celerybootstep. This starts a dedicated, non-blocking greenlet that runs the heartbeat loop. This is the most efficient method forgevent. - Prefork/Solo Pool & Beat: For the default
preforkpool, thesolopool, and thecelery beatscheduler, the library connects to Celery's startup signals (worker_process_init,beat_init). When a process starts, it launches a standard Python thread to manage the heartbeat. The thread is gracefully stopped on shutdown. - Pool Detection: The library uses the
on_after_configuresignal to inspect the configuredworker_pooland sets an environment variable. This variable is inherited by forked worker processes, allowing them to know which heartbeat strategy to use.
Contributing
Contributions are welcome! If you find a bug or have an idea for an improvement, please open an issue or submit a pull request on our GitHub repository.
License
This project is licensed under the MIT License. See the LICENSE file for details.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file celery_pulse-0.0.1.tar.gz.
File metadata
- Download URL: celery_pulse-0.0.1.tar.gz
- Upload date:
- Size: 7.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
86132cd8395d519baab892e54d24e838b4d8524495287e627e52ed05078e9219
|
|
| MD5 |
abfee5dbcc535e3594e14026cfc3e8cb
|
|
| BLAKE2b-256 |
fec8db6b36d578e33ca5e91cb18db5882598f1c172fd6c77c4d405eac9b7a67f
|
File details
Details for the file celery_pulse-0.0.1-py3-none-any.whl.
File metadata
- Download URL: celery_pulse-0.0.1-py3-none-any.whl
- Upload date:
- Size: 7.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a801c99b6d311d9b936a4b46445014043251e9fbced5cac8fa7749ac268ec6f2
|
|
| MD5 |
aa295eff2a5225fcf91fe6e7d784bf1c
|
|
| BLAKE2b-256 |
ebfdaddb68a917c2c6930c06050df334f579eac8d0b8219fd9c65acf7f7840cc
|