Skip to main content

Google Cloud Tasks support for Django

Project description

README.md

Django Cloud Tasks

A Django library for managing asynchronous tasks and complex workflows, built on top of Google Cloud Tasks. This library allows you to schedule tasks, chains, parallel execution (groups), set error callbacks, apply delays, and handle revocations cleanly and explicitly.


Key Features

  • Task: Single asynchronous units of execution.
  • Chain: Sequences of tasks that run one after another.
  • Group: Multiple parallel tasks whose results are aggregated upon completion.
  • Sub-Chains: Chains triggered by completion of a parent chain.
  • Delayed Tasks: Schedule execution for a specific future time.
  • Error Callbacks: Define handlers for unexpected errors.
  • Result Injection: Automatically passes previous outputs into next tasks.
  • Revocation: Dynamically cancel executing or future tasks.
  • Debug Mode: Execute tasks synchronously for local development without cloud services.

Installation and Setup

Requirements

pip install django-gcp-cloudtasks

Environment variables required

In your project's .env or OS environment set these variables:

TASKAPP_GCP_PROJECT=my-project
TASKAPP_GCP_LOCATION=us-central1
TASKAPP_GCP_QUEUE=default
TASKAPP_CLOUD_RUN_URL=https://YOUR_CLOUD_RUN_ENDPOINT
TASKAPP_AUTH_TOKEN=YOUR_SECURE_AUTH_TOKEN
TASKAPP_DEBUG_MODE=False  # Set to 'True' for local development/testing

Django settings

Add django_cloudtasks to your INSTALLED_APPS in your Django settings.py:

INSTALLED_APPS = [
    ...
    'django_cloudtasks',
    ...
]

Apply migrations:

python manage.py migrate django_cloudtasks

URL Configuration

Add the following to your project's urls.py:

from django.urls import path, include

urlpatterns = [
    ...
    path('cloudtasks/', include('django_cloudtasks.urls')),
    ...
]

This will register all the necessary endpoints for task execution, tracking, and revocation.

Create a Service Account

  1. Create a Service Account:
gcloud iam service-accounts create my-service-account \
    --display-name="My Service Account"
  1. Grant Roles to the Service Account:
    • Grant permissions to interact with Cloud Tasks and other necessary services.
gcloud projects add-iam-policy-binding test-project-455815 \
    --member="serviceAccount:my-service-account@test-project-455815.iam.gserviceaccount.com" \
    --role="roles/cloudtasks.enqueuer"
  • You may need additional roles depending on your use case, e.g., roles/cloudtasks.viewer.
  1. Generate a Key for the Service Account:
gcloud iam service-accounts keys create ~/path/to/key.json \
    --iam-account=my-service-account@test-project-455815.iam.gserviceaccount.com

Set Up Environment Variable

Set the GOOGLE_APPLICATION_CREDENTIALS environment variable to the path of your service account key file:

export GOOGLE_APPLICATION_CREDENTIALS="~/path/to/key.json"

Usage Examples

Single Task

from django_cloudtasks.decorators import register_task

@register_task
def sum(a, b):
    return a + b

from django_cloudtasks.manager import CloudChainManager

chain_mgr = CloudChainManager()
chain_mgr.add_task("sum", {"a": 4, "b": 5})
chain_mgr.run()

Sequential Tasks (Chain)

from django_cloudtasks.manager import CloudChainManager

chain_mgr = CloudChainManager()
chain_mgr.add_task("sum", {"a": 4, "b": 5})
chain_mgr.add_task("sum", {"b": 10})  # auto-inject result from previous task into 'a'
chain_mgr.run()

Group Tasks (Parallel)

from django_cloudtasks.manager import CloudChainManager

chain_mgr = CloudChainManager()

chain_mgr.add_group([
    {"endpoint_path": "sum", "payload": {"a": 3, "b": 3}},
    {"endpoint_path": "sum", "payload": {"a": 7, "b": 2}}
])

chain_mgr.add_task("sum", {"a": 4, "b": 5}, delay_seconds=30)

chain_mgr.run()

Using Delays

from django_cloudtasks.manager import CloudChainManager

chain_mgr = CloudChainManager()
chain_mgr.add_task("sum", {"a": 2, "b": 1}, delay_seconds=300)
chain_mgr.run()

Delays execution for 5 minutes (300 seconds).


Error Callbacks

Automatically schedule another task if any error occurs:

from django_cloudtasks.manager import CloudChainManager
from django_cloudtasks.decorators import register_task

@register_task
def faulty_task(a, b):
    # Always raise an exception to simulate failure:
    raise ValueError(f"Test error: inputs {a}, {b}")

@register_task
def my_error_handler(original_task_id, original_task_name, error, payload):
    print(f"Error callback triggered for task {original_task_name} id={original_task_id}")
    print(f"Error was: {error}")
    print(f"Payload that caused failure: {payload}")


chain_mgr = CloudChainManager()
chain_mgr.add_task(
    "faulty_task", {"a": 1, "b": 2}, error_callback="my_error_handler"
)
chain_mgr.run()

Error Handler Parameters

Error handlers must accept the following parameters:

Parameter Type Description
original_task_id UUID The unique identifier of the task that failed
original_task_name str The name/endpoint of the task that failed
error str String representation of the exception that occurred
payload dict The original payload sent to the task that failed

CloudChainManager Structure

The CloudChainManager class provides properties and methods to access task and chain IDs for monitoring, debugging, or revocation:

from django_cloudtasks.manager import CloudChainManager

# Create a chain manager
chain_mgr = CloudChainManager()
chain_mgr.add_task("task1", {"a": 1})
chain_mgr.add_task("task2", {"b": 2})

# Access chain ID for revocation
chain_id = chain_mgr.chain.id  # UUID of the entire chain
print(f"Chain ID: {chain_id}")

# Access the last added task's ID
last_task_id = chain_mgr.last_task.id  # UUID of the most recently added task
print(f"Last Task ID: {last_task_id}")

# Get all tasks in the chain
all_tasks = chain_mgr.chain.tasks.all()
for task in all_tasks:
    print(f"Task ID: {task.id}, Endpoint: {task.endpoint_path}")

# Revoke the chain
from django_cloudtasks.utils import revoke_chain
revoke_chain(chain_id)

# Revoke a specific task
from django_cloudtasks.utils import revoke_task
revoke_task(last_task_id)

Chaining and Grouping Chains

Automatically schedule another task if any error occurs:

from django_cloudtasks.manager import CloudChainManager
c1 = CloudChainManager()
c1.add_task("sum", {"a": 4, "b": 5})

c2 = CloudChainManager()
c2.add_task("sum", {"a": 2, "b": 5})

c3 = CloudChainManager()
c3.add_chain_group(c1, c2)
c3.run()

Revoking Tasks and Chains

Revoke actively scheduled or running tasks:

API Endpoint:

GET /cloudtasks/revoke/?task=<TASK_ID>
GET /cloudtasks/revoke/?chain=<CHAIN_ID>

Provided endpoints

Endpoint Usage Description
/cloudtasks/run/<task_name>/ POST Execute a registered task (Cloud Tasks entrypoint)
/cloudtasks/tracker/ POST Receive results and control flow after tasks execution
/cloudtasks/revoke/ GET Revoke specific tasks or entire chains

Debug Mode

The library provides a DEBUG mode for local development and testing without requiring actual Google Cloud Tasks infrastructure.

How to Enable Debug Mode

Set the environment variable:

TASKAPP_DEBUG_MODE=True

Debug Mode Behavior

When DEBUG mode is enabled:

  1. Tasks are executed synchronously, one after another
  2. No HTTP requests are made to Google Cloud Tasks
  3. Task results and errors are processed locally
  4. All chain, group, and task relationships work exactly as they would in production
  5. Perfect for unit testing and local development without internet connection

This allows you to test complex task chains without needing to set up Cloud Tasks queues or deal with authentication in your development environment.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

django_gcp_cloudtasks-1.9.0.tar.gz (41.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

django_gcp_cloudtasks-1.9.0-py3-none-any.whl (18.3 kB view details)

Uploaded Python 3

File details

Details for the file django_gcp_cloudtasks-1.9.0.tar.gz.

File metadata

  • Download URL: django_gcp_cloudtasks-1.9.0.tar.gz
  • Upload date:
  • Size: 41.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.1

File hashes

Hashes for django_gcp_cloudtasks-1.9.0.tar.gz
Algorithm Hash digest
SHA256 da1a122009bc6bc12963b0d42be00213a6f0296adb330a592c56dcde6fed6c7a
MD5 07027292dd99533f3d2948f7873cbf19
BLAKE2b-256 40096b49ceabf6e86f20eb3fb15cd4adce7b35d54a0c3aa52b7abc5f18f2c5c8

See more details on using hashes here.

File details

Details for the file django_gcp_cloudtasks-1.9.0-py3-none-any.whl.

File metadata

File hashes

Hashes for django_gcp_cloudtasks-1.9.0-py3-none-any.whl
Algorithm Hash digest
SHA256 7fde0d46ff670b6ec5816a4a05b37d6838ff0aa8a9d74dbe43236d7bc110b885
MD5 c0e6f0cd25caac08296acd19be5b9630
BLAKE2b-256 6a3de14f64574e91161a5ee7b24edf7f818b1debd43bb55705d37ef291732d95

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page