Skip to main content

Database-backed task queue backend for Django's django.tasks framework

Project description

dj_queue

dj_queue is a database-backed task queue backend for Django's django.tasks framework.

It keeps the queue, live execution state, runtime metadata, and task results in your database.

  • no Redis, RabbitMQ, or separate result store
  • PostgreSQL is the first-class production backend
  • MySQL 8+, MariaDB 10.6+, and SQLite are supported
  • immediate, scheduled, recurring, and concurrency-limited work
  • fork and async runtime modes
  • multi-database aware from day one

dj_queue is inspired by Rails solid_queue, but shaped to fit Django's task backend API and long-running process model.

Why dj_queue

The database is the queue.

That gives dj_queue a narrow, explicit shape:

  • application code uses Django's @task API
  • DjQueueBackend stores jobs and results in Django-managed tables
  • workers, dispatchers, and schedulers all share one operations layer
  • PostgreSQL can use LISTEN/NOTIFY and SKIP LOCKED as optimizations
  • polling remains the correctness path on every supported database

If your application already depends on the database being the durable system of record, dj_queue lets background work follow the same model.

Installation

dj_queue requires Python 3.12+ and Django 6.0+.

Install the package:

pip install dj-queue

Backend-specific extras are available when you want dj_queue to install a database adapter for you:

pip install "dj-queue[postgres]"

Notes:

  • postgres installs psycopg, which Django's PostgreSQL backend and dj_queue's optional LISTEN/NOTIFY wakeups use
  • for MySQL or MariaDB, install and configure a Django-compatible driver in your application following Django's database docs

Add dj_queue to INSTALLED_APPS, register the router, and point Django's task backend at DjQueueBackend:

# settings.py

INSTALLED_APPS = [
  # ...
  "dj_queue",
]

DATABASE_ROUTERS = ["dj_queue.routers.DjQueueRouter"]

TASKS = {
  "default": {
    "BACKEND": "dj_queue.backend.DjQueueBackend",
    "QUEUES": [],
    "OPTIONS": {},
  },
}

Run migrations:

python manage.py migrate

Quick Start

Define a task with Django's @task decorator:

# myapp/tasks.py

from django.tasks import task


@task
def add(a, b):
  return a + b

Start the dj_queue runtime in one terminal:

python manage.py dj_queue

Then enqueue work from another terminal or from your application code:

from myapp.tasks import add

task_result = add.enqueue(3, 7)
print(task_result.id)

Read the result back through Django's task backend API:

from myapp.tasks import add

fresh_result = add.get_backend().get_result(task_result.id)
print(fresh_result.status)
print(fresh_result.return_value)

When the worker has executed the job, fresh_result.return_value will be 10.

Data Contract

Job payloads and persisted return values are stored in JSON columns, so they must be JSON round-trippable.

  • enqueueing args or kwargs that cannot round-trip through JSON fails immediately
  • returning a non-JSON-serializable value marks the job failed instead of leaving it claimed forever

If you need to pass model instances, files, or custom objects, store them elsewhere and pass identifiers or serialized data instead.

How dj_queue runs

python manage.py dj_queue starts a supervisor for one backend alias.

Job lifecycle:

enqueue -> ready | scheduled | blocked -> claimed -> successful | failed

The runtime has four moving parts:

  • supervisor: boots and stops the runtime
  • workers: claim ready jobs and execute them
  • dispatchers: promote due scheduled jobs and run concurrency maintenance
  • scheduler: enqueue recurring tasks and finished-job cleanup when configured

Useful command variants:

python manage.py dj_queue
python manage.py dj_queue --mode async
python manage.py dj_queue --only-work
python manage.py dj_queue --only-dispatch
python manage.py dj_queue --skip-recurring

Mode and topology notes:

  • fork is the default standalone mode
  • async runs supervised actors in threads inside one process
  • --only-work starts workers without dispatchers or scheduler
  • --only-dispatch starts dispatchers without workers or scheduler
  • --skip-recurring starts without the scheduler

If you're familiar with Solid Queue, the same high-level tradeoff is described in its fork vs async mode section.

Choose a setup

Once migrations are in place, start processing jobs with python manage.py dj_queue on the machine that should do the work. With the default configuration, this starts the supervisor, workers, and dispatcher for the default backend alias and processes all queues.

For most deployments, start with a standalone dj_queue process. Reach for a dedicated queue database before you reach for embedded mode.

  • single database, standalone process: easiest way to start. Use the app database and run python manage.py dj_queue
  • dedicated queue database: recommended production default. Keep queue tables and runtime traffic on database_alias. See Multi-Database Setup
  • embedded server mode: run dj_queue inside ASGI or Gunicorn when you want queue execution colocated with the server process. See Embedded Server Mode

For small deployments, running dj_queue on the same machine as the web server is often enough. When you need more capacity, multiple machines can point at the same queue database. Full python manage.py dj_queue instances coordinate through database locking, so workers and dispatchers share load safely and recurring firing stays deduplicated across schedulers.

In practice, keep recurring settings identical on every full node and prefer one full instance plus additional python manage.py dj_queue --only-work nodes. Add --only-dispatch nodes only when you need more scheduled-job promotion or concurrency-maintenance throughput.

Common Patterns

Scheduled jobs

Use run_after to keep work out of the ready queue until a future time:

from datetime import timedelta

from django.utils import timezone

from myapp.tasks import send_digest

future = timezone.now() + timedelta(hours=1)
send_digest.using(run_after=future).enqueue("daily")

Priorities and named queues

Use priority and queue_name on the task call itself:

from myapp.tasks import deliver_email

deliver_email.using(queue_name="email", priority=10).enqueue("welcome")
deliver_email.using(queue_name="email", priority=-5).enqueue("digest")

Bulk enqueue

Use enqueue_all() when you need one backend call to submit many jobs:

from myapp.tasks import process_item

results = process_item.get_backend().enqueue_all(
  [(process_item, [item_id], {}) for item_id in range(5)]
)

Ordering and transactions

Queue ordering rules:

  • within one selected queue, higher numeric priority is claimed first
  • across multiple queue selectors, selector order wins
  • "*" matches all queues
  • selectors ending in * match queue prefixes such as email*

For example, a worker configured with queues: ["email", "default"] will prefer ready work from email before default, even if default contains higher-priority rows.

enqueue() writes immediately. If a task depends on rows that are still inside the current transaction, use enqueue_on_commit():

from django.db import transaction

from dj_queue.api import enqueue_on_commit
from myapp.tasks import send_receipt

with transaction.atomic():
  order = create_order()
  enqueue_on_commit(send_receipt, order.id)

Recurring Tasks

dj_queue supports both static recurring tasks from settings and dynamic recurring tasks managed at runtime.

Static recurring tasks

Define recurring tasks in TASKS[...]["OPTIONS"]["recurring"]:

TASKS = {
  "default": {
    "BACKEND": "dj_queue.backend.DjQueueBackend",
    "QUEUES": [],
    "OPTIONS": {
      "recurring": {
        "nightly_cleanup": {
          "task_path": "myapp.tasks.cleanup",
          "schedule": "0 3 * * *",
          "queue_name": "maintenance",
          "priority": -5,
          "description": "nightly cleanup",
        },
      },
    },
  },
}

Dynamic recurring tasks

Create, update, and remove recurring tasks at runtime:

from dj_queue.api import schedule_recurring_task, unschedule_recurring_task

schedule_recurring_task(
  key="tenant_42_report",
  task_path="myapp.tasks.send_report",
  schedule="0 * * * *",
  queue_name="reports",
  priority=5,
)

unschedule_recurring_task("tenant_42_report")

Dynamic recurring tasks require TASKS[backend_alias]["OPTIONS"]["scheduler"]["dynamic_tasks_enabled"] = True or the equivalent scheduler.dynamic_tasks_enabled: true in the optional YAML config.

The scheduler is part of the normal dj_queue runtime. You do not run a separate recurring service.

Concurrency Controls

Tasks can opt into database-backed concurrency limits by defining concurrency metadata on the wrapped function:

from django.tasks import task


@task
def sync_account(account_id, action):
  return f"{account_id}:{action}"


sync_account.func.concurrency_key = "account:{account_id}"
sync_account.func.concurrency_limit = 1
sync_account.func.concurrency_duration = 60
sync_account.func.on_conflict = "block"

With this configuration:

  • the first matching job can run immediately
  • later jobs for the same key can block until capacity is released
  • on_conflict = "discard" turns the same pattern into singleton-style work

Queue Operations

QueueInfo exposes operational queue controls without bypassing the queue tables:

from dj_queue.api import QueueInfo

orders = QueueInfo("orders")

print(orders.size)
print(orders.latency)
print(orders.paused)

orders.pause()
orders.resume()
orders.clear()

Operational commands:

python manage.py dj_queue_health
python manage.py dj_queue_health --max-age 120
python manage.py dj_queue_prune --older-than 86400
python manage.py dj_queue_prune --task-path myapp.tasks.cleanup

If Django admin is installed, dj_queue also registers the main operational models there, including jobs, failed executions, processes, recurring tasks, pauses, and semaphores.

Failed jobs

When a task raises, dj_queue keeps the job and its failed execution row in the queue database, including the exception class, message, and traceback.

You can retry or discard failed jobs through Django admin or the operations layer:

from dj_queue.operations.jobs import discard_failed_job, retry_failed_job

retry_failed_job(job_id)
discard_failed_job(job_id)

Failures stay inspectable until you act on them.

Multi-Database Setup

dj_queue can keep queue tables on a dedicated database alias.

Example configuration:

DATABASES = {
  "default": {
    "ENGINE": "django.db.backends.postgresql",
    "NAME": "app",
  },
  "queue": {
    "ENGINE": "django.db.backends.postgresql",
    "NAME": "queue",
  },
}

DATABASE_ROUTERS = ["dj_queue.routers.DjQueueRouter"]

TASKS = {
  "default": {
    "BACKEND": "dj_queue.backend.DjQueueBackend",
    "QUEUES": [],
    "OPTIONS": {
      "database_alias": "queue",
    },
  },
}

Run your normal application migrations on default, then migrate dj_queue onto the queue database:

python manage.py migrate
python manage.py migrate dj_queue --database queue

With this setup, dj_queue's ORM queries and raw SQL helpers stay on the queue database.

Embedded Server Mode

dj_queue can run inside an existing server process via embedded async supervision.

ASGI

Wrap your ASGI application with DjQueueLifespan:

from django.core.asgi import get_asgi_application

from dj_queue.contrib.asgi import DjQueueLifespan

django_application = get_asgi_application()
application = DjQueueLifespan(django_application)

Gunicorn

Import the provided hooks in your Gunicorn config:

# gunicorn.conf.py

from dj_queue.contrib.gunicorn import post_fork, worker_exit

Both embedded integrations use AsyncSupervisor(standalone=False) and leave signal handling to the host server.

Configuration

The main configuration lives in TASKS[backend_alias]["OPTIONS"].

Start with these options:

  • mode: "fork" or "async"
  • workers: queue selectors, thread counts, and process counts
  • dispatchers: scheduled promotion and concurrency maintenance settings
  • scheduler: dynamic recurring polling settings
  • database_alias: database alias for queue tables and runtime activity
  • preserve_finished_jobs and clear_finished_jobs_after: result retention and cleanup

Additional operational tuning is available when needed, including use_skip_locked, listen_notify, silence_polling, process_heartbeat_interval, process_alive_threshold, shutdown_timeout, and on_thread_error.

On PostgreSQL, listen_notify uses the same Django PostgreSQL driver configuration as the main database connection. Install a compatible driver in your project, or use dj-queue[postgres] to pull in psycopg.

Configuration precedence is explicit:

  • CLI overrides
  • environment variables
  • YAML file pointed to by DJ_QUEUE_CONFIG
  • Django TASKS settings

YAML file config

You can point dj_queue at a YAML file with either --config or DJ_QUEUE_CONFIG:

python manage.py dj_queue --config /etc/dj_queue.yml
DJ_QUEUE_CONFIG=/etc/dj_queue.yml python manage.py dj_queue

The YAML file should contain a single mapping of backend option values. It uses the same shape as TASKS[backend_alias]["OPTIONS"], not the full Django TASKS structure:

mode: async
database_alias: queue
preserve_finished_jobs: true
clear_finished_jobs_after: 86400
listen_notify: true
silence_polling: true

workers:
  - queues: ["default", "email*"]
    threads: 8
    processes: 1
    polling_interval: 0.1

dispatchers:
  - batch_size: 500
    polling_interval: 1
    concurrency_maintenance: true
    concurrency_maintenance_interval: 600

scheduler:
  dynamic_tasks_enabled: true
  polling_interval: 5

recurring:
  nightly_cleanup:
    task_path: myapp.tasks.cleanup
    schedule: "0 3 * * *"
    queue_name: maintenance
    priority: -5
    description: nightly cleanup

This file is merged on top of TASKS[backend_alias]["OPTIONS"], then any environment-variable and CLI overrides win after that.

Environment overrides currently supported by dj_queue itself:

  • DJ_QUEUE_CONFIG
  • DJ_QUEUE_MODE
  • DJ_QUEUE_SKIP_RECURRING

Database Support

Backend Support level Notes
PostgreSQL first-class polling, SKIP LOCKED, and optional LISTEN/NOTIFY
MySQL 8+ supported polling plus SKIP LOCKED
MariaDB 10.6+ supported polling plus SKIP LOCKED
SQLite supported with limits polling only, serialized writes, no SKIP LOCKED, no LISTEN/NOTIFY; practical for development, CI, and smaller deployments

Polling is the portability path everywhere. Backend-specific features improve latency and throughput but are not correctness requirements.

Examples

The repository ships real runnable examples in examples/.

Recommended entry points:

  • examples/ex01_basic_enqueue.py
  • examples/ex07_basic_enqueue_on_commit.py
  • examples/ex08_basic_recurring.py
  • examples/ex20_advanced_concurrency.py
  • examples/ex21_advanced_queue_control.py
  • examples/ex24_advanced_multi_db.py
  • examples/ex25_advanced_asgi.py

The examples index in examples/README.md lists the full progression.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dj_queue-0.1.0.tar.gz (32.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dj_queue-0.1.0-py3-none-any.whl (49.2 kB view details)

Uploaded Python 3

File details

Details for the file dj_queue-0.1.0.tar.gz.

File metadata

  • Download URL: dj_queue-0.1.0.tar.gz
  • Upload date:
  • Size: 32.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.4 {"installer":{"name":"uv","version":"0.11.4","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for dj_queue-0.1.0.tar.gz
Algorithm Hash digest
SHA256 11715e7cfcc6568a676fef0b59162030454e183970ae7371197ad75723df8c45
MD5 20cb94562d835c28fa4e395767c0ec15
BLAKE2b-256 3e65437edb3a6a54b5df9f364a725d62e4537ba8f73db93c762fedb2f36ae65b

See more details on using hashes here.

File details

Details for the file dj_queue-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: dj_queue-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 49.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.4 {"installer":{"name":"uv","version":"0.11.4","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for dj_queue-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 0a06c12df12b560a87a9f8b312f5b09e11777c3be4cb36f5b3538ca4ca6fe350
MD5 9ec2f9b129b6bf71f784bd6a224a01b6
BLAKE2b-256 469a6db7e8444b8539a29d5023c287a62d4dae7083ca71e39a7fe19fcbbf7242

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page