Skip to main content

Durable workflows for Django.

Project description

Django Hookflow

Durable workflows for Django, powered by Upstash QStash.

Build multi-step workflows that survive server restarts, network failures, and deployment updates. Each step is executed exactly once, and the workflow automatically resumes from where it left off.

Installation

pip install django-hookflow

or with uv:

uv add django-hookflow

Requirements:

  • Python 3.10+
  • Django 4.2+

Quick Start

1. Configure Settings

Add to your Django settings:

# settings.py

INSTALLED_APPS = [
    # ...
    "django_hookflow",
]

# Required: QStash credentials (get these from https://console.upstash.com/qstash)
QSTASH_TOKEN = "your-qstash-token"
QSTASH_CURRENT_SIGNING_KEY = "your-current-signing-key"
QSTASH_NEXT_SIGNING_KEY = "your-next-signing-key"

# Required: Your public domain where QStash can reach your webhooks
DJANGO_HOOKFLOW_DOMAIN = "https://your-app.com"

# Optional: Custom webhook path (default: /hookflow/)
DJANGO_HOOKFLOW_WEBHOOK_PATH = "/hookflow/"

# Optional: Enable database persistence for durability (recommended for production)
DJANGO_HOOKFLOW_PERSISTENCE_ENABLED = True

2. Add URL Routes

# urls.py
from django.urls import include, path

urlpatterns = [
    path("hookflow/", include("django_hookflow.urls")),
]

3. Run Migrations (if using persistence)

python manage.py migrate django_hookflow

4. Define a Workflow

# myapp/workflows.py
from django_hookflow import workflow


@workflow
def process_order(ctx):
    """Process an order with multiple durable steps."""
    order_id = ctx.data.get("order_id")

    # Step 1: Validate the order (executed exactly once)
    validated = ctx.step.run("validate", validate_order, order_id)

    # Step 2: Wait for payment confirmation (durable sleep)
    ctx.step.sleep("wait-for-payment", seconds=60)

    # Step 3: Charge the payment (durable HTTP call)
    payment = ctx.step.call(
        "charge",
        url="https://api.stripe.com/v1/charges",
        method="POST",
        body={"amount": validated["total"], "order_id": order_id},
        headers={"Authorization": "Bearer sk_..."},
    )

    # Step 4: Fulfill the order
    result = ctx.step.run("fulfill", fulfill_order, order_id, payment)

    return {"status": "completed", "result": result}


def validate_order(order_id):
    """Validate order exists and has items."""
    # Your validation logic here
    return {"order_id": order_id, "total": 9999}


def fulfill_order(order_id, payment):
    """Ship the order."""
    # Your fulfillment logic here
    return {"shipped": True}

5. Trigger the Workflow

# Trigger returns immediately, workflow runs asynchronously
run_id = process_order.trigger(data={"order_id": "12345"})
print(f"Started workflow with run_id: {run_id}")

How It Works

  1. Trigger: When you call .trigger(), a message is published to QStash with your workflow payload
  2. Webhook: QStash calls your webhook endpoint at /hookflow/workflow/{workflow_id}/
  3. Execute: The webhook executes your workflow function with a WorkflowContext
  4. Checkpoint: Each ctx.step.* call checks if the step already completed (returns cached result) or executes and raises StepCompleted
  5. Schedule Next: StepCompleted halts execution and schedules the next QStash callback with updated state
  6. Resume: The workflow re-executes from the start on each callback, skipping completed steps via cached results
  7. Complete: When all steps finish without raising StepCompleted, the workflow returns its final result

Configuration Reference

Required Settings

Setting Description Example
QSTASH_TOKEN Your QStash API token "eyJ..."
QSTASH_CURRENT_SIGNING_KEY Current webhook signing key "sig_..."
QSTASH_NEXT_SIGNING_KEY Next webhook signing key (for key rotation) "sig_..."
DJANGO_HOOKFLOW_DOMAIN Public URL where QStash can reach your app "https://myapp.com"

Optional Settings

Setting Default Description
DJANGO_HOOKFLOW_WEBHOOK_PATH "/hookflow/" Base path for webhook endpoints
DJANGO_HOOKFLOW_PERSISTENCE_ENABLED False Enable database persistence

Environment Variables Example

# .env
QSTASH_TOKEN=eyJVc2VySUQiOiIxMjM0NTY3ODkwIiwiQXBpS2V5IjoiYWJjZGVmIn0=
QSTASH_CURRENT_SIGNING_KEY=sig_abc123...
QSTASH_NEXT_SIGNING_KEY=sig_def456...
DJANGO_HOOKFLOW_DOMAIN=https://myapp.example.com
DJANGO_HOOKFLOW_PERSISTENCE_ENABLED=true
# settings.py
import os

QSTASH_TOKEN = os.environ.get("QSTASH_TOKEN")
QSTASH_CURRENT_SIGNING_KEY = os.environ.get("QSTASH_CURRENT_SIGNING_KEY")
QSTASH_NEXT_SIGNING_KEY = os.environ.get("QSTASH_NEXT_SIGNING_KEY")
DJANGO_HOOKFLOW_DOMAIN = os.environ.get("DJANGO_HOOKFLOW_DOMAIN")
DJANGO_HOOKFLOW_PERSISTENCE_ENABLED = (
    os.environ.get("DJANGO_HOOKFLOW_PERSISTENCE_ENABLED", "").lower() == "true"
)

Workflow Context API

Inside a workflow function, ctx provides:

Property Type Description
ctx.data dict Initial payload passed to .trigger()
ctx.run_id str Unique identifier for this workflow run
ctx.workflow_id str The workflow's identifier
ctx.step StepManager Step manager for durable operations

Step Manager Methods

ctx.step.run(step_id, fn, *args, **kwargs)

Execute a function as a durable step. The function is called with the provided arguments, and its result is cached. On retry, the cached result is returned without re-executing.

result = ctx.step.run("my-step", my_function, arg1, arg2, kwarg1="value")

ctx.step.sleep(step_id, seconds)

Sleep without consuming server resources. The workflow yields to QStash, which schedules the next callback after the delay.

ctx.step.sleep("wait", seconds=300)  # Wait 5 minutes

ctx.step.call(step_id, url, method="GET", body=None, headers=None)

Make a durable HTTP request. The response is cached, so retries don't re-execute the request.

response = ctx.step.call(
    "api-call",
    url="https://api.example.com/endpoint",
    method="POST",
    body={"key": "value"},
    headers={"Authorization": "Bearer token"},
)
# response = {"status_code": 200, "data": {...}}

Workflow Patterns

Sequential Steps

@workflow
def sequential_workflow(ctx):
    step1_result = ctx.step.run("step-1", do_step_1)
    step2_result = ctx.step.run("step-2", do_step_2, step1_result)
    step3_result = ctx.step.run("step-3", do_step_3, step2_result)
    return step3_result

Conditional Logic

@workflow
def conditional_workflow(ctx):
    data = ctx.step.run("fetch-data", fetch_data)

    if data.get("needs_approval"):
        ctx.step.run("request-approval", send_approval_request, data)
        ctx.step.sleep("wait-for-approval", seconds=3600)  # Wait 1 hour
        approved = ctx.step.run("check-approval", check_approval_status, data["id"])
        if not approved:
            return {"status": "rejected"}

    result = ctx.step.run("process", process_data, data)
    return {"status": "completed", "result": result}

External API Integration

@workflow
def api_integration_workflow(ctx):
    # Create resource in external system
    created = ctx.step.call(
        "create-resource",
        url="https://api.external.com/resources",
        method="POST",
        body=ctx.data,
        headers={"Authorization": f"Bearer {settings.API_KEY}"},
    )

    resource_id = created["data"]["id"]

    # Poll for completion
    ctx.step.sleep("wait-for-processing", seconds=30)

    status = ctx.step.call(
        "check-status",
        url=f"https://api.external.com/resources/{resource_id}",
        method="GET",
        headers={"Authorization": f"Bearer {settings.API_KEY}"},
    )

    return {"resource_id": resource_id, "status": status["data"]["status"]}

Custom Workflow ID

@workflow(workflow_id="order-processor-v2")
def process_order(ctx):
    pass  # Your workflow logic

Custom Run ID

# Use a deterministic run ID for idempotency
run_id = process_order.trigger(
    data={"order_id": "12345"},
    run_id=f"order-12345-{timestamp}",
)

Database Persistence

When DJANGO_HOOKFLOW_PERSISTENCE_ENABLED=True, workflow state is persisted to the database. This enables:

  • Recovery: Workflows can recover from QStash message failures
  • Monitoring: View workflow status via Django Admin
  • Debugging: Inspect step results and error messages

Models

WorkflowRun

Tracks workflow executions.

Field Description
run_id Unique identifier for this run
workflow_id The workflow definition ID
status pending, running, completed, or failed
data Initial payload
result Final result (if completed)
error_message Error message (if failed)
created_at When the run started
completed_at When the run finished

StepExecution

Records individual step results.

Field Description
workflow_run Foreign key to WorkflowRun
step_id Step identifier
result Step result (JSON)
executed_at When the step executed

DeadLetterEntry

Failed workflow entries for manual recovery.

Field Description
workflow_id The workflow definition ID
run_id The failed run ID
payload Full workflow payload at failure
error_message Error description
is_replayed Whether this entry has been replayed

Django Admin

Django Hookflow includes admin interfaces for all models. Access them at /admin/django_hookflow/.

Features:

  • View workflow runs with status filtering
  • Inspect step execution results
  • Replay failed workflows from the DLQ

Building Your Own API

Django Hookflow doesn't include pre-built REST endpoints. Instead, use the provided models to build your own API that fits your application's needs.

Example: Django REST Framework Views

# myapp/api/views.py
from rest_framework import status
from rest_framework.decorators import api_view
from rest_framework.response import Response

from django_hookflow.models import WorkflowRun, StepExecution


@api_view(["GET"])
def workflow_run_detail(request, run_id):
    """Get details of a specific workflow run."""
    try:
        run = WorkflowRun.objects.prefetch_related("step_executions").get(run_id=run_id)
    except WorkflowRun.DoesNotExist:
        return Response(
            {"error": "Workflow run not found"},
            status=status.HTTP_404_NOT_FOUND,
        )

    steps = [
        {
            "step_id": step.step_id,
            "result": step.result,
            "executed_at": step.executed_at.isoformat(),
        }
        for step in run.step_executions.all().order_by("executed_at")
    ]

    return Response(
        {
            "run_id": run.run_id,
            "workflow_id": run.workflow_id,
            "status": run.status,
            "data": run.data,
            "result": run.result,
            "error_message": run.error_message or None,
            "created_at": run.created_at.isoformat(),
            "completed_at": run.completed_at.isoformat() if run.completed_at else None,
            "steps": steps,
        }
    )


@api_view(["GET"])
def workflow_run_list(request):
    """List workflow runs with optional filtering."""
    queryset = WorkflowRun.objects.all()

    # Filter by workflow_id
    workflow_id = request.query_params.get("workflow_id")
    if workflow_id:
        queryset = queryset.filter(workflow_id=workflow_id)

    # Filter by status
    status_filter = request.query_params.get("status")
    if status_filter:
        queryset = queryset.filter(status=status_filter)

    # Pagination
    limit = int(request.query_params.get("limit", 50))
    offset = int(request.query_params.get("offset", 0))

    total = queryset.count()
    runs = queryset.order_by("-created_at")[offset : offset + limit]

    return Response(
        {
            "total": total,
            "runs": [
                {
                    "run_id": run.run_id,
                    "workflow_id": run.workflow_id,
                    "status": run.status,
                    "created_at": run.created_at.isoformat(),
                    "completed_at": run.completed_at.isoformat()
                    if run.completed_at
                    else None,
                }
                for run in runs
            ],
        }
    )

Example: URLs

# myapp/api/urls.py
from django.urls import path
from . import views

urlpatterns = [
    path("workflows/", views.workflow_run_list, name="workflow-list"),
    path("workflows/<str:run_id>/", views.workflow_run_detail, name="workflow-detail"),
]

Example: Plain Django Views

# myapp/views.py
import json
from django.http import JsonResponse
from django.views.decorators.http import require_GET

from django_hookflow.models import WorkflowRun


@require_GET
def workflow_status(request, run_id):
    """Simple status endpoint."""
    try:
        run = WorkflowRun.objects.get(run_id=run_id)
    except WorkflowRun.DoesNotExist:
        return JsonResponse({"error": "Not found"}, status=404)

    return JsonResponse(
        {
            "run_id": run.run_id,
            "status": run.status,
            "completed_at": run.completed_at.isoformat() if run.completed_at else None,
        }
    )

Maintenance

Cleanup Command

Clean up old workflow data to prevent unbounded database growth:

# Delete completed/failed workflows older than 30 days
python manage.py cleanup_workflows

# Custom age threshold
python manage.py cleanup_workflows --days-old 7

# Dry run (show what would be deleted)
python manage.py cleanup_workflows --dry-run

# Only clean up DLQ entries
python manage.py cleanup_workflows --dlq-only

# Only clean up workflow runs
python manage.py cleanup_workflows --workflows-only

Recommended: Scheduled Cleanup

Add to your cron or scheduled tasks:

# Daily cleanup of workflows older than 30 days
0 2 * * * cd /path/to/app && python manage.py cleanup_workflows --days-old 30

Error Handling

Workflow Errors

Errors in step functions are wrapped in WorkflowError:

from django_hookflow.exceptions import WorkflowError


@workflow
def my_workflow(ctx):
    try:
        result = ctx.step.run("risky-step", risky_function)
    except WorkflowError as e:
        # Log the error, the workflow will be marked as failed
        raise

Retry Behavior

By default, QStash retries failed deliveries. Django Hookflow includes logic to determine if errors are retryable:

  • Retryable: Network errors, timeouts, 5xx responses
  • Non-retryable: ValueError, TypeError, KeyError, "not found" errors

Failed workflows that exhaust retries are added to the Dead Letter Queue for manual review.

Dead Letter Queue

View and replay failed workflows via Django Admin or programmatically:

from django_hookflow.dlq import DeadLetterEntry

# Get unreplayed failures
failures = DeadLetterEntry.objects.filter(is_replayed=False)

for entry in failures:
    print(f"Failed: {entry.workflow_id} - {entry.error_message}")

    # Replay the workflow
    new_run_id = entry.replay()
    print(f"Replayed as: {new_run_id}")

Troubleshooting

"QSTASH_TOKEN is not configured"

Ensure you've set the QSTASH_TOKEN in your Django settings. Get your token from the Upstash Console.

"DJANGO_HOOKFLOW_DOMAIN is not configured"

Set DJANGO_HOOKFLOW_DOMAIN to your public URL. For local development, use a tunnel like ngrok:

ngrok http 8000
# Use the ngrok URL as DJANGO_HOOKFLOW_DOMAIN

Webhooks not being received

  1. Verify your domain is publicly accessible
  2. Check that the webhook path matches your URL configuration
  3. Verify QStash signing keys are correct
  4. Check Django logs for signature verification errors

Workflow stuck in "running" status

This can happen if:

  1. QStash message delivery failed
  2. Your server crashed during execution
  3. A step raised an unexpected exception

Solutions:

  • Check the Dead Letter Queue for failures
  • Manually trigger a new run with the same data
  • If persistence is enabled, the workflow can self-recover on the next callback

Steps executing multiple times

Ensure:

  1. Each step has a unique step_id within the workflow
  2. Persistence is enabled (DJANGO_HOOKFLOW_PERSISTENCE_ENABLED=True)
  3. You're using the same run_id for retries

Security

Webhook Verification

All incoming webhooks are verified using QStash's JWT signatures. Ensure you've configured:

  • QSTASH_CURRENT_SIGNING_KEY
  • QSTASH_NEXT_SIGNING_KEY

Network Security

  • Use HTTPS for your DJANGO_HOOKFLOW_DOMAIN
  • Consider IP allowlisting for QStash IPs
  • Don't expose sensitive data in workflow payloads (use references/IDs instead)

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

django_hookflow-0.0.4.tar.gz (40.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

django_hookflow-0.0.4-py3-none-any.whl (33.9 kB view details)

Uploaded Python 3

File details

Details for the file django_hookflow-0.0.4.tar.gz.

File metadata

  • Download URL: django_hookflow-0.0.4.tar.gz
  • Upload date:
  • Size: 40.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for django_hookflow-0.0.4.tar.gz
Algorithm Hash digest
SHA256 89f71474e692c41e0b5020fd1745c6efa07ad8a1d8341cb6e1885a6c022b724c
MD5 4dd4d9df102764ff99a243ea91ebf0f4
BLAKE2b-256 c55a02b762db81e7e675bb4b509cbfe982d86cd729d3a3e84eed653ea5b7e6a6

See more details on using hashes here.

Provenance

The following attestation bundles were made for django_hookflow-0.0.4.tar.gz:

Publisher: main.yaml on jmitchel3/django-hookflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file django_hookflow-0.0.4-py3-none-any.whl.

File metadata

File hashes

Hashes for django_hookflow-0.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 1f49367e1529374180427943d633e453b1f0893057bd6f8ed139d826dc80d3b7
MD5 1dcf5a335a1b9c20542aec7e821d0761
BLAKE2b-256 bb0c8de1c14923f50dfbeb2cfaf46599b7336153eb412ee7c0aa02ea1516168a

See more details on using hashes here.

Provenance

The following attestation bundles were made for django_hookflow-0.0.4-py3-none-any.whl:

Publisher: main.yaml on jmitchel3/django-hookflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page