Durable workflow engine backed by Redis — Python client
Project description
redflow (Python)
Durable workflow engine backed by Redis.
redflow lets you define async workflows with durable, cached steps, run them from a client, and process them with a Redis-backed worker. It is the Python implementation of the same runtime model used by the TypeScript @redflow/client.
Status
Early-stage project. APIs are usable, but expect iteration.
Requirements
- Python
3.11+ - Redis
6+recommended (older versions work, but some operations use slower fallbacks)
Install
Using uv (recommended):
uv add redflow
Optional extras:
# faster Redis parser/transport path (hiredis)
uv add "redflow[fast]"
# Pydantic input validation support for workflows
uv add "redflow[pydantic]"
Using pip:
pip install redflow
pip install "redflow[fast]"
pip install "redflow[pydantic]"
Quick Start
1. Define a workflow
Functional API:
from redflow import WorkflowHandlerContext, define_workflow
async def fetch_user(user_id: str) -> dict:
return {"id": user_id, "email": "user@example.com"}
async def send_welcome(email: str) -> dict:
return {"ok": True, "email": email}
async def handler(ctx: WorkflowHandlerContext) -> dict:
user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"])
await ctx.step.run("send-email", send_welcome, user["email"])
return {"sent": True}
send_welcome_email = define_workflow(
"send-welcome-email",
handler=handler,
queue="default",
max_attempts=3,
)
Decorator API:
from redflow import WorkflowHandlerContext, workflow
@workflow("send-welcome-email")
async def send_welcome_email(ctx: WorkflowHandlerContext) -> dict:
user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"])
await ctx.step.run("send-email", send_welcome, user["email"])
return {"sent": True}
2. Start a worker
Import workflow modules before starting the worker so definitions are registered.
import asyncio
from redflow import StartWorkerOptions, start_worker
import myapp.workflows # noqa: F401
async def main() -> None:
worker = await start_worker(
StartWorkerOptions(
app="billing-worker",
url="redis://127.0.0.1:6379",
concurrency=4,
)
)
try:
await asyncio.Event().wait()
finally:
await worker.stop()
asyncio.run(main())
3. Trigger a workflow
Most users will call .run(...) on the returned WorkflowDefinition:
handle = await send_welcome_email.run(
{"user_id": "user_123"},
idempotency_key="welcome:user_123",
)
output = await handle.result(timeout_ms=15_000)
You can also trigger by name via RedflowClient:
from redflow import create_client
client = create_client(url="redis://127.0.0.1:6379")
handle = await client.emit_workflow(
"send-welcome-email",
{"user_id": "user_123"},
idempotency_key="welcome:user_123",
)
output = await handle.result(timeout_ms=15_000)
Delayed run:
from datetime import datetime, timedelta
handle = await send_welcome_email.run(
{"user_id": "user_789"},
run_at=datetime.now() + timedelta(minutes=1),
idempotency_key="welcome:user_789:delayed",
)
Core Concepts
Workflow handler context
Inside a workflow handler, ctx contains:
ctx.input: workflow input (validated ifinput_schemais configured)ctx.run: run metadata (id,workflow,queue,attempt,max_attempts)ctx.signal:asyncio.Eventset when cancellation is requestedctx.step: durable step API
Step names must be unique per run
Within a single workflow execution, step names are unique. Reusing a step name in the same run raises an error.
Step results are durable
If a step succeeds, its result is cached in Redis and reused after worker restarts or retry attempts (same run + same step name).
Step API (inside workflow handlers)
The Python API is intentionally Pythonic and uses positional name instead of the TS { name } object.
ctx.step.run(...)
Signature (conceptually):
await ctx.step.run(
step_name: str,
fn: Callable[..., Awaitable[T]],
*args,
timeout_ms: int | None = None,
**kwargs,
) -> T
Example:
import asyncio
async def fetch_remote_user(user_id: str, *, signal: asyncio.Event | None = None) -> dict:
# redflow injects `signal=` automatically when the function accepts it
return {"id": user_id, "email": "demo@example.com"}
user = await ctx.step.run(
"fetch-user",
fetch_remote_user,
ctx.input["user_id"],
timeout_ms=4_000,
)
Notes:
- If
fnaccepts asignalparameter (or**kwargs), redflow injectssignal=<asyncio.Event>. timeout_msfails the step withTimeoutErrorand records failed step state.- Step output must be JSON-serializable.
ctx.step.run_workflow(...)
Trigger a child workflow and wait for the child result.
receipt = await ctx.step.run_workflow(
"send-receipt",
"receipt-workflow", # or a WorkflowDefinition object
{"order_id": "ord_1", "email": "user@example.com"},
timeout_ms=20_000,
idempotency_key="receipt:ord_1",
)
You can pass a workflow object (returned by define_workflow / @workflow) instead of a string:
receipt = await ctx.step.run_workflow(
"send-receipt",
send_receipt_workflow,
{"order_id": "ord_1"},
)
ctx.step.emit_workflow(...)
Trigger a child workflow without waiting. Returns the child run ID.
child_run_id = await ctx.step.emit_workflow(
"emit-analytics",
"analytics-workflow", # or a WorkflowDefinition object
{"order_id": "ord_1"},
timeout_ms=5_000,
idempotency_key="analytics:ord_1",
)
Child workflow step options
Both run_workflow and emit_workflow support:
timeout_msrun_atqueue_overrideidempotency_keyidempotency_ttl
Workflow Options
define_workflow(...) / @workflow(...) support these main options:
queue: str = "default"max_concurrency: int = 1max_attempts: int | None = None(falls back to engine default)cron: list[CronTrigger] | None = Noneon_failure: Callable[[OnFailureContext], Awaitable[None] | None] | Noneinput_schema(typically a Pydantic v2 model class)
Input validation with Pydantic (optional)
Install the extra first:
uv add "redflow[pydantic]"
Then:
from pydantic import BaseModel
from redflow import WorkflowHandlerContext, define_workflow
class SendWelcomeInput(BaseModel):
user_id: str
async def handler(ctx: WorkflowHandlerContext[SendWelcomeInput]) -> dict:
# ctx.input is the validated Pydantic model instance
return {"user_id": ctx.input.user_id}
send_welcome = define_workflow(
"send-welcome",
handler=handler,
input_schema=SendWelcomeInput,
)
max_concurrency
Limits concurrent running runs for that workflow. Default is 1.
define_workflow(
"heavy-sync",
handler=handler,
queue="ops",
max_concurrency=1,
)
max_attempts
Total attempts including the first try.
Retries use exponential backoff with jitter. Throw NonRetriableError to fail immediately without retrying.
Cron
Use CronTrigger for scheduled runs (cron parsing is provided by cronsim).
from redflow import CronTrigger, define_workflow
define_workflow(
"digest-cron",
handler=handler,
queue="ops",
cron=[
CronTrigger(id="digest-hourly", expression="0 * * * *"),
CronTrigger(expression="*/5 * * * *", timezone="UTC", input={"source": "cron"}),
],
)
Cron scheduling respects max_concurrency: if the workflow is already at the limit, that cron tick is skipped.
on_failure
Called when a run reaches terminal failure (retries exhausted or non-retriable error). Not called on cancellation.
from redflow import OnFailureContext, define_workflow
async def on_fail(ctx: OnFailureContext) -> None:
print("workflow failed", ctx.run.id, ctx.run.workflow, ctx.error)
define_workflow(
"invoice-sync",
handler=handler,
queue="billing",
max_attempts=4,
on_failure=on_fail,
)
Client API
Create a client
from redflow import create_client
client = create_client(
url="redis://127.0.0.1:6379",
prefix="redflow:v1",
app="my-service", # optional, used for queue scoping/metadata
)
Environment defaults:
REDIS_URLis used ifurlis omittedREDFLOW_PREFIXis used bydefault_prefix()REDFLOW_APPis used bycreate_client()whenappis omitted
Triggering runs
await client.emit_workflow(name, input, ...)await client.run_by_name(name, input, ...)(advanced; supportsqueue_override)
WorkflowDefinition.run(...) is usually preferred because it keeps workflow defaults (queue/max attempts) close to the definition.
Inspect and control runs
from redflow import ListRunsParams
run = await client.get_run("run_123")
steps = await client.get_run_steps("run_123")
recent = await client.list_runs(ListRunsParams(limit=50))
failed_checkout = await client.list_runs(
ListRunsParams(workflow="checkout", status="failed", limit=20)
)
workflows = await client.list_workflows()
meta = await client.get_workflow_meta("checkout")
all_meta = await client.list_workflows_meta()
stats = await client.get_stats()
canceled = await client.cancel_run("run_123", reason="requested by user")
RunHandle
Run-triggering methods return a RunHandle-like object with:
.idawait .get_state()await .result(timeout_ms=...)
handle = await client.emit_workflow("checkout", {"order_id": "ord_3"})
state = await handle.get_state()
print(state["status"])
output = await handle.result(timeout_ms=30_000)
Default client helpers
The package keeps a process-global default client (used by WorkflowDefinition.run()):
from redflow import get_default_client, set_default_client
client = get_default_client()
set_default_client(client)
Worker API
start_worker(...) launches:
- worker pollers (claim + execute runs)
- scheduled promoter
- reaper (lease recovery)
- cron scheduler
- poller watchdog
StartWorkerOptions
Common fields:
app(required): stable worker/service identifierurlorredisprefixregistryqueuesconcurrency
Runtime tuning (flat fields in Python, not nested like TS):
lease_msblmove_timeout_secreaper_interval_msreaper_batch_sizewatchdog_enabledwatchdog_interval_mswatchdog_stalled_for_ms
Example with explicit queues and tuning:
worker = await start_worker(
StartWorkerOptions(
app="billing-worker",
url="redis://127.0.0.1:6379",
prefix="redflow:prod",
queues=["critical", "io", "analytics"],
concurrency=8,
lease_ms=5000,
blmove_timeout_sec=1,
reaper_interval_ms=500,
reaper_batch_size=100,
watchdog_enabled=True,
)
)
Important startup note
start_worker(...) automatically syncs the workflow registry to Redis metadata. Make sure all workflow modules are imported first.
Testing
Unit tests with run_inline(...)
run_inline executes a workflow handler in-process without Redis.
from redflow import run_inline
result = await run_inline(send_welcome_email, input={"user_id": "test"})
assert result.succeeded is True
assert result.output == {"sent": True}
assert [step.name for step in result.steps] == ["fetch-user", "send-email"]
Override step outputs by step name:
result = await run_inline(
send_welcome_email,
input={"user_id": "test"},
step_overrides={
"fetch-user": {"id": "test", "email": "mock@example.com"},
"send-email": {"ok": True},
},
)
Notes:
run_inlineis best for handler logic and step sequencing.step.run_workflow(...)requiresstep_overridesinrun_inline(no Redis worker exists to execute child runs).- Use real Redis +
start_worker(...)for end-to-end lifecycle tests.
Errors
Common error classes:
from redflow import (
CanceledError,
InputValidationError,
NonRetriableError,
OutputSerializationError,
RedflowError,
TimeoutError,
UnknownWorkflowError,
)
Typical semantics:
CanceledError: run/step canceledTimeoutError: wait or step timeoutInputValidationError: workflow input failed schema validationUnknownWorkflowError: workflow metadata/definition not foundOutputSerializationError: step/workflow output was not JSON-serializableNonRetriableError: fail immediately without retrying
Python vs TypeScript Notes
- Python uses
snake_casenames (emit_workflow,max_attempts,run_at) - TypeScript uses
camelCase(emitWorkflow,maxAttempts,runAt) - Runtime semantics are intentionally aligned across implementations
Development (package)
From redflow/packages/python:
uv sync --extra dev
uv run ruff check .
uv run pytest
uv run pytest -m e2e # requires Redis
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file redflow-0.0.10.tar.gz.
File metadata
- Download URL: redflow-0.0.10.tar.gz
- Upload date:
- Size: 87.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
203683f0e9e65083f8e81e0e59f5209531b9b8e1f9f000c1083b28572f354344
|
|
| MD5 |
66722a61dc0d3988efad92c95c98cebe
|
|
| BLAKE2b-256 |
17ce0a6479d120bfe9eb8efe746f15e7c199e64e241dcf757fac5e8ead8ce3af
|
Provenance
The following attestation bundles were made for redflow-0.0.10.tar.gz:
Publisher:
ci.yml on getrelocapp/redflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
redflow-0.0.10.tar.gz -
Subject digest:
203683f0e9e65083f8e81e0e59f5209531b9b8e1f9f000c1083b28572f354344 - Sigstore transparency entry: 992088841
- Sigstore integration time:
-
Permalink:
getrelocapp/redflow@deebba1a5ee69bfd2a292e122b0685644db15664 -
Branch / Tag:
refs/heads/master - Owner: https://github.com/getrelocapp
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
self-hosted -
Publication workflow:
ci.yml@deebba1a5ee69bfd2a292e122b0685644db15664 -
Trigger Event:
push
-
Statement type:
File details
Details for the file redflow-0.0.10-py3-none-any.whl.
File metadata
- Download URL: redflow-0.0.10-py3-none-any.whl
- Upload date:
- Size: 44.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1ac0c7cd0f6bc13430d3b3c04e44a3e8469aca3033c0e6a21db38a755cb07a62
|
|
| MD5 |
c47ec8305c7581ae9a3c984c2667ff32
|
|
| BLAKE2b-256 |
9ce273ce54869e87b32b2b60ac240724928d8274a3c3bf1632d278040590fd60
|
Provenance
The following attestation bundles were made for redflow-0.0.10-py3-none-any.whl:
Publisher:
ci.yml on getrelocapp/redflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
redflow-0.0.10-py3-none-any.whl -
Subject digest:
1ac0c7cd0f6bc13430d3b3c04e44a3e8469aca3033c0e6a21db38a755cb07a62 - Sigstore transparency entry: 992088864
- Sigstore integration time:
-
Permalink:
getrelocapp/redflow@deebba1a5ee69bfd2a292e122b0685644db15664 -
Branch / Tag:
refs/heads/master - Owner: https://github.com/getrelocapp
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
self-hosted -
Publication workflow:
ci.yml@deebba1a5ee69bfd2a292e122b0685644db15664 -
Trigger Event:
push
-
Statement type: