Skip to main content

Django app for manage async tasks by http requests

Project description

django-i3tasks

Django app for managing async tasks via HTTP using Google Cloud Pub/Sub.

pip install django-i3tasks

Quick start

1. Add to INSTALLED_APPS

INSTALLED_APPS = [
    ...,
    "django_i3tasks",
]

2. Include the URL configuration

# urls.py
from django.urls import path, include

urlpatterns = [
    ...,
    path("i3/", include("django_i3tasks.urls")),
]

This registers two endpoints:

  • POST /i3/tasks-push/ — receives tasks pushed by Pub/Sub
  • POST /i3/tasks-beat/ — triggered by an external scheduler (e.g. Google Cloud Scheduler) to run scheduled tasks

3. Run migrations

python manage.py migrate

This creates the tables for task executions, attempts, and results.

4. Configure settings

Local / emulator

from django_i3tasks.types import I3TasksSettings, Queue, Schedule

PUBSUB_CONFIG = {
    "EMULATOR": True,
    "HOST": "localhost:8085",       # or named host in Docker Compose
    "PROJECT_ID": "my-project",
    "CREDENTIALS": False,
}

I3TASKS = I3TasksSettings(
    namespace=f"tasks.{SHORT_PROJECT_NAME}",
    default_queue=Queue(
        queue_name="default",
        subscription_name="default",
        push_endpoint="http://localhost:8000/i3/tasks-push/",
    ),
    other_queues=(),
    schedules=(
        Schedule(
            module_name="myapp.tasks",
            func_name="my_scheduled_task",
            cron="* * * * *",
            args=[],
            kwargs={},
        ),
    ),
)

Production (Google Cloud)

from django_i3tasks.types import I3TasksSettings, Queue, Schedule

PUBSUB_CONFIG = {
    "EMULATOR": False,
    "PROJECT_ID": "my-project",
    "CREDENTIALS": "/app/conf/credentials.json",  # path to service account JSON
}

I3TASKS = I3TasksSettings(
    namespace=f"tasks.{SHORT_PROJECT_NAME}",
    default_queue=Queue(
        queue_name="default",
        subscription_name="default",
        push_endpoint="https://your-host.example.com/i3/tasks-push/",
    ),
    other_queues=(),
    schedules=(),
)

5. Ensure Pub/Sub topics and subscriptions exist

Run this once to create the required Pub/Sub resources:

python manage.py i3tasks_ensure_pubsub

This is also called automatically on startup if run_queue_create_command_on_startup=True (the default).


Defining tasks

Decorate any function with @TaskDecorator to make it an async task:

# myapp/tasks.py
from django_i3tasks.utils import TaskDecorator

@TaskDecorator
def send_email(recipient, subject, body):
    # your logic here
    pass

Running a task asynchronously

from myapp.tasks import send_email

send_email.delay("user@example.com", "Hello", "World")
# or equivalently:
send_email.async_run("user@example.com", "Hello", "World")

Running a task synchronously

send_email.sync_run("user@example.com", "Hello", "World")
# or call it directly:
send_email("user@example.com", "Hello", "World")

Accessing task metadata inside the function (bind)

When bind=True, the task receives itself as task_metadata:

@TaskDecorator(bind=True)
def my_task(arg1, task_metadata=None):
    print(task_metadata)  # TaskObj instance

Task chaining

.delay() returns a ChainHandle. Use .then() to schedule a follow-up task that runs after the current one succeeds:

from myapp.tasks import send_email, log_sent

send_email.delay("user@example.com", "Hello", "World").then(log_sent)

You can chain multiple steps:

send_email.delay(...).then(step_two).then(step_three)

Each step is persisted to the database. If the original task is executed by Pub/Sub, the next step in the chain is enqueued automatically on success.

on_success shorthand

For a single fixed follow-up, declare it on the decorator:

@TaskDecorator(on_success=log_sent)
def send_email(recipient, subject, body):
    ...

Every .delay() call will automatically chain log_sent after a successful execution.


Task groups (fan-out / join)

Use TaskGroup to fan out N parallel tasks and run a callback when all of them succeed.

Basic usage

from django_i3tasks.models import TaskGroup
from myapp.tasks import process_item, all_done

# 1. Create the group — declare the callback and the expected member count.
group = TaskGroup.create(callback=all_done, total_count=3)

# 2. Dispatch member tasks, passing the group via __i3group__.
for item in items:
    process_item.delay(item, __i3group__=group)

all_done is called automatically once all 3 members complete successfully. If any member exceeds its retry limit, the group is marked failed and the callback is never called.

Callback with a chain

Use build_chain() to attach a chain to the callback without dispatching it immediately:

from myapp.tasks import all_done, notify_admin

chain = all_done.build_chain().then(notify_admin)
group = TaskGroup.create(callback=chain, total_count=3)

When the join fires, all_done is called and notify_admin is chained after it.

TaskGroup states

Status Meaning
pending Waiting for members to complete
success All members succeeded; callback dispatched
failed At least one member exceeded retries

I3TasksSettings reference

Parameter Type Default Description
namespace str required Prefix for Pub/Sub topic/subscription names
default_queue Queue required Default queue configuration
other_queues tuple[Queue] () Additional queues
schedules tuple[Schedule] () Scheduled tasks (cron-based)
force_sync bool False If True, .delay() runs synchronously (useful for testing)
default_max_retries int 3 Maximum retry attempts on failure
run_queue_create_command_on_startup bool True Auto-run i3tasks_ensure_pubsub on app startup

How it works

  1. .delay() serializes the task and publishes it to Google Cloud Pub/Sub.
  2. A TaskExecution and a TaskExecutionTry record are saved to the database.
  3. The Pub/Sub push subscription delivers the message to /i3/tasks-push/.
  4. The endpoint deserializes and executes the task, saving the result.
  5. On failure, the task is re-enqueued up to default_max_retries times.

Scheduled tasks are triggered by hitting /i3/tasks-beat/. The app evaluates each configured Schedule's cron expression and runs matching tasks.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

django_i3tasks-0.0.26.tar.gz (36.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

django_i3tasks-0.0.26-py3-none-any.whl (40.4 kB view details)

Uploaded Python 3

File details

Details for the file django_i3tasks-0.0.26.tar.gz.

File metadata

  • Download URL: django_i3tasks-0.0.26.tar.gz
  • Upload date:
  • Size: 36.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.0.0 CPython/3.12.3

File hashes

Hashes for django_i3tasks-0.0.26.tar.gz
Algorithm Hash digest
SHA256 7a7c989e09d9183fd2989438fc337ccfff1e5124922fccd7688c40c1f0a5cbd3
MD5 f5bd5e233e3afe472fd31fcc7b84c4fc
BLAKE2b-256 c6cdbfeea652cee741e3244612d749f19d0e0e6bc5229630352a30ed09c3a08c

See more details on using hashes here.

File details

Details for the file django_i3tasks-0.0.26-py3-none-any.whl.

File metadata

File hashes

Hashes for django_i3tasks-0.0.26-py3-none-any.whl
Algorithm Hash digest
SHA256 61477e1dd202827229d03bbe7b0822eb2dd97d6691d0f3c0abab4e680fde924f
MD5 9186e992558e4ba0ff37b4986722856c
BLAKE2b-256 808ec1672d4a575a6df5f373163351297978bf439dd741bf87eac65a8d089151

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page