Skip to main content

Durable workflow engine backed by Redis — Python client

Project description

redflow (Python)

Durable Redis-backed workflows and event orchestration for Python.

redflow lets you run async workflows with durable steps, retries, delayed runs, cron triggers, event fan-out, child workflows, and durable event history. The Python runtime is intentionally aligned with the TypeScript @redflow/client model while keeping a Pythonic API.

Status

Beta. The package is usable today, but the API is still evolving.

Highlights

  • Durable step.run(...) with cached step output across retries and restarts
  • Delayed runs and cron-triggered workflows
  • Event fan-out via EventTrigger(name=...) and emit_event(...)
  • Durable event history via list_events(...) and get_event(...)
  • Child workflows via run_workflow(...) and emit_workflow(...)
  • Cancellation propagation across parent and child runs
  • Inline testing via run_inline(...)
  • Python 3.11+ with optional Pydantic validation

Documentation shape

This README follows a Diataxis-style structure:

  • Tutorial: first working workflow
  • Cookbook: practical production recipes
  • Explanation: execution model and guarantees
  • Reference: key APIs and environment variables

Requirements

  • Python 3.11+
  • Redis 6.2+

Install

Using uv:

uv add redflow

Optional extras:

# Faster Redis parser / transport path
uv add "redflow[fast]"

# Pydantic input validation support
uv add "redflow[pydantic]"

Using pip:

pip install redflow
pip install "redflow[fast]"
pip install "redflow[pydantic]"

Tutorial

1. Define a workflow

from redflow import WorkflowHandlerContext, workflow


async def fetch_user(user_id: str, *, signal=None) -> dict:
    return {"id": user_id, "email": "user@example.com"}


async def send_email(email: str, *, signal=None) -> dict:
    return {"ok": True, "email": email}


@workflow("send-welcome-email", queue="default", max_attempts=3)
async def send_welcome_email(ctx: WorkflowHandlerContext) -> dict:
    user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"])
    await ctx.step.run("send-email", send_email, user["email"])
    return {"sent": True}

2. Start a worker

Import workflow modules before starting the worker so definitions are registered.

import asyncio

from redflow import StartWorkerOptions, start_worker

import myapp.workflows  # noqa: F401


async def main() -> None:
    worker = await start_worker(
        StartWorkerOptions(
            app="billing-worker",
            url="redis://127.0.0.1:6379",
            concurrency=4,
        )
    )
    try:
        await asyncio.Event().wait()
    finally:
        await worker.stop()


asyncio.run(main())

3. Trigger a run and wait for the result

from redflow import create_client, set_default_client

client = create_client(url="redis://127.0.0.1:6379")
set_default_client(client)

handle = await send_welcome_email.run(
    {"user_id": "user_123"},
    idempotency_key="welcome:user_123",
)

result = await handle.result(timeout_ms=15_000)
print(result)  # {"sent": True}

You can also emit by workflow name:

handle = await client.emit_workflow(
    "send-welcome-email",
    {"user_id": "user_123"},
    idempotency_key="welcome:user_123",
)

Cookbook

Pick the right primitive

Use this rule of thumb:

Need Primitive
Durable unit of work ctx.step.run(...)
Sleep until a duration or datetime ctx.step.wait_for(...)
Wait for a specific external event ctx.step.wait_for_event(...)
Start a child workflow and wait for output ctx.step.run_workflow(...)
Start a child workflow fire-and-forget ctx.step.emit_workflow(...)
Fan out to workflows subscribed to an event ctx.step.emit_event(...) or client.emit_event(...)

Python step APIs are intentionally positional and Pythonic: the step name is the first argument, not an options object.

Use durable steps

user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"], timeout_ms=4_000)

Notes:

  • Step names must be unique within a run
  • Successful step output is cached for that run and reused on retry / recovery
  • If the function accepts signal or **kwargs, redflow injects signal=<asyncio.Event>
  • Step output must be JSON-serializable

Sleep durably

from datetime import datetime, timedelta

await ctx.step.wait_for("pause-before-retry", "5m")
await ctx.step.wait_for("pause-until-cutoff", datetime.now() + timedelta(minutes=15))

wait_for(...) accepts:

  • milliseconds as int / float
  • duration strings like "30s", "5m", "2h"
  • datetime

Emit recurring events with cron workflows

If the schedule is known up front, model it as a cron workflow that emits a domain event. This keeps scheduling in workflows and delivery in event subscribers.

from redflow import CronTrigger, EventTrigger, workflow


async def send_email(email: str, *, signal=None) -> dict:
    return {"ok": True, "email": email}


@workflow(
    "emit-daily-digest-events",
    queue="system",
    cron=[
        CronTrigger(
            id="daily-digest",
            expression="0 9 * * *",
            timezone="UTC",
            input={"source": "cron"},
        )
    ],
)
async def emit_daily_digest(ctx):
    subscriber_run_ids = await ctx.step.emit_event(
        "emit-digest-due",
        "digest.due",
        {"source": ctx.input["source"]},
        event_id=f"digest.due:{ctx.run.id}",
    )
    return {"subscriber_run_ids": subscriber_run_ids}


@workflow(
    "send-digest-email",
    queue="notifications",
    event=[EventTrigger(name="digest.due")],
)
async def send_digest_email(ctx):
    await ctx.step.run("deliver-email", send_email, "team@example.com")
    return {"delivered": True}

This pattern is usually a better fit than keeping a workflow open and waiting when you already know the time the event should happen.

Schedule a one-off event for later

For a single future dispatch, schedule a workflow run with run_at=... and emit the event from inside that workflow.

from datetime import datetime, timedelta
from redflow import workflow


@workflow("dispatch-invoice-due", queue="system")
async def dispatch_invoice_due(ctx):
    subscriber_run_ids = await ctx.step.emit_event(
        "emit-invoice-due",
        "invoice.due",
        {"invoice_id": ctx.input["invoice_id"]},
        event_id=f"invoice.due:{ctx.input['invoice_id']}",
    )
    return {"subscriber_run_ids": subscriber_run_ids}


due_at = datetime.now() + timedelta(minutes=30)

await dispatch_invoice_due.run(
    {"invoice_id": "inv_123"},
    run_at=due_at,
    idempotency_key="invoice-due:inv_123",
)

Use this when the event should happen at a known future time but you still want event fan-out at dispatch time.

Subscribe workflows to events

from redflow import EventTrigger, workflow


async def record_metric(order_id: str, *, signal=None) -> dict:
    return {"tracked": order_id}


@workflow(
    "analytics-order-created",
    queue="analytics",
    event=[EventTrigger(name="order.created")],
)
async def analytics_order_created(ctx):
    order_id = ctx.input["order_id"]
    await ctx.step.run("track", record_metric, order_id)
    return {"tracked": order_id}

Emitting the event fans out to every subscribed workflow:

import asyncio


handles = await client.emit_event(
    "order.created",
    {"order_id": "ord_123"},
    event_id="evt_order_created_ord_123",
)

outputs = await asyncio.gather(*(handle.result(timeout_ms=10_000) for handle in handles))

Need the event later? Schedule a workflow run with run_at and emit the event from that workflow instead.

This is the right pattern for domain events such as:

  • order.created
  • invoice.due
  • notification.due
  • document.parsed

Inspect durable event history

If you provide an explicit event_id, you can fetch the event record later:

result = await client.emit_event_detailed(
    "order.created",
    {"order_id": "ord_123"},
    event_id="evt_order_created_ord_123",
)

print(result["handles"])
print(result["resumed_run_ids"])

event = await client.get_event("evt_order_created_ord_123")
print(event["fanout_status"])
print(event["deliveries"])

You can also list recent events:

from redflow import ListEventsParams

events = await client.list_events(
    ListEventsParams(
        name="order.created",
        limit=20,
        sort="desc",
    )
)

Durable event records include:

  • event envelope (id, name, ts, data)
  • fan-out summary (subscriber_count, delivery_count, failed_delivery_count, fanout_status)
  • resumed waiter summary (resumed_run_count, resumed_run_ids)
  • per-subscriber delivery records

Run child workflows

Wait for the child output:

child_output = await ctx.step.run_workflow(
    "generate-receipt",
    "receipt-workflow",
    {"order_id": "ord_1"},
    timeout_ms=20_000,
)

Fire-and-forget child run:

child_run_id = await ctx.step.emit_workflow(
    "emit-analytics",
    "analytics-workflow",
    {"order_id": "ord_1"},
)

These APIs also support:

  • timeout_ms
  • run_at for workflow-running APIs such as run_workflow(...) and emit_workflow(...)
  • queue_override
  • idempotency_key
  • idempotency_ttl

For delayed event dispatch, schedule a workflow run with run_at=... and emit the event from that workflow.

Trigger a delayed run

from datetime import datetime, timedelta

handle = await send_welcome_email.run(
    {"user_id": "user_789"},
    run_at=datetime.now() + timedelta(minutes=1),
    idempotency_key="welcome:user_789:delayed",
)

Wait for an external callback when you really need a live pause

Use step.wait_for_event(...) when a running workflow must pause until a later callback arrives for that specific run.

from redflow import workflow


@workflow("await-approval", queue="default")
async def await_approval(ctx):
    received = await ctx.step.wait_for_event(
        "wait-approval",
        "approval.received",
        ctx.input["order_id"],
        "30m",
    )

    if received is None:
        return {"status": "timed_out"}

    return {
        "status": "approved",
        "event_id": received["id"],
        "approved_at": received["ts"],
    }

Then emit the matching event:

await client.emit_event(
    "approval.received",
    {"approved": True, "note": "ok"},
    correlation_key="ord_123",
    event_id="evt_approval_ord_123",
)

Semantics:

  • matching is exact on event_name + correlation_key
  • the step returns ReceivedEvent when resumed by an event
  • it returns None when the timeout wins
  • the wait is durable across worker restarts

Use this for:

  • human approval flows
  • webhook callbacks
  • payment provider confirmations
  • chat / bot interactions where a later action resumes a waiting run

If the time is known up front, prefer run_at or cron-triggered workflows that emit an event instead of keeping a workflow open waiting.

Add input validation with Pydantic

Install the extra first:

uv add "redflow[pydantic]"

Then:

from pydantic import BaseModel
from redflow import WorkflowHandlerContext, define_workflow


class SendWelcomeInput(BaseModel):
    user_id: str


async def handler(ctx: WorkflowHandlerContext[SendWelcomeInput]) -> dict:
    return {"user_id": ctx.input.user_id}


send_welcome = define_workflow(
    "send-welcome",
    handler=handler,
    input_schema=SendWelcomeInput,
)

Configure retries and failure handling

from redflow import NonRetriableError, OnFailureContext, define_workflow
from redflow.types import WorkflowRetryPolicy


async def on_fail(ctx: OnFailureContext) -> None:
    print("final failure", ctx.run.id, ctx.error)


define_workflow(
    "invoice-sync",
    handler=handler,
    max_attempts=4,
    retries=WorkflowRetryPolicy(
        max_attempts=4,
        delay=lambda ctx: "5s" if ctx.run.attempt < 3 else "30s",
        should_retry=lambda ctx: "validation" not in str(ctx.error).lower(),
    ),
    on_failure=on_fail,
)

Notes:

  • max_attempts includes the initial attempt
  • retries.max_attempts takes precedence over top-level max_attempts
  • raise NonRetriableError to fail immediately
  • on_failure runs only after terminal failure, not on cancellation

Control admission with debounce, mutex, throttle, and rate limits

from redflow import define_workflow
from redflow.types import WorkflowDebounceOptions, WorkflowRateLimitOptions


define_workflow(
    "invoice-sync",
    handler=handler,
    debounce=WorkflowDebounceOptions(
        key=lambda input: f"invoice:{input['invoice_id']}",
        period="30s",
        timeout="5m",
    ),
    mutex=lambda input: f"customer:{input['customer_id']}",
    throttle=WorkflowRateLimitOptions(
        key=lambda input: f"customer:{input['customer_id']}",
        limit=1,
        period="2s",
    ),
    rate_limit=WorkflowRateLimitOptions(
        key=lambda input: f"tenant:{input['tenant_id']}",
        limit=100,
        period="1m",
    ),
)

Semantics:

  • debounce coalesces duplicate enqueue attempts by key
  • mutex allows only one running run per key
  • throttle delays execution when the budget is exceeded
  • rate_limit fails fast with retry metadata

Schedule workflows with cron

from redflow import CronTrigger, define_workflow


define_workflow(
    "digest-cron",
    handler=handler,
    queue="ops",
    cron=[
        CronTrigger(id="digest-hourly", expression="0 * * * *"),
        CronTrigger(
            expression="*/5 * * * *",
            timezone="UTC",
            input={"source": "cron"},
        ),
    ],
)

Cron ticks respect max_concurrency: if a workflow is already at its concurrency limit, the tick is skipped.

Test handlers with run_inline(...)

run_inline(...) executes workflow logic in-process without Redis:

from redflow import run_inline


result = await run_inline(send_welcome_email, input={"user_id": "test"})
assert result.succeeded is True
assert result.output == {"sent": True}

Step overrides are keyed by step name:

result = await run_inline(
    send_welcome_email,
    input={"user_id": "test"},
    step_overrides={
        "fetch-user": {"id": "test", "email": "mock@example.com"},
        "send-email": {"ok": True},
    },
)

You can also override wait_for_event(...) steps with either a ReceivedEvent-like dict or None.

For full lifecycle behavior such as queues, retries, cron, cancellation, and event fan-out, use a real Redis instance and start_worker(...).

Explanation

Handler context

Inside a workflow handler:

  • ctx.input: validated workflow input, or raw input if no schema is configured
  • ctx.run: run metadata (id, workflow, queue, attempt, max_attempts)
  • ctx.signal: cancellation event
  • ctx.step: durable step API

Run lifecycle

Each run moves through:

scheduled -> queued -> running -> succeeded | failed | canceled

Step state is stored separately from top-level run state. If a step with the same name already succeeded for the current run, its cached output is reused.

Durability and idempotency

  • step.run(...) persists successful step output and reuses it after retry / recovery
  • duplicate step names in the same run are rejected
  • idempotency_key deduplicates run creation
  • explicit event_id deduplicates external event emits
  • event records and run history are retained for the configured retention window

Events model

redflow has two event-oriented patterns, but the primary one is fan-out:

  1. Event fan-out

    • workflows subscribe with event=[EventTrigger(name=...)]
    • emit_event(...) creates runs for matching subscribers
    • cron or delayed workflows can emit these events on a schedule
  2. Event resume

    • a running workflow pauses with step.wait_for_event(...)
    • a later emit_event(...) with the same event_name + correlation_key resumes the waiting run
    • this is a narrower pattern for callbacks, approvals, and external confirmations

Durable event records let you inspect what happened after the fact:

  • which runs were created
  • which waiters were resumed
  • whether fan-out completed, partially failed, or had no subscribers

Rule of thumb:

  • If you want to broadcast work to subscribers, emit an event
  • If the time is known ahead of time, schedule a workflow and emit the event from there
  • If you need to resume one waiting run from a later callback, use wait_for_event(...)

Cancellation model

  • ctx.signal is set when workflow cancellation is requested
  • step.run(...) injects signal= into the step function when supported
  • parent cancellation propagates to descendant runs spawned via step APIs
  • lease-loss recovery does not itself imply child-cancel cascades

Python / TypeScript parity

Behavior is intentionally aligned with @redflow/client. Naming follows language conventions:

  • Python: emit_workflow, emit_event, run_at, max_attempts
  • TypeScript: emitWorkflow, emitEvent, runAt, maxAttempts

Reference

Public workflow APIs

  • define_workflow(...)
  • @workflow(...)
  • WorkflowDefinition.run(...)
  • start_worker(StartWorkerOptions(...))
  • run_inline(...)

Step APIs

  • ctx.step.run(name, fn, *args, timeout_ms=None, **kwargs)
  • ctx.step.wait_for(name, target, timeout_ms=None)
  • ctx.step.wait_for_event(name, event_name, correlation_key, timeout)
  • ctx.step.run_workflow(name, workflow, input, ...)
  • ctx.step.emit_workflow(name, workflow, input, ...)
  • ctx.step.emit_event(name, event_name, input, ...)

Client APIs

Create a client:

from redflow import create_client

client = create_client(
    app="my-service",                 # optional, falls back to REDFLOW_APP
    url="redis://127.0.0.1:6379",     # optional, falls back to REDIS_URL
    prefix="redflow:v1",              # optional, falls back to REDFLOW_PREFIX
)

Common methods:

  • emit_workflow(...)
  • emit_event(...)
  • emit_event_detailed(...)
  • run_by_name(...)
  • get_run(run_id)
  • get_run_steps(run_id)
  • get_run_attempts(run_id)
  • list_runs(ListRunsParams(...))
  • search_runs(SearchRunsParams(...))
  • list_workflows()
  • get_workflow_meta(name)
  • list_workflows_meta()
  • list_event_definitions()
  • list_events(ListEventsParams(...))
  • get_event(event_id)
  • get_stats()
  • cancel_run(run_id, reason=...)
  • cleanup_expired_runs_now(...)
  • cleanup_expired_events_now(...)
  • reset_runs() for dev/testing
  • reset_state() for dev/testing

Run handle

Run creation APIs return a RunHandle with:

  • .id
  • await .get_state()
  • await .result(timeout_ms=...)

Default client helpers

WorkflowDefinition.run(...) uses the process-global default client:

  • get_default_client()
  • set_default_client(client)

If you never call set_default_client(...), redflow lazily creates one from environment defaults.

Environment variables

  • REDIS_URL Redis connection string used by create_client() and start_worker(...) when no explicit URL or Redis instance is provided
  • REDFLOW_PREFIX Redis key prefix, default: redflow:v1
  • REDFLOW_APP Default app name used by create_client(...)
  • REDFLOW_RUN_RETENTION_DAYS Retention window for terminal runs and durable event records, default: 30

Worker options

Important StartWorkerOptions fields:

  • app
  • redis or url
  • prefix
  • registry
  • queues
  • concurrency
  • lease_ms
  • blmove_timeout_sec
  • reaper_interval_ms
  • reaper_batch_size
  • watchdog_enabled
  • watchdog_interval_ms
  • watchdog_stalled_for_ms
  • run_history_cleanup_interval_ms
  • registry_recovery_interval_ms

Python keeps runtime tuning flat on StartWorkerOptions; there is no nested runtime object.

Source layout

  • src/redflow/ - public modules and stable entry points
  • src/redflow/_client/ - command, query, retention, and registry internals
  • src/redflow/_worker/ - worker loops, executor, and step runtime internals
  • src/redflow/_core/ - shared private primitives such as payload storage, retry logic, and Redis helpers

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

redflow-0.1.2.tar.gz (204.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

redflow-0.1.2-py3-none-any.whl (122.3 kB view details)

Uploaded Python 3

File details

Details for the file redflow-0.1.2.tar.gz.

File metadata

  • Download URL: redflow-0.1.2.tar.gz
  • Upload date:
  • Size: 204.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for redflow-0.1.2.tar.gz
Algorithm Hash digest
SHA256 d136087ce886d11c3b7b2495323b6e24631f58b7abe5319b8e057e87466ac872
MD5 4cd22f55da7e5e314e66c88e82c3110c
BLAKE2b-256 f52e7c9e9c6c3b437ea05adc15d65559c15c0ad00f8212d423ca4f2edb5f2f67

See more details on using hashes here.

Provenance

The following attestation bundles were made for redflow-0.1.2.tar.gz:

Publisher: ci.yml on getrelocapp/redflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file redflow-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: redflow-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 122.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for redflow-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 b55538a5b0b9621b20afa028c75fb3197b905b4b6ace6e9d3110366227d0b7fe
MD5 e295a7f29e335b4c5ac45d67f4f7cacf
BLAKE2b-256 14262b6b66b1a612531269ae02fe789da9d3d37311936b96ab07259bf71b7649

See more details on using hashes here.

Provenance

The following attestation bundles were made for redflow-0.1.2-py3-none-any.whl:

Publisher: ci.yml on getrelocapp/redflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page