A database-backed task queue backend for Django 6.0's built-in task framework
Project description
django-database-task
A database-backed task queue backend for Django 6.0's built-in task framework.
Features
- No external dependencies - Uses your existing database, no Redis or message broker required
- Priority support - Tasks can have priorities from -100 to 100
- Delayed execution - Schedule tasks to run at a specific time with
run_after - Exclusive locking - Prevents duplicate task execution with
SELECT FOR UPDATE SKIP LOCKED - Django Admin integration - View and manage tasks from the admin interface
- Async support - Supports async task functions
Architecture
sequenceDiagram
participant App as Application
participant Backend as DatabaseTaskBackend
participant DB as Database
participant Worker as Worker Process
Note over App,Worker: Task Enqueue
App->>Backend: task.enqueue(args, kwargs)
Backend->>Backend: Validate & serialize args
Backend->>DB: INSERT task (status=READY)
DB-->>Backend: Task ID
Backend-->>App: TaskResult (id, status=READY)
Note over App,Worker: Task Execution
Worker->>DB: SELECT FOR UPDATE SKIP LOCKED<br/>(status=READY, run_after <= now)
DB-->>Worker: Task record (with lock)
Worker->>DB: UPDATE status=RUNNING
Worker->>Worker: Execute task function
alt Success
Worker->>DB: UPDATE status=SUCCESSFUL,<br/>return_value, finished_at
else Failure
Worker->>DB: UPDATE status=FAILED,<br/>errors, finished_at
end
Note over App,Worker: Result Retrieval (Optional)
App->>Backend: backend.get_result(task_id)
Backend->>DB: SELECT task
DB-->>Backend: Task record
Backend-->>App: TaskResult (status, return_value, errors)
Requirements
- Python 3.12+
- Django 6.0+
Supported Databases
Django 6.0 officially supports the following database versions:
| Database | Minimum Version | Notes |
|---|---|---|
| PostgreSQL | 14+ | Recommended for production. Full SELECT FOR UPDATE SKIP LOCKED support. |
| MySQL | 8.0.11+ | Full SELECT FOR UPDATE SKIP LOCKED support. |
| MariaDB | 10.6+ | Full SELECT FOR UPDATE SKIP LOCKED support. |
| SQLite | 3.31.0+ | Works for development/testing, but no row-level locking. |
| Oracle | 19c+ | Supported but not tested with this package. |
Note: SELECT FOR UPDATE SKIP LOCKED is used to prevent duplicate task execution in multi-worker environments. SQLite does not support row-level locking, so it is only recommended for development or single-worker deployments.
Installation
pip install django-database-task
Quick Start
1. Add to INSTALLED_APPS
INSTALLED_APPS = [
# ...
'django_database_task',
]
2. Configure the task backend
TASKS = {
'default': {
'BACKEND': 'django_database_task.backends.DatabaseTaskBackend',
'QUEUES': [], # Empty list means all queues
'OPTIONS': {},
},
}
3. Run migrations
python manage.py migrate django_database_task
4. Define a task
from django.tasks import task
@task
def send_welcome_email(user_id):
user = User.objects.get(id=user_id)
# Send email...
return f"Email sent to {user.email}"
5. Enqueue the task
result = send_welcome_email.enqueue(user_id=123)
print(f"Task ID: {result.id}")
6. Run the worker
# Run once (exit when no tasks)
python manage.py run_database_tasks
# Run continuously (poll every 5 seconds)
python manage.py run_database_tasks --continuous --interval 5
Usage
Important: JSON-Serializable Parameters
Task arguments, keyword arguments, and return values must be JSON-serializable.
Supported types:
str,int,float,bool,Nonedict(with JSON-serializable keys and values)list,tuple(with JSON-serializable elements)bytes(UTF-8 decodable only)
Not supported (will raise TypeError):
datetime,date,time- convert to ISO string:dt.isoformat()UUID- convert to string:str(uuid)Decimal- convert to float or string- Custom objects - serialize manually
from django.tasks import task
# ❌ This will raise TypeError
@task
def bad_task(user_id, created_at):
pass
bad_task.enqueue(123, datetime.now()) # TypeError!
# ✅ Convert to JSON-serializable types
@task
def good_task(user_id, created_at_iso):
created_at = datetime.fromisoformat(created_at_iso)
# ...
good_task.enqueue(123, datetime.now().isoformat()) # OK
Task with priority
@task(priority=10) # Higher priority, runs first
def urgent_task():
pass
@task(priority=-10) # Lower priority
def background_task():
pass
Delayed execution
from datetime import timedelta
from django.utils import timezone
# Run 1 hour from now
delayed_task = my_task.using(run_after=timezone.now() + timedelta(hours=1))
result = delayed_task.enqueue()
Task with context
@task(takes_context=True)
def task_with_context(context, message):
task_id = context.task_result.id
attempt = context.attempt
return f"Task {task_id} (attempt {attempt}): {message}"
Async tasks
@task
async def fetch_data(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
# Enqueue like normal tasks
result = fetch_data.enqueue("https://example.com/api")
Queue-specific tasks
@task(queue_name="emails")
def send_newsletter():
pass
# Run worker for specific queue
# python manage.py run_database_tasks --queue emails
Management Commands
run_database_tasks
Execute tasks queued in the database.
python manage.py run_database_tasks [options]
| Option | Description |
|---|---|
--queue |
Queue name to process (all queues if not specified) |
--backend |
Backend name (default: "default") |
--continuous |
Keep polling even when no tasks |
--interval |
Polling interval in seconds (default: 5) |
--max-tasks |
Maximum number of tasks to process (0=unlimited) |
purge_completed_database_tasks
Delete completed task records from the database.
python manage.py purge_completed_database_tasks [options]
| Option | Description |
|---|---|
--days |
Delete tasks completed more than N days ago (0=all) |
--status |
Target statuses, comma-separated (default: "SUCCESSFUL,FAILED") |
--batch-size |
Number of tasks to delete at once (default: 1000) |
--dry-run |
Show count only without deleting |
Programmatic API
You can also process tasks programmatically without management commands:
from django_database_task import process_one_task, process_tasks, get_pending_task_count
# Process a single task
result = process_one_task()
if result:
print(f"Processed: {result.id}, status: {result.status}")
# Process multiple tasks
results = process_tasks(max_tasks=10)
print(f"Processed {len(results)} tasks")
# Process tasks from a specific queue
results = process_tasks(queue_name="emails", max_tasks=5)
# Get pending task count
count = get_pending_task_count()
print(f"Pending tasks: {count}")
HTTP Endpoints (Optional)
For environments where cron or direct command execution is not available (e.g., serverless, PaaS), you can use HTTP endpoints to trigger task processing.
Setup
Include the URLs in your project:
# urls.py
from django.urls import path, include
urlpatterns = [
path("tasks/", include("django_database_task.urls")),
]
Available Endpoints
| Endpoint | Method | Description |
|---|---|---|
/tasks/run/ |
POST | Process multiple pending tasks |
/tasks/run-one/ |
POST | Process a single pending task |
/tasks/status/ |
GET | Get pending task count |
Request Parameters
POST /tasks/run/
| Parameter | Type | Default | Description |
|---|---|---|---|
max_tasks |
int | 10 | Maximum tasks to process (1-100) |
queue_name |
string | null | Filter by queue name |
backend_name |
string | "default" | Task backend name |
Response:
{
"processed": 3,
"results": [
{"id": "uuid", "status": "SUCCESSFUL", "task_path": "myapp.tasks.send_email"},
{"id": "uuid", "status": "FAILED", "task_path": "myapp.tasks.process_data"}
]
}
POST /tasks/run-one/
| Parameter | Type | Default | Description |
|---|---|---|---|
queue_name |
string | null | Filter by queue name |
backend_name |
string | "default" | Task backend name |
Response:
{"processed": true, "result": {"id": "uuid", "status": "SUCCESSFUL", "task_path": "..."}}
or
{"processed": false, "result": null}
GET /tasks/status/
| Parameter | Type | Default | Description |
|---|---|---|---|
queue_name |
string | null | Filter by queue name |
backend_name |
string | "default" | Task backend name |
Response:
{"pending_count": 5}
Example Usage
# Process up to 10 tasks
curl -X POST http://localhost:8000/tasks/run/ \
-H "Content-Type: application/json" \
-d '{"max_tasks": 10}'
# Process tasks from a specific queue
curl -X POST http://localhost:8000/tasks/run/ \
-H "Content-Type: application/json" \
-d '{"queue_name": "emails", "max_tasks": 5}'
# Get pending task count
curl http://localhost:8000/tasks/status/
Use Cases
Cloud Scheduler / Cron Job
Call the endpoint periodically to process tasks:
# Every minute via cron or Cloud Scheduler
curl -X POST https://your-app.com/tasks/run/ \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{"max_tasks": 50}'
Webhook Trigger
Trigger task processing after an event:
# In your webhook handler
import requests
def handle_webhook(request):
# ... process webhook ...
# Trigger background task processing
requests.post(
"http://localhost:8000/tasks/run/",
json={"max_tasks": 10}
)
Health Check with Task Status
Monitor pending task count:
# Alert if too many pending tasks
count=$(curl -s http://localhost:8000/tasks/status/ | jq '.pending_count')
if [ "$count" -gt 100 ]; then
echo "Warning: $count pending tasks"
fi
Security
The endpoints are CSRF-exempt for API/webhook use. Always add authentication in production:
from django.contrib.admin.views.decorators import staff_member_required
from django_database_task.views import RunTasksView, RunOneTaskView, TaskStatusView
urlpatterns = [
path(
"tasks/run/",
staff_member_required(RunTasksView.as_view()),
name="run_tasks",
),
path(
"tasks/run-one/",
staff_member_required(RunOneTaskView.as_view()),
name="run_one_task",
),
path(
"tasks/status/",
staff_member_required(TaskStatusView.as_view()),
name="task_status",
),
]
Or use token-based authentication:
from django.http import HttpResponseForbidden
from django.conf import settings
def require_api_token(view_func):
def wrapper(request, *args, **kwargs):
token = request.headers.get("Authorization", "").replace("Bearer ", "")
if token != settings.TASK_API_TOKEN:
return HttpResponseForbidden("Invalid token")
return view_func(request, *args, **kwargs)
return wrapper
urlpatterns = [
path("tasks/run/", require_api_token(RunTasksView.as_view())),
]
Django Admin
The package includes a Django Admin integration to view task status:
- Task list with status badges
- Filter by status, queue, backend
- Search by task ID or path
- View task arguments and results
License
MIT License - see LICENSE for details.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file django_database_task-0.1.0.tar.gz.
File metadata
- Download URL: django_database_task-0.1.0.tar.gz
- Upload date:
- Size: 29.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4f84c2faf67249a4c61fba7a405aa7710a91805b629757957d4cfde9750e476f
|
|
| MD5 |
ba0900cc6ce3bdcb7b77d7ce834593e7
|
|
| BLAKE2b-256 |
ec023001365dafb623e63a6a8fb26821ccf8b413d80269816d1710e381da75ee
|
File details
Details for the file django_database_task-0.1.0-py3-none-any.whl.
File metadata
- Download URL: django_database_task-0.1.0-py3-none-any.whl
- Upload date:
- Size: 24.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6d7aa18bfc8c0f3d9211fae922b3abf4a8c13c3bad42b2a5aa6ffe45c01c9af8
|
|
| MD5 |
8314f9663164c0f294fcae0761a8758d
|
|
| BLAKE2b-256 |
20747ea798528a393e0298cdcb8866c84638a2384ca7c06ca6b153a95e636f6e
|