Google Cloud Tasks support for Django
Project description
README.md
Django Cloud Tasks
A Django library for managing asynchronous tasks and complex workflows, built on top of Google Cloud Tasks. This library allows you to schedule tasks, chains, parallel execution (groups), set error callbacks, apply delays, and handle revocations cleanly and explicitly.
Key Features
- Task: Single asynchronous units of execution.
- Chain: Sequences of tasks that run one after another.
- Group: Multiple parallel tasks whose results are aggregated upon completion.
- Sub-Chains: Chains triggered by completion of a parent chain.
- Delayed Tasks: Schedule execution for a specific future time.
- Error Callbacks: Define handlers for unexpected errors.
- Result Injection: Automatically passes previous outputs into next tasks.
- Revocation: Dynamically cancel executing or future tasks.
- Debug Mode: Execute tasks synchronously for local development without cloud services.
Installation and Setup
Requirements
pip install django-gcp-cloudtasks
Environment variables required
In your project's .env or OS environment set these variables:
TASKAPP_GCP_PROJECT=my-project
TASKAPP_GCP_LOCATION=us-central1
TASKAPP_GCP_QUEUE=default
TASKAPP_CLOUD_RUN_URL=https://YOUR_CLOUD_RUN_ENDPOINT
TASKAPP_AUTH_TOKEN=YOUR_SECURE_AUTH_TOKEN
TASKAPP_DEBUG_MODE=False # Set to 'True' for local development/testing
Django settings
Add django_cloudtasks to your INSTALLED_APPS in your Django settings.py:
INSTALLED_APPS = [
...
'django_cloudtasks',
...
]
Apply migrations:
python manage.py migrate django_cloudtasks
URL Configuration
Add the following to your project's urls.py:
from django.urls import path, include
urlpatterns = [
...
path('cloudtasks/', include('django_cloudtasks.urls')),
...
]
This will register all the necessary endpoints for task execution, tracking, and revocation.
Create a Service Account
- Create a Service Account:
gcloud iam service-accounts create my-service-account \
--display-name="My Service Account"
- Grant Roles to the Service Account:
- Grant permissions to interact with Cloud Tasks and other necessary services.
gcloud projects add-iam-policy-binding test-project-455815 \
--member="serviceAccount:my-service-account@test-project-455815.iam.gserviceaccount.com" \
--role="roles/cloudtasks.enqueuer"
- You may need additional roles depending on your use case, e.g.,
roles/cloudtasks.viewer.
- Generate a Key for the Service Account:
gcloud iam service-accounts keys create ~/path/to/key.json \
--iam-account=my-service-account@test-project-455815.iam.gserviceaccount.com
Set Up Environment Variable
Set the GOOGLE_APPLICATION_CREDENTIALS environment variable to the path of your service account key file:
export GOOGLE_APPLICATION_CREDENTIALS="~/path/to/key.json"
Usage Examples
Single Task
from django_cloudtasks.decorators import register_task
@register_task
def sum(a, b):
return a + b
from django_cloudtasks.manager import CloudChainManager
chain_mgr = CloudChainManager()
chain_mgr.add_task("sum", {"a": 4, "b": 5})
chain_mgr.run()
Sequential Tasks (Chain)
from django_cloudtasks.manager import CloudChainManager
chain_mgr = CloudChainManager()
chain_mgr.add_task("sum", {"a": 4, "b": 5})
chain_mgr.add_task("sum", {"b": 10}) # auto-inject result from previous task into 'a'
chain_mgr.run()
Group Tasks (Parallel)
from django_cloudtasks.manager import CloudChainManager
chain_mgr = CloudChainManager()
chain_mgr.add_group([
{"endpoint_path": "sum", "payload": {"a": 3, "b": 3}},
{"endpoint_path": "sum", "payload": {"a": 7, "b": 2}}
])
chain_mgr.add_task("sum", {"a": 4, "b": 5}, delay_seconds=30)
chain_mgr.run()
Using Delays
from django_cloudtasks.manager import CloudChainManager
chain_mgr = CloudChainManager()
chain_mgr.add_task("sum", {"a": 2, "b": 1}, delay_seconds=300)
chain_mgr.run()
Delays execution for 5 minutes (300 seconds).
Error Callbacks
Automatically schedule another task if any error occurs:
from django_cloudtasks.manager import CloudChainManager
from django_cloudtasks.decorators import register_task
@register_task
def faulty_task(a, b):
# Always raise an exception to simulate failure:
raise ValueError(f"Test error: inputs {a}, {b}")
@register_task
def my_error_handler(original_task_id, original_task_name, error, payload):
print(f"Error callback triggered for task {original_task_name} id={original_task_id}")
print(f"Error was: {error}")
print(f"Payload that caused failure: {payload}")
chain_mgr = CloudChainManager()
chain_mgr.add_task(
"faulty_task", {"a": 1, "b": 2}, error_callback="my_error_handler"
)
chain_mgr.run()
Error Handler Parameters
Error handlers must accept the following parameters:
| Parameter | Type | Description |
|---|---|---|
original_task_id |
UUID | The unique identifier of the task that failed |
original_task_name |
str | The name/endpoint of the task that failed |
error |
str | String representation of the exception that occurred |
payload |
dict | The original payload sent to the task that failed |
CloudChainManager Structure
The CloudChainManager class provides properties and methods to access task and chain IDs for monitoring, debugging, or revocation:
from django_cloudtasks.manager import CloudChainManager
# Create a chain manager
chain_mgr = CloudChainManager()
chain_mgr.add_task("task1", {"a": 1})
chain_mgr.add_task("task2", {"b": 2})
# Access chain ID for revocation
chain_id = chain_mgr.chain.id # UUID of the entire chain
print(f"Chain ID: {chain_id}")
# Access the last added task's ID
last_task_id = chain_mgr.last_task.id # UUID of the most recently added task
print(f"Last Task ID: {last_task_id}")
# Get all tasks in the chain
all_tasks = chain_mgr.chain.tasks.all()
for task in all_tasks:
print(f"Task ID: {task.id}, Endpoint: {task.endpoint_path}")
# Revoke the chain
from django_cloudtasks.utils import revoke_chain
revoke_chain(chain_id)
# Revoke a specific task
from django_cloudtasks.utils import revoke_task
revoke_task(last_task_id)
Chaining and Grouping Chains
Automatically schedule another task if any error occurs:
from django_cloudtasks.manager import CloudChainManager
c1 = CloudChainManager()
c1.add_task("sum", {"a": 4, "b": 5})
c2 = CloudChainManager()
c2.add_task("sum", {"a": 2, "b": 5})
c3 = CloudChainManager()
c3.add_chain_group(c1, c2)
c3.run()
Revoking Tasks and Chains
Revoke actively scheduled or running tasks:
API Endpoint:
GET /cloudtasks/revoke/?task=<TASK_ID>
GET /cloudtasks/revoke/?chain=<CHAIN_ID>
Provided endpoints
| Endpoint | Usage | Description |
|---|---|---|
/cloudtasks/run/<task_name>/ |
POST | Execute a registered task (Cloud Tasks entrypoint) |
/cloudtasks/tracker/ |
POST | Receive results and control flow after tasks execution |
/cloudtasks/revoke/ |
GET | Revoke specific tasks or entire chains |
Debug Mode
The library provides a DEBUG mode for local development and testing without requiring actual Google Cloud Tasks infrastructure.
How to Enable Debug Mode
Set the environment variable:
TASKAPP_DEBUG_MODE=True
Debug Mode Behavior
When DEBUG mode is enabled:
- Tasks are executed synchronously, one after another
- No HTTP requests are made to Google Cloud Tasks
- Task results and errors are processed locally
- All chain, group, and task relationships work exactly as they would in production
- Perfect for unit testing and local development without internet connection
This allows you to test complex task chains without needing to set up Cloud Tasks queues or deal with authentication in your development environment.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file django_gcp_cloudtasks-1.9.0.tar.gz.
File metadata
- Download URL: django_gcp_cloudtasks-1.9.0.tar.gz
- Upload date:
- Size: 41.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
da1a122009bc6bc12963b0d42be00213a6f0296adb330a592c56dcde6fed6c7a
|
|
| MD5 |
07027292dd99533f3d2948f7873cbf19
|
|
| BLAKE2b-256 |
40096b49ceabf6e86f20eb3fb15cd4adce7b35d54a0c3aa52b7abc5f18f2c5c8
|
File details
Details for the file django_gcp_cloudtasks-1.9.0-py3-none-any.whl.
File metadata
- Download URL: django_gcp_cloudtasks-1.9.0-py3-none-any.whl
- Upload date:
- Size: 18.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7fde0d46ff670b6ec5816a4a05b37d6838ff0aa8a9d74dbe43236d7bc110b885
|
|
| MD5 |
c0e6f0cd25caac08296acd19be5b9630
|
|
| BLAKE2b-256 |
6a3de14f64574e91161a5ee7b24edf7f818b1debd43bb55705d37ef291732d95
|