Django process orchestration with paginated logs and async workers
Project description
djprocess
Django process orchestration with paginated logs and Celery workers.
Track long-running jobs in the database, stream structured log lines, and dispatch work to Celery workers.
Features
- Abstract
BaseProcessandBaseProcessLogmodels with status lifecycle tracking - Monotonic, paginated log lines per process
- Per-process
timeout(default 20 minutes) enforced by a supervisor-queue watchdog - Escalating kill: SIGTERM retries, then SIGKILL
- Pluggable worker backend (Celery included)
- Eager-mode Celery for fast tests and local development
Installation
pip install -e ".[dev]"
Quick start
1. Define models
from datetime import timedelta
from django.db import models
from djprocess.models import BaseProcess, BaseProcessLog
class ExportProcess(BaseProcess):
path = models.CharField(max_length=255)
def run(self):
self.log(f"Exporting {self.path}")
# ... do work ...
self.log("Export complete")
class ExportProcessLog(BaseProcessLog):
process = models.ForeignKey(
ExportProcess,
on_delete=models.CASCADE,
related_name="logs",
)
2. Configure Django and Celery
INSTALLED_APPS = [
# ...
"djprocess",
"djprocess_celery",
]
CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = "redis://localhost:6379/0"
DJPROCESS_PROCESS_QUEUE = "processes"
DJPROCESS_SUPERVISOR_QUEUE = "supervisor"
DJPROCESS_KILL_TERM_RETRIES = 3
DJPROCESS_KILL_TERM_WAIT_SECONDS = 2
Create myproject/celery.py:
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")
app = Celery("myproject")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
3. Run a process
process = ExportProcess.objects.create(path="/tmp/data.csv")
process.timeout = timedelta(minutes=45) # optional, default is 20 minutes
process.start() # dispatches process + per-job watchdog
page = process.paginate_logs(page=1, per_page=50)
for entry in page:
print(entry.sequence, entry.level, entry.message)
Each start() schedules a watchdog on the supervisor queue with countdown=process.timeout_seconds(). Different processes can have different timeouts on the same worker pool.
4. Start workers
Use the prefork pool so Celery can terminate individual child processes:
celery -A myproject worker -Q processes -c 4 -P prefork -l info
celery -A myproject worker -Q supervisor -c 2 -P prefork -l info
Do not use worker-level --time-limit; timeouts are per process, not per worker.
Process lifecycle
| Status | Meaning |
|---|---|
pending |
Created, not yet started |
running |
Worker is executing run() |
succeeded |
Finished without error |
failed |
Raised an exception |
cancelled |
Revoked before or during run |
timed_out |
Exceeded timeout and was killed |
When the watchdog fires on a still-running process, it sends SIGTERM up to DJPROCESS_KILL_TERM_RETRIES times (waiting DJPROCESS_KILL_TERM_WAIT_SECONDS between checks), then SIGKILL. The supervisor task retries immediately if the control command itself fails.
Call process.start() to dispatch, process.cancel() to kill the process task and revoke the watchdog, and process.execute() directly when running synchronously in tests.
Worker backend
The default backend is djprocess_celery.backend.CeleryWorkerBackend. Override with:
DJPROCESS_WORKER_BACKEND = "myapp.backends.CustomWorkerBackend"
Your backend must implement:
dispatch(process) -> (process_task_id, watchdog_task_id)kill(task_id)— escalating SIGTERM → SIGKILLrevoke_scheduled(task_id)— cancel a not-yet-run watchdog
Development
python manage.py makemigrations
python manage.py migrate
pytest
Example app
This repository includes djprocess_example with ExampleProcess and test settings configured for in-memory Celery (CELERY_TASK_ALWAYS_EAGER = True).
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file djprocess-0.0.1.tar.gz.
File metadata
- Download URL: djprocess-0.0.1.tar.gz
- Upload date:
- Size: 24.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4ce32683c845cbbccce139a310bbad6154d8db2171e36ec16c746871b14fb837
|
|
| MD5 |
f90428f912b242a47d36d6364514048a
|
|
| BLAKE2b-256 |
3d49abe2129899477e89d95187966ecd7cb5e9c48be0b09a05351c40b9388b69
|
File details
Details for the file djprocess-0.0.1-py3-none-any.whl.
File metadata
- Download URL: djprocess-0.0.1-py3-none-any.whl
- Upload date:
- Size: 27.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
268f4488ac08ea80ebe41e0ac8a85d44d8e84eb687700a954c0bfa763f164c17
|
|
| MD5 |
1567a6e326569315658df2bc6dc5185a
|
|
| BLAKE2b-256 |
52be6aec82597429b2f9b02f11096e35805f7f3c47d4f3d3f93eeb9ab12d4cbb
|