Skip to main content

Durable workflow engine backed by Redis — Python client

Project description

redflow (Python)

Durable workflow engine backed by Redis.

redflow helps you run async workflows with durable steps, retries, cancellation, and scheduling. The Python runtime is intentionally aligned with the TypeScript @redflow/client runtime model.

Status

Early stage. APIs are usable, but expect iteration.

Documentation Map

This README is grouped by Diátaxis-style intent:

  • Tutorial: first working workflow in ~5 minutes
  • How-to: focused recipes for common production tasks
  • Explanation: execution model and guarantees
  • Reference: options, environment variables, and API surface

Source Layout

The package follows a compact src/ layout with a stable public API and grouped internals:

  • src/redflow/ — public modules and stable entry points
  • src/redflow/_client/ — client command/query/registry internals
  • src/redflow/_worker/ — worker loops/executor/step runtime internals
  • src/redflow/_core/ — shared private primitives (Redis decoding, payload storage, retry/idempotency)

This keeps top-level imports stable while allowing large modules to be decomposed without excessive folder nesting.

Requirements

  • Python 3.11+
  • Redis 6+ recommended

Install

Using uv (recommended):

uv add redflow

Optional extras:

# Faster Redis parser/transport path
uv add "redflow[fast]"

# Pydantic input validation support
uv add "redflow[pydantic]"

Using pip:

pip install redflow
pip install "redflow[fast]"
pip install "redflow[pydantic]"

Tutorial

1) Define a workflow

from redflow import WorkflowHandlerContext, workflow


async def fetch_user(user_id: str) -> dict:
    return {"id": user_id, "email": "user@example.com"}


async def send_email(email: str) -> dict:
    return {"ok": True, "email": email}


@workflow("send-welcome-email", queue="default", max_attempts=3)
async def send_welcome_email(ctx: WorkflowHandlerContext) -> dict:
    user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"])
    await ctx.step.run("send-email", send_email, user["email"])
    return {"sent": True}

2) Start a worker

Import workflow modules before starting the worker so definitions are registered.

import asyncio
from redflow import StartWorkerOptions, start_worker

import myapp.workflows  # noqa: F401


async def main() -> None:
    worker = await start_worker(
        StartWorkerOptions(
            app="billing-worker",
            url="redis://127.0.0.1:6379",
            concurrency=4,
        )
    )
    try:
        await asyncio.Event().wait()
    finally:
        await worker.stop()


asyncio.run(main())

3) Trigger a run and wait for output

handle = await send_welcome_email.run(
    {"user_id": "user_123"},
    idempotency_key="welcome:user_123",
)

result = await handle.result(timeout_ms=15_000)
print(result)  # {"sent": True}

You can also trigger by workflow name:

from redflow import create_client

client = create_client(url="redis://127.0.0.1:6379")

handle = await client.emit_workflow(
    "send-welcome-email",
    {"user_id": "user_123"},
    idempotency_key="welcome:user_123",
)

How-to

Choose the right step primitive

Python step API is intentionally Pythonic: step name is positional ("step-name"), not an options object like in TS.

Use ctx.step.run(...) for durable units of work:

user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"], timeout_ms=4_000)

Notes:

  • Step names are unique within a run; duplicate names fail the run.
  • If the function accepts signal (or **kwargs), redflow injects signal=<asyncio.Event>.
  • timeout_ms raises TimeoutError and records failed step state.
  • Step output must be JSON-serializable.

Use ctx.step.run_workflow(...) when parent must wait for child output:

child_output = await ctx.step.run_workflow(
    "generate-receipt",
    "receipt-workflow",  # or WorkflowDefinition
    {"order_id": "ord_1"},
    timeout_ms=20_000,
)

Use ctx.step.emit_workflow(...) for fire-and-forget child runs:

child_run_id = await ctx.step.emit_workflow(
    "emit-analytics",
    "analytics-workflow",  # or WorkflowDefinition
    {"order_id": "ord_1"},
)

Use ctx.step.emit_event(...) to fan out runs to all workflows subscribed to an event:

child_run_ids = await ctx.step.emit_event(
    "emit-analytics-event",
    "analytics.record",
    {"order_id": "ord_1"},
)

run_workflow, emit_workflow, and emit_event support:

  • timeout_ms
  • run_at
  • queue_override
  • idempotency_key
  • idempotency_ttl
  • parent_run_id (advanced; set automatically for step-spawned children)

Trigger a one-off delayed run

from datetime import datetime, timedelta

handle = await send_welcome_email.run(
    {"user_id": "user_789"},
    run_at=datetime.now() + timedelta(minutes=1),
    idempotency_key="welcome:user_789:delayed",
)

Add input validation with Pydantic

Install the optional extra first:

uv add "redflow[pydantic]"

Then:

from pydantic import BaseModel
from redflow import WorkflowHandlerContext, define_workflow


class SendWelcomeInput(BaseModel):
    user_id: str


async def handler(ctx: WorkflowHandlerContext[SendWelcomeInput]) -> dict:
    return {"user_id": ctx.input.user_id}


send_welcome = define_workflow(
    "send-welcome",
    handler=handler,
    input_schema=SendWelcomeInput,
)

Configure retries and failure handling

  • Retries use exponential backoff with jitter.
  • max_attempts includes the initial attempt.
  • Raise NonRetriableError to fail immediately without retry.
  • Use on_failure for final failure hooks (not called on cancel).
from redflow import NonRetriableError, OnFailureContext, define_workflow


async def on_fail(ctx: OnFailureContext) -> None:
    print("final failure", ctx.run.id, ctx.error)


define_workflow(
    "invoice-sync",
    handler=handler,
    max_attempts=4,
    on_failure=on_fail,
)

Schedule workflows with cron

from redflow import CronTrigger, define_workflow

define_workflow(
    "digest-cron",
    handler=handler,
    queue="ops",
    cron=[
        CronTrigger(id="digest-hourly", expression="0 * * * *"),
        CronTrigger(expression="*/5 * * * *", timezone="UTC", input={"source": "cron"}),
    ],
)

Cron scheduling respects max_concurrency: ticks are skipped while the workflow is at its concurrency limit.

Test handlers quickly with run_inline(...)

run_inline executes workflow logic in-process without Redis:

from redflow import run_inline

result = await run_inline(send_welcome_email, input={"user_id": "test"})
assert result.succeeded is True
assert result.output == {"sent": True}

Step overrides by step name:

result = await run_inline(
    send_welcome_email,
    input={"user_id": "test"},
    step_overrides={
        "fetch-user": {"id": "test", "email": "mock@example.com"},
        "send-email": {"ok": True},
    },
)

For full lifecycle behavior (queues, retries, cron, cancel), run e2e tests against real Redis and start_worker(...).

Important run_inline limitation:

  • step.run_workflow(...) in run_inline requires matching step_overrides because no Redis worker executes child runs.

Explanation

Handler context

Inside a workflow handler:

  • ctx.input: validated workflow input (or raw input if no schema)
  • ctx.run: run metadata (id, workflow, queue, attempt, max_attempts)
  • ctx.signal: cancellation event
  • ctx.step: durable step API

Execution model

Each workflow run lives in Redis and moves through:

scheduled -> queued -> running -> succeeded | failed | canceled

Step state is stored per run. If a step with the same name already succeeded in that run, its cached output is reused.

Durability and idempotency guarantees

  • Step durability: successful step output is persisted and reused across retries/restarts for the same run + step name.
  • Step uniqueness: duplicate step names in one run are rejected.
  • Run idempotency: idempotency_key deduplicates run creation.

Cancellation model

  • ctx.signal is set when workflow cancellation is requested.
  • ctx.step.run(...) injects signal= into the function when supported.
  • Parent cancellation propagates to descendant runs spawned from step APIs (run_workflow, emit_workflow, emit_event).
  • Lease-loss abort paths do not trigger child-cancel cascades by themselves.

Runtime parity with TypeScript

Behavior is intentionally aligned with @redflow/client. API naming differs by language convention:

  • Python: snake_case (emit_workflow, max_attempts, run_at)
  • TypeScript: camelCase (emitWorkflow, maxAttempts, runAt)

Reference

Workflow definition options

define_workflow(...) / @workflow(...):

  • queue: str = "default"
  • max_concurrency: int = 1
  • max_attempts: int | None = None (falls back to engine default)
  • cron: list[CronTrigger] | None = None
  • event: list[EventTrigger] | None = None
  • on_failure: Callable[[OnFailureContext], Awaitable[None] | None] | None
  • input_schema: Any | None = None
  • registry: WorkflowRegistry | None = None

Client API

Create client:

from redflow import create_client

client = create_client(
    app="my-service",                 # optional
    url="redis://127.0.0.1:6379",     # optional if REDIS_URL is set
    prefix="redflow:v1",              # optional
)

Useful methods:

  • emit_workflow(...)
  • emit_event(...)
  • run_by_name(...) (advanced)
  • get_run(run_id)
  • get_run_steps(run_id)
  • list_runs(ListRunsParams(...))
  • search_runs(SearchRunsParams(...))
  • list_workflows()
  • get_workflow_meta(name)
  • list_workflows_meta()
  • list_event_definitions()
  • get_stats()
  • cancel_run(run_id, reason=...)
  • cleanup_expired_runs_now()

WorkflowDefinition.run(...) is the common path for application code.

Run calls return a RunHandle with:

  • .id
  • await .get_state()
  • await .result(timeout_ms=...)

Default client helpers used by WorkflowDefinition.run(...):

  • get_default_client()
  • set_default_client(client)

Run history retention

Terminal runs are retained for a configurable window, then purged.

  • Default: 30 days
  • Override: REDFLOW_RUN_RETENTION_DAYS (fractional values supported)

Manual cleanup hook:

removed = await client.cleanup_expired_runs_now()

Worker API

start_worker(StartWorkerOptions(...)) starts:

  • pollers (claim + execute runs)
  • scheduled promoter
  • reaper
  • cron scheduler
  • run history cleanup loop
  • registry recovery loop
  • watchdog

StartWorkerOptions main fields:

  • app (required)
  • redis or url
  • prefix
  • registry
  • queues
  • concurrency
  • lease_ms
  • blmove_timeout_sec
  • reaper_interval_ms
  • reaper_batch_size
  • watchdog_enabled
  • watchdog_interval_ms
  • watchdog_stalled_for_ms
  • run_history_cleanup_interval_ms
  • registry_recovery_interval_ms

Runtime tuning fields in Python are flat (not nested under runtime like TS).

registry_recovery_interval_ms lets a running worker recover workflow metadata after Redis state resets.

Example with explicit queues + runtime tuning:

worker = await start_worker(
    StartWorkerOptions(
        app="billing-worker",
        url="redis://127.0.0.1:6379",
        prefix="redflow:prod",
        queues=["critical", "io", "analytics"],
        concurrency=8,
        lease_ms=5000,
        blmove_timeout_sec=1,
        reaper_interval_ms=500,
        reaper_batch_size=100,
        watchdog_enabled=True,
        run_history_cleanup_interval_ms=60_000,
        registry_recovery_interval_ms=5_000,
    )
)

Important startup note:

start_worker(...) syncs the in-process workflow registry to Redis metadata on startup. Import workflow modules before worker start.

Environment variables

  • REDIS_URL
  • REDFLOW_PREFIX
  • REDFLOW_APP
  • REDFLOW_RUN_RETENTION_DAYS

Resolution rules:

  • create_client(url=...) uses url, otherwise REDIS_URL, otherwise redis://localhost:6379
  • prefix defaults to REDFLOW_PREFIX (or built-in default)
  • create_client(app=...) uses explicit app, otherwise REDFLOW_APP

Common errors

from redflow import (
    CanceledError,
    InputValidationError,
    NonRetriableError,
    OutputSerializationError,
    RedflowError,
    TimeoutError,
    UnknownWorkflowError,
)

Semantics:

  • CanceledError: run/step canceled
  • TimeoutError: wait or step timeout
  • InputValidationError: workflow input failed schema validation
  • UnknownWorkflowError: workflow definition/metadata not found
  • OutputSerializationError: output is not JSON-serializable
  • NonRetriableError: fail without retry

Support

Development

From redflow/packages/python:

uv sync --extra dev
uv run ruff check .
uv run mypy src
uv run pytest
uv run pytest -m e2e  # requires Redis

If e2e tests are skipped, set REDIS_URL or run local redis-server.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

redflow-0.0.14.tar.gz (113.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

redflow-0.0.14-py3-none-any.whl (73.2 kB view details)

Uploaded Python 3

File details

Details for the file redflow-0.0.14.tar.gz.

File metadata

  • Download URL: redflow-0.0.14.tar.gz
  • Upload date:
  • Size: 113.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for redflow-0.0.14.tar.gz
Algorithm Hash digest
SHA256 0d2f592ebeb0e008f15ed1a78d510f49c0f6aaac9f3446d744cb0dbd4ea19403
MD5 80329212c2ac504afb37e58a5f1eac78
BLAKE2b-256 835ac8ba592503de3e33764c96007b7c135e914f7b4ad2dad200ac45c5264b9c

See more details on using hashes here.

Provenance

The following attestation bundles were made for redflow-0.0.14.tar.gz:

Publisher: ci.yml on getrelocapp/redflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file redflow-0.0.14-py3-none-any.whl.

File metadata

  • Download URL: redflow-0.0.14-py3-none-any.whl
  • Upload date:
  • Size: 73.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for redflow-0.0.14-py3-none-any.whl
Algorithm Hash digest
SHA256 dd41b499ed6558dd36225b35278a32d6cd48b41d222e03d136c9e190fa30ea05
MD5 89f258d707f7f3b2115100271317ded5
BLAKE2b-256 10e0bc013a6b3ff27a4b17fa1340d1e775752af9f6b003d67fa167bd1144218a

See more details on using hashes here.

Provenance

The following attestation bundles were made for redflow-0.0.14-py3-none-any.whl:

Publisher: ci.yml on getrelocapp/redflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page