Durable workflow engine backed by Redis — Python client
Project description
redflow (Python)
Durable Redis-backed workflows for Python.
redflow lets you run async workflows with durable steps, retries, cancellation, cron triggers, event fan-out, and wait_for_event(...) pauses that survive worker restarts. The Python runtime is intentionally aligned with the TypeScript @redflow/client model while keeping a Pythonic API.
Status
Beta. The package is usable today, but the API is still evolving.
Highlights
- Durable
step.run(...)with cached step output across retries and restarts - Delayed runs and cron-triggered workflows
- Event-driven workflows via
EventTrigger(name=...) - Durable
step.wait_for_event(...)with exactevent + correlation_keymatching - Durable event history via
list_events(...)andget_event(...) - Cancellation propagation across parent and child runs
- Inline testing via
run_inline(...) - Python 3.11+ with optional Pydantic validation
Documentation shape
This README follows a Diataxis-style structure:
- Tutorial: first working workflow
- How-to: practical production recipes
- Explanation: execution model and guarantees
- Reference: key APIs and environment variables
Requirements
- Python
3.11+ - Redis
6.2+
Install
Using uv:
uv add redflow
Optional extras:
# Faster Redis parser / transport path
uv add "redflow[fast]"
# Pydantic input validation support
uv add "redflow[pydantic]"
Using pip:
pip install redflow
pip install "redflow[fast]"
pip install "redflow[pydantic]"
Tutorial
1. Define a workflow
from redflow import WorkflowHandlerContext, workflow
async def fetch_user(user_id: str, *, signal=None) -> dict:
return {"id": user_id, "email": "user@example.com"}
async def send_email(email: str, *, signal=None) -> dict:
return {"ok": True, "email": email}
@workflow("send-welcome-email", queue="default", max_attempts=3)
async def send_welcome_email(ctx: WorkflowHandlerContext) -> dict:
user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"])
await ctx.step.run("send-email", send_email, user["email"])
return {"sent": True}
2. Start a worker
Import workflow modules before starting the worker so definitions are registered.
import asyncio
from redflow import StartWorkerOptions, start_worker
import myapp.workflows # noqa: F401
async def main() -> None:
worker = await start_worker(
StartWorkerOptions(
app="billing-worker",
url="redis://127.0.0.1:6379",
concurrency=4,
)
)
try:
await asyncio.Event().wait()
finally:
await worker.stop()
asyncio.run(main())
3. Trigger a run and wait for the result
from redflow import create_client, set_default_client
client = create_client(url="redis://127.0.0.1:6379")
set_default_client(client)
handle = await send_welcome_email.run(
{"user_id": "user_123"},
idempotency_key="welcome:user_123",
)
result = await handle.result(timeout_ms=15_000)
print(result) # {"sent": True}
You can also emit by workflow name:
handle = await client.emit_workflow(
"send-welcome-email",
{"user_id": "user_123"},
idempotency_key="welcome:user_123",
)
How-to
Pick the right primitive
Use this rule of thumb:
| Need | Primitive |
|---|---|
| Durable unit of work | ctx.step.run(...) |
| Sleep until a duration or datetime | ctx.step.wait_for(...) |
| Wait for a specific external event | ctx.step.wait_for_event(...) |
| Start a child workflow and wait for output | ctx.step.run_workflow(...) |
| Start a child workflow fire-and-forget | ctx.step.emit_workflow(...) |
| Fan out to workflows subscribed to an event | ctx.step.emit_event(...) or client.emit_event(...) |
Python step APIs are intentionally positional and Pythonic: the step name is the first argument, not an options object.
Use durable steps
user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"], timeout_ms=4_000)
Notes:
- Step names must be unique within a run
- Successful step output is cached for that run and reused on retry / recovery
- If the function accepts
signalor**kwargs, redflow injectssignal=<asyncio.Event> - Step output must be JSON-serializable
Sleep durably
from datetime import datetime, timedelta
await ctx.step.wait_for("pause-before-retry", "5m")
await ctx.step.wait_for("pause-until-cutoff", datetime.now() + timedelta(minutes=15))
wait_for(...) accepts:
- milliseconds as
int/float - duration strings like
"30s","5m","2h" datetime
Subscribe workflows to events
from redflow import EventTrigger, workflow
async def record_metric(order_id: str, *, signal=None) -> dict:
return {"tracked": order_id}
@workflow(
"analytics-order-created",
queue="analytics",
event=[EventTrigger(name="order.created")],
)
async def analytics_order_created(ctx):
order_id = ctx.input["order_id"]
await ctx.step.run("track", record_metric, order_id)
return {"tracked": order_id}
Emitting the event fans out to every subscribed workflow:
handles = await client.emit_event(
"order.created",
{"order_id": "ord_123"},
event_id="evt_order_created_ord_123",
)
outputs = await asyncio.gather(*(handle.result(timeout_ms=10_000) for handle in handles))
Wait for a specific event
Use step.wait_for_event(...) when a run needs to pause until a matching callback or domain event arrives.
from redflow import workflow
@workflow("await-approval", queue="default")
async def await_approval(ctx):
received = await ctx.step.wait_for_event(
"wait-approval",
"approval.received",
ctx.input["order_id"],
"30m",
)
if received is None:
return {"status": "timed_out"}
return {
"status": "approved",
"event_id": received["id"],
"approved_at": received["ts"],
}
Then emit the matching event:
await client.emit_event(
"approval.received",
{"approved": True, "note": "ok"},
correlation_key="ord_123",
event_id="evt_approval_ord_123",
)
Semantics:
- matching is exact on
event_name + correlation_key - the step returns
ReceivedEventwhen resumed by an event - it returns
Nonewhen the timeout wins - the wait is durable across worker restarts
Inspect durable event history
If you provide an explicit event_id, you can fetch the event record later:
result = await client.emit_event_detailed(
"order.created",
{"order_id": "ord_123"},
event_id="evt_order_created_ord_123",
)
print(result["handles"])
print(result["resumed_run_ids"])
event = await client.get_event("evt_order_created_ord_123")
print(event["fanout_status"])
print(event["deliveries"])
You can also list recent events:
from redflow import ListEventsParams
events = await client.list_events(
ListEventsParams(
name="order.created",
limit=20,
sort="desc",
)
)
Durable event records include:
- event envelope (
id,name,ts,data) - fan-out summary (
subscriber_count,delivery_count,failed_delivery_count,fanout_status) - resumed waiter summary (
resumed_run_count,resumed_run_ids) - per-subscriber delivery records
Run child workflows
Wait for the child output:
child_output = await ctx.step.run_workflow(
"generate-receipt",
"receipt-workflow",
{"order_id": "ord_1"},
timeout_ms=20_000,
)
Fire-and-forget child run:
child_run_id = await ctx.step.emit_workflow(
"emit-analytics",
"analytics-workflow",
{"order_id": "ord_1"},
)
These APIs also support:
timeout_msrun_atqueue_overrideidempotency_keyidempotency_ttl
Trigger a delayed run
from datetime import datetime, timedelta
handle = await send_welcome_email.run(
{"user_id": "user_789"},
run_at=datetime.now() + timedelta(minutes=1),
idempotency_key="welcome:user_789:delayed",
)
Add input validation with Pydantic
Install the extra first:
uv add "redflow[pydantic]"
Then:
from pydantic import BaseModel
from redflow import WorkflowHandlerContext, define_workflow
class SendWelcomeInput(BaseModel):
user_id: str
async def handler(ctx: WorkflowHandlerContext[SendWelcomeInput]) -> dict:
return {"user_id": ctx.input.user_id}
send_welcome = define_workflow(
"send-welcome",
handler=handler,
input_schema=SendWelcomeInput,
)
Configure retries and failure handling
from redflow import NonRetriableError, OnFailureContext, define_workflow
from redflow.types import WorkflowRetryPolicy
async def on_fail(ctx: OnFailureContext) -> None:
print("final failure", ctx.run.id, ctx.error)
define_workflow(
"invoice-sync",
handler=handler,
max_attempts=4,
retries=WorkflowRetryPolicy(
max_attempts=4,
delay=lambda ctx: "5s" if ctx.run.attempt < 3 else "30s",
should_retry=lambda ctx: "validation" not in str(ctx.error).lower(),
),
on_failure=on_fail,
)
Notes:
max_attemptsincludes the initial attemptretries.max_attemptstakes precedence over top-levelmax_attempts- raise
NonRetriableErrorto fail immediately on_failureruns only after terminal failure, not on cancellation
Control admission with debounce, mutex, throttle, and rate limits
from redflow import define_workflow
from redflow.types import WorkflowDebounceOptions, WorkflowRateLimitOptions
define_workflow(
"invoice-sync",
handler=handler,
debounce=WorkflowDebounceOptions(
key=lambda input: f"invoice:{input['invoice_id']}",
period="30s",
timeout="5m",
),
mutex=lambda input: f"customer:{input['customer_id']}",
throttle=WorkflowRateLimitOptions(
key=lambda input: f"customer:{input['customer_id']}",
limit=1,
period="2s",
),
rate_limit=WorkflowRateLimitOptions(
key=lambda input: f"tenant:{input['tenant_id']}",
limit=100,
period="1m",
),
)
Semantics:
debouncecoalesces duplicate enqueue attempts by keymutexallows only one running run per keythrottledelays execution when the budget is exceededrate_limitfails fast with retry metadata
Schedule workflows with cron
from redflow import CronTrigger, define_workflow
define_workflow(
"digest-cron",
handler=handler,
queue="ops",
cron=[
CronTrigger(id="digest-hourly", expression="0 * * * *"),
CronTrigger(
expression="*/5 * * * *",
timezone="UTC",
input={"source": "cron"},
),
],
)
Cron ticks respect max_concurrency: if a workflow is already at its concurrency limit, the tick is skipped.
Test handlers with run_inline(...)
run_inline(...) executes workflow logic in-process without Redis:
from redflow import run_inline
result = await run_inline(send_welcome_email, input={"user_id": "test"})
assert result.succeeded is True
assert result.output == {"sent": True}
Step overrides are keyed by step name:
result = await run_inline(
send_welcome_email,
input={"user_id": "test"},
step_overrides={
"fetch-user": {"id": "test", "email": "mock@example.com"},
"send-email": {"ok": True},
},
)
You can also override wait_for_event(...) steps with either a ReceivedEvent-like dict or None.
For full lifecycle behavior such as queues, retries, cron, cancellation, and event fan-out, use a real Redis instance and start_worker(...).
Explanation
Handler context
Inside a workflow handler:
ctx.input: validated workflow input, or raw input if no schema is configuredctx.run: run metadata (id,workflow,queue,attempt,max_attempts)ctx.signal: cancellation eventctx.step: durable step API
Run lifecycle
Each run moves through:
scheduled -> queued -> running -> succeeded | failed | canceled
Step state is stored separately from top-level run state. If a step with the same name already succeeded for the current run, its cached output is reused.
Durability and idempotency
step.run(...)persists successful step output and reuses it after retry / recovery- duplicate step names in the same run are rejected
idempotency_keydeduplicates run creation- explicit
event_iddeduplicates external event emits - event records and run history are retained for the configured retention window
Events model
redflow has two event-oriented patterns:
-
Event fan-out
- workflows subscribe with
event=[EventTrigger(name=...)] emit_event(...)creates runs for matching subscribers
- workflows subscribe with
-
Event resume
- a running workflow pauses with
step.wait_for_event(...) - a later
emit_event(...)with the sameevent_name + correlation_keyresumes the waiting run
- a running workflow pauses with
Durable event records let you inspect what happened after the fact:
- which runs were created
- which waiters were resumed
- whether fan-out completed, partially failed, or had no subscribers
Cancellation model
ctx.signalis set when workflow cancellation is requestedstep.run(...)injectssignal=into the step function when supported- parent cancellation propagates to descendant runs spawned via step APIs
- lease-loss recovery does not itself imply child-cancel cascades
Python / TypeScript parity
Behavior is intentionally aligned with @redflow/client. Naming follows language conventions:
- Python:
emit_workflow,emit_event,run_at,max_attempts - TypeScript:
emitWorkflow,emitEvent,runAt,maxAttempts
Reference
Public workflow APIs
define_workflow(...)@workflow(...)WorkflowDefinition.run(...)start_worker(StartWorkerOptions(...))run_inline(...)
Step APIs
ctx.step.run(name, fn, *args, timeout_ms=None, **kwargs)ctx.step.wait_for(name, target, timeout_ms=None)ctx.step.wait_for_event(name, event_name, correlation_key, timeout)ctx.step.run_workflow(name, workflow, input, ...)ctx.step.emit_workflow(name, workflow, input, ...)ctx.step.emit_event(name, event_name, input, ...)
Client APIs
Create a client:
from redflow import create_client
client = create_client(
app="my-service", # optional, falls back to REDFLOW_APP
url="redis://127.0.0.1:6379", # optional, falls back to REDIS_URL
prefix="redflow:v1", # optional, falls back to REDFLOW_PREFIX
)
Common methods:
emit_workflow(...)emit_event(...)emit_event_detailed(...)run_by_name(...)get_run(run_id)get_run_steps(run_id)get_run_attempts(run_id)list_runs(ListRunsParams(...))search_runs(SearchRunsParams(...))list_workflows()get_workflow_meta(name)list_workflows_meta()list_event_definitions()list_events(ListEventsParams(...))get_event(event_id)get_stats()cancel_run(run_id, reason=...)cleanup_expired_runs_now(...)cleanup_expired_events_now(...)reset_runs()for dev/testingreset_state()for dev/testing
Run handle
Run creation APIs return a RunHandle with:
.idawait .get_state()await .result(timeout_ms=...)
Default client helpers
WorkflowDefinition.run(...) uses the process-global default client:
get_default_client()set_default_client(client)
If you never call set_default_client(...), redflow lazily creates one from environment defaults.
Environment variables
REDIS_URLRedis connection string used bycreate_client()andstart_worker(...)when no explicit URL or Redis instance is providedREDFLOW_PREFIXRedis key prefix, default:redflow:v1REDFLOW_APPDefault app name used bycreate_client(...)REDFLOW_RUN_RETENTION_DAYSRetention window for terminal runs and durable event records, default:30
Worker options
Important StartWorkerOptions fields:
appredisorurlprefixregistryqueuesconcurrencylease_msblmove_timeout_secreaper_interval_msreaper_batch_sizewatchdog_enabledwatchdog_interval_mswatchdog_stalled_for_msrun_history_cleanup_interval_msregistry_recovery_interval_ms
Python keeps runtime tuning flat on StartWorkerOptions; there is no nested runtime object.
Source layout
src/redflow/- public modules and stable entry pointssrc/redflow/_client/- command, query, retention, and registry internalssrc/redflow/_worker/- worker loops, executor, and step runtime internalssrc/redflow/_core/- shared private primitives such as payload storage, retry logic, and Redis helpers
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file redflow-0.0.26.tar.gz.
File metadata
- Download URL: redflow-0.0.26.tar.gz
- Upload date:
- Size: 143.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3da0f25f1fad82582c900220592c89074306e61909950c784c1bcea7cf230888
|
|
| MD5 |
476fdc6cae87f868b2981e7057ce7dcf
|
|
| BLAKE2b-256 |
d8b3af020a2db3e02bf0b64f328bd1a9e6dca5720714f86243cdad31e0a0765e
|
Provenance
The following attestation bundles were made for redflow-0.0.26.tar.gz:
Publisher:
ci.yml on getrelocapp/redflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
redflow-0.0.26.tar.gz -
Subject digest:
3da0f25f1fad82582c900220592c89074306e61909950c784c1bcea7cf230888 - Sigstore transparency entry: 1108041415
- Sigstore integration time:
-
Permalink:
getrelocapp/redflow@61d31d62fba455e527161eaf523218f0483384e1 -
Branch / Tag:
refs/heads/master - Owner: https://github.com/getrelocapp
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
self-hosted -
Publication workflow:
ci.yml@61d31d62fba455e527161eaf523218f0483384e1 -
Trigger Event:
push
-
Statement type:
File details
Details for the file redflow-0.0.26-py3-none-any.whl.
File metadata
- Download URL: redflow-0.0.26-py3-none-any.whl
- Upload date:
- Size: 100.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cf2d6f1c43921f76889bc3638652f767e9b8a75a823f2e94f55d75906ee46da5
|
|
| MD5 |
cd869eb1b83322884144e14efd77df61
|
|
| BLAKE2b-256 |
97d7866e81d8c44562eb91bb236f133c7e8bae63ba9dc568eb39268e59682082
|
Provenance
The following attestation bundles were made for redflow-0.0.26-py3-none-any.whl:
Publisher:
ci.yml on getrelocapp/redflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
redflow-0.0.26-py3-none-any.whl -
Subject digest:
cf2d6f1c43921f76889bc3638652f767e9b8a75a823f2e94f55d75906ee46da5 - Sigstore transparency entry: 1108041420
- Sigstore integration time:
-
Permalink:
getrelocapp/redflow@61d31d62fba455e527161eaf523218f0483384e1 -
Branch / Tag:
refs/heads/master - Owner: https://github.com/getrelocapp
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
self-hosted -
Publication workflow:
ci.yml@61d31d62fba455e527161eaf523218f0483384e1 -
Trigger Event:
push
-
Statement type: