Skip to main content

Background job tracking for Django — typed job types, Redis progress caching, pluggable executors, and cancellation.

Project description

django-task-ferry

A reusable Django package for background job management with real-time progress tracking.

Jobs are polymorphic (each type has its own model with extra fields), progress is written to Redis mid-transaction so API clients can poll without waiting for a DB commit, and cancellation is cooperative — a running job checks for cancellation on every progress tick.


Features

  • Pluggable executors — Celery, django-tasks (DEP-0014), or synchronous (great for tests)
  • Redis progress cache — progress updates are visible immediately, even inside a long transaction
  • Hierarchical progress — create child progress objects for pipeline stages with unequal weights
  • Cooperative cancellation — jobs check for cancellation on every progress update
  • DRF API — built-in list / detail / cancel endpoints, ready to mount
  • Type registry — job types self-register; no central list to maintain

Installation

# Base (ImmediateExecutor only — good for tests/scripts)
pip install django-task-ferry

# With Celery support
pip install django-task-ferry[celery]

# With django-tasks support (DEP-0014)
pip install django-task-ferry[django-tasks]

# Development / test
pip install django-task-ferry[dev]

Quick start

1. Add to INSTALLED_APPS

# settings.py
INSTALLED_APPS = [
    ...
    "task_ferry",
]

2. Configure the executor

All configuration lives under a single TASK_FERRY dict in your Django settings.

Settings reference

Key Default Description
EXECUTOR "task_ferry.executors.immediate.ImmediateExecutor" Dotted path to the executor class. See Executors below.
CELERY_QUEUE "default" Global Celery queue for all jobs. Individual job types can override this with their queue attribute.
PROGRESS_CACHE_TIMEOUT 3600 Seconds to keep progress data in the cache. Should be longer than your longest expected job.
JOB_EXPIRY_DAYS 7 Ended jobs older than this many days are removed by JobHandler.cleanup_old_jobs().
MAX_JOBS_PER_USER_PER_TYPE 5 Global default for JobType.max_count. Individual job types can override this.

Executors

Three executors are included:

Executor When to use
task_ferry.executors.celery.CeleryExecutor Production — requires pip install django-task-ferry[celery]
task_ferry.executors.django_tasks.DjangoTasksExecutor Production — requires pip install django-task-ferry[django-tasks]
task_ferry.executors.immediate.ImmediateExecutor Tests and scripts — runs jobs synchronously in the current process
# settings.py

# Celery (recommended for production)
TASK_FERRY = {
    "EXECUTOR": "task_ferry.executors.celery.CeleryExecutor",
    "CELERY_QUEUE": "default",
    "PROGRESS_CACHE_TIMEOUT": 3600,
    "JOB_EXPIRY_DAYS": 7,
    "MAX_JOBS_PER_USER_PER_TYPE": 5,
}

# django-tasks (DEP-0014)
TASK_FERRY = {
    "EXECUTOR": "task_ferry.executors.django_tasks.DjangoTasksExecutor",
}
TASKS = {
    "default": {
        "BACKEND": "django_tasks.backends.database.DatabaseBackend",
    }
}

# Synchronous — tests / management commands
TASK_FERRY = {
    "EXECUTOR": "task_ferry.executors.immediate.ImmediateExecutor",
}

Per-job-type queue routing (Celery only)

Set queue on a JobType to send that type to a specific Celery queue instead of the global CELERY_QUEUE default. This has no effect when using DjangoTasksExecutor or ImmediateExecutor.

class HeavyExportJobType(JobType):
    type = "heavy_export"
    model_class = HeavyExportJob
    queue = "heavy"  # routed to the "heavy" worker pool


class QuickReportJobType(JobType):
    type = "quick_report"
    model_class = QuickReportJob
    # queue not set — falls back to TASK_FERRY["CELERY_QUEUE"]

3. Run migrations

python manage.py migrate

4. Mount the API (optional)

# urls.py
from django.urls import include, path

urlpatterns = [
    ...
    path("api/jobs/", include("task_ferry.api.urls")),
]

Defining a job type

Step 1 — Create the model

Add domain-specific fields by subclassing Job:

# myapp/models.py
from task_ferry.models import Job


class ExportJob(Job):
    table_id = models.IntegerField()
    output_path = models.CharField(max_length=500, blank=True)
    
    class Meta:
        app_label = "myapp"

Create and apply a migration:

python manage.py makemigrations myapp
python manage.py migrate

Step 2 — Implement the JobType

# myapp/job_types.py
from task_ferry.registry import JobType
from .models import ExportJob


class ExportJobType(JobType):
    type = "export_table"  # unique string identifier
    model_class = ExportJob
    max_count = 2  # max concurrent jobs per user
    queue = "exports"  # optional: route to a specific Celery queue
    
    def prepare_values(self, values: dict, user) -> dict:
        """Validate and transform kwargs before the Job row is created."""
        if "table_id" not in values:
            raise ValueError("table_id is required")
        return values
    
    def run(self, job: ExportJob, progress) -> None:
        """
        Do the actual work. Called inside the executor task.
        progress is a Progress object rooted at 100.
        """
        rows = fetch_rows(job.table_id)  # your code here
        progress.increment(10, state="Fetched rows")
        
        # Use a child for a sub-stage with its own step count.
        write_stage = progress.create_child(represents=80, total=len(rows))
        for row in rows:
            write_to_file(row)
            write_stage.increment(state=f"Writing row {row.id}...")
        
        progress.increment(10, state="Finalising")
        job.output_path = "/exports/result.csv"
        job.save(update_fields=["output_path"])

Step 3 — Register in AppConfig.ready()

# myapp/apps.py
from django.apps import AppConfig


class MyAppConfig(AppConfig):
    name = "myapp"
    
    def ready(self):
        from task_ferry.registry import job_type_registry
        from .job_types import ExportJobType
        
        job_type_registry.register(ExportJobType())

Job model reference

Every concrete job model inherits the following fields from Job:

Field Type Description
state str Current lifecycle state. One of pending, started, finished, failed, cancelled.
progress_percentage int 0–100. Written to Redis mid-run; persisted to DB at completion.
progress_state str Human-readable description of the current step.
error str Short error string (exception message). Empty unless the job failed.
human_readable_error str Longer, user-facing error description.
user User The user who triggered the job. None for system jobs.
created_at datetime When the job was created.
updated_at datetime Last state change timestamp.

Convenience properties (read from cache, falling back to the DB):

Property Description
is_pending True if state is pending.
is_running True if state is started.
is_finished True if state is finished.
is_failed True if state is failed.
is_cancelled True if state is cancelled.
has_ended True if state is finished, failed, or cancelled.
specific Returns the concrete subclass instance. Use this when you have a base Job queryset and need type-specific fields.

Dispatching a job

from task_ferry.handler import JobHandler

# Authenticated user context (e.g. from a DRF view)
job = JobHandler.create_and_start(
    user=request.user,
    job_type_name="export_table",
    table_id=42,
)

# System / automated context (no user)
job = JobHandler.create_and_start(
    user=None,
    job_type_name="export_table",
    table_id=42,
)

create_and_start returns the saved Job instance immediately. The actual work runs asynchronously inside the configured executor.

Listing jobs for a user

jobs = JobHandler.get_jobs_for_user(
    user=request.user,
    states=["pending", "started"],  # optional filter
    job_type_name="export_table",  # optional filter
    limit=20,
    offset=0,
)

Polling progress

Poll GET /api/jobs/<id>/ from your frontend. The response is served from Redis so it reflects mid-transaction progress without waiting for a DB commit:

{
  "id": 17,
  "state": "started",
  "progress_percentage": 43,
  "progress_state": "Writing row 430...",
  "error": "",
  "human_readable_error": "",
  "created_at": "2026-05-28T10:00:00Z",
  "updated_at": "2026-05-28T10:00:05Z"
}

Possible state values: pending, started, finished, failed, cancelled.

A simple polling loop in JavaScript:

async function pollJob(jobId, onProgress) {
    while (true) {
        const res = await fetch(`/api/jobs/${jobId}/`);
        const job = await res.json();
        onProgress(job);
        if (["finished", "failed", "cancelled"].includes(job.state)) break;
        await new Promise(r => setTimeout(r, 1500));  // 1.5 s interval
    }
}

Cancelling a job

# From Python
from task_ferry.handler import JobHandler

JobHandler.cancel(user=request.user, job_id=job.id)

Or via the API:

POST /api/jobs/<id>/cancel/

Cancellation is cooperative — a running job checks for cancellation on every progress.increment() call and raises JobCancelled when detected. The handler catches this and calls JobType.on_cancelled(job) so you can clean up.

Override on_cancelled to remove partial output files, release locks, etc.:

def on_cancelled(self, job: ExportJob) -> None:
    if job.output_path:
        os.unlink(job.output_path)

JobType reference

Attributes

Attribute Type Default Description
type str Required. Unique string identifier, e.g. "export_table".
model_class Type[Job] Required. The Job subclass whose DB table stores this type's fields.
max_count int 5 Maximum pending-or-running jobs of this type per user. Set to 1 for types where duplicates make no sense.
queue str or None None Celery queue to route this job type to. When None, falls back to the global TASK_FERRY["CELERY_QUEUE"].

Hooks

All hooks have no-op defaults. Override only what you need:

Hook Called when
prepare_values(values, user) Before the Job row is created. Validate and transform kwargs. Must return the dict of model field values.
after_job_creation(job, values) Immediately after the row is saved, before dispatch.
run(job, progress) Inside the executor task. Implement your work here.
on_error(job, exc) After the job is marked failed. Log or alert.
on_cancelled(job) After the job is marked cancelled. Clean up partial state.
before_delete(job) Before cleanup_old_jobs deletes an expired job row.

Progress API reference

A Progress object is passed to run(job, progress). It is rooted at 100 — i.e. calling increment until all steps are done brings it to 100%.

Method / Property Description
increment(by=1, state="") Advance by by steps and fire the progress callback. Clamped — will not exceed 100%.
create_child(represents, total) Return a child Progress with total steps. When the child completes, the parent advances by represents units.
percentage Current completion as an integer 0–100.

Multiple children with unequal weights

def run(self, job, progress):
    # 10% — fetch
    progress.increment(10, state="Fetching...")
    
    # 70% — process rows (5 items, each worth 70/5 = 14% of total)
    process = progress.create_child(represents=70, total=5)
    for item in items:
        process_item(item)
        process.increment(state=f"Processing {item}...")
    
    # 20% — finalise
    progress.increment(20, state="Done")

Multiple children can be created up front and advanced independently — they do not overwrite each other's contribution.


Periodic cleanup

Ended jobs older than JOB_EXPIRY_DAYS (default 7) can be deleted by calling:

from task_ferry.handler import JobHandler

deleted_count = JobHandler.cleanup_old_jobs()

Wire this up as a Celery beat task or a management command on a daily schedule.


Testing

Use ImmediateExecutor in your test settings to run jobs synchronously — no broker or worker needed:

# settings_test.py
TASK_FERRY = {
    "EXECUTOR": "task_ferry.executors.immediate.ImmediateExecutor",
}

Then test directly:

def test_export_creates_file(db, user):
    job = JobHandler.create_and_start(user, "export_table", table_id=1)
    job.refresh_from_db()
    assert job.state == "finished"
    assert job.output_path != ""

To test cancellation, patch enqueue so the job stays pending, then cancel before it runs:

def test_cancel_cleans_up(db, user, monkeypatch):
    from task_ferry.executors.immediate import ImmediateExecutor
    monkeypatch.setattr(ImmediateExecutor, "enqueue", lambda self, job_id, queue=None: None)
    
    job = JobHandler.create_and_start(user, "export_table", table_id=1)
    JobHandler.cancel(user, job.id)
    assert job.state == "cancelled"

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

django_task_ferry-0.1.1.tar.gz (25.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

django_task_ferry-0.1.1-py3-none-any.whl (27.4 kB view details)

Uploaded Python 3

File details

Details for the file django_task_ferry-0.1.1.tar.gz.

File metadata

  • Download URL: django_task_ferry-0.1.1.tar.gz
  • Upload date:
  • Size: 25.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for django_task_ferry-0.1.1.tar.gz
Algorithm Hash digest
SHA256 506d1f1ea622b11f7ea268cdd71213576381aa7b5f1c66719c7f4cbadc8ef062
MD5 3445274ca1f0b76e05597eb958715af6
BLAKE2b-256 5ea9575867bb239b2ca71f1caa9b1a49f0813c7b976746ce4c147bd95275dafb

See more details on using hashes here.

Provenance

The following attestation bundles were made for django_task_ferry-0.1.1.tar.gz:

Publisher: release.yml on wmo-raf/django-task-ferry

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file django_task_ferry-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for django_task_ferry-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 dd666f346b0d4d276c689c551a1aaad6185df9a5d5937f164824623fd43bed25
MD5 69a47396090dffd53cebb59298e46f72
BLAKE2b-256 0ef2c0cda0e2e158fa4b2903d639cbd47f935d29087fa3c104b7e0494f2a30cd

See more details on using hashes here.

Provenance

The following attestation bundles were made for django_task_ferry-0.1.1-py3-none-any.whl:

Publisher: release.yml on wmo-raf/django-task-ferry

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page