Skip to main content

Durable workflow engine backed by Redis — Python client

Project description

redflow (Python)

Durable Redis-backed workflows for Python.

redflow lets you run async workflows with durable steps, retries, cancellation, cron triggers, event fan-out, and wait_for_event(...) pauses that survive worker restarts. The Python runtime is intentionally aligned with the TypeScript @redflow/client model while keeping a Pythonic API.

Status

Beta. The package is usable today, but the API is still evolving.

Highlights

  • Durable step.run(...) with cached step output across retries and restarts
  • Delayed runs and cron-triggered workflows
  • Event-driven workflows via EventTrigger(name=...)
  • Durable step.wait_for_event(...) with exact event + correlation_key matching
  • Durable event history via list_events(...) and get_event(...)
  • Cancellation propagation across parent and child runs
  • Inline testing via run_inline(...)
  • Python 3.11+ with optional Pydantic validation

Documentation shape

This README follows a Diataxis-style structure:

  • Tutorial: first working workflow
  • How-to: practical production recipes
  • Explanation: execution model and guarantees
  • Reference: key APIs and environment variables

Requirements

  • Python 3.11+
  • Redis 6.2+

Install

Using uv:

uv add redflow

Optional extras:

# Faster Redis parser / transport path
uv add "redflow[fast]"

# Pydantic input validation support
uv add "redflow[pydantic]"

Using pip:

pip install redflow
pip install "redflow[fast]"
pip install "redflow[pydantic]"

Tutorial

1. Define a workflow

from redflow import WorkflowHandlerContext, workflow


async def fetch_user(user_id: str, *, signal=None) -> dict:
    return {"id": user_id, "email": "user@example.com"}


async def send_email(email: str, *, signal=None) -> dict:
    return {"ok": True, "email": email}


@workflow("send-welcome-email", queue="default", max_attempts=3)
async def send_welcome_email(ctx: WorkflowHandlerContext) -> dict:
    user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"])
    await ctx.step.run("send-email", send_email, user["email"])
    return {"sent": True}

2. Start a worker

Import workflow modules before starting the worker so definitions are registered.

import asyncio

from redflow import StartWorkerOptions, start_worker

import myapp.workflows  # noqa: F401


async def main() -> None:
    worker = await start_worker(
        StartWorkerOptions(
            app="billing-worker",
            url="redis://127.0.0.1:6379",
            concurrency=4,
        )
    )
    try:
        await asyncio.Event().wait()
    finally:
        await worker.stop()


asyncio.run(main())

3. Trigger a run and wait for the result

from redflow import create_client, set_default_client

client = create_client(url="redis://127.0.0.1:6379")
set_default_client(client)

handle = await send_welcome_email.run(
    {"user_id": "user_123"},
    idempotency_key="welcome:user_123",
)

result = await handle.result(timeout_ms=15_000)
print(result)  # {"sent": True}

You can also emit by workflow name:

handle = await client.emit_workflow(
    "send-welcome-email",
    {"user_id": "user_123"},
    idempotency_key="welcome:user_123",
)

How-to

Pick the right primitive

Use this rule of thumb:

Need Primitive
Durable unit of work ctx.step.run(...)
Sleep until a duration or datetime ctx.step.wait_for(...)
Wait for a specific external event ctx.step.wait_for_event(...)
Start a child workflow and wait for output ctx.step.run_workflow(...)
Start a child workflow fire-and-forget ctx.step.emit_workflow(...)
Fan out to workflows subscribed to an event ctx.step.emit_event(...) or client.emit_event(...)

Python step APIs are intentionally positional and Pythonic: the step name is the first argument, not an options object.

Use durable steps

user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"], timeout_ms=4_000)

Notes:

  • Step names must be unique within a run
  • Successful step output is cached for that run and reused on retry / recovery
  • If the function accepts signal or **kwargs, redflow injects signal=<asyncio.Event>
  • Step output must be JSON-serializable

Sleep durably

from datetime import datetime, timedelta

await ctx.step.wait_for("pause-before-retry", "5m")
await ctx.step.wait_for("pause-until-cutoff", datetime.now() + timedelta(minutes=15))

wait_for(...) accepts:

  • milliseconds as int / float
  • duration strings like "30s", "5m", "2h"
  • datetime

Subscribe workflows to events

from redflow import EventTrigger, workflow


async def record_metric(order_id: str, *, signal=None) -> dict:
    return {"tracked": order_id}


@workflow(
    "analytics-order-created",
    queue="analytics",
    event=[EventTrigger(name="order.created")],
)
async def analytics_order_created(ctx):
    order_id = ctx.input["order_id"]
    await ctx.step.run("track", record_metric, order_id)
    return {"tracked": order_id}

Emitting the event fans out to every subscribed workflow:

handles = await client.emit_event(
    "order.created",
    {"order_id": "ord_123"},
    event_id="evt_order_created_ord_123",
)

outputs = await asyncio.gather(*(handle.result(timeout_ms=10_000) for handle in handles))

Wait for a specific event

Use step.wait_for_event(...) when a run needs to pause until a matching callback or domain event arrives.

from redflow import workflow


@workflow("await-approval", queue="default")
async def await_approval(ctx):
    received = await ctx.step.wait_for_event(
        "wait-approval",
        "approval.received",
        ctx.input["order_id"],
        "30m",
    )

    if received is None:
        return {"status": "timed_out"}

    return {
        "status": "approved",
        "event_id": received["id"],
        "approved_at": received["ts"],
    }

Then emit the matching event:

await client.emit_event(
    "approval.received",
    {"approved": True, "note": "ok"},
    correlation_key="ord_123",
    event_id="evt_approval_ord_123",
)

Semantics:

  • matching is exact on event_name + correlation_key
  • the step returns ReceivedEvent when resumed by an event
  • it returns None when the timeout wins
  • the wait is durable across worker restarts

Inspect durable event history

If you provide an explicit event_id, you can fetch the event record later:

result = await client.emit_event_detailed(
    "order.created",
    {"order_id": "ord_123"},
    event_id="evt_order_created_ord_123",
)

print(result["handles"])
print(result["resumed_run_ids"])

event = await client.get_event("evt_order_created_ord_123")
print(event["fanout_status"])
print(event["deliveries"])

You can also list recent events:

from redflow import ListEventsParams

events = await client.list_events(
    ListEventsParams(
        name="order.created",
        limit=20,
        sort="desc",
    )
)

Durable event records include:

  • event envelope (id, name, ts, data)
  • fan-out summary (subscriber_count, delivery_count, failed_delivery_count, fanout_status)
  • resumed waiter summary (resumed_run_count, resumed_run_ids)
  • per-subscriber delivery records

Run child workflows

Wait for the child output:

child_output = await ctx.step.run_workflow(
    "generate-receipt",
    "receipt-workflow",
    {"order_id": "ord_1"},
    timeout_ms=20_000,
)

Fire-and-forget child run:

child_run_id = await ctx.step.emit_workflow(
    "emit-analytics",
    "analytics-workflow",
    {"order_id": "ord_1"},
)

These APIs also support:

  • timeout_ms
  • run_at
  • queue_override
  • idempotency_key
  • idempotency_ttl

Trigger a delayed run

from datetime import datetime, timedelta

handle = await send_welcome_email.run(
    {"user_id": "user_789"},
    run_at=datetime.now() + timedelta(minutes=1),
    idempotency_key="welcome:user_789:delayed",
)

Add input validation with Pydantic

Install the extra first:

uv add "redflow[pydantic]"

Then:

from pydantic import BaseModel
from redflow import WorkflowHandlerContext, define_workflow


class SendWelcomeInput(BaseModel):
    user_id: str


async def handler(ctx: WorkflowHandlerContext[SendWelcomeInput]) -> dict:
    return {"user_id": ctx.input.user_id}


send_welcome = define_workflow(
    "send-welcome",
    handler=handler,
    input_schema=SendWelcomeInput,
)

Configure retries and failure handling

from redflow import NonRetriableError, OnFailureContext, define_workflow
from redflow.types import WorkflowRetryPolicy


async def on_fail(ctx: OnFailureContext) -> None:
    print("final failure", ctx.run.id, ctx.error)


define_workflow(
    "invoice-sync",
    handler=handler,
    max_attempts=4,
    retries=WorkflowRetryPolicy(
        max_attempts=4,
        delay=lambda ctx: "5s" if ctx.run.attempt < 3 else "30s",
        should_retry=lambda ctx: "validation" not in str(ctx.error).lower(),
    ),
    on_failure=on_fail,
)

Notes:

  • max_attempts includes the initial attempt
  • retries.max_attempts takes precedence over top-level max_attempts
  • raise NonRetriableError to fail immediately
  • on_failure runs only after terminal failure, not on cancellation

Control admission with debounce, mutex, throttle, and rate limits

from redflow import define_workflow
from redflow.types import WorkflowDebounceOptions, WorkflowRateLimitOptions


define_workflow(
    "invoice-sync",
    handler=handler,
    debounce=WorkflowDebounceOptions(
        key=lambda input: f"invoice:{input['invoice_id']}",
        period="30s",
        timeout="5m",
    ),
    mutex=lambda input: f"customer:{input['customer_id']}",
    throttle=WorkflowRateLimitOptions(
        key=lambda input: f"customer:{input['customer_id']}",
        limit=1,
        period="2s",
    ),
    rate_limit=WorkflowRateLimitOptions(
        key=lambda input: f"tenant:{input['tenant_id']}",
        limit=100,
        period="1m",
    ),
)

Semantics:

  • debounce coalesces duplicate enqueue attempts by key
  • mutex allows only one running run per key
  • throttle delays execution when the budget is exceeded
  • rate_limit fails fast with retry metadata

Schedule workflows with cron

from redflow import CronTrigger, define_workflow


define_workflow(
    "digest-cron",
    handler=handler,
    queue="ops",
    cron=[
        CronTrigger(id="digest-hourly", expression="0 * * * *"),
        CronTrigger(
            expression="*/5 * * * *",
            timezone="UTC",
            input={"source": "cron"},
        ),
    ],
)

Cron ticks respect max_concurrency: if a workflow is already at its concurrency limit, the tick is skipped.

Test handlers with run_inline(...)

run_inline(...) executes workflow logic in-process without Redis:

from redflow import run_inline


result = await run_inline(send_welcome_email, input={"user_id": "test"})
assert result.succeeded is True
assert result.output == {"sent": True}

Step overrides are keyed by step name:

result = await run_inline(
    send_welcome_email,
    input={"user_id": "test"},
    step_overrides={
        "fetch-user": {"id": "test", "email": "mock@example.com"},
        "send-email": {"ok": True},
    },
)

You can also override wait_for_event(...) steps with either a ReceivedEvent-like dict or None.

For full lifecycle behavior such as queues, retries, cron, cancellation, and event fan-out, use a real Redis instance and start_worker(...).

Explanation

Handler context

Inside a workflow handler:

  • ctx.input: validated workflow input, or raw input if no schema is configured
  • ctx.run: run metadata (id, workflow, queue, attempt, max_attempts)
  • ctx.signal: cancellation event
  • ctx.step: durable step API

Run lifecycle

Each run moves through:

scheduled -> queued -> running -> succeeded | failed | canceled

Step state is stored separately from top-level run state. If a step with the same name already succeeded for the current run, its cached output is reused.

Durability and idempotency

  • step.run(...) persists successful step output and reuses it after retry / recovery
  • duplicate step names in the same run are rejected
  • idempotency_key deduplicates run creation
  • explicit event_id deduplicates external event emits
  • event records and run history are retained for the configured retention window

Events model

redflow has two event-oriented patterns:

  1. Event fan-out

    • workflows subscribe with event=[EventTrigger(name=...)]
    • emit_event(...) creates runs for matching subscribers
  2. Event resume

    • a running workflow pauses with step.wait_for_event(...)
    • a later emit_event(...) with the same event_name + correlation_key resumes the waiting run

Durable event records let you inspect what happened after the fact:

  • which runs were created
  • which waiters were resumed
  • whether fan-out completed, partially failed, or had no subscribers

Cancellation model

  • ctx.signal is set when workflow cancellation is requested
  • step.run(...) injects signal= into the step function when supported
  • parent cancellation propagates to descendant runs spawned via step APIs
  • lease-loss recovery does not itself imply child-cancel cascades

Python / TypeScript parity

Behavior is intentionally aligned with @redflow/client. Naming follows language conventions:

  • Python: emit_workflow, emit_event, run_at, max_attempts
  • TypeScript: emitWorkflow, emitEvent, runAt, maxAttempts

Reference

Public workflow APIs

  • define_workflow(...)
  • @workflow(...)
  • WorkflowDefinition.run(...)
  • start_worker(StartWorkerOptions(...))
  • run_inline(...)

Step APIs

  • ctx.step.run(name, fn, *args, timeout_ms=None, **kwargs)
  • ctx.step.wait_for(name, target, timeout_ms=None)
  • ctx.step.wait_for_event(name, event_name, correlation_key, timeout)
  • ctx.step.run_workflow(name, workflow, input, ...)
  • ctx.step.emit_workflow(name, workflow, input, ...)
  • ctx.step.emit_event(name, event_name, input, ...)

Client APIs

Create a client:

from redflow import create_client

client = create_client(
    app="my-service",                 # optional, falls back to REDFLOW_APP
    url="redis://127.0.0.1:6379",     # optional, falls back to REDIS_URL
    prefix="redflow:v1",              # optional, falls back to REDFLOW_PREFIX
)

Common methods:

  • emit_workflow(...)
  • emit_event(...)
  • emit_event_detailed(...)
  • run_by_name(...)
  • get_run(run_id)
  • get_run_steps(run_id)
  • get_run_attempts(run_id)
  • list_runs(ListRunsParams(...))
  • search_runs(SearchRunsParams(...))
  • list_workflows()
  • get_workflow_meta(name)
  • list_workflows_meta()
  • list_event_definitions()
  • list_events(ListEventsParams(...))
  • get_event(event_id)
  • get_stats()
  • cancel_run(run_id, reason=...)
  • cleanup_expired_runs_now(...)
  • cleanup_expired_events_now(...)
  • reset_runs() for dev/testing
  • reset_state() for dev/testing

Run handle

Run creation APIs return a RunHandle with:

  • .id
  • await .get_state()
  • await .result(timeout_ms=...)

Default client helpers

WorkflowDefinition.run(...) uses the process-global default client:

  • get_default_client()
  • set_default_client(client)

If you never call set_default_client(...), redflow lazily creates one from environment defaults.

Environment variables

  • REDIS_URL Redis connection string used by create_client() and start_worker(...) when no explicit URL or Redis instance is provided
  • REDFLOW_PREFIX Redis key prefix, default: redflow:v1
  • REDFLOW_APP Default app name used by create_client(...)
  • REDFLOW_RUN_RETENTION_DAYS Retention window for terminal runs and durable event records, default: 30

Worker options

Important StartWorkerOptions fields:

  • app
  • redis or url
  • prefix
  • registry
  • queues
  • concurrency
  • lease_ms
  • blmove_timeout_sec
  • reaper_interval_ms
  • reaper_batch_size
  • watchdog_enabled
  • watchdog_interval_ms
  • watchdog_stalled_for_ms
  • run_history_cleanup_interval_ms
  • registry_recovery_interval_ms

Python keeps runtime tuning flat on StartWorkerOptions; there is no nested runtime object.

Source layout

  • src/redflow/ - public modules and stable entry points
  • src/redflow/_client/ - command, query, retention, and registry internals
  • src/redflow/_worker/ - worker loops, executor, and step runtime internals
  • src/redflow/_core/ - shared private primitives such as payload storage, retry logic, and Redis helpers

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

redflow-0.0.26.tar.gz (143.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

redflow-0.0.26-py3-none-any.whl (100.3 kB view details)

Uploaded Python 3

File details

Details for the file redflow-0.0.26.tar.gz.

File metadata

  • Download URL: redflow-0.0.26.tar.gz
  • Upload date:
  • Size: 143.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for redflow-0.0.26.tar.gz
Algorithm Hash digest
SHA256 3da0f25f1fad82582c900220592c89074306e61909950c784c1bcea7cf230888
MD5 476fdc6cae87f868b2981e7057ce7dcf
BLAKE2b-256 d8b3af020a2db3e02bf0b64f328bd1a9e6dca5720714f86243cdad31e0a0765e

See more details on using hashes here.

Provenance

The following attestation bundles were made for redflow-0.0.26.tar.gz:

Publisher: ci.yml on getrelocapp/redflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file redflow-0.0.26-py3-none-any.whl.

File metadata

  • Download URL: redflow-0.0.26-py3-none-any.whl
  • Upload date:
  • Size: 100.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for redflow-0.0.26-py3-none-any.whl
Algorithm Hash digest
SHA256 cf2d6f1c43921f76889bc3638652f767e9b8a75a823f2e94f55d75906ee46da5
MD5 cd869eb1b83322884144e14efd77df61
BLAKE2b-256 97d7866e81d8c44562eb91bb236f133c7e8bae63ba9dc568eb39268e59682082

See more details on using hashes here.

Provenance

The following attestation bundles were made for redflow-0.0.26-py3-none-any.whl:

Publisher: ci.yml on getrelocapp/redflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page