Skip to main content

Durable workflow engine backed by Redis — Python client

Project description

redflow

Durable workflow engine backed by Redis.

Install

pip install redflow
# or with hiredis for better performance:
pip install redflow[fast]

Define a workflow

from redflow import define_workflow, WorkflowHandlerContext

async def handler(ctx: WorkflowHandlerContext) -> dict:
    user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"])
    await ctx.step.run("send-email", send_welcome, user["email"])
    return {"sent": True}

send_welcome_email = define_workflow("send-welcome-email", handler=handler)

Or with the decorator:

from redflow import workflow, WorkflowHandlerContext

@workflow("send-welcome-email")
async def send_welcome_email(ctx: WorkflowHandlerContext) -> dict:
    user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"])
    await ctx.step.run("send-email", send_welcome, user["email"])
    return {"sent": True}

Handler context gives you:

  • input — workflow input data
  • run — run metadata (id, workflow, queue, attempt, max_attempts)
  • signalasyncio.Event, set when cancellation is requested
  • step — durable step API

Step API (inside workflow handlers)

step.run

Durable, cached units of work. On crash recovery, completed steps return their cached result instead of re-executing.

payment = await ctx.step.run("capture-payment", capture_payment)

With timeout:

payment = await ctx.step.run("capture-payment", capture_payment, timeout_ms=4000)

step.run_workflow

Trigger a child workflow and wait for its result.

receipt = await ctx.step.run_workflow(
    "send-receipt",
    "receipt-workflow",
    {"order_id": order_id, "email": email},
    timeout_ms=20_000,
    idempotency_key=f"receipt:{order_id}",
)

step.emit_workflow

Trigger a child workflow without waiting — returns the child run ID.

child_run_id = await ctx.step.emit_workflow(
    "emit-analytics",
    "analytics-workflow",
    {"order_id": order_id},
    idempotency_key=f"analytics:{order_id}",
)

Run workflows

from redflow import create_client

client = create_client(url="redis://localhost:6379")

handle = await client.emit_workflow(
    "send-welcome-email",
    {"user_id": "user_123"},
    idempotency_key="welcome:user_123",
)

output = await handle.result(timeout_ms=15_000)

Delayed run:

from datetime import datetime, timedelta

handle = await client.emit_workflow(
    "send-welcome-email",
    {"user_id": "user_789"},
    run_at=datetime.now() + timedelta(minutes=1),
    idempotency_key="welcome:user_789:delayed",
)

Start a worker

import asyncio
from redflow import start_worker, StartWorkerOptions

# import your workflow modules so they register
import workflows

async def main():
    worker = await start_worker(StartWorkerOptions(
        app="billing-worker",
        url="redis://localhost:6379",
        concurrency=4,
    ))

    # graceful shutdown
    try:
        await asyncio.Event().wait()
    finally:
        await worker.stop()

asyncio.run(main())

Explicit queues and runtime tuning:

worker = await start_worker(StartWorkerOptions(
    app="billing-worker",
    url="redis://localhost:6379",
    queues=["critical", "io", "analytics"],
    concurrency=8,
    lease_ms=5000,
    blmove_timeout_sec=1,
    reaper_interval_ms=500,
))

Workflow options

max_concurrency

Limits concurrent running runs per workflow. Default is 1.

define_workflow(
    "heavy-sync",
    handler=handler,
    queue="ops",
    max_concurrency=1,
)

Cron

from redflow import CronTrigger

define_workflow(
    "digest-cron",
    handler=handler,
    queue="ops",
    cron=[
        CronTrigger(id="digest-hourly", expression="0 * * * *"),
        CronTrigger(expression="*/5 * * * *", timezone="UTC", input={"source": "cron"}),
    ],
)

Cron respects max_concurrency: if the limit is reached, that tick is skipped.

on_failure

from redflow import NonRetriableError, OnFailureContext

async def on_fail(ctx: OnFailureContext) -> None:
    print(f"workflow failed: {ctx.run.id} {ctx.run.workflow} {ctx.error}")

define_workflow(
    "invoice-sync",
    handler=handler,
    queue="billing",
    max_attempts=4,
    on_failure=on_fail,
)

Client API

Inspect and control runs

run = await client.get_run("run_123")
steps = await client.get_run_steps("run_123")

recent = await client.list_runs(ListRunsParams(limit=50))
failed = await client.list_runs(ListRunsParams(
    workflow="checkout",
    status="failed",
    limit=20,
))

workflows = await client.list_workflows()
meta = await client.get_workflow_meta("checkout")
stats = await client.get_stats()

canceled = await client.cancel_run("run_123", reason="requested by user")

RunHandle

handle = await client.emit_workflow("checkout", {"order_id": "ord_3"})

state = await handle.get_state()
print(state["status"])

output = await handle.result(timeout_ms=30_000)

Testing

run_inline executes a workflow handler in-process without Redis — useful for unit tests.

from redflow import run_inline

result = await run_inline(my_workflow_def, input={"user_id": "test"})
assert result.succeeded
assert result.output == {"sent": True}

Override external steps:

result = await run_inline(
    my_workflow_def,
    input={"user_id": "test"},
    step_overrides={"fetch-user": {"email": "mock@test.com"}},
)

Errors

from redflow import (
    RedflowError,         # base class
    CanceledError,        # run was canceled
    TimeoutError,         # operation timed out
    NonRetriableError,    # permanent failure, no retries
    InputValidationError, # input schema mismatch
    UnknownWorkflowError, # workflow not registered
)

Throw NonRetriableError from a handler to fail the run immediately without exhausting retry attempts.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

redflow-0.0.2.tar.gz (66.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

redflow-0.0.2-py3-none-any.whl (35.1 kB view details)

Uploaded Python 3

File details

Details for the file redflow-0.0.2.tar.gz.

File metadata

  • Download URL: redflow-0.0.2.tar.gz
  • Upload date:
  • Size: 66.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for redflow-0.0.2.tar.gz
Algorithm Hash digest
SHA256 d4a82f47810ac8be073170cbd2deeadec157f6b0972e205a10695b381e6b78ef
MD5 ff5903c19074c8f4c9e7e0f8b517876d
BLAKE2b-256 678708401d5168c16edb9d761a6fe08ac2cae848df41ac242c3193f504d5583e

See more details on using hashes here.

Provenance

The following attestation bundles were made for redflow-0.0.2.tar.gz:

Publisher: ci.yml on getrelocapp/redflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file redflow-0.0.2-py3-none-any.whl.

File metadata

  • Download URL: redflow-0.0.2-py3-none-any.whl
  • Upload date:
  • Size: 35.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for redflow-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 fadd7cad2db2fbc53f6d05d495760fc621ba10861e0df388e790f400e54b17cd
MD5 1f3bd692ee18f0bca57a1fb454c1b892
BLAKE2b-256 a29a35ba205c60320772f6d2a5ddaa8b544143eca7e4aa36bffe5252a9182f62

See more details on using hashes here.

Provenance

The following attestation bundles were made for redflow-0.0.2-py3-none-any.whl:

Publisher: ci.yml on getrelocapp/redflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page