Skip to main content

A Redis/Valkey-backed task queue backend for Django 6.0's built-in task framework

Project description

django-tasks-redis

CI PyPI version Python versions License: MIT

A Redis/Valkey-backed task queue backend for Django 6.0's built-in task framework.

Features

  • Full integration with Django 6.0's task framework (django.tasks)
  • Redis Streams for reliable task queuing with consumer groups
  • Support for both Redis and Valkey backends
  • Delayed task execution with scheduled times
  • Priority-based task processing
  • Crash recovery with automatic task reclaim
  • Django Admin integration for task monitoring and management
  • HTTP endpoints for external triggers (webhooks, Cloud Scheduler, etc.)

Architecture

sequenceDiagram
    participant App as Application
    participant Backend as RedisTaskBackend
    participant Redis as Redis/Valkey
    participant Worker as Worker Process

    Note over App,Worker: Task Enqueue
    App->>Backend: task.enqueue(args, kwargs)
    Backend->>Backend: Validate & serialize args
    Backend->>Redis: HSET task data (status=READY)
    Backend->>Redis: XADD to priority stream
    Redis-->>Backend: Message ID
    Backend-->>App: TaskResult (id, status=READY)

    Note over App,Worker: Task Execution
    Worker->>Redis: XREADGROUP (consumer group)<br/>(blocks waiting for messages)
    Redis-->>Worker: Message with task_id
    Worker->>Redis: HGET task data
    Redis-->>Worker: Task data
    Worker->>Redis: HSET status=RUNNING
    Worker->>Worker: Execute task function
    alt Success
        Worker->>Redis: HSET status=SUCCESSFUL,<br/>return_value, finished_at
    else Failure
        Worker->>Redis: HSET status=FAILED,<br/>errors, finished_at
    end
    Worker->>Redis: XACK (acknowledge message)

    Note over App,Worker: Crash Recovery
    Worker->>Redis: XAUTOCLAIM stale messages<br/>(claim_timeout exceeded)
    Redis-->>Worker: Reclaimed messages
    Worker->>Worker: Re-execute tasks

    Note over App,Worker: Result Retrieval (Optional)
    App->>Backend: backend.get_result(task_id)
    Backend->>Redis: HGETALL task data
    Redis-->>Backend: Task data
    Backend-->>App: TaskResult (status, return_value, errors)

Requirements

  • Python 3.12+
  • Django 6.0+
  • Redis 5.0+ or Valkey 7.2+

Installation

pip install django-tasks-redis

Quick Start

  1. Add django_tasks_redis to your INSTALLED_APPS:
INSTALLED_APPS = [
    # ...
    "django_tasks_redis",
]
  1. Configure the task backend in your Django settings:
TASKS = {
    "default": {
        "BACKEND": "django_tasks_redis.RedisTaskBackend",
        "QUEUES": [],  # Empty list = allow all queue names
        "OPTIONS": {
            "REDIS_URL": "redis://localhost:6379/0",
        },
    },
}

Note: QUEUES controls which queue names are allowed. If omitted, only "default" queue is allowed. Set QUEUES: [] (empty list) to allow all queue names, or specify explicit names like ["default", "emails"].

  1. Define a task:
from django.tasks import task

@task
def send_email(to: str, subject: str, body: str):
    # Send email logic here
    pass
  1. Enqueue the task:
result = send_email.enqueue("user@example.com", "Hello", "World")
print(f"Task ID: {result.id}")
  1. Run the worker:
python manage.py run_redis_tasks

Configuration Options

TASKS = {
    "default": {
        "BACKEND": "django_tasks_redis.RedisTaskBackend",
        "QUEUES": [],  # Empty list = allow all queue names
        "OPTIONS": {
            # Connection settings (use URL or individual settings)
            "REDIS_URL": "redis://localhost:6379/0",
            # Or use individual settings:
            # "REDIS_HOST": "localhost",
            # "REDIS_PORT": 6379,
            # "REDIS_DB": 0,
            # "REDIS_PASSWORD": None,
            # "REDIS_SSL": False,

            # Behavior settings
            "REDIS_RESULT_TTL": 604800,  # Result retention period (seconds), default 7 days
            "REDIS_KEY_PREFIX": "django_tasks",  # Redis key prefix
            "REDIS_CONSUMER_GROUP": "django_tasks_workers",  # Consumer group name
            "REDIS_CLAIM_TIMEOUT": 300,  # Stale message claim timeout (seconds)
            "REDIS_BLOCK_TIMEOUT": 5000,  # XREADGROUP block timeout (milliseconds)
        },
    },
}

Management Commands

run_redis_tasks

Start a worker to process tasks:

python manage.py run_redis_tasks [options]

Options:
  --queue QUEUE_NAME      Process only tasks from specific queue
  --backend BACKEND_NAME  Backend name (default: default)
  --continuous            Continuous mode (don't exit)
  --interval SECONDS      Polling interval (default: 1)
  --max-tasks N           Maximum tasks to process (0=unlimited)
  --workers N             Number of worker threads (default: 1)
  --claim-interval SECS   Stale task claim interval (default: 60)

purge_completed_redis_tasks

Delete completed tasks:

python manage.py purge_completed_redis_tasks [options]

Options:
  --days N                Delete tasks completed N+ days ago
  --status STATUS         Target status (default: SUCCESSFUL,FAILED)
  --batch-size N          Batch delete size (default: 1000)
  --dry-run               Only show count, don't delete
  --backend BACKEND_NAME  Backend name (default: default)

Django Admin

The package provides Django Admin integration for viewing and managing tasks:

  • View task list with status, priority, queue
  • Filter by status, queue, backend
  • Run selected tasks
  • Retry failed tasks

HTTP Endpoints

Include the URLs in your project:

from django.urls import include, path

urlpatterns = [
    # ...
    path("tasks/", include("django_tasks_redis.urls")),
]

Available endpoints:

  • POST /tasks/run/ - Process multiple tasks
  • POST /tasks/run-one/ - Process a single task
  • POST /tasks/execute/<task_id>/ - Execute specific task by ID
  • GET /tasks/status/<task_id>/ - Get task status
  • POST /tasks/purge/ - Purge completed tasks

Public API

The executor module provides functions for programmatic task management:

from django_tasks_redis import executor

# Process tasks
result = executor.process_one_task(queue_name="default")
results = executor.process_tasks(max_tasks=10)

# Execute specific task
result = executor.run_task_by_id(task_id, allow_retry=True)

# Get pending task count
count = executor.get_pending_task_count()

# Purge completed tasks
deleted = executor.purge_completed_tasks(days=7)

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

django_tasks_redis-0.1.0.tar.gz (27.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

django_tasks_redis-0.1.0-py3-none-any.whl (23.7 kB view details)

Uploaded Python 3

File details

Details for the file django_tasks_redis-0.1.0.tar.gz.

File metadata

  • Download URL: django_tasks_redis-0.1.0.tar.gz
  • Upload date:
  • Size: 27.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.11

File hashes

Hashes for django_tasks_redis-0.1.0.tar.gz
Algorithm Hash digest
SHA256 a6e614bb130f18b30e37b3a815eedf5b0b92c6b102268e83cf2ce3fa08551272
MD5 b0cbb3b3f9dbc7133e452e7131848ba2
BLAKE2b-256 1cba20911356b261f732a38572ea8bce5982d22c784ad872ba8c4cbeffa23b35

See more details on using hashes here.

File details

Details for the file django_tasks_redis-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for django_tasks_redis-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 bac17d5ea64cc9e124383d92e114b5e630e38a4cd6103c59623a4b1fceb048a6
MD5 7913d59834cc099f99e977d14383898b
BLAKE2b-256 8002ae31cde04d3381bf7de6200950c0b14409b82d19ca7b335bfc2c18a0165d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page