Durable workflow engine backed by Redis — Python client
Project description
redflow
Durable workflow engine backed by Redis.
Install
pip install redflow
# or with hiredis for better performance:
pip install redflow[fast]
Define a workflow
from redflow import define_workflow, WorkflowHandlerContext
async def handler(ctx: WorkflowHandlerContext) -> dict:
user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"])
await ctx.step.run("send-email", send_welcome, user["email"])
return {"sent": True}
send_welcome_email = define_workflow("send-welcome-email", handler=handler)
Or with the decorator:
from redflow import workflow, WorkflowHandlerContext
@workflow("send-welcome-email")
async def send_welcome_email(ctx: WorkflowHandlerContext) -> dict:
user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"])
await ctx.step.run("send-email", send_welcome, user["email"])
return {"sent": True}
Handler context gives you:
input— workflow input datarun— run metadata (id,workflow,queue,attempt,max_attempts)signal—asyncio.Event, set when cancellation is requestedstep— durable step API
Step API (inside workflow handlers)
step.run
Durable, cached units of work. On crash recovery, completed steps return their cached result instead of re-executing.
payment = await ctx.step.run("capture-payment", capture_payment)
With timeout:
payment = await ctx.step.run("capture-payment", capture_payment, timeout_ms=4000)
step.run_workflow
Trigger a child workflow and wait for its result.
receipt = await ctx.step.run_workflow(
"send-receipt",
"receipt-workflow",
{"order_id": order_id, "email": email},
timeout_ms=20_000,
idempotency_key=f"receipt:{order_id}",
)
step.emit_workflow
Trigger a child workflow without waiting — returns the child run ID.
child_run_id = await ctx.step.emit_workflow(
"emit-analytics",
"analytics-workflow",
{"order_id": order_id},
idempotency_key=f"analytics:{order_id}",
)
Run workflows
from redflow import create_client
client = create_client(url="redis://localhost:6379")
handle = await client.emit_workflow(
"send-welcome-email",
{"user_id": "user_123"},
idempotency_key="welcome:user_123",
)
output = await handle.result(timeout_ms=15_000)
Delayed run:
from datetime import datetime, timedelta
handle = await client.emit_workflow(
"send-welcome-email",
{"user_id": "user_789"},
run_at=datetime.now() + timedelta(minutes=1),
idempotency_key="welcome:user_789:delayed",
)
Start a worker
import asyncio
from redflow import start_worker, StartWorkerOptions
# import your workflow modules so they register
import workflows
async def main():
worker = await start_worker(StartWorkerOptions(
app="billing-worker",
url="redis://localhost:6379",
concurrency=4,
))
# graceful shutdown
try:
await asyncio.Event().wait()
finally:
await worker.stop()
asyncio.run(main())
Explicit queues and runtime tuning:
worker = await start_worker(StartWorkerOptions(
app="billing-worker",
url="redis://localhost:6379",
queues=["critical", "io", "analytics"],
concurrency=8,
lease_ms=5000,
blmove_timeout_sec=1,
reaper_interval_ms=500,
))
Workflow options
max_concurrency
Limits concurrent running runs per workflow. Default is 1.
define_workflow(
"heavy-sync",
handler=handler,
queue="ops",
max_concurrency=1,
)
Cron
from redflow import CronTrigger
define_workflow(
"digest-cron",
handler=handler,
queue="ops",
cron=[
CronTrigger(id="digest-hourly", expression="0 * * * *"),
CronTrigger(expression="*/5 * * * *", timezone="UTC", input={"source": "cron"}),
],
)
Cron respects max_concurrency: if the limit is reached, that tick is skipped.
on_failure
from redflow import NonRetriableError, OnFailureContext
async def on_fail(ctx: OnFailureContext) -> None:
print(f"workflow failed: {ctx.run.id} {ctx.run.workflow} {ctx.error}")
define_workflow(
"invoice-sync",
handler=handler,
queue="billing",
max_attempts=4,
on_failure=on_fail,
)
Client API
Inspect and control runs
run = await client.get_run("run_123")
steps = await client.get_run_steps("run_123")
recent = await client.list_runs(ListRunsParams(limit=50))
failed = await client.list_runs(ListRunsParams(
workflow="checkout",
status="failed",
limit=20,
))
workflows = await client.list_workflows()
meta = await client.get_workflow_meta("checkout")
stats = await client.get_stats()
canceled = await client.cancel_run("run_123", reason="requested by user")
RunHandle
handle = await client.emit_workflow("checkout", {"order_id": "ord_3"})
state = await handle.get_state()
print(state["status"])
output = await handle.result(timeout_ms=30_000)
Testing
run_inline executes a workflow handler in-process without Redis — useful
for unit tests.
from redflow import run_inline
result = await run_inline(my_workflow_def, input={"user_id": "test"})
assert result.succeeded
assert result.output == {"sent": True}
Override external steps:
result = await run_inline(
my_workflow_def,
input={"user_id": "test"},
step_overrides={"fetch-user": {"email": "mock@test.com"}},
)
Errors
from redflow import (
RedflowError, # base class
CanceledError, # run was canceled
TimeoutError, # operation timed out
NonRetriableError, # permanent failure, no retries
InputValidationError, # input schema mismatch
UnknownWorkflowError, # workflow not registered
)
Throw NonRetriableError from a handler to fail the run immediately
without exhausting retry attempts.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file redflow-0.0.6.tar.gz.
File metadata
- Download URL: redflow-0.0.6.tar.gz
- Upload date:
- Size: 74.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
77f630cbd17dcf627e6741a95df7d57270f6c125e0dc0e1dfecaee05fce5a5bd
|
|
| MD5 |
427ff615be4b6de5943ea81bae590f4b
|
|
| BLAKE2b-256 |
683dbb2a21e5e404be5e4998695fca2db874dda9663c79cc2949bbfb9c150486
|
Provenance
The following attestation bundles were made for redflow-0.0.6.tar.gz:
Publisher:
ci.yml on getrelocapp/redflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
redflow-0.0.6.tar.gz -
Subject digest:
77f630cbd17dcf627e6741a95df7d57270f6c125e0dc0e1dfecaee05fce5a5bd - Sigstore transparency entry: 976446126
- Sigstore integration time:
-
Permalink:
getrelocapp/redflow@4cb3cb7764aa038f8ee31a5b224d52f3165bffd8 -
Branch / Tag:
refs/heads/master - Owner: https://github.com/getrelocapp
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
self-hosted -
Publication workflow:
ci.yml@4cb3cb7764aa038f8ee31a5b224d52f3165bffd8 -
Trigger Event:
push
-
Statement type:
File details
Details for the file redflow-0.0.6-py3-none-any.whl.
File metadata
- Download URL: redflow-0.0.6-py3-none-any.whl
- Upload date:
- Size: 36.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
78f256b3a2dcc7075678271468e9b1d92a43c8be1d9d09fc25bb8abab5338ba7
|
|
| MD5 |
b2f862b44c35971fdeed428118d0384f
|
|
| BLAKE2b-256 |
cb0accf803ec11bb2b03932c34df36e184052310dc6fce4d7c63f325c7f16b49
|
Provenance
The following attestation bundles were made for redflow-0.0.6-py3-none-any.whl:
Publisher:
ci.yml on getrelocapp/redflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
redflow-0.0.6-py3-none-any.whl -
Subject digest:
78f256b3a2dcc7075678271468e9b1d92a43c8be1d9d09fc25bb8abab5338ba7 - Sigstore transparency entry: 976446127
- Sigstore integration time:
-
Permalink:
getrelocapp/redflow@4cb3cb7764aa038f8ee31a5b224d52f3165bffd8 -
Branch / Tag:
refs/heads/master - Owner: https://github.com/getrelocapp
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
self-hosted -
Publication workflow:
ci.yml@4cb3cb7764aa038f8ee31a5b224d52f3165bffd8 -
Trigger Event:
push
-
Statement type: