Durable workflow engine backed by Redis — Python client
Project description
redflow (Python)
Durable workflow engine backed by Redis.
redflow helps you run async workflows with durable steps, retries, cancellation, and scheduling. The Python runtime is intentionally aligned with the TypeScript @redflow/client runtime model.
Status
Early stage. APIs are usable, but expect iteration.
Documentation Map
This README is grouped by Diátaxis-style intent:
- Tutorial: first working workflow in ~5 minutes
- How-to: focused recipes for common production tasks
- Explanation: execution model and guarantees
- Reference: options, environment variables, and API surface
Source Layout
The package follows a compact src/ layout with a stable public API and grouped internals:
src/redflow/— public modules and stable entry pointssrc/redflow/_client/— client command/query/registry internalssrc/redflow/_worker/— worker loops/executor/step runtime internalssrc/redflow/_core/— shared private primitives (Redis decoding, payload storage, retry/idempotency)
This keeps top-level imports stable while allowing large modules to be decomposed without excessive folder nesting.
Requirements
- Python
3.11+ - Redis
6+recommended
Install
Using uv (recommended):
uv add redflow
Optional extras:
# Faster Redis parser/transport path
uv add "redflow[fast]"
# Pydantic input validation support
uv add "redflow[pydantic]"
Using pip:
pip install redflow
pip install "redflow[fast]"
pip install "redflow[pydantic]"
Tutorial
1) Define a workflow
from redflow import WorkflowHandlerContext, workflow
async def fetch_user(user_id: str) -> dict:
return {"id": user_id, "email": "user@example.com"}
async def send_email(email: str) -> dict:
return {"ok": True, "email": email}
@workflow("send-welcome-email", queue="default", max_attempts=3)
async def send_welcome_email(ctx: WorkflowHandlerContext) -> dict:
user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"])
await ctx.step.run("send-email", send_email, user["email"])
return {"sent": True}
2) Start a worker
Import workflow modules before starting the worker so definitions are registered.
import asyncio
from redflow import StartWorkerOptions, start_worker
import myapp.workflows # noqa: F401
async def main() -> None:
worker = await start_worker(
StartWorkerOptions(
app="billing-worker",
url="redis://127.0.0.1:6379",
concurrency=4,
)
)
try:
await asyncio.Event().wait()
finally:
await worker.stop()
asyncio.run(main())
3) Trigger a run and wait for output
handle = await send_welcome_email.run(
{"user_id": "user_123"},
idempotency_key="welcome:user_123",
)
result = await handle.result(timeout_ms=15_000)
print(result) # {"sent": True}
You can also trigger by workflow name:
from redflow import create_client
client = create_client(url="redis://127.0.0.1:6379")
handle = await client.emit_workflow(
"send-welcome-email",
{"user_id": "user_123"},
idempotency_key="welcome:user_123",
)
How-to
Choose the right step primitive
Python step API is intentionally Pythonic: step name is positional ("step-name"), not an options object like in TS.
Use ctx.step.run(...) for durable units of work:
user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"], timeout_ms=4_000)
Notes:
- Step names are unique within a run; duplicate names fail the run.
- If the function accepts
signal(or**kwargs), redflow injectssignal=<asyncio.Event>. timeout_msraisesTimeoutErrorand records failed step state.- Step output must be JSON-serializable.
Use ctx.step.run_workflow(...) when parent must wait for child output:
child_output = await ctx.step.run_workflow(
"generate-receipt",
"receipt-workflow", # or WorkflowDefinition
{"order_id": "ord_1"},
timeout_ms=20_000,
)
Use ctx.step.emit_workflow(...) for fire-and-forget child runs:
child_run_id = await ctx.step.emit_workflow(
"emit-analytics",
"analytics-workflow", # or WorkflowDefinition
{"order_id": "ord_1"},
)
Use ctx.step.emit_event(...) to fan out runs to all workflows subscribed to an event:
child_run_ids = await ctx.step.emit_event(
"emit-analytics-event",
"analytics.record",
{"order_id": "ord_1"},
)
run_workflow, emit_workflow, and emit_event support:
timeout_msrun_atqueue_overrideidempotency_keyidempotency_ttlparent_run_id(advanced; set automatically for step-spawned children)
Trigger a one-off delayed run
from datetime import datetime, timedelta
handle = await send_welcome_email.run(
{"user_id": "user_789"},
run_at=datetime.now() + timedelta(minutes=1),
idempotency_key="welcome:user_789:delayed",
)
Add input validation with Pydantic
Install the optional extra first:
uv add "redflow[pydantic]"
Then:
from pydantic import BaseModel
from redflow import WorkflowHandlerContext, define_workflow
class SendWelcomeInput(BaseModel):
user_id: str
async def handler(ctx: WorkflowHandlerContext[SendWelcomeInput]) -> dict:
return {"user_id": ctx.input.user_id}
send_welcome = define_workflow(
"send-welcome",
handler=handler,
input_schema=SendWelcomeInput,
)
Configure retries and failure handling
- Retries use exponential backoff with jitter.
max_attemptsincludes the initial attempt.- Raise
NonRetriableErrorto fail immediately without retry. - Use
on_failurefor final failure hooks (not called on cancel).
from redflow import NonRetriableError, OnFailureContext, define_workflow
async def on_fail(ctx: OnFailureContext) -> None:
print("final failure", ctx.run.id, ctx.error)
define_workflow(
"invoice-sync",
handler=handler,
max_attempts=4,
on_failure=on_fail,
)
Schedule workflows with cron
from redflow import CronTrigger, define_workflow
define_workflow(
"digest-cron",
handler=handler,
queue="ops",
cron=[
CronTrigger(id="digest-hourly", expression="0 * * * *"),
CronTrigger(expression="*/5 * * * *", timezone="UTC", input={"source": "cron"}),
],
)
Cron scheduling respects max_concurrency: ticks are skipped while the workflow is at its concurrency limit.
Test handlers quickly with run_inline(...)
run_inline executes workflow logic in-process without Redis:
from redflow import run_inline
result = await run_inline(send_welcome_email, input={"user_id": "test"})
assert result.succeeded is True
assert result.output == {"sent": True}
Step overrides by step name:
result = await run_inline(
send_welcome_email,
input={"user_id": "test"},
step_overrides={
"fetch-user": {"id": "test", "email": "mock@example.com"},
"send-email": {"ok": True},
},
)
For full lifecycle behavior (queues, retries, cron, cancel), run e2e tests against real Redis and start_worker(...).
Important run_inline limitation:
step.run_workflow(...)inrun_inlinerequires matchingstep_overridesbecause no Redis worker executes child runs.
Explanation
Handler context
Inside a workflow handler:
ctx.input: validated workflow input (or raw input if no schema)ctx.run: run metadata (id,workflow,queue,attempt,max_attempts)ctx.signal: cancellation eventctx.step: durable step API
Execution model
Each workflow run lives in Redis and moves through:
scheduled -> queued -> running -> succeeded | failed | canceled
Step state is stored per run. If a step with the same name already succeeded in that run, its cached output is reused.
Durability and idempotency guarantees
- Step durability: successful step output is persisted and reused across retries/restarts for the same run + step name.
- Step uniqueness: duplicate step names in one run are rejected.
- Run idempotency:
idempotency_keydeduplicates run creation.
Cancellation model
ctx.signalis set when workflow cancellation is requested.ctx.step.run(...)injectssignal=into the function when supported.- Parent cancellation propagates to descendant runs spawned from step APIs (
run_workflow,emit_workflow,emit_event). - Lease-loss abort paths do not trigger child-cancel cascades by themselves.
Runtime parity with TypeScript
Behavior is intentionally aligned with @redflow/client. API naming differs by language convention:
- Python:
snake_case(emit_workflow,max_attempts,run_at) - TypeScript:
camelCase(emitWorkflow,maxAttempts,runAt)
Reference
Workflow definition options
define_workflow(...) / @workflow(...):
queue: str = "default"max_concurrency: int = 1max_attempts: int | None = None(falls back to engine default)cron: list[CronTrigger] | None = Noneevent: list[EventTrigger] | None = Noneon_failure: Callable[[OnFailureContext], Awaitable[None] | None] | Noneinput_schema: Any | None = Noneregistry: WorkflowRegistry | None = None
Client API
Create client:
from redflow import create_client
client = create_client(
app="my-service", # optional
url="redis://127.0.0.1:6379", # optional if REDIS_URL is set
prefix="redflow:v1", # optional
)
Useful methods:
emit_workflow(...)emit_event(...)run_by_name(...)(advanced)get_run(run_id)get_run_steps(run_id)list_runs(ListRunsParams(...))search_runs(SearchRunsParams(...))list_workflows()get_workflow_meta(name)list_workflows_meta()list_event_definitions()get_stats()cancel_run(run_id, reason=...)cleanup_expired_runs_now()
WorkflowDefinition.run(...) is the common path for application code.
Run calls return a RunHandle with:
.idawait .get_state()await .result(timeout_ms=...)
Default client helpers used by WorkflowDefinition.run(...):
get_default_client()set_default_client(client)
Run history retention
Terminal runs are retained for a configurable window, then purged.
- Default:
30days - Override:
REDFLOW_RUN_RETENTION_DAYS(fractional values supported)
Manual cleanup hook:
removed = await client.cleanup_expired_runs_now()
Worker API
start_worker(StartWorkerOptions(...)) starts:
- pollers (claim + execute runs)
- scheduled promoter
- reaper
- cron scheduler
- run history cleanup loop
- registry recovery loop
- watchdog
StartWorkerOptions main fields:
app(required)redisorurlprefixregistryqueuesconcurrencylease_msblmove_timeout_secreaper_interval_msreaper_batch_sizewatchdog_enabledwatchdog_interval_mswatchdog_stalled_for_msrun_history_cleanup_interval_msregistry_recovery_interval_ms
Runtime tuning fields in Python are flat (not nested under runtime like TS).
registry_recovery_interval_ms lets a running worker recover workflow metadata after Redis state resets.
Example with explicit queues + runtime tuning:
worker = await start_worker(
StartWorkerOptions(
app="billing-worker",
url="redis://127.0.0.1:6379",
prefix="redflow:prod",
queues=["critical", "io", "analytics"],
concurrency=8,
lease_ms=5000,
blmove_timeout_sec=1,
reaper_interval_ms=500,
reaper_batch_size=100,
watchdog_enabled=True,
run_history_cleanup_interval_ms=60_000,
registry_recovery_interval_ms=5_000,
)
)
Important startup note:
start_worker(...) syncs the in-process workflow registry to Redis metadata on startup. Import workflow modules before worker start.
Environment variables
REDIS_URLREDFLOW_PREFIXREDFLOW_APPREDFLOW_RUN_RETENTION_DAYS
Resolution rules:
create_client(url=...)usesurl, otherwiseREDIS_URL, otherwiseredis://localhost:6379- prefix defaults to
REDFLOW_PREFIX(or built-in default) create_client(app=...)uses explicitapp, otherwiseREDFLOW_APP
Common errors
from redflow import (
CanceledError,
InputValidationError,
NonRetriableError,
OutputSerializationError,
RedflowError,
TimeoutError,
UnknownWorkflowError,
)
Semantics:
CanceledError: run/step canceledTimeoutError: wait or step timeoutInputValidationError: workflow input failed schema validationUnknownWorkflowError: workflow definition/metadata not foundOutputSerializationError: output is not JSON-serializableNonRetriableError: fail without retry
Support
- Open issues in the main repository: https://github.com/getrelocapp/redflow/issues
- Include:
- Python package version
- Redis version
- minimal repro (workflow + input)
Development
From redflow/packages/python:
uv sync --extra dev
uv run ruff check .
uv run mypy src
uv run pytest
uv run pytest -m e2e # requires Redis
If e2e tests are skipped, set REDIS_URL or run local redis-server.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file redflow-0.0.14.tar.gz.
File metadata
- Download URL: redflow-0.0.14.tar.gz
- Upload date:
- Size: 113.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0d2f592ebeb0e008f15ed1a78d510f49c0f6aaac9f3446d744cb0dbd4ea19403
|
|
| MD5 |
80329212c2ac504afb37e58a5f1eac78
|
|
| BLAKE2b-256 |
835ac8ba592503de3e33764c96007b7c135e914f7b4ad2dad200ac45c5264b9c
|
Provenance
The following attestation bundles were made for redflow-0.0.14.tar.gz:
Publisher:
ci.yml on getrelocapp/redflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
redflow-0.0.14.tar.gz -
Subject digest:
0d2f592ebeb0e008f15ed1a78d510f49c0f6aaac9f3446d744cb0dbd4ea19403 - Sigstore transparency entry: 1010606415
- Sigstore integration time:
-
Permalink:
getrelocapp/redflow@19f7db86940b2cafc8bfc7c6a169df09b5e5d3bb -
Branch / Tag:
refs/heads/master - Owner: https://github.com/getrelocapp
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
self-hosted -
Publication workflow:
ci.yml@19f7db86940b2cafc8bfc7c6a169df09b5e5d3bb -
Trigger Event:
push
-
Statement type:
File details
Details for the file redflow-0.0.14-py3-none-any.whl.
File metadata
- Download URL: redflow-0.0.14-py3-none-any.whl
- Upload date:
- Size: 73.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dd41b499ed6558dd36225b35278a32d6cd48b41d222e03d136c9e190fa30ea05
|
|
| MD5 |
89f258d707f7f3b2115100271317ded5
|
|
| BLAKE2b-256 |
10e0bc013a6b3ff27a4b17fa1340d1e775752af9f6b003d67fa167bd1144218a
|
Provenance
The following attestation bundles were made for redflow-0.0.14-py3-none-any.whl:
Publisher:
ci.yml on getrelocapp/redflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
redflow-0.0.14-py3-none-any.whl -
Subject digest:
dd41b499ed6558dd36225b35278a32d6cd48b41d222e03d136c9e190fa30ea05 - Sigstore transparency entry: 1010606454
- Sigstore integration time:
-
Permalink:
getrelocapp/redflow@19f7db86940b2cafc8bfc7c6a169df09b5e5d3bb -
Branch / Tag:
refs/heads/master - Owner: https://github.com/getrelocapp
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
self-hosted -
Publication workflow:
ci.yml@19f7db86940b2cafc8bfc7c6a169df09b5e5d3bb -
Trigger Event:
push
-
Statement type: