Durable workflow engine SDK for Python + Postgres
Project description
pyflows
Durable workflow engine SDK for Python + Postgres
pyflows lets you write long-running, fault-tolerant workflows as plain async Python functions — backed entirely by your existing Postgres database. No extra infrastructure, no separate orchestration service, no new runtime to operate.
[!WARNING] Early development (alpha). The core API is stabilizing but not yet 1.0. Expect breaking changes before the first stable release.
How it works
Each workflow step is persisted to Postgres before execution. If the process crashes mid-run, the worker replays from the last checkpoint — re-executing only the steps that haven't completed. All state, retries, and scheduling live in the database.
@workflow fn → WorkflowApp.start()
↓ enqueues to pgmq
Python async worker
↓ executes steps
PgStateBackend ←→ Postgres
Quick start
docker compose up -d # start Postgres with pgmq
uv add pyflows
import asyncio
from pydantic import BaseModel
from pyflows import PyflowsConfig, RetryConfig, StepContext, WorkflowApp, WorkflowContext
config = PyflowsConfig(
dsn="postgresql://pyflows:pyflows@127.0.0.1:5433/pyflows_test",
otel_enabled=False,
db_ssl=False,
)
app = WorkflowApp(config=config)
class OrderInput(BaseModel):
order_id: str
amount: float
class OrderResult(BaseModel):
charged: bool
confirmation: str
@app.step(retry=RetryConfig(max_retries=3, initial_delay_seconds=1.0))
async def charge_payment(ctx: StepContext, input: OrderInput) -> OrderResult:
# call your payment API here
return OrderResult(charged=True, confirmation=f"CHG-{input.order_id}")
@app.workflow()
async def process_order(ctx: WorkflowContext, input: OrderInput) -> OrderResult:
return await ctx.step(charge_payment, input)
async def main() -> None:
await app.initialize()
instance_id = await app.start(process_order, OrderInput(order_id="ORD-1", amount=99.0))
await app.process_once()
status = await app.get_status(instance_id)
print(status.state, status.output)
await app.close()
asyncio.run(main())
Features
- Checkpoint replay — workflows survive crashes; completed steps are never re-executed
- Typed end-to-end — step inputs and outputs are Pydantic models; no
dict[str, Any]at the boundary - Configurable retries — per-step
RetryConfigwith exponential or linear backoff and jitter - Plugin hooks —
before_workflow,after_workflow,on_workflow_error,before_step,after_step,on_step_error - Automatic migrations —
await app.initialize()applies schema migrations; no manual SQL required - Cron scheduler — trigger recurring workflows via
PgCronBackend(backed by pg_durabledf.wait_for_schedule) - Dead-letter queue — failed workflows are archived to
pgmq.a_{queue}instead of being re-queued indefinitely - Worker coordination — atomic
pending→runningclaim prevents duplicate processing when multiple workers race on the same instance - Swappable backends — orchestrator, queue, and scheduler implement ABCs; swap without touching workflow code
- OpenTelemetry — built-in span management for workflows and steps
Plugin system
from pyflows import LoggingPlugin, PyflowsPlugin, StepEvent, WorkflowEvent
# Built-in: log all lifecycle events
app.register_plugin(LoggingPlugin())
# Custom: implement any subset of hooks
class MetricsPlugin(PyflowsPlugin):
async def after_step(self, event: StepEvent, result: object) -> None:
metrics.record("step.completed", tags={"step": event.step_name})
async def on_workflow_error(self, event: WorkflowEvent, error: Exception) -> None:
metrics.record("workflow.failed", tags={"workflow": event.workflow_name})
app.register_plugin(MetricsPlugin())
Plugins are called in registration order. A plugin that raises never affects other plugins or the workflow itself.
Retry configuration
from pyflows import RetryConfig
# Per-step retry (backoff can be "exponential" or "linear")
@app.step(retry=RetryConfig(max_retries=5, initial_delay_seconds=2.0, max_delay_seconds=60.0, backoff="exponential"))
async def my_step(ctx, input: MyInput) -> MyOutput: ...
# Workflow-level defaults (applied to all steps unless overridden)
@app.workflow(step_defaults=RetryConfig(max_retries=2))
async def my_workflow(ctx, input: MyInput) -> MyOutput: ...
SQL export and runtime workflows
pyflows can export any registered workflow to a pg_durable SQL DSL. Use this to:
- Transfer workflow definitions from dev → prod without code deployment
- Create workflows at runtime from config, API payloads, or external systems
- Inspect the step sequence of any workflow before executing it
Export a Python workflow to SQL
from pyflows import SqlExporter
exporter = SqlExporter(registry=app.registry, base_url="http://my-app:8000")
# Full SQL ready to run against a Postgres database with pg_durable
sql = exporter.export_workflow("process_order")
# Dry-run: inspect steps without producing runnable SQL
result = exporter.dry_run("process_order")
print(result.steps) # [StepSql(step_name='charge_payment', ...)]
print(result.sql) # pg_durable DSL
Compose a workflow at runtime from step names
When you want to define a workflow without writing a Python function — from a config file, an API request, or a database record — use compose(). Each step name must already be registered with the app.
# No Python workflow function needed — compose step sequences dynamically
sql = exporter.compose(
workflow_name="on_call_response",
steps=["check_service_health", "diagnose_incident", "apply_remediation"],
)
# Execute sql against Postgres with pg_durable to start the workflow
The compose() call validates that every step name is registered, so typos raise a KeyError immediately rather than failing silently at runtime.
Export all workflows
# All registered workflows in one SQL file (dev → prod migration)
sql = exporter.export_all()
Scheduling
PgCronBackend schedules recurring workflows using pg_durable's df.wait_for_schedule() — no pg_cron extension required, only the df extension from pg_durable.
from pyflows import PgCronBackend
scheduler = PgCronBackend(dsn=config.dsn)
await scheduler.initialize()
# Schedule a workflow to run every hour (job_id is a pg_durable instance ID string)
job_id = await scheduler.schedule(
job_name="hourly_health_check",
cron="0 * * * *",
command="SELECT pyflows.enqueue_workflow('health_check', '{}')",
)
jobs = await scheduler.list_jobs()
await scheduler.unschedule(job_id)
Check whether pg_durable is installed at runtime:
await app.initialize()
if app.pg_durable_available:
# scheduler and push-mode SQL export are usable
...
Backend abstraction
Every infrastructure concern is behind an ABC in backends/base.py. Swap backends without touching workflow code:
| Component | Default | Interface |
|---|---|---|
| State + checkpoints | PgStateBackend |
OrchestratorBackend |
| Step queue | PgmqBackend |
QueueBackend |
| Cron scheduling | PgCronBackend |
SchedulerBackend |
# Bring your own queue backend
class RedisQueueBackend(QueueBackend):
...
app = WorkflowApp(config=config)
# Use custom backend by injecting into WorkflowWorker directly
Requirements
Python: 3.13+
Postgres extensions (15+):
| Extension | Purpose | Required |
|---|---|---|
pgmq |
Step queue | Yes |
pg_durable (df) |
Cron scheduling, push-mode SQL export | Optional |
The bundled docker-compose.yml starts a Postgres instance with pgmq pre-installed:
docker compose up -d
Installation
pip install pyflows
# or
uv add pyflows
Development
uv sync # install deps
uv run pytest tests/unit/ # unit tests (no DB needed)
docker compose up -d # start Postgres
uv run pytest tests/e2e/ # E2E tests
uv run ruff check src/ tests/ # lint
AI SRE example
See examples/ai_sre/workflow.py for a full incident response workflow: health check → AI diagnosis → auto-remediation, with retries, plugin hooks, and typed I/O.
Roadmap
- M1 — Project scaffold: backend ABCs, Pydantic types, exception hierarchy
- M2 — Core SDK:
WorkflowApp,@step,@workflow,WorkflowContext, replay engine - M3 — SqlExporter: Python workflow → pg_durable DSL (AST-based)
- M4 — E2E test suite: basic, retry, monitor/cancel (Docker-based)
- M6 — Plugin system:
PyflowsPluginABC,LoggingPlugin, lifecycle hooks - M7 — Migrations + scheduler: versioned schema migrations,
PgCronBackendvia pg_durable - M8 — AI SRE example, README, production hardening: DLQ, worker coordination, linear backoff, pg_durable detection
- M5 — FastAPI integration: push endpoint (deferred; pull mode works without it)
- M9 — PyPI release + full documentation
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pgflows-0.1.0.tar.gz.
File metadata
- Download URL: pgflows-0.1.0.tar.gz
- Upload date:
- Size: 19.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.0 {"installer":{"name":"uv","version":"0.10.0","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bde9611227392eba1d0a82c487c560658c8bf564acce7aa457a2a844b2d9f921
|
|
| MD5 |
cb7ae301c38ac4276ce671ec02f91a72
|
|
| BLAKE2b-256 |
a0653003bd8628c3a5f688497d256875d667c971e844aa4dcf3c539dc5ffde92
|
File details
Details for the file pgflows-0.1.0-py3-none-any.whl.
File metadata
- Download URL: pgflows-0.1.0-py3-none-any.whl
- Upload date:
- Size: 27.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.0 {"installer":{"name":"uv","version":"0.10.0","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dfb28d8cd09cdcf432ecce65cbc1badeb44a8dec230f6e72895ca9ee0b9e55a1
|
|
| MD5 |
164e16b45a225fdc4274ec4e14e932ab
|
|
| BLAKE2b-256 |
bb489ad25f5b8a81f368926449f75cf263cc71198bdc0af246c5303501665efd
|