Skip to main content

Durable workflow engine backed by Redis — Python client

Project description

redflow (Python)

Durable workflow engine backed by Redis.

redflow helps you run async workflows with durable steps, retries, cancellation, and scheduling. The Python runtime is intentionally aligned with the TypeScript @redflow/client runtime model.

Status

Early stage. APIs are usable, but expect iteration.

Documentation Map

This README is grouped by Diátaxis-style intent:

  • Tutorial: first working workflow in ~5 minutes
  • How-to: focused recipes for common production tasks
  • Explanation: execution model and guarantees
  • Reference: options, environment variables, and API surface

Requirements

  • Python 3.11+
  • Redis 6+ recommended

Install

Using uv (recommended):

uv add redflow

Optional extras:

# Faster Redis parser/transport path
uv add "redflow[fast]"

# Pydantic input validation support
uv add "redflow[pydantic]"

Using pip:

pip install redflow
pip install "redflow[fast]"
pip install "redflow[pydantic]"

Tutorial

1) Define a workflow

from redflow import WorkflowHandlerContext, workflow


async def fetch_user(user_id: str) -> dict:
    return {"id": user_id, "email": "user@example.com"}


async def send_email(email: str) -> dict:
    return {"ok": True, "email": email}


@workflow("send-welcome-email", queue="default", max_attempts=3)
async def send_welcome_email(ctx: WorkflowHandlerContext) -> dict:
    user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"])
    await ctx.step.run("send-email", send_email, user["email"])
    return {"sent": True}

2) Start a worker

Import workflow modules before starting the worker so definitions are registered.

import asyncio
from redflow import StartWorkerOptions, start_worker

import myapp.workflows  # noqa: F401


async def main() -> None:
    worker = await start_worker(
        StartWorkerOptions(
            app="billing-worker",
            url="redis://127.0.0.1:6379",
            concurrency=4,
        )
    )
    try:
        await asyncio.Event().wait()
    finally:
        await worker.stop()


asyncio.run(main())

3) Trigger a run and wait for output

handle = await send_welcome_email.run(
    {"user_id": "user_123"},
    idempotency_key="welcome:user_123",
)

result = await handle.result(timeout_ms=15_000)
print(result)  # {"sent": True}

You can also trigger by workflow name:

from redflow import create_client

client = create_client(url="redis://127.0.0.1:6379")

handle = await client.emit_workflow(
    "send-welcome-email",
    {"user_id": "user_123"},
    idempotency_key="welcome:user_123",
)

How-to

Choose the right step primitive

Python step API is intentionally Pythonic: step name is positional ("step-name"), not an options object like in TS.

Use ctx.step.run(...) for durable units of work:

user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"], timeout_ms=4_000)

Notes:

  • Step names are unique within a run; duplicate names fail the run.
  • If the function accepts signal (or **kwargs), redflow injects signal=<asyncio.Event>.
  • timeout_ms raises TimeoutError and records failed step state.
  • Step output must be JSON-serializable.

Use ctx.step.run_workflow(...) when parent must wait for child output:

child_output = await ctx.step.run_workflow(
    "generate-receipt",
    "receipt-workflow",  # or WorkflowDefinition
    {"order_id": "ord_1"},
    timeout_ms=20_000,
)

Use ctx.step.emit_workflow(...) for fire-and-forget child runs:

child_run_id = await ctx.step.emit_workflow(
    "emit-analytics",
    "analytics-workflow",  # or WorkflowDefinition
    {"order_id": "ord_1"},
)

run_workflow and emit_workflow support:

  • timeout_ms
  • run_at
  • queue_override
  • idempotency_key
  • idempotency_ttl

Trigger a one-off delayed run

from datetime import datetime, timedelta

handle = await send_welcome_email.run(
    {"user_id": "user_789"},
    run_at=datetime.now() + timedelta(minutes=1),
    idempotency_key="welcome:user_789:delayed",
)

Add input validation with Pydantic

Install the optional extra first:

uv add "redflow[pydantic]"

Then:

from pydantic import BaseModel
from redflow import WorkflowHandlerContext, define_workflow


class SendWelcomeInput(BaseModel):
    user_id: str


async def handler(ctx: WorkflowHandlerContext[SendWelcomeInput]) -> dict:
    return {"user_id": ctx.input.user_id}


send_welcome = define_workflow(
    "send-welcome",
    handler=handler,
    input_schema=SendWelcomeInput,
)

Configure retries and failure handling

  • Retries use exponential backoff with jitter.
  • max_attempts includes the initial attempt.
  • Raise NonRetriableError to fail immediately without retry.
  • Use on_failure for final failure hooks (not called on cancel).
from redflow import NonRetriableError, OnFailureContext, define_workflow


async def on_fail(ctx: OnFailureContext) -> None:
    print("final failure", ctx.run.id, ctx.error)


define_workflow(
    "invoice-sync",
    handler=handler,
    max_attempts=4,
    on_failure=on_fail,
)

Schedule workflows with cron

from redflow import CronTrigger, define_workflow

define_workflow(
    "digest-cron",
    handler=handler,
    queue="ops",
    cron=[
        CronTrigger(id="digest-hourly", expression="0 * * * *"),
        CronTrigger(expression="*/5 * * * *", timezone="UTC", input={"source": "cron"}),
    ],
)

Cron scheduling respects max_concurrency: ticks are skipped while the workflow is at its concurrency limit.

Test handlers quickly with run_inline(...)

run_inline executes workflow logic in-process without Redis:

from redflow import run_inline

result = await run_inline(send_welcome_email, input={"user_id": "test"})
assert result.succeeded is True
assert result.output == {"sent": True}

Step overrides by step name:

result = await run_inline(
    send_welcome_email,
    input={"user_id": "test"},
    step_overrides={
        "fetch-user": {"id": "test", "email": "mock@example.com"},
        "send-email": {"ok": True},
    },
)

For full lifecycle behavior (queues, retries, cron, cancel), run e2e tests against real Redis and start_worker(...).

Important run_inline limitation:

  • step.run_workflow(...) in run_inline requires matching step_overrides because no Redis worker executes child runs.

Explanation

Handler context

Inside a workflow handler:

  • ctx.input: validated workflow input (or raw input if no schema)
  • ctx.run: run metadata (id, workflow, queue, attempt, max_attempts)
  • ctx.signal: cancellation event
  • ctx.step: durable step API

Execution model

Each workflow run lives in Redis and moves through:

scheduled -> queued -> running -> succeeded | failed | canceled

Step state is stored per run. If a step with the same name already succeeded in that run, its cached output is reused.

Durability and idempotency guarantees

  • Step durability: successful step output is persisted and reused across retries/restarts for the same run + step name.
  • Step uniqueness: duplicate step names in one run are rejected.
  • Run idempotency: idempotency_key deduplicates run creation.

Cancellation model

  • ctx.signal is set when workflow cancellation is requested.
  • ctx.step.run(...) injects signal= into the function when supported.
  • Parent cancellation propagates to run_workflow(...) child runs.

Runtime parity with TypeScript

Behavior is intentionally aligned with @redflow/client. API naming differs by language convention:

  • Python: snake_case (emit_workflow, max_attempts, run_at)
  • TypeScript: camelCase (emitWorkflow, maxAttempts, runAt)

Reference

Workflow definition options

define_workflow(...) / @workflow(...):

  • queue: str = "default"
  • max_concurrency: int = 1
  • max_attempts: int | None = None (falls back to engine default)
  • cron: list[CronTrigger] | None = None
  • on_failure: Callable[[OnFailureContext], Awaitable[None] | None] | None
  • input_schema: Any | None = None
  • registry: WorkflowRegistry | None = None

Client API

Create client:

from redflow import create_client

client = create_client(
    app="my-service",                 # optional
    url="redis://127.0.0.1:6379",     # optional if REDIS_URL is set
    prefix="redflow:v1",              # optional
)

Useful methods:

  • emit_workflow(...)
  • run_by_name(...) (advanced)
  • get_run(run_id)
  • get_run_steps(run_id)
  • list_runs(ListRunsParams(...))
  • list_workflows()
  • get_workflow_meta(name)
  • list_workflows_meta()
  • get_stats()
  • cancel_run(run_id, reason=...)
  • cleanup_expired_runs_now()

WorkflowDefinition.run(...) is the common path for application code.

Run calls return a RunHandle with:

  • .id
  • await .get_state()
  • await .result(timeout_ms=...)

Default client helpers used by WorkflowDefinition.run(...):

  • get_default_client()
  • set_default_client(client)

Run history retention

Terminal runs are retained for a configurable window, then purged.

  • Default: 30 days
  • Override: REDFLOW_RUN_RETENTION_DAYS (fractional values supported)

Manual cleanup hook:

removed = await client.cleanup_expired_runs_now()

Worker API

start_worker(StartWorkerOptions(...)) starts:

  • pollers (claim + execute runs)
  • scheduled promoter
  • reaper
  • cron scheduler
  • run history cleanup loop
  • registry recovery loop
  • watchdog

StartWorkerOptions main fields:

  • app (required)
  • redis or url
  • prefix
  • registry
  • queues
  • concurrency
  • lease_ms
  • blmove_timeout_sec
  • reaper_interval_ms
  • reaper_batch_size
  • watchdog_enabled
  • watchdog_interval_ms
  • watchdog_stalled_for_ms
  • run_history_cleanup_interval_ms
  • registry_recovery_interval_ms

Runtime tuning fields in Python are flat (not nested under runtime like TS).

registry_recovery_interval_ms lets a running worker recover workflow metadata after Redis state resets.

Example with explicit queues + runtime tuning:

worker = await start_worker(
    StartWorkerOptions(
        app="billing-worker",
        url="redis://127.0.0.1:6379",
        prefix="redflow:prod",
        queues=["critical", "io", "analytics"],
        concurrency=8,
        lease_ms=5000,
        blmove_timeout_sec=1,
        reaper_interval_ms=500,
        reaper_batch_size=100,
        watchdog_enabled=True,
        run_history_cleanup_interval_ms=60_000,
        registry_recovery_interval_ms=5_000,
    )
)

Important startup note:

start_worker(...) syncs the in-process workflow registry to Redis metadata on startup. Import workflow modules before worker start.

Environment variables

  • REDIS_URL
  • REDFLOW_PREFIX
  • REDFLOW_APP
  • REDFLOW_RUN_RETENTION_DAYS

Resolution rules:

  • create_client(url=...) uses url, otherwise REDIS_URL, otherwise redis://localhost:6379
  • prefix defaults to REDFLOW_PREFIX (or built-in default)
  • create_client(app=...) uses explicit app, otherwise REDFLOW_APP

Common errors

from redflow import (
    CanceledError,
    InputValidationError,
    NonRetriableError,
    OutputSerializationError,
    RedflowError,
    TimeoutError,
    UnknownWorkflowError,
)

Semantics:

  • CanceledError: run/step canceled
  • TimeoutError: wait or step timeout
  • InputValidationError: workflow input failed schema validation
  • UnknownWorkflowError: workflow definition/metadata not found
  • OutputSerializationError: output is not JSON-serializable
  • NonRetriableError: fail without retry

Support

Development

From redflow/packages/python:

uv sync --extra dev
uv run ruff check .
uv run mypy src
uv run pytest
uv run pytest -m e2e  # requires Redis

If e2e tests are skipped, set REDIS_URL or run local redis-server.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

redflow-0.0.12.tar.gz (95.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

redflow-0.0.12-py3-none-any.whl (50.1 kB view details)

Uploaded Python 3

File details

Details for the file redflow-0.0.12.tar.gz.

File metadata

  • Download URL: redflow-0.0.12.tar.gz
  • Upload date:
  • Size: 95.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for redflow-0.0.12.tar.gz
Algorithm Hash digest
SHA256 122ecd2a229477ea203150f56c418eeb4c51aef94f9dc10f2324c5b49b369d18
MD5 1e759d2b430d86d7480bbeceb6d00bc1
BLAKE2b-256 001a74c13e72e4ea128c486e673b0d6e766e94f7605ffd1120637321cd5e8e31

See more details on using hashes here.

Provenance

The following attestation bundles were made for redflow-0.0.12.tar.gz:

Publisher: ci.yml on getrelocapp/redflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file redflow-0.0.12-py3-none-any.whl.

File metadata

  • Download URL: redflow-0.0.12-py3-none-any.whl
  • Upload date:
  • Size: 50.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for redflow-0.0.12-py3-none-any.whl
Algorithm Hash digest
SHA256 0e517e1936f411256fe7555002774970ffac147c0d19dcb5451f24ad88ab7270
MD5 0a3b3d62b33d6e2c29bbf245d85300fb
BLAKE2b-256 145ea1ca0bbd5aac0dfde1a0d86f32fecc6efad7d1b18c53c4b145595349b4b0

See more details on using hashes here.

Provenance

The following attestation bundles were made for redflow-0.0.12-py3-none-any.whl:

Publisher: ci.yml on getrelocapp/redflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page