Skip to main content

Durable workflow engine backed by Redis — Python client

Project description

redflow (Python)

Durable workflow engine backed by Redis.

redflow helps you run async workflows with durable steps, retries, cancellation, and scheduling. The Python runtime is intentionally aligned with the TypeScript @redflow/client runtime model.

Status

Early stage. APIs are usable, but expect iteration.

Documentation Map

This README is grouped by Diátaxis-style intent:

  • Tutorial: first working workflow in ~5 minutes
  • How-to: focused recipes for common production tasks
  • Explanation: execution model and guarantees
  • Reference: options, environment variables, and API surface

Source Layout

The package follows a compact src/ layout with a stable public API and grouped internals:

  • src/redflow/ — public modules and stable entry points
  • src/redflow/_client/ — client command/query/registry internals
  • src/redflow/_worker/ — worker loops/executor/step runtime internals
  • src/redflow/_core/ — shared private primitives (Redis decoding, payload storage, retry/idempotency)

This keeps top-level imports stable while allowing large modules to be decomposed without excessive folder nesting.

Requirements

  • Python 3.11+
  • Redis 6+ recommended

Install

Using uv (recommended):

uv add redflow

Optional extras:

# Faster Redis parser/transport path
uv add "redflow[fast]"

# Pydantic input validation support
uv add "redflow[pydantic]"

Using pip:

pip install redflow
pip install "redflow[fast]"
pip install "redflow[pydantic]"

Tutorial

1) Define a workflow

from redflow import WorkflowHandlerContext, workflow


async def fetch_user(user_id: str) -> dict:
    return {"id": user_id, "email": "user@example.com"}


async def send_email(email: str) -> dict:
    return {"ok": True, "email": email}


@workflow("send-welcome-email", queue="default", max_attempts=3)
async def send_welcome_email(ctx: WorkflowHandlerContext) -> dict:
    user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"])
    await ctx.step.run("send-email", send_email, user["email"])
    return {"sent": True}

2) Start a worker

Import workflow modules before starting the worker so definitions are registered.

import asyncio
from redflow import StartWorkerOptions, start_worker

import myapp.workflows  # noqa: F401


async def main() -> None:
    worker = await start_worker(
        StartWorkerOptions(
            app="billing-worker",
            url="redis://127.0.0.1:6379",
            concurrency=4,
        )
    )
    try:
        await asyncio.Event().wait()
    finally:
        await worker.stop()


asyncio.run(main())

3) Trigger a run and wait for output

handle = await send_welcome_email.run(
    {"user_id": "user_123"},
    idempotency_key="welcome:user_123",
)

result = await handle.result(timeout_ms=15_000)
print(result)  # {"sent": True}

You can also trigger by workflow name:

from redflow import create_client

client = create_client(url="redis://127.0.0.1:6379")

handle = await client.emit_workflow(
    "send-welcome-email",
    {"user_id": "user_123"},
    idempotency_key="welcome:user_123",
)

How-to

Choose the right step primitive

Python step API is intentionally Pythonic: step name is positional ("step-name"), not an options object like in TS.

Use ctx.step.run(...) for durable units of work:

user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"], timeout_ms=4_000)

Notes:

  • Step names are unique within a run; duplicate names fail the run.
  • If the function accepts signal (or **kwargs), redflow injects signal=<asyncio.Event>.
  • timeout_ms raises TimeoutError and records failed step state.
  • Step output must be JSON-serializable.

Use ctx.step.wait_for(...) to pause a run durably until a relative/absolute time:

from datetime import datetime, timedelta

await ctx.step.wait_for("pause-before-retry", "5m")
await ctx.step.wait_for("pause-until-cutoff", datetime.now() + timedelta(minutes=15))

wait_for accepts milliseconds (int/float), duration strings (for example "30s", "5m"), or datetime.

Use ctx.step.run_workflow(...) when parent must wait for child output:

child_output = await ctx.step.run_workflow(
    "generate-receipt",
    "receipt-workflow",  # or WorkflowDefinition
    {"order_id": "ord_1"},
    timeout_ms=20_000,
)

Use ctx.step.emit_workflow(...) for fire-and-forget child runs:

child_run_id = await ctx.step.emit_workflow(
    "emit-analytics",
    "analytics-workflow",  # or WorkflowDefinition
    {"order_id": "ord_1"},
)

Use ctx.step.emit_event(...) to fan out runs to all workflows subscribed to an event:

child_run_ids = await ctx.step.emit_event(
    "emit-analytics-event",
    "analytics.record",
    {"order_id": "ord_1"},
)

run_workflow, emit_workflow, and emit_event support:

  • timeout_ms
  • run_at
  • queue_override
  • idempotency_key
  • idempotency_ttl
  • parent_run_id (advanced; set automatically for step-spawned children)

Trigger a one-off delayed run

from datetime import datetime, timedelta

handle = await send_welcome_email.run(
    {"user_id": "user_789"},
    run_at=datetime.now() + timedelta(minutes=1),
    idempotency_key="welcome:user_789:delayed",
)

Add input validation with Pydantic

Install the optional extra first:

uv add "redflow[pydantic]"

Then:

from pydantic import BaseModel
from redflow import WorkflowHandlerContext, define_workflow


class SendWelcomeInput(BaseModel):
    user_id: str


async def handler(ctx: WorkflowHandlerContext[SendWelcomeInput]) -> dict:
    return {"user_id": ctx.input.user_id}


send_welcome = define_workflow(
    "send-welcome",
    handler=handler,
    input_schema=SendWelcomeInput,
)

Configure retries and failure handling

  • Retries use exponential backoff with jitter.
  • max_attempts includes the initial attempt.
  • Raise NonRetriableError to fail immediately without retry.
  • Use on_failure for final failure hooks (not called on cancel).
  • Use retries=WorkflowRetryPolicy(...) for custom delay/should-retry behavior.
from redflow import NonRetriableError, OnFailureContext, define_workflow
from redflow.types import WorkflowRetryPolicy


async def on_fail(ctx: OnFailureContext) -> None:
    print("final failure", ctx.run.id, ctx.error)


define_workflow(
    "invoice-sync",
    handler=handler,
    max_attempts=4,
    retries=WorkflowRetryPolicy(
        max_attempts=4,
        delay=lambda ctx: "5s" if ctx.run.attempt < 3 else "30s",
        should_retry=lambda ctx: "validation" not in str(ctx.error).lower(),
    ),
    on_failure=on_fail,
)

When both are set, retries.max_attempts takes precedence over top-level max_attempts.

Control admission with debounce/mutex/rate limits

from redflow import define_workflow
from redflow.types import WorkflowDebounceOptions, WorkflowRateLimitOptions

define_workflow(
    "invoice-sync",
    handler=handler,
    debounce=WorkflowDebounceOptions(
        key=lambda input: f"invoice:{input['invoice_id']}",
        period="30s",
        timeout="5m",
    ),
    mutex=lambda input: f"customer:{input['customer_id']}",
    throttle=WorkflowRateLimitOptions(
        key=lambda input: f"customer:{input['customer_id']}",
        limit=1,
        period="2s",
    ),
    rate_limit=WorkflowRateLimitOptions(
        key=lambda input: f"tenant:{input['tenant_id']}",
        limit=100,
        period="1m",
    ),
)
  • debounce coalesces duplicate enqueue attempts by dynamic key.
  • mutex allows only one run with the same key in running state at a time.
  • throttle reschedules queued runs to a later time when the key budget is exceeded.
  • rate_limit fails fast with retry metadata when the key budget is exceeded.

Schedule workflows with cron

from redflow import CronTrigger, define_workflow

define_workflow(
    "digest-cron",
    handler=handler,
    queue="ops",
    cron=[
        CronTrigger(id="digest-hourly", expression="0 * * * *"),
        CronTrigger(expression="*/5 * * * *", timezone="UTC", input={"source": "cron"}),
    ],
)

Cron scheduling respects max_concurrency: ticks are skipped while the workflow is at its concurrency limit.

Test handlers quickly with run_inline(...)

run_inline executes workflow logic in-process without Redis:

from redflow import run_inline

result = await run_inline(send_welcome_email, input={"user_id": "test"})
assert result.succeeded is True
assert result.output == {"sent": True}

Step overrides by step name:

result = await run_inline(
    send_welcome_email,
    input={"user_id": "test"},
    step_overrides={
        "fetch-user": {"id": "test", "email": "mock@example.com"},
        "send-email": {"ok": True},
    },
)

For full lifecycle behavior (queues, retries, cron, cancel), run e2e tests against real Redis and start_worker(...).

Important run_inline limitation:

  • step.run_workflow(...) in run_inline requires matching step_overrides because no Redis worker executes child runs.

Explanation

Handler context

Inside a workflow handler:

  • ctx.input: validated workflow input (or raw input if no schema)
  • ctx.run: run metadata (id, workflow, queue, attempt, max_attempts)
  • ctx.signal: cancellation event
  • ctx.step: durable step API

Execution model

Each workflow run lives in Redis and moves through:

scheduled -> queued -> running -> succeeded | failed | canceled

Step state is stored per run. If a step with the same name already succeeded in that run, its cached output is reused.

Durability and idempotency guarantees

  • Step durability: successful step output is persisted and reused across retries/restarts for the same run + step name.
  • Step uniqueness: duplicate step names in one run are rejected.
  • Run idempotency: idempotency_key deduplicates run creation.

Cancellation model

  • ctx.signal is set when workflow cancellation is requested.
  • ctx.step.run(...) injects signal= into the function when supported.
  • Parent cancellation propagates to descendant runs spawned from step APIs (run_workflow, emit_workflow, emit_event).
  • Lease-loss abort paths do not trigger child-cancel cascades by themselves.

Runtime parity with TypeScript

Behavior is intentionally aligned with @redflow/client. API naming differs by language convention:

  • Python: snake_case (emit_workflow, max_attempts, run_at)
  • TypeScript: camelCase (emitWorkflow, maxAttempts, runAt)

Reference

Workflow definition options

define_workflow(...) / @workflow(...):

  • queue: str = "default"
  • max_concurrency: int = 1
  • max_attempts: int | None = None (falls back to engine default)
  • cron: list[CronTrigger] | None = None
  • event: list[EventTrigger] | None = None
  • debounce: WorkflowDebounceOptions | None = None
  • mutex: Callable[[Any], str] | None = None
  • rate_limit: WorkflowRateLimitOptions | None = None
  • retries: WorkflowRetryPolicy | None = None
  • throttle: WorkflowRateLimitOptions | None = None
  • on_failure: Callable[[OnFailureContext], Awaitable[None] | None] | None
  • input_schema: Any | None = None
  • registry: WorkflowRegistry | None = None

Client API

Create client:

from redflow import create_client

client = create_client(
    app="my-service",                 # optional
    url="redis://127.0.0.1:6379",     # optional if REDIS_URL is set
    prefix="redflow:v1",              # optional
)

Useful methods:

  • emit_workflow(...)
  • emit_event(...)
  • run_by_name(...) (advanced)
  • get_run(run_id)
  • get_run_steps(run_id)
  • list_runs(ListRunsParams(...))
  • search_runs(SearchRunsParams(...))
  • list_workflows()
  • get_workflow_meta(name)
  • list_workflows_meta()
  • list_event_definitions()
  • get_stats()
  • cancel_run(run_id, reason=...)
  • reset_runs() (clear run state, keep workflow/cron/event metadata + custom idempotency keys)
  • reset_state() (clear all keys under prefix; testing/dev)
  • cleanup_expired_runs_now()

WorkflowDefinition.run(...) is the common path for application code.

Run calls return a RunHandle with:

  • .id
  • await .get_state()
  • await .result(timeout_ms=...)

Default client helpers used by WorkflowDefinition.run(...):

  • get_default_client()
  • set_default_client(client)

Run history retention

Terminal runs are retained for a configurable window, then purged.

  • Default: 30 days
  • Override: REDFLOW_RUN_RETENTION_DAYS (fractional values supported)

Manual cleanup hook:

removed = await client.cleanup_expired_runs_now()

Worker API

start_worker(StartWorkerOptions(...)) starts:

  • pollers (claim + execute runs)
  • scheduled promoter
  • reaper
  • cron scheduler
  • run history cleanup loop
  • registry recovery loop
  • watchdog

StartWorkerOptions main fields:

  • app (required)
  • redis or url
  • prefix
  • registry
  • queues
  • concurrency
  • lease_ms
  • blmove_timeout_sec
  • reaper_interval_ms
  • reaper_batch_size
  • watchdog_enabled
  • watchdog_interval_ms
  • watchdog_stalled_for_ms
  • run_history_cleanup_interval_ms
  • registry_recovery_interval_ms

Runtime tuning fields in Python are flat (not nested under runtime like TS).

registry_recovery_interval_ms lets a running worker recover workflow metadata after Redis state resets.

Example with explicit queues + runtime tuning:

worker = await start_worker(
    StartWorkerOptions(
        app="billing-worker",
        url="redis://127.0.0.1:6379",
        prefix="redflow:prod",
        queues=["critical", "io", "analytics"],
        concurrency=8,
        lease_ms=5000,
        blmove_timeout_sec=1,
        reaper_interval_ms=500,
        reaper_batch_size=100,
        watchdog_enabled=True,
        run_history_cleanup_interval_ms=60_000,
        registry_recovery_interval_ms=5_000,
    )
)

Important startup note:

start_worker(...) syncs the in-process workflow registry to Redis metadata on startup. Import workflow modules before worker start.

Environment variables

  • REDIS_URL
  • REDFLOW_PREFIX
  • REDFLOW_APP
  • REDFLOW_RUN_RETENTION_DAYS

Resolution rules:

  • create_client(url=...) uses url, otherwise REDIS_URL, otherwise redis://localhost:6379
  • prefix defaults to REDFLOW_PREFIX (or built-in default)
  • create_client(app=...) uses explicit app, otherwise REDFLOW_APP

Common errors

from redflow import (
    CanceledError,
    InputValidationError,
    NonRetriableError,
    OutputSerializationError,
    RedflowError,
    TimeoutError,
    UnknownWorkflowError,
)

Semantics:

  • CanceledError: run/step canceled
  • TimeoutError: wait or step timeout
  • InputValidationError: workflow input failed schema validation
  • UnknownWorkflowError: workflow definition/metadata not found
  • OutputSerializationError: output is not JSON-serializable
  • NonRetriableError: fail without retry

Support

Development

From redflow/packages/python:

uv sync --extra dev
uv run ruff check .
uv run mypy src
uv run pytest
uv run pytest -m e2e  # requires Redis

If e2e tests are skipped, set REDIS_URL or run local redis-server.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

redflow-0.0.25.tar.gz (136.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

redflow-0.0.25-py3-none-any.whl (94.1 kB view details)

Uploaded Python 3

File details

Details for the file redflow-0.0.25.tar.gz.

File metadata

  • Download URL: redflow-0.0.25.tar.gz
  • Upload date:
  • Size: 136.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for redflow-0.0.25.tar.gz
Algorithm Hash digest
SHA256 44869c969b6cd55acf31031ef71244c27b8ea716522eed8d8d86e62d6bf8cc8c
MD5 067c7fd4c1d4965786e24dfe94270894
BLAKE2b-256 6e563a117e0ee11f08e56f9708b87782003a43192ef5525ba8fc15a3c85b2541

See more details on using hashes here.

Provenance

The following attestation bundles were made for redflow-0.0.25.tar.gz:

Publisher: ci.yml on getrelocapp/redflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file redflow-0.0.25-py3-none-any.whl.

File metadata

  • Download URL: redflow-0.0.25-py3-none-any.whl
  • Upload date:
  • Size: 94.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for redflow-0.0.25-py3-none-any.whl
Algorithm Hash digest
SHA256 e084403e0b55a9665a993553c29e142919a2324dde86677ea46b09a7d1c8c0b2
MD5 3577fda69659b8a2b3d81e19dcca7684
BLAKE2b-256 8d4177b304f30007dca6436af1da52091f277e810ffb4255abe2eca197a8043c

See more details on using hashes here.

Provenance

The following attestation bundles were made for redflow-0.0.25-py3-none-any.whl:

Publisher: ci.yml on getrelocapp/redflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page