Django app for manage async tasks by http requests
Project description
django-i3tasks
Django app for managing async tasks via HTTP using Google Cloud Pub/Sub.
pip install django-i3tasks
Quick start
1. Add to INSTALLED_APPS
INSTALLED_APPS = [
...,
"django_i3tasks",
]
2. Include the URL configuration
# urls.py
from django.urls import path, include
urlpatterns = [
...,
path("i3/", include("django_i3tasks.urls")),
]
This registers two endpoints:
POST /i3/tasks-push/— receives tasks pushed by Pub/SubPOST /i3/tasks-beat/— triggered by an external scheduler (e.g. Google Cloud Scheduler) to run scheduled tasks
3. Run migrations
python manage.py migrate
This creates the tables for task executions, attempts, and results.
4. Configure settings
Local / emulator
from django_i3tasks.types import I3TasksSettings, PushQueue, Schedule
PUBSUB_CONFIG = {
"EMULATOR": True,
"HOST": "localhost:8085", # or named host in Docker Compose
"PROJECT_ID": "my-project",
"CREDENTIALS": False,
}
I3TASKS = I3TasksSettings(
namespace=f"tasks.{SHORT_PROJECT_NAME}",
default_queue=PushQueue(
queue_name="default",
subscription_name="default",
push_endpoint="http://localhost:8000/i3/tasks-push/",
),
other_queues=(),
schedules=(
Schedule(
module_name="myapp.tasks",
func_name="my_scheduled_task",
cron="* * * * *",
args=[],
kwargs={},
),
),
)
Note:
Queueremains available as a backward-compatible alias forPushQueue. Existing configurations that useQueue(...)continue to work without changes.
Production (Google Cloud)
from django_i3tasks.types import I3TasksSettings, PushQueue, Schedule
PUBSUB_CONFIG = {
"EMULATOR": False,
"PROJECT_ID": "my-project",
"CREDENTIALS": "/app/conf/credentials.json", # path to service account JSON
}
I3TASKS = I3TasksSettings(
namespace=f"tasks.{SHORT_PROJECT_NAME}",
default_queue=PushQueue(
queue_name="default",
subscription_name="default",
push_endpoint="https://your-host.example.com/i3/tasks-push/",
),
other_queues=(),
schedules=(),
)
5. Ensure Pub/Sub topics and subscriptions exist
Run this once to create the required Pub/Sub resources:
python manage.py i3tasks_ensure_pubsub
This is also called automatically on startup if run_queue_create_command_on_startup=True (the default).
Defining tasks
Decorate any function with @TaskDecorator to make it an async task:
# myapp/tasks.py
from django_i3tasks.utils import TaskDecorator
@TaskDecorator
def send_email(recipient, subject, body):
# your logic here
pass
Running a task asynchronously
from myapp.tasks import send_email
send_email.delay("user@example.com", "Hello", "World")
# or equivalently:
send_email.async_run("user@example.com", "Hello", "World")
Running a task synchronously
send_email.sync_run("user@example.com", "Hello", "World")
# or call it directly:
send_email("user@example.com", "Hello", "World")
Accessing task metadata inside the function (bind)
When bind=True, the task receives itself as task_metadata:
@TaskDecorator(bind=True)
def my_task(arg1, task_metadata=None):
print(task_metadata) # TaskObj instance
Task chaining
.delay() returns a ChainHandle. Use .then() to schedule a follow-up task that runs after the current one succeeds:
from myapp.tasks import send_email, log_sent
send_email.delay("user@example.com", "Hello", "World").then(log_sent)
You can chain multiple steps:
send_email.delay(...).then(step_two).then(step_three)
Each step is persisted to the database. If the original task is executed by Pub/Sub, the next step in the chain is enqueued automatically on success.
on_success shorthand
For a single fixed follow-up, declare it on the decorator:
@TaskDecorator(on_success=log_sent)
def send_email(recipient, subject, body):
...
Every .delay() call will automatically chain log_sent after a successful execution.
Task groups (fan-out / join)
Use TaskGroup to fan out N parallel tasks and run a callback when all of them succeed.
Basic usage
from django_i3tasks.models import TaskGroup
from myapp.tasks import process_item, all_done
# 1. Create the group — declare the callback and the expected member count.
group = TaskGroup.create(callback=all_done, total_count=3)
# 2. Dispatch member tasks, passing the group via __i3group__.
for item in items:
process_item.delay(item, __i3group__=group)
all_done is called automatically once all 3 members complete successfully. If any member exceeds its retry limit, the group is marked failed and the callback is never called.
Callback with a chain
Use build_chain() to attach a chain to the callback without dispatching it immediately:
from myapp.tasks import all_done, notify_admin
chain = all_done.build_chain().then(notify_admin)
group = TaskGroup.create(callback=chain, total_count=3)
When the join fires, all_done is called and notify_admin is chained after it.
TaskGroup states
| Status | Meaning |
|---|---|
pending |
Waiting for members to complete |
success |
All members succeeded; callback dispatched |
failed |
At least one member exceeded retries |
Pull queues
By default, tasks are delivered via Pub/Sub push — Pub/Sub calls your /i3/tasks-push/ HTTP endpoint. For workers that cannot expose a public endpoint (local dev, private networks) or that need to control their own concurrency, you can use a pull queue instead.
Push and pull are mutually exclusive per queue. A queue is either one or the other.
Configuring a pull queue
Add a PullQueue to other_queues. Pull queues do not require a push endpoint.
from django_i3tasks.types import I3TasksSettings, PushQueue, PullQueue, Schedule
I3TASKS = I3TasksSettings(
namespace="tasks.myproject",
default_queue=PushQueue(
queue_name="default",
subscription_name="default",
push_endpoint="https://your-host.example.com/i3/tasks-push/",
),
other_queues=(
PullQueue(
queue_name="heavy",
subscription_name="heavy-pull",
),
),
)
Note:
default_queuemust always be aPushQueue— the/i3/tasks-push/view requires it. Onlyother_queuesentries can bePullQueue.
Dispatching a task to a pull queue
Pass the queue name when calling .delay():
@TaskDecorator(queue_name="heavy")
def heavy_task(data):
...
heavy_task.delay(data)
Running the pull worker
Start a worker process for each pull queue you want to consume:
python manage.py i3tasks_worker --queue=heavy
The worker polls the subscription in a loop, processing one message at a time. Press Ctrl+C to stop.
Ack / nack behavior:
- Task succeeds → message is acknowledged
- Task exceeds max retries → message is acknowledged (no further delivery)
- Malformed message (bad JSON, missing fields) → message is not acknowledged; Pub/Sub redelivers after the ack deadline
- Unexpected infrastructure error → message is not acknowledged; Pub/Sub redelivers
Retries are managed by the task itself via Pub/Sub: on failure with retries remaining, a new attempt is published back to the topic. The worker always acknowledges after run_from_async returns (success or exhausted retries).
Provisioning Pub/Sub resources
i3tasks_ensure_pubsub handles both push and pull queues. Pull subscriptions are created without a push endpoint:
python manage.py i3tasks_ensure_pubsub
I3TasksSettings reference
| Parameter | Type | Default | Description |
|---|---|---|---|
namespace |
str |
required | Prefix for Pub/Sub topic/subscription names |
default_queue |
PushQueue |
required | Default push queue (must be PushQueue; required by the HTTP view) |
other_queues |
tuple[PushQueue | PullQueue] |
() |
Additional queues — each can be a PushQueue or a PullQueue |
schedules |
tuple[Schedule] |
() |
Scheduled tasks (cron-based) |
force_sync |
bool |
False |
If True, .delay() runs synchronously (useful for testing) |
default_max_retries |
int |
3 |
Maximum retry attempts on failure |
run_queue_create_command_on_startup |
bool |
True |
Auto-run i3tasks_ensure_pubsub on app startup |
Queue types
| Type | Fields | Delivery |
|---|---|---|
PushQueue(queue_name, subscription_name, push_endpoint) |
3 fields | Pub/Sub pushes to your HTTP endpoint |
PullQueue(queue_name, subscription_name) |
2 fields | Worker polls with i3tasks_worker --queue=<name> |
Queue |
alias for PushQueue |
Backward-compatible; existing configs need no changes |
How it works
Push delivery (default):
.delay()serializes the task and publishes it to Google Cloud Pub/Sub.- A
TaskExecutionand aTaskExecutionTryrecord are saved to the database. - The Pub/Sub push subscription delivers the message to
/i3/tasks-push/. - The endpoint deserializes and executes the task, saving the result.
- On failure, the task is re-enqueued up to
default_max_retriestimes.
Pull delivery (PullQueue):
Steps 1–2 are identical. Instead of Pub/Sub pushing to an HTTP endpoint, the i3tasks_worker process polls the pull subscription and executes tasks in the same way.
Scheduled tasks are triggered by hitting /i3/tasks-beat/. The app evaluates each configured Schedule's cron expression and runs matching tasks.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file django_i3tasks-0.0.27.tar.gz.
File metadata
- Download URL: django_i3tasks-0.0.27.tar.gz
- Upload date:
- Size: 51.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b2817189ec43af7d1a27dca1a6da6166f9c2cf2224ec067c9f973708780cc66a
|
|
| MD5 |
6ab76faa172672b28efc30bc2bafbc3a
|
|
| BLAKE2b-256 |
e42a92dc6b5b100672e87b03af1e4677aa57db8eea63641b12fbd0139733c64c
|
File details
Details for the file django_i3tasks-0.0.27-py3-none-any.whl.
File metadata
- Download URL: django_i3tasks-0.0.27-py3-none-any.whl
- Upload date:
- Size: 45.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f9092b750dad3719453b8298f40b39be5ded40a3c3339bbdd2d5e27242738c83
|
|
| MD5 |
ee7e18f43b9612b52f63be1460fbe4dc
|
|
| BLAKE2b-256 |
e48329e4327a68d9da833fed2981e62d2c0761f3e65dcf7456416cad2c1d31df
|