Skip to main content

Durable workflow engine SDK for Python + Postgres

Project description

pgflows

PyPI Python License

Durable workflow engine SDK for Python + Postgres

pgflows lets you write long-running, fault-tolerant workflows as plain async Python functions — backed entirely by your existing Postgres database. No extra infrastructure, no separate orchestration service, no new runtime to operate.

[!WARNING] Early development (alpha). The core API is stabilizing but not yet 1.0. Expect breaking changes before the first stable release.

How it works

Each workflow step is persisted to Postgres before execution. If the process crashes mid-run, the worker replays from the last checkpoint — re-executing only the steps that haven't completed. All state, retries, and scheduling live in the database.

@workflow fn  →  WorkflowApp.start()
                     ↓  enqueues to pgmq
              Python async worker
                     ↓  executes steps
              PgStateBackend  ←→  Postgres

Quick start

docker compose up -d   # start Postgres with pgmq
uv add pgflows
import asyncio
from pydantic import BaseModel
from pgflows import PgflowsConfig, RetryConfig, StepContext, WorkflowApp, WorkflowContext

config = PgflowsConfig(
    dsn="postgresql://pgflows:pgflows@127.0.0.1:5433/pgflows_test",
    otel_enabled=False,
    db_ssl=False,
)
app = WorkflowApp(config=config)


class OrderInput(BaseModel):
    order_id: str
    amount: float


class OrderResult(BaseModel):
    charged: bool
    confirmation: str


@app.step(retry=RetryConfig(max_retries=3, initial_delay_seconds=1.0))
async def charge_payment(ctx: StepContext, input: OrderInput) -> OrderResult:
    # call your payment API here
    return OrderResult(charged=True, confirmation=f"CHG-{input.order_id}")


@app.workflow()
async def process_order(ctx: WorkflowContext, input: OrderInput) -> OrderResult:
    return await ctx.step(charge_payment, input)


async def main() -> None:
    await app.initialize()
    instance_id = await app.start(process_order, OrderInput(order_id="ORD-1", amount=99.0))
    await app.process_once()
    status = await app.get_status(instance_id)
    print(status.state, status.output)
    await app.close()

asyncio.run(main())

Features

  • Checkpoint replay — workflows survive crashes; completed steps are never re-executed
  • Typed end-to-end — step inputs and outputs are Pydantic models; no dict[str, Any] at the boundary
  • Configurable retries — per-step RetryConfig with exponential or linear backoff and jitter
  • Plugin hooksbefore_workflow, after_workflow, on_workflow_error, before_step, after_step, on_step_error
  • Automatic migrationsawait app.initialize() applies schema migrations; no manual SQL required
  • Two execution modes — a Python pull worker, or push mode where pg_durable orchestrates and calls Python steps via df.http() or a pgmq+NOTIFY StepWorker
  • Cron scheduler — trigger recurring workflows via PgCronBackend (backed by pg_durable df.wait_for_schedule)
  • Dead-letter queue — failed workflows are archived to pgmq.a_{queue} instead of being re-queued indefinitely
  • Worker coordination — atomic pending→running claim prevents duplicate processing when multiple workers race on the same instance
  • Swappable backends — orchestrator, queue, and scheduler implement ABCs; swap without touching workflow code
  • Execution history — typed access to pg_durable's per-run trail: instance_info, instance_nodes, instance_executions, metrics
  • OpenTelemetry — built-in span management for workflows and steps

Plugin system

from pgflows import LoggingPlugin, PgflowsPlugin, StepEvent, WorkflowEvent

# Built-in: log all lifecycle events
app.register_plugin(LoggingPlugin())

# Custom: implement any subset of hooks
class MetricsPlugin(PgflowsPlugin):
    async def after_step(self, event: StepEvent, result: object) -> None:
        metrics.record("step.completed", tags={"step": event.step_name})

    async def on_workflow_error(self, event: WorkflowEvent, error: Exception) -> None:
        metrics.record("workflow.failed", tags={"workflow": event.workflow_name})

app.register_plugin(MetricsPlugin())

Plugins are called in registration order. A plugin that raises never affects other plugins or the workflow itself.

Retry configuration

from pgflows import RetryConfig

# Per-step retry (backoff can be "exponential" or "linear")
@app.step(retry=RetryConfig(max_retries=5, initial_delay_seconds=2.0, max_delay_seconds=60.0, backoff="exponential"))
async def my_step(ctx, input: MyInput) -> MyOutput: ...

# Workflow-level defaults (applied to all steps unless overridden)
@app.workflow(step_defaults=RetryConfig(max_retries=2))
async def my_workflow(ctx, input: MyInput) -> MyOutput: ...

SQL export and runtime workflows

pgflows can export any registered workflow to a pg_durable SQL DSL. Use this to:

  • Transfer workflow definitions from dev → prod without code deployment
  • Create workflows at runtime from config, API payloads, or external systems
  • Inspect the step sequence of any workflow before executing it

Export a Python workflow to SQL

from pgflows import SqlExporter

exporter = SqlExporter(registry=app.registry, base_url="http://my-app:8000")

# Full SQL ready to run against a Postgres database with pg_durable
sql = exporter.export_workflow("process_order")

# Dry-run: inspect steps without producing runnable SQL
result = exporter.dry_run("process_order")
print(result.steps)   # [StepSql(step_name='charge_payment', ...)]
print(result.sql)     # pg_durable DSL

Compose a workflow at runtime from step names

When you want to define a workflow without writing a Python function — from a config file, an API request, or a database record — use compose(). Each step name must already be registered with the app.

# No Python workflow function needed — compose step sequences dynamically
sql = exporter.compose(
    workflow_name="on_call_response",
    steps=["check_service_health", "diagnose_incident", "apply_remediation"],
)

# Execute sql against Postgres with pg_durable to start the workflow

The compose() call validates that every step name is registered, so typos raise a KeyError immediately rather than failing silently at runtime.

Export all workflows

# All registered workflows in one SQL file (dev → prod migration)
sql = exporter.export_all()

Push-mode step bindings (pg_durable orchestrates, Python runs the steps)

In push mode pg_durable is the orchestrator — it durably drives the graph (~> sequence, & parallel join, | race, ?>/!> branch, loops) and calls out to your Python steps. There are two selectable bindings for that call-out:

SqlExporter(mode=...) How pg_durable invokes a step
"http" (default) df.http() → your FastAPI step endpoint (X-DF-Instance-ID header + {input} body)
"pgmq" pgmq.send + pg_notify → a StepWorker runs the step and writes the result to a poll table pg_durable reads
# Pull both bindings off the app (registry + queue config wired for you)
http_sql = app.exporter(base_url="https://api.example.com/pgflows", mode="http").export_workflow("process_order")
worker_sql = app.exporter(mode="worker").export_workflow("process_order")

# The pgmq binding needs a step worker draining the queue + signalling results back:
await app.run_step_worker()          # blocking; use asyncio.create_task for background

Compose graphs directly with the Python DSL builders. app.worker_step() is a native pgmq.send → pg_notify → poll-result → read unit that composes with the operators; prefer it over the bare worker_step() builder because it binds the app's configured step_queue/notify_channel (the bare builder hardcodes pgflows_steps, so a renamed queue silently hangs). Here it runs double_it, then add_ten consuming its output via a result capture:

node = (
    app.worker_step("double_it", capture="d")
    >> app.worker_step("add_ten", input_expr="$d::jsonb", capture="r")
)
instance_id = await app.pg_durable.start(node, label="pipeline")

Gotchas worth knowing (all about using pg_durable correctly):

  • Thread data with captures (|=> / capture=), not many df.setvars. With more than one durable var set, pg_durable serializes the vars snapshot with non-deterministic key order and a parallel-join replay then fails. Keep a single config var and pass step data through captures.
  • The pgmq binding polls a result table instead of df.wait_for_signal, because a NOTIFY-woken worker can signal before pg_durable registers the waiter (that signal would be dropped). The poll table is race-free.
  • A captured wait_for_signal is the full {signal_name, timed_out, data} envelope — read your payload under ->'data' (e.g. $decision::jsonb->'data'->>'approved').
  • pg_durable composition limits (bundled build): join worker_step branches, not trivial bare-SQL ones (SELECT 1 & SELECT 2 can hang); never put a loop and a parallel node in the same instance (deadlocks); | (race) is reliable only as a terminal node and does not cancel the loser. Cancel stale running instances if the executor wedges.

Running push mode for real (pg_durable + pgmq)

The bundled compose DB ships only pgmq. To exercise push mode end to end you need a Postgres with both extensions — build the combined image and run the live e2e (real df.start / df.http / pgmq.send, no mocks):

docker build -t pgflows-e2e-dfpgmq:latest tests/e2e/docker
docker compose -f tests/e2e/docker/docker-compose.yml up -d --wait
uv run pytest tests/e2e/test_live_dfpgmq.py -v

docker-compose.full.yml brings up the full two-container stack — that Postgres image plus the example app server (Dockerfile.app + examples/server.py).

Observability — execution history

pg_durable records the full per-run history in the database; PgDurableClient exposes it as typed models (no raw df.* SQL needed):

client = app.pg_durable

info  = await client.instance_info(iid)        # InstanceInfo: label, function, status, output
nodes = await client.instance_nodes(iid)        # list[InstanceNode]: per-node trail (type, result, status)
execs = await client.instance_executions(iid)   # list[ExecutionRecord]: status, event_count, duration_ms
m     = await client.metrics()                   # Metrics: cluster-wide counters

# Need your own tables around a durable run? Use the pooled connection accessor:
async with app.acquire() as conn:
    rows = await conn.fetch("SELECT * FROM my_audit WHERE run = $1", run_id)

instance_nodes expands the graph into structural THEN/JOIN/IF rows, so it returns more rows than the nodes you wrote — handy for seeing exactly where a run is.

Scheduling

PgCronBackend schedules recurring workflows using pg_durable's df.wait_for_schedule() — no pg_cron extension required, only the df extension from pg_durable.

from pgflows import PgCronBackend

scheduler = PgCronBackend(dsn=config.dsn)
await scheduler.initialize()

# Schedule a workflow to run every hour (job_id is a pg_durable instance ID string)
job_id = await scheduler.schedule(
    job_name="hourly_health_check",
    cron="0 * * * *",
    command="SELECT pgflows.enqueue_workflow('health_check', '{}')",
)

jobs = await scheduler.list_jobs()
await scheduler.unschedule(job_id)

Check whether pg_durable is installed at runtime:

await app.initialize()
if app.pg_durable_available:
    # scheduler and push-mode SQL export are usable
    ...

Backend abstraction

Every infrastructure concern is behind an ABC in backends/base.py. Swap backends without touching workflow code:

Component Default Interface
State + checkpoints PgStateBackend OrchestratorBackend
Step queue PgmqBackend QueueBackend
Cron scheduling PgCronBackend SchedulerBackend
# Bring your own queue backend
class RedisQueueBackend(QueueBackend):
    ...

app = WorkflowApp(config=config)
# Use custom backend by injecting into WorkflowWorker directly

Requirements

Python: 3.13+

Postgres extensions (15+):

Extension Purpose Required
pgmq Step queue Yes
pg_durable (df) Cron scheduling, push-mode SQL export Optional

The bundled docker-compose.yml starts a Postgres instance with pgmq pre-installed:

docker compose up -d

Installation

pip install pgflows
# or
uv add pgflows

Docker

# GitHub Container Registry
docker pull ghcr.io/niradler/pgflows:latest

# Docker Hub
docker pull niradler/pgflows:latest

Extend the base image with your workflow code:

FROM ghcr.io/niradler/pgflows:latest
WORKDIR /app
COPY . .
CMD ["python", "worker.py"]

Development

uv sync                          # install deps
uv run pytest tests/unit/        # unit tests (no DB needed)
docker compose up -d             # start Postgres
uv run pytest tests/e2e/         # E2E tests
uv run ruff check src/ tests/    # lint

AI SRE example

See examples/ai_sre/workflow.py for a full incident response workflow: health check → AI diagnosis → auto-remediation, with retries, plugin hooks, and typed I/O.

Roadmap

  • M1 — Project scaffold: backend ABCs, Pydantic types, exception hierarchy
  • M2 — Core SDK: WorkflowApp, @step, @workflow, WorkflowContext, replay engine
  • M3 — SqlExporter: Python workflow → pg_durable DSL (AST-based)
  • M4 — E2E test suite: basic, retry, monitor/cancel (Docker-based)
  • M6 — Plugin system: PgflowsPlugin ABC, LoggingPlugin, lifecycle hooks
  • M7 — Migrations + scheduler: versioned schema migrations, PgCronBackend via pg_durable
  • M8 — AI SRE example, README, production hardening: DLQ, worker coordination, linear backoff, pg_durable detection
  • M9 — PyPI release (pgflows on PyPI, Docker images on GHCR + Docker Hub)
  • M5 — FastAPI integration: push endpoint (deferred; pull mode works without it)

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pgflows-0.1.1.tar.gz (32.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pgflows-0.1.1-py3-none-any.whl (43.1 kB view details)

Uploaded Python 3

File details

Details for the file pgflows-0.1.1.tar.gz.

File metadata

  • Download URL: pgflows-0.1.1.tar.gz
  • Upload date:
  • Size: 32.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.0 {"installer":{"name":"uv","version":"0.10.0","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for pgflows-0.1.1.tar.gz
Algorithm Hash digest
SHA256 216719e4da653fc2d9e0242e24e30ba321881d3586e723019fa2facba54558ac
MD5 bf7398edd15ce7acc132936ea6d21e9b
BLAKE2b-256 61c922fc6d40efb1c8c286d7c64c7d0a8b0b0693c45c99185926e8dd9021e1ed

See more details on using hashes here.

File details

Details for the file pgflows-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: pgflows-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 43.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.0 {"installer":{"name":"uv","version":"0.10.0","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for pgflows-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 db220bb4f56eae294c71ec18eeed5b1d90fc63b3c95ec1036211fa41d5ae0106
MD5 df40054c820fa0536bb014a3b7c16d2b
BLAKE2b-256 6ad96d01eb2666c34270b4183c727affac216d771f00ad02c41280e851f1ef66

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page