Skip to main content

Durable workflow engine backed by Redis — Python client

Project description

redflow

Durable workflow engine backed by Redis.

Install

pip install redflow
# or with hiredis for better performance:
pip install redflow[fast]

Define a workflow

from redflow import define_workflow, WorkflowHandlerContext

async def handler(ctx: WorkflowHandlerContext) -> dict:
    user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"])
    await ctx.step.run("send-email", send_welcome, user["email"])
    return {"sent": True}

send_welcome_email = define_workflow("send-welcome-email", handler=handler)

Or with the decorator:

from redflow import workflow, WorkflowHandlerContext

@workflow("send-welcome-email")
async def send_welcome_email(ctx: WorkflowHandlerContext) -> dict:
    user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"])
    await ctx.step.run("send-email", send_welcome, user["email"])
    return {"sent": True}

Handler context gives you:

  • input — workflow input data
  • run — run metadata (id, workflow, queue, attempt, max_attempts)
  • signalasyncio.Event, set when cancellation is requested
  • step — durable step API

Step API (inside workflow handlers)

step.run

Durable, cached units of work. On crash recovery, completed steps return their cached result instead of re-executing.

payment = await ctx.step.run("capture-payment", capture_payment)

With timeout:

payment = await ctx.step.run("capture-payment", capture_payment, timeout_ms=4000)

step.run_workflow

Trigger a child workflow and wait for its result.

receipt = await ctx.step.run_workflow(
    "send-receipt",
    "receipt-workflow",
    {"order_id": order_id, "email": email},
    timeout_ms=20_000,
    idempotency_key=f"receipt:{order_id}",
)

step.emit_workflow

Trigger a child workflow without waiting — returns the child run ID.

child_run_id = await ctx.step.emit_workflow(
    "emit-analytics",
    "analytics-workflow",
    {"order_id": order_id},
    idempotency_key=f"analytics:{order_id}",
)

Run workflows

from redflow import create_client

client = create_client(url="redis://localhost:6379")

handle = await client.emit_workflow(
    "send-welcome-email",
    {"user_id": "user_123"},
    idempotency_key="welcome:user_123",
)

output = await handle.result(timeout_ms=15_000)

Delayed run:

from datetime import datetime, timedelta

handle = await client.emit_workflow(
    "send-welcome-email",
    {"user_id": "user_789"},
    run_at=datetime.now() + timedelta(minutes=1),
    idempotency_key="welcome:user_789:delayed",
)

Start a worker

import asyncio
from redflow import start_worker, StartWorkerOptions

# import your workflow modules so they register
import workflows

async def main():
    worker = await start_worker(StartWorkerOptions(
        app="billing-worker",
        url="redis://localhost:6379",
        concurrency=4,
    ))

    # graceful shutdown
    try:
        await asyncio.Event().wait()
    finally:
        await worker.stop()

asyncio.run(main())

Explicit queues and runtime tuning:

worker = await start_worker(StartWorkerOptions(
    app="billing-worker",
    url="redis://localhost:6379",
    queues=["critical", "io", "analytics"],
    concurrency=8,
    lease_ms=5000,
    blmove_timeout_sec=1,
    reaper_interval_ms=500,
))

Workflow options

max_concurrency

Limits concurrent running runs per workflow. Default is 1.

define_workflow(
    "heavy-sync",
    handler=handler,
    queue="ops",
    max_concurrency=1,
)

Cron

from redflow import CronTrigger

define_workflow(
    "digest-cron",
    handler=handler,
    queue="ops",
    cron=[
        CronTrigger(id="digest-hourly", expression="0 * * * *"),
        CronTrigger(expression="*/5 * * * *", timezone="UTC", input={"source": "cron"}),
    ],
)

Cron respects max_concurrency: if the limit is reached, that tick is skipped.

on_failure

from redflow import NonRetriableError, OnFailureContext

async def on_fail(ctx: OnFailureContext) -> None:
    print(f"workflow failed: {ctx.run.id} {ctx.run.workflow} {ctx.error}")

define_workflow(
    "invoice-sync",
    handler=handler,
    queue="billing",
    max_attempts=4,
    on_failure=on_fail,
)

Client API

Inspect and control runs

run = await client.get_run("run_123")
steps = await client.get_run_steps("run_123")

recent = await client.list_runs(ListRunsParams(limit=50))
failed = await client.list_runs(ListRunsParams(
    workflow="checkout",
    status="failed",
    limit=20,
))

workflows = await client.list_workflows()
meta = await client.get_workflow_meta("checkout")
stats = await client.get_stats()

canceled = await client.cancel_run("run_123", reason="requested by user")

RunHandle

handle = await client.emit_workflow("checkout", {"order_id": "ord_3"})

state = await handle.get_state()
print(state["status"])

output = await handle.result(timeout_ms=30_000)

Testing

run_inline executes a workflow handler in-process without Redis — useful for unit tests.

from redflow import run_inline

result = await run_inline(my_workflow_def, input={"user_id": "test"})
assert result.succeeded
assert result.output == {"sent": True}

Override external steps:

result = await run_inline(
    my_workflow_def,
    input={"user_id": "test"},
    step_overrides={"fetch-user": {"email": "mock@test.com"}},
)

Errors

from redflow import (
    RedflowError,         # base class
    CanceledError,        # run was canceled
    TimeoutError,         # operation timed out
    NonRetriableError,    # permanent failure, no retries
    InputValidationError, # input schema mismatch
    UnknownWorkflowError, # workflow not registered
)

Throw NonRetriableError from a handler to fail the run immediately without exhausting retry attempts.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

redflow-0.0.5.tar.gz (74.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

redflow-0.0.5-py3-none-any.whl (36.1 kB view details)

Uploaded Python 3

File details

Details for the file redflow-0.0.5.tar.gz.

File metadata

  • Download URL: redflow-0.0.5.tar.gz
  • Upload date:
  • Size: 74.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for redflow-0.0.5.tar.gz
Algorithm Hash digest
SHA256 67087604c1476bde28ec5b0739d0eaeaab1e6895af1ad1203f01cde19bb8d12e
MD5 7892c5fba4f4e5ed177b627b42e5e120
BLAKE2b-256 f048c4f26ff05d661988f8b75f05c8b1eaf909717b54d24c7d486e7f81e8157d

See more details on using hashes here.

Provenance

The following attestation bundles were made for redflow-0.0.5.tar.gz:

Publisher: ci.yml on getrelocapp/redflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file redflow-0.0.5-py3-none-any.whl.

File metadata

  • Download URL: redflow-0.0.5-py3-none-any.whl
  • Upload date:
  • Size: 36.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for redflow-0.0.5-py3-none-any.whl
Algorithm Hash digest
SHA256 cb880acbb65005f3121198e035817c3bbbc6fb7351ce75f56d44331080257445
MD5 aeca7aa55b9d290d7f7549bd5fb3dec9
BLAKE2b-256 023097459dabff597fc53a23c4b8aa3859bf8556ea6840aad10fc6e054073d51

See more details on using hashes here.

Provenance

The following attestation bundles were made for redflow-0.0.5-py3-none-any.whl:

Publisher: ci.yml on getrelocapp/redflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page