Skip to main content

Django process orchestration with paginated logs and async workers

Project description

djprocess

Django process orchestration with paginated logs and Celery workers.

Track long-running jobs in the database, stream structured log lines, and dispatch work to Celery workers.

Features

  • Abstract BaseProcess and BaseProcessLog models with status lifecycle tracking
  • Monotonic, paginated log lines per process
  • Per-process timeout (default 20 minutes) enforced by a supervisor-queue watchdog
  • Escalating kill: SIGTERM retries, then SIGKILL
  • Pluggable worker backend (Celery included)
  • Eager-mode Celery for fast tests and local development

Installation

pip install -e ".[dev]"

Quick start

1. Define models

from datetime import timedelta

from django.db import models
from djprocess.models import BaseProcess, BaseProcessLog


class ExportProcess(BaseProcess):
    path = models.CharField(max_length=255)

    def run(self):
        self.log(f"Exporting {self.path}")
        # ... do work ...
        self.log("Export complete")


class ExportProcessLog(BaseProcessLog):
    process = models.ForeignKey(
        ExportProcess,
        on_delete=models.CASCADE,
        related_name="logs",
    )

2. Configure Django and Celery

INSTALLED_APPS = [
    # ...
    "djprocess",
    "djprocess_celery",
]

CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = "redis://localhost:6379/0"

DJPROCESS_PROCESS_QUEUE = "processes"
DJPROCESS_SUPERVISOR_QUEUE = "supervisor"
DJPROCESS_KILL_TERM_RETRIES = 3
DJPROCESS_KILL_TERM_WAIT_SECONDS = 2

Create myproject/celery.py:

import os
from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")

app = Celery("myproject")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

3. Run a process

process = ExportProcess.objects.create(path="/tmp/data.csv")
process.timeout = timedelta(minutes=45)  # optional, default is 20 minutes
process.start()  # dispatches process + per-job watchdog

page = process.paginate_logs(page=1, per_page=50)
for entry in page:
    print(entry.sequence, entry.level, entry.message)

Each start() schedules a watchdog on the supervisor queue with countdown=process.timeout_seconds(). Different processes can have different timeouts on the same worker pool.

4. Start workers

Use the prefork pool so Celery can terminate individual child processes:

celery -A myproject worker -Q processes -c 4 -P prefork -l info
celery -A myproject worker -Q supervisor -c 2 -P prefork -l info

Do not use worker-level --time-limit; timeouts are per process, not per worker.

Process lifecycle

Status Meaning
pending Created, not yet started
running Worker is executing run()
succeeded Finished without error
failed Raised an exception
cancelled Revoked before or during run
timed_out Exceeded timeout and was killed

When the watchdog fires on a still-running process, it sends SIGTERM up to DJPROCESS_KILL_TERM_RETRIES times (waiting DJPROCESS_KILL_TERM_WAIT_SECONDS between checks), then SIGKILL. The supervisor task retries immediately if the control command itself fails.

Call process.start() to dispatch, process.cancel() to kill the process task and revoke the watchdog, and process.execute() directly when running synchronously in tests.

Worker backend

The default backend is djprocess_celery.backend.CeleryWorkerBackend. Override with:

DJPROCESS_WORKER_BACKEND = "myapp.backends.CustomWorkerBackend"

Your backend must implement:

  • dispatch(process) -> (process_task_id, watchdog_task_id)
  • kill(task_id) — escalating SIGTERM → SIGKILL
  • revoke_scheduled(task_id) — cancel a not-yet-run watchdog

Development

python manage.py makemigrations
python manage.py migrate
pytest

Example app

This repository includes djprocess_example with ExampleProcess and test settings configured for in-memory Celery (CELERY_TASK_ALWAYS_EAGER = True).

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

djprocess-0.0.1.tar.gz (24.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

djprocess-0.0.1-py3-none-any.whl (27.4 kB view details)

Uploaded Python 3

File details

Details for the file djprocess-0.0.1.tar.gz.

File metadata

  • Download URL: djprocess-0.0.1.tar.gz
  • Upload date:
  • Size: 24.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.5

File hashes

Hashes for djprocess-0.0.1.tar.gz
Algorithm Hash digest
SHA256 4ce32683c845cbbccce139a310bbad6154d8db2171e36ec16c746871b14fb837
MD5 f90428f912b242a47d36d6364514048a
BLAKE2b-256 3d49abe2129899477e89d95187966ecd7cb5e9c48be0b09a05351c40b9388b69

See more details on using hashes here.

File details

Details for the file djprocess-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: djprocess-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 27.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.5

File hashes

Hashes for djprocess-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 268f4488ac08ea80ebe41e0ac8a85d44d8e84eb687700a954c0bfa763f164c17
MD5 1567a6e326569315658df2bc6dc5185a
BLAKE2b-256 52be6aec82597429b2f9b02f11096e35805f7f3c47d4f3d3f93eeb9ab12d4cbb

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page