Durable workflows for Django.
Project description
Django Hookflow
Durable workflows for Django, powered by Upstash QStash.
Build multi-step workflows that survive server restarts, network failures, and deployment updates. Each step is executed exactly once, and the workflow automatically resumes from where it left off.
Installation
pip install django-hookflow
or with uv:
uv add django-hookflow
Requirements:
- Python 3.10+
- Django 4.2+
Quick Start
1. Configure Settings
Add to your Django settings:
# settings.py
INSTALLED_APPS = [
# ...
"django_hookflow",
]
# Required: QStash credentials (get these from https://console.upstash.com/qstash)
QSTASH_TOKEN = "your-qstash-token"
QSTASH_CURRENT_SIGNING_KEY = "your-current-signing-key"
QSTASH_NEXT_SIGNING_KEY = "your-next-signing-key"
# Required: Your public domain where QStash can reach your webhooks
DJANGO_HOOKFLOW_DOMAIN = "https://your-app.com"
# Optional: Custom webhook path (default: /hookflow/)
DJANGO_HOOKFLOW_WEBHOOK_PATH = "/hookflow/"
# Optional: Enable database persistence for durability (recommended for production)
DJANGO_HOOKFLOW_PERSISTENCE_ENABLED = True
2. Add URL Routes
# urls.py
from django.urls import include, path
urlpatterns = [
path("hookflow/", include("django_hookflow.urls")),
]
3. Run Migrations (if using persistence)
python manage.py migrate django_hookflow
4. Define a Workflow
# myapp/workflows.py
from django_hookflow import workflow
@workflow
def process_order(ctx):
"""Process an order with multiple durable steps."""
order_id = ctx.data.get("order_id")
# Step 1: Validate the order (executed exactly once)
validated = ctx.step.run("validate", validate_order, order_id)
# Step 2: Wait for payment confirmation (durable sleep)
ctx.step.sleep("wait-for-payment", seconds=60)
# Step 3: Charge the payment (durable HTTP call)
payment = ctx.step.call(
"charge",
url="https://api.stripe.com/v1/charges",
method="POST",
body={"amount": validated["total"], "order_id": order_id},
headers={"Authorization": "Bearer sk_..."},
)
# Step 4: Fulfill the order
result = ctx.step.run("fulfill", fulfill_order, order_id, payment)
return {"status": "completed", "result": result}
def validate_order(order_id):
"""Validate order exists and has items."""
# Your validation logic here
return {"order_id": order_id, "total": 9999}
def fulfill_order(order_id, payment):
"""Ship the order."""
# Your fulfillment logic here
return {"shipped": True}
5. Trigger the Workflow
# Trigger returns immediately, workflow runs asynchronously
run_id = process_order.trigger(data={"order_id": "12345"})
print(f"Started workflow with run_id: {run_id}")
How It Works
- Trigger: When you call
.trigger(), a message is published to QStash with your workflow payload - Webhook: QStash calls your webhook endpoint at
/hookflow/workflow/{workflow_id}/ - Execute: The webhook executes your workflow function with a
WorkflowContext - Checkpoint: Each
ctx.step.*call checks if the step already completed (returns cached result) or executes and raisesStepCompleted - Schedule Next:
StepCompletedhalts execution and schedules the next QStash callback with updated state - Resume: The workflow re-executes from the start on each callback, skipping completed steps via cached results
- Complete: When all steps finish without raising
StepCompleted, the workflow returns its final result
Configuration Reference
Required Settings
| Setting | Description | Example |
|---|---|---|
QSTASH_TOKEN |
Your QStash API token | "eyJ..." |
QSTASH_CURRENT_SIGNING_KEY |
Current webhook signing key | "sig_..." |
QSTASH_NEXT_SIGNING_KEY |
Next webhook signing key (for key rotation) | "sig_..." |
DJANGO_HOOKFLOW_DOMAIN |
Public URL where QStash can reach your app | "https://myapp.com" |
Optional Settings
| Setting | Default | Description |
|---|---|---|
DJANGO_HOOKFLOW_WEBHOOK_PATH |
"/hookflow/" |
Base path for webhook endpoints |
DJANGO_HOOKFLOW_PERSISTENCE_ENABLED |
False |
Enable database persistence |
Environment Variables Example
# .env
QSTASH_TOKEN=eyJVc2VySUQiOiIxMjM0NTY3ODkwIiwiQXBpS2V5IjoiYWJjZGVmIn0=
QSTASH_CURRENT_SIGNING_KEY=sig_abc123...
QSTASH_NEXT_SIGNING_KEY=sig_def456...
DJANGO_HOOKFLOW_DOMAIN=https://myapp.example.com
DJANGO_HOOKFLOW_PERSISTENCE_ENABLED=true
# settings.py
import os
QSTASH_TOKEN = os.environ.get("QSTASH_TOKEN")
QSTASH_CURRENT_SIGNING_KEY = os.environ.get("QSTASH_CURRENT_SIGNING_KEY")
QSTASH_NEXT_SIGNING_KEY = os.environ.get("QSTASH_NEXT_SIGNING_KEY")
DJANGO_HOOKFLOW_DOMAIN = os.environ.get("DJANGO_HOOKFLOW_DOMAIN")
DJANGO_HOOKFLOW_PERSISTENCE_ENABLED = (
os.environ.get("DJANGO_HOOKFLOW_PERSISTENCE_ENABLED", "").lower() == "true"
)
Workflow Context API
Inside a workflow function, ctx provides:
| Property | Type | Description |
|---|---|---|
ctx.data |
dict |
Initial payload passed to .trigger() |
ctx.run_id |
str |
Unique identifier for this workflow run |
ctx.workflow_id |
str |
The workflow's identifier |
ctx.step |
StepManager |
Step manager for durable operations |
Step Manager Methods
ctx.step.run(step_id, fn, *args, **kwargs)
Execute a function as a durable step. The function is called with the provided arguments, and its result is cached. On retry, the cached result is returned without re-executing.
result = ctx.step.run("my-step", my_function, arg1, arg2, kwarg1="value")
ctx.step.sleep(step_id, seconds)
Sleep without consuming server resources. The workflow yields to QStash, which schedules the next callback after the delay.
ctx.step.sleep("wait", seconds=300) # Wait 5 minutes
ctx.step.call(step_id, url, method="GET", body=None, headers=None)
Make a durable HTTP request. The response is cached, so retries don't re-execute the request.
response = ctx.step.call(
"api-call",
url="https://api.example.com/endpoint",
method="POST",
body={"key": "value"},
headers={"Authorization": "Bearer token"},
)
# response = {"status_code": 200, "data": {...}}
Workflow Patterns
Sequential Steps
@workflow
def sequential_workflow(ctx):
step1_result = ctx.step.run("step-1", do_step_1)
step2_result = ctx.step.run("step-2", do_step_2, step1_result)
step3_result = ctx.step.run("step-3", do_step_3, step2_result)
return step3_result
Conditional Logic
@workflow
def conditional_workflow(ctx):
data = ctx.step.run("fetch-data", fetch_data)
if data.get("needs_approval"):
ctx.step.run("request-approval", send_approval_request, data)
ctx.step.sleep("wait-for-approval", seconds=3600) # Wait 1 hour
approved = ctx.step.run("check-approval", check_approval_status, data["id"])
if not approved:
return {"status": "rejected"}
result = ctx.step.run("process", process_data, data)
return {"status": "completed", "result": result}
External API Integration
@workflow
def api_integration_workflow(ctx):
# Create resource in external system
created = ctx.step.call(
"create-resource",
url="https://api.external.com/resources",
method="POST",
body=ctx.data,
headers={"Authorization": f"Bearer {settings.API_KEY}"},
)
resource_id = created["data"]["id"]
# Poll for completion
ctx.step.sleep("wait-for-processing", seconds=30)
status = ctx.step.call(
"check-status",
url=f"https://api.external.com/resources/{resource_id}",
method="GET",
headers={"Authorization": f"Bearer {settings.API_KEY}"},
)
return {"resource_id": resource_id, "status": status["data"]["status"]}
Custom Workflow ID
@workflow(workflow_id="order-processor-v2")
def process_order(ctx):
pass # Your workflow logic
Custom Run ID
# Use a deterministic run ID for idempotency
run_id = process_order.trigger(
data={"order_id": "12345"},
run_id=f"order-12345-{timestamp}",
)
Database Persistence
When DJANGO_HOOKFLOW_PERSISTENCE_ENABLED=True, workflow state is persisted to the database. This enables:
- Recovery: Workflows can recover from QStash message failures
- Monitoring: View workflow status via Django Admin
- Debugging: Inspect step results and error messages
Models
WorkflowRun
Tracks workflow executions.
| Field | Description |
|---|---|
run_id |
Unique identifier for this run |
workflow_id |
The workflow definition ID |
status |
pending, running, completed, or failed |
data |
Initial payload |
result |
Final result (if completed) |
error_message |
Error message (if failed) |
created_at |
When the run started |
completed_at |
When the run finished |
StepExecution
Records individual step results.
| Field | Description |
|---|---|
workflow_run |
Foreign key to WorkflowRun |
step_id |
Step identifier |
result |
Step result (JSON) |
executed_at |
When the step executed |
DeadLetterEntry
Failed workflow entries for manual recovery.
| Field | Description |
|---|---|
workflow_id |
The workflow definition ID |
run_id |
The failed run ID |
payload |
Full workflow payload at failure |
error_message |
Error description |
is_replayed |
Whether this entry has been replayed |
Django Admin
Django Hookflow includes admin interfaces for all models. Access them at /admin/django_hookflow/.
Features:
- View workflow runs with status filtering
- Inspect step execution results
- Replay failed workflows from the DLQ
Building Your Own API
Django Hookflow doesn't include pre-built REST endpoints. Instead, use the provided models to build your own API that fits your application's needs.
Example: Django REST Framework Views
# myapp/api/views.py
from rest_framework import status
from rest_framework.decorators import api_view
from rest_framework.response import Response
from django_hookflow.models import WorkflowRun, StepExecution
@api_view(["GET"])
def workflow_run_detail(request, run_id):
"""Get details of a specific workflow run."""
try:
run = WorkflowRun.objects.prefetch_related("step_executions").get(run_id=run_id)
except WorkflowRun.DoesNotExist:
return Response(
{"error": "Workflow run not found"},
status=status.HTTP_404_NOT_FOUND,
)
steps = [
{
"step_id": step.step_id,
"result": step.result,
"executed_at": step.executed_at.isoformat(),
}
for step in run.step_executions.all().order_by("executed_at")
]
return Response(
{
"run_id": run.run_id,
"workflow_id": run.workflow_id,
"status": run.status,
"data": run.data,
"result": run.result,
"error_message": run.error_message or None,
"created_at": run.created_at.isoformat(),
"completed_at": run.completed_at.isoformat() if run.completed_at else None,
"steps": steps,
}
)
@api_view(["GET"])
def workflow_run_list(request):
"""List workflow runs with optional filtering."""
queryset = WorkflowRun.objects.all()
# Filter by workflow_id
workflow_id = request.query_params.get("workflow_id")
if workflow_id:
queryset = queryset.filter(workflow_id=workflow_id)
# Filter by status
status_filter = request.query_params.get("status")
if status_filter:
queryset = queryset.filter(status=status_filter)
# Pagination
limit = int(request.query_params.get("limit", 50))
offset = int(request.query_params.get("offset", 0))
total = queryset.count()
runs = queryset.order_by("-created_at")[offset : offset + limit]
return Response(
{
"total": total,
"runs": [
{
"run_id": run.run_id,
"workflow_id": run.workflow_id,
"status": run.status,
"created_at": run.created_at.isoformat(),
"completed_at": run.completed_at.isoformat()
if run.completed_at
else None,
}
for run in runs
],
}
)
Example: URLs
# myapp/api/urls.py
from django.urls import path
from . import views
urlpatterns = [
path("workflows/", views.workflow_run_list, name="workflow-list"),
path("workflows/<str:run_id>/", views.workflow_run_detail, name="workflow-detail"),
]
Example: Plain Django Views
# myapp/views.py
import json
from django.http import JsonResponse
from django.views.decorators.http import require_GET
from django_hookflow.models import WorkflowRun
@require_GET
def workflow_status(request, run_id):
"""Simple status endpoint."""
try:
run = WorkflowRun.objects.get(run_id=run_id)
except WorkflowRun.DoesNotExist:
return JsonResponse({"error": "Not found"}, status=404)
return JsonResponse(
{
"run_id": run.run_id,
"status": run.status,
"completed_at": run.completed_at.isoformat() if run.completed_at else None,
}
)
Maintenance
Cleanup Command
Clean up old workflow data to prevent unbounded database growth:
# Delete completed/failed workflows older than 30 days
python manage.py cleanup_workflows
# Custom age threshold
python manage.py cleanup_workflows --days-old 7
# Dry run (show what would be deleted)
python manage.py cleanup_workflows --dry-run
# Only clean up DLQ entries
python manage.py cleanup_workflows --dlq-only
# Only clean up workflow runs
python manage.py cleanup_workflows --workflows-only
Recommended: Scheduled Cleanup
Add to your cron or scheduled tasks:
# Daily cleanup of workflows older than 30 days
0 2 * * * cd /path/to/app && python manage.py cleanup_workflows --days-old 30
Error Handling
Workflow Errors
Errors in step functions are wrapped in WorkflowError:
from django_hookflow.exceptions import WorkflowError
@workflow
def my_workflow(ctx):
try:
result = ctx.step.run("risky-step", risky_function)
except WorkflowError as e:
# Log the error, the workflow will be marked as failed
raise
Retry Behavior
By default, QStash retries failed deliveries. Django Hookflow includes logic to determine if errors are retryable:
- Retryable: Network errors, timeouts, 5xx responses
- Non-retryable:
ValueError,TypeError,KeyError, "not found" errors
Failed workflows that exhaust retries are added to the Dead Letter Queue for manual review.
Dead Letter Queue
View and replay failed workflows via Django Admin or programmatically:
from django_hookflow.dlq import DeadLetterEntry
# Get unreplayed failures
failures = DeadLetterEntry.objects.filter(is_replayed=False)
for entry in failures:
print(f"Failed: {entry.workflow_id} - {entry.error_message}")
# Replay the workflow
new_run_id = entry.replay()
print(f"Replayed as: {new_run_id}")
Troubleshooting
"QSTASH_TOKEN is not configured"
Ensure you've set the QSTASH_TOKEN in your Django settings. Get your token from the Upstash Console.
"DJANGO_HOOKFLOW_DOMAIN is not configured"
Set DJANGO_HOOKFLOW_DOMAIN to your public URL. For local development, use a tunnel like ngrok:
ngrok http 8000
# Use the ngrok URL as DJANGO_HOOKFLOW_DOMAIN
Webhooks not being received
- Verify your domain is publicly accessible
- Check that the webhook path matches your URL configuration
- Verify QStash signing keys are correct
- Check Django logs for signature verification errors
Workflow stuck in "running" status
This can happen if:
- QStash message delivery failed
- Your server crashed during execution
- A step raised an unexpected exception
Solutions:
- Check the Dead Letter Queue for failures
- Manually trigger a new run with the same data
- If persistence is enabled, the workflow can self-recover on the next callback
Steps executing multiple times
Ensure:
- Each step has a unique
step_idwithin the workflow - Persistence is enabled (
DJANGO_HOOKFLOW_PERSISTENCE_ENABLED=True) - You're using the same
run_idfor retries
Security
Webhook Verification
All incoming webhooks are verified using QStash's JWT signatures. Ensure you've configured:
QSTASH_CURRENT_SIGNING_KEYQSTASH_NEXT_SIGNING_KEY
Network Security
- Use HTTPS for your
DJANGO_HOOKFLOW_DOMAIN - Consider IP allowlisting for QStash IPs
- Don't expose sensitive data in workflow payloads (use references/IDs instead)
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file django_hookflow-0.0.4.tar.gz.
File metadata
- Download URL: django_hookflow-0.0.4.tar.gz
- Upload date:
- Size: 40.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
89f71474e692c41e0b5020fd1745c6efa07ad8a1d8341cb6e1885a6c022b724c
|
|
| MD5 |
4dd4d9df102764ff99a243ea91ebf0f4
|
|
| BLAKE2b-256 |
c55a02b762db81e7e675bb4b509cbfe982d86cd729d3a3e84eed653ea5b7e6a6
|
Provenance
The following attestation bundles were made for django_hookflow-0.0.4.tar.gz:
Publisher:
main.yaml on jmitchel3/django-hookflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
django_hookflow-0.0.4.tar.gz -
Subject digest:
89f71474e692c41e0b5020fd1745c6efa07ad8a1d8341cb6e1885a6c022b724c - Sigstore transparency entry: 835603113
- Sigstore integration time:
-
Permalink:
jmitchel3/django-hookflow@1bbb66a831cc0a026c07623e9058d7d031afaf3f -
Branch / Tag:
refs/tags/v0.0.4 - Owner: https://github.com/jmitchel3
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
main.yaml@1bbb66a831cc0a026c07623e9058d7d031afaf3f -
Trigger Event:
push
-
Statement type:
File details
Details for the file django_hookflow-0.0.4-py3-none-any.whl.
File metadata
- Download URL: django_hookflow-0.0.4-py3-none-any.whl
- Upload date:
- Size: 33.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1f49367e1529374180427943d633e453b1f0893057bd6f8ed139d826dc80d3b7
|
|
| MD5 |
1dcf5a335a1b9c20542aec7e821d0761
|
|
| BLAKE2b-256 |
bb0c8de1c14923f50dfbeb2cfaf46599b7336153eb412ee7c0aa02ea1516168a
|
Provenance
The following attestation bundles were made for django_hookflow-0.0.4-py3-none-any.whl:
Publisher:
main.yaml on jmitchel3/django-hookflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
django_hookflow-0.0.4-py3-none-any.whl -
Subject digest:
1f49367e1529374180427943d633e453b1f0893057bd6f8ed139d826dc80d3b7 - Sigstore transparency entry: 835603114
- Sigstore integration time:
-
Permalink:
jmitchel3/django-hookflow@1bbb66a831cc0a026c07623e9058d7d031afaf3f -
Branch / Tag:
refs/tags/v0.0.4 - Owner: https://github.com/jmitchel3
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
main.yaml@1bbb66a831cc0a026c07623e9058d7d031afaf3f -
Trigger Event:
push
-
Statement type: