Skip to main content

Durable workflow engine backed by Redis — Python client

Project description

redflow (Python)

Durable workflow engine backed by Redis.

redflow lets you define async workflows with durable, cached steps, run them from a client, and process them with a Redis-backed worker. It is the Python implementation of the same runtime model used by the TypeScript @redflow/client.

Status

Early-stage project. APIs are usable, but expect iteration.

Requirements

  • Python 3.11+
  • Redis 6+ recommended (older versions work, but some operations use slower fallbacks)

Install

Using uv (recommended):

uv add redflow

Optional extras:

# faster Redis parser/transport path (hiredis)
uv add "redflow[fast]"

# Pydantic input validation support for workflows
uv add "redflow[pydantic]"

Using pip:

pip install redflow
pip install "redflow[fast]"
pip install "redflow[pydantic]"

Quick Start

1. Define a workflow

Functional API:

from redflow import WorkflowHandlerContext, define_workflow


async def fetch_user(user_id: str) -> dict:
    return {"id": user_id, "email": "user@example.com"}


async def send_welcome(email: str) -> dict:
    return {"ok": True, "email": email}


async def handler(ctx: WorkflowHandlerContext) -> dict:
    user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"])
    await ctx.step.run("send-email", send_welcome, user["email"])
    return {"sent": True}


send_welcome_email = define_workflow(
    "send-welcome-email",
    handler=handler,
    queue="default",
    max_attempts=3,
)

Decorator API:

from redflow import WorkflowHandlerContext, workflow


@workflow("send-welcome-email")
async def send_welcome_email(ctx: WorkflowHandlerContext) -> dict:
    user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"])
    await ctx.step.run("send-email", send_welcome, user["email"])
    return {"sent": True}

2. Start a worker

Import workflow modules before starting the worker so definitions are registered.

import asyncio

from redflow import StartWorkerOptions, start_worker

import myapp.workflows  # noqa: F401


async def main() -> None:
    worker = await start_worker(
        StartWorkerOptions(
            app="billing-worker",
            url="redis://127.0.0.1:6379",
            concurrency=4,
        )
    )

    try:
        await asyncio.Event().wait()
    finally:
        await worker.stop()


asyncio.run(main())

3. Trigger a workflow

Most users will call .run(...) on the returned WorkflowDefinition:

handle = await send_welcome_email.run(
    {"user_id": "user_123"},
    idempotency_key="welcome:user_123",
)

output = await handle.result(timeout_ms=15_000)

You can also trigger by name via RedflowClient:

from redflow import create_client

client = create_client(url="redis://127.0.0.1:6379")

handle = await client.emit_workflow(
    "send-welcome-email",
    {"user_id": "user_123"},
    idempotency_key="welcome:user_123",
)

output = await handle.result(timeout_ms=15_000)

Delayed run:

from datetime import datetime, timedelta

handle = await send_welcome_email.run(
    {"user_id": "user_789"},
    run_at=datetime.now() + timedelta(minutes=1),
    idempotency_key="welcome:user_789:delayed",
)

Core Concepts

Workflow handler context

Inside a workflow handler, ctx contains:

  • ctx.input: workflow input (validated if input_schema is configured)
  • ctx.run: run metadata (id, workflow, queue, attempt, max_attempts)
  • ctx.signal: asyncio.Event set when cancellation is requested
  • ctx.step: durable step API

Step names must be unique per run

Within a single workflow execution, step names are unique. Reusing a step name in the same run raises an error.

Step results are durable

If a step succeeds, its result is cached in Redis and reused after worker restarts or retry attempts (same run + same step name).

Step API (inside workflow handlers)

The Python API is intentionally Pythonic and uses positional name instead of the TS { name } object.

ctx.step.run(...)

Signature (conceptually):

await ctx.step.run(
    step_name: str,
    fn: Callable[..., Awaitable[T]],
    *args,
    timeout_ms: int | None = None,
    **kwargs,
) -> T

Example:

import asyncio


async def fetch_remote_user(user_id: str, *, signal: asyncio.Event | None = None) -> dict:
    # redflow injects `signal=` automatically when the function accepts it
    return {"id": user_id, "email": "demo@example.com"}


user = await ctx.step.run(
    "fetch-user",
    fetch_remote_user,
    ctx.input["user_id"],
    timeout_ms=4_000,
)

Notes:

  • If fn accepts a signal parameter (or **kwargs), redflow injects signal=<asyncio.Event>.
  • timeout_ms fails the step with TimeoutError and records failed step state.
  • Step output must be JSON-serializable.

ctx.step.run_workflow(...)

Trigger a child workflow and wait for the child result.

receipt = await ctx.step.run_workflow(
    "send-receipt",
    "receipt-workflow",  # or a WorkflowDefinition object
    {"order_id": "ord_1", "email": "user@example.com"},
    timeout_ms=20_000,
    idempotency_key="receipt:ord_1",
)

You can pass a workflow object (returned by define_workflow / @workflow) instead of a string:

receipt = await ctx.step.run_workflow(
    "send-receipt",
    send_receipt_workflow,
    {"order_id": "ord_1"},
)

ctx.step.emit_workflow(...)

Trigger a child workflow without waiting. Returns the child run ID.

child_run_id = await ctx.step.emit_workflow(
    "emit-analytics",
    "analytics-workflow",  # or a WorkflowDefinition object
    {"order_id": "ord_1"},
    timeout_ms=5_000,
    idempotency_key="analytics:ord_1",
)

Child workflow step options

Both run_workflow and emit_workflow support:

  • timeout_ms
  • run_at
  • queue_override
  • idempotency_key
  • idempotency_ttl

Workflow Options

define_workflow(...) / @workflow(...) support these main options:

  • queue: str = "default"
  • max_concurrency: int = 1
  • max_attempts: int | None = None (falls back to engine default)
  • cron: list[CronTrigger] | None = None
  • on_failure: Callable[[OnFailureContext], Awaitable[None] | None] | None
  • input_schema (typically a Pydantic v2 model class)

Input validation with Pydantic (optional)

Install the extra first:

uv add "redflow[pydantic]"

Then:

from pydantic import BaseModel
from redflow import WorkflowHandlerContext, define_workflow


class SendWelcomeInput(BaseModel):
    user_id: str


async def handler(ctx: WorkflowHandlerContext[SendWelcomeInput]) -> dict:
    # ctx.input is the validated Pydantic model instance
    return {"user_id": ctx.input.user_id}


send_welcome = define_workflow(
    "send-welcome",
    handler=handler,
    input_schema=SendWelcomeInput,
)

max_concurrency

Limits concurrent running runs for that workflow. Default is 1.

define_workflow(
    "heavy-sync",
    handler=handler,
    queue="ops",
    max_concurrency=1,
)

max_attempts

Total attempts including the first try.

Retries use exponential backoff with jitter. Throw NonRetriableError to fail immediately without retrying.

Cron

Use CronTrigger for scheduled runs (cron parsing is provided by cronsim).

from redflow import CronTrigger, define_workflow

define_workflow(
    "digest-cron",
    handler=handler,
    queue="ops",
    cron=[
        CronTrigger(id="digest-hourly", expression="0 * * * *"),
        CronTrigger(expression="*/5 * * * *", timezone="UTC", input={"source": "cron"}),
    ],
)

Cron scheduling respects max_concurrency: if the workflow is already at the limit, that cron tick is skipped.

on_failure

Called when a run reaches terminal failure (retries exhausted or non-retriable error). Not called on cancellation.

from redflow import OnFailureContext, define_workflow


async def on_fail(ctx: OnFailureContext) -> None:
    print("workflow failed", ctx.run.id, ctx.run.workflow, ctx.error)


define_workflow(
    "invoice-sync",
    handler=handler,
    queue="billing",
    max_attempts=4,
    on_failure=on_fail,
)

Client API

Create a client

from redflow import create_client

client = create_client(
    url="redis://127.0.0.1:6379",
    prefix="redflow:v1",
    app="my-service",  # optional, used for queue scoping/metadata
)

Environment defaults:

  • REDIS_URL is used if url is omitted
  • REDFLOW_PREFIX is used by default_prefix()
  • REDFLOW_APP is used by create_client() when app is omitted

Triggering runs

  • await client.emit_workflow(name, input, ...)
  • await client.run_by_name(name, input, ...) (advanced; supports queue_override)

WorkflowDefinition.run(...) is usually preferred because it keeps workflow defaults (queue/max attempts) close to the definition.

Inspect and control runs

from redflow import ListRunsParams

run = await client.get_run("run_123")
steps = await client.get_run_steps("run_123")

recent = await client.list_runs(ListRunsParams(limit=50))
failed_checkout = await client.list_runs(
    ListRunsParams(workflow="checkout", status="failed", limit=20)
)

workflows = await client.list_workflows()
meta = await client.get_workflow_meta("checkout")
all_meta = await client.list_workflows_meta()
stats = await client.get_stats()

canceled = await client.cancel_run("run_123", reason="requested by user")

RunHandle

Run-triggering methods return a RunHandle-like object with:

  • .id
  • await .get_state()
  • await .result(timeout_ms=...)
handle = await client.emit_workflow("checkout", {"order_id": "ord_3"})

state = await handle.get_state()
print(state["status"])

output = await handle.result(timeout_ms=30_000)

Default client helpers

The package keeps a process-global default client (used by WorkflowDefinition.run()):

from redflow import get_default_client, set_default_client

client = get_default_client()
set_default_client(client)

Worker API

start_worker(...) launches:

  • worker pollers (claim + execute runs)
  • scheduled promoter
  • reaper (lease recovery)
  • cron scheduler
  • poller watchdog

StartWorkerOptions

Common fields:

  • app (required): stable worker/service identifier
  • url or redis
  • prefix
  • registry
  • queues
  • concurrency

Runtime tuning (flat fields in Python, not nested like TS):

  • lease_ms
  • blmove_timeout_sec
  • reaper_interval_ms
  • reaper_batch_size
  • watchdog_enabled
  • watchdog_interval_ms
  • watchdog_stalled_for_ms

Example with explicit queues and tuning:

worker = await start_worker(
    StartWorkerOptions(
        app="billing-worker",
        url="redis://127.0.0.1:6379",
        prefix="redflow:prod",
        queues=["critical", "io", "analytics"],
        concurrency=8,
        lease_ms=5000,
        blmove_timeout_sec=1,
        reaper_interval_ms=500,
        reaper_batch_size=100,
        watchdog_enabled=True,
    )
)

Important startup note

start_worker(...) automatically syncs the workflow registry to Redis metadata. Make sure all workflow modules are imported first.

Testing

Unit tests with run_inline(...)

run_inline executes a workflow handler in-process without Redis.

from redflow import run_inline

result = await run_inline(send_welcome_email, input={"user_id": "test"})

assert result.succeeded is True
assert result.output == {"sent": True}
assert [step.name for step in result.steps] == ["fetch-user", "send-email"]

Override step outputs by step name:

result = await run_inline(
    send_welcome_email,
    input={"user_id": "test"},
    step_overrides={
        "fetch-user": {"id": "test", "email": "mock@example.com"},
        "send-email": {"ok": True},
    },
)

Notes:

  • run_inline is best for handler logic and step sequencing.
  • step.run_workflow(...) requires step_overrides in run_inline (no Redis worker exists to execute child runs).
  • Use real Redis + start_worker(...) for end-to-end lifecycle tests.

Errors

Common error classes:

from redflow import (
    CanceledError,
    InputValidationError,
    NonRetriableError,
    OutputSerializationError,
    RedflowError,
    TimeoutError,
    UnknownWorkflowError,
)

Typical semantics:

  • CanceledError: run/step canceled
  • TimeoutError: wait or step timeout
  • InputValidationError: workflow input failed schema validation
  • UnknownWorkflowError: workflow metadata/definition not found
  • OutputSerializationError: step/workflow output was not JSON-serializable
  • NonRetriableError: fail immediately without retrying

Python vs TypeScript Notes

  • Python uses snake_case names (emit_workflow, max_attempts, run_at)
  • TypeScript uses camelCase (emitWorkflow, maxAttempts, runAt)
  • Runtime semantics are intentionally aligned across implementations

Development (package)

From redflow/packages/python:

uv sync --extra dev
uv run ruff check .
uv run pytest
uv run pytest -m e2e  # requires Redis

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

redflow-0.0.10.tar.gz (87.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

redflow-0.0.10-py3-none-any.whl (44.1 kB view details)

Uploaded Python 3

File details

Details for the file redflow-0.0.10.tar.gz.

File metadata

  • Download URL: redflow-0.0.10.tar.gz
  • Upload date:
  • Size: 87.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for redflow-0.0.10.tar.gz
Algorithm Hash digest
SHA256 203683f0e9e65083f8e81e0e59f5209531b9b8e1f9f000c1083b28572f354344
MD5 66722a61dc0d3988efad92c95c98cebe
BLAKE2b-256 17ce0a6479d120bfe9eb8efe746f15e7c199e64e241dcf757fac5e8ead8ce3af

See more details on using hashes here.

Provenance

The following attestation bundles were made for redflow-0.0.10.tar.gz:

Publisher: ci.yml on getrelocapp/redflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file redflow-0.0.10-py3-none-any.whl.

File metadata

  • Download URL: redflow-0.0.10-py3-none-any.whl
  • Upload date:
  • Size: 44.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for redflow-0.0.10-py3-none-any.whl
Algorithm Hash digest
SHA256 1ac0c7cd0f6bc13430d3b3c04e44a3e8469aca3033c0e6a21db38a755cb07a62
MD5 c47ec8305c7581ae9a3c984c2667ff32
BLAKE2b-256 9ce273ce54869e87b32b2b60ac240724928d8274a3c3bf1632d278040590fd60

See more details on using hashes here.

Provenance

The following attestation bundles were made for redflow-0.0.10-py3-none-any.whl:

Publisher: ci.yml on getrelocapp/redflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page