Skip to main content

Durable workflow engine backed by Redis — Python client

Project description

redflow

Durable workflow engine backed by Redis.

Install

pip install redflow
# or with hiredis for better performance:
pip install redflow[fast]

Define a workflow

from redflow import define_workflow, WorkflowHandlerContext

async def handler(ctx: WorkflowHandlerContext) -> dict:
    user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"])
    await ctx.step.run("send-email", send_welcome, user["email"])
    return {"sent": True}

send_welcome_email = define_workflow("send-welcome-email", handler=handler)

Or with the decorator:

from redflow import workflow, WorkflowHandlerContext

@workflow("send-welcome-email")
async def send_welcome_email(ctx: WorkflowHandlerContext) -> dict:
    user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"])
    await ctx.step.run("send-email", send_welcome, user["email"])
    return {"sent": True}

Handler context gives you:

  • input — workflow input data
  • run — run metadata (id, workflow, queue, attempt, max_attempts)
  • signalasyncio.Event, set when cancellation is requested
  • step — durable step API

Step API (inside workflow handlers)

step.run

Durable, cached units of work. On crash recovery, completed steps return their cached result instead of re-executing.

payment = await ctx.step.run("capture-payment", capture_payment)

With timeout:

payment = await ctx.step.run("capture-payment", capture_payment, timeout_ms=4000)

step.run_workflow

Trigger a child workflow and wait for its result.

receipt = await ctx.step.run_workflow(
    "send-receipt",
    "receipt-workflow",
    {"order_id": order_id, "email": email},
    timeout_ms=20_000,
    idempotency_key=f"receipt:{order_id}",
)

step.emit_workflow

Trigger a child workflow without waiting — returns the child run ID.

child_run_id = await ctx.step.emit_workflow(
    "emit-analytics",
    "analytics-workflow",
    {"order_id": order_id},
    idempotency_key=f"analytics:{order_id}",
)

Run workflows

from redflow import create_client

client = create_client(url="redis://localhost:6379")

handle = await client.emit_workflow(
    "send-welcome-email",
    {"user_id": "user_123"},
    idempotency_key="welcome:user_123",
)

output = await handle.result(timeout_ms=15_000)

Delayed run:

from datetime import datetime, timedelta

handle = await client.emit_workflow(
    "send-welcome-email",
    {"user_id": "user_789"},
    run_at=datetime.now() + timedelta(minutes=1),
    idempotency_key="welcome:user_789:delayed",
)

Start a worker

import asyncio
from redflow import start_worker, StartWorkerOptions

# import your workflow modules so they register
import workflows

async def main():
    worker = await start_worker(StartWorkerOptions(
        app="billing-worker",
        url="redis://localhost:6379",
        concurrency=4,
    ))

    # graceful shutdown
    try:
        await asyncio.Event().wait()
    finally:
        await worker.stop()

asyncio.run(main())

Explicit queues and runtime tuning:

worker = await start_worker(StartWorkerOptions(
    app="billing-worker",
    url="redis://localhost:6379",
    queues=["critical", "io", "analytics"],
    concurrency=8,
    lease_ms=5000,
    blmove_timeout_sec=1,
    reaper_interval_ms=500,
))

Workflow options

max_concurrency

Limits concurrent running runs per workflow. Default is 1.

define_workflow(
    "heavy-sync",
    handler=handler,
    queue="ops",
    max_concurrency=1,
)

Cron

from redflow import CronTrigger

define_workflow(
    "digest-cron",
    handler=handler,
    queue="ops",
    cron=[
        CronTrigger(id="digest-hourly", expression="0 * * * *"),
        CronTrigger(expression="*/5 * * * *", timezone="UTC", input={"source": "cron"}),
    ],
)

Cron respects max_concurrency: if the limit is reached, that tick is skipped.

on_failure

from redflow import NonRetriableError, OnFailureContext

async def on_fail(ctx: OnFailureContext) -> None:
    print(f"workflow failed: {ctx.run.id} {ctx.run.workflow} {ctx.error}")

define_workflow(
    "invoice-sync",
    handler=handler,
    queue="billing",
    max_attempts=4,
    on_failure=on_fail,
)

Client API

Inspect and control runs

run = await client.get_run("run_123")
steps = await client.get_run_steps("run_123")

recent = await client.list_runs(ListRunsParams(limit=50))
failed = await client.list_runs(ListRunsParams(
    workflow="checkout",
    status="failed",
    limit=20,
))

workflows = await client.list_workflows()
meta = await client.get_workflow_meta("checkout")
stats = await client.get_stats()

canceled = await client.cancel_run("run_123", reason="requested by user")

RunHandle

handle = await client.emit_workflow("checkout", {"order_id": "ord_3"})

state = await handle.get_state()
print(state["status"])

output = await handle.result(timeout_ms=30_000)

Testing

run_inline executes a workflow handler in-process without Redis — useful for unit tests.

from redflow import run_inline

result = await run_inline(my_workflow_def, input={"user_id": "test"})
assert result.succeeded
assert result.output == {"sent": True}

Override external steps:

result = await run_inline(
    my_workflow_def,
    input={"user_id": "test"},
    step_overrides={"fetch-user": {"email": "mock@test.com"}},
)

Errors

from redflow import (
    RedflowError,         # base class
    CanceledError,        # run was canceled
    TimeoutError,         # operation timed out
    NonRetriableError,    # permanent failure, no retries
    InputValidationError, # input schema mismatch
    UnknownWorkflowError, # workflow not registered
)

Throw NonRetriableError from a handler to fail the run immediately without exhausting retry attempts.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

redflow-0.0.7.tar.gz (74.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

redflow-0.0.7-py3-none-any.whl (36.5 kB view details)

Uploaded Python 3

File details

Details for the file redflow-0.0.7.tar.gz.

File metadata

  • Download URL: redflow-0.0.7.tar.gz
  • Upload date:
  • Size: 74.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for redflow-0.0.7.tar.gz
Algorithm Hash digest
SHA256 e0e9d9440400da079e6e60c21299e3a40335b62cd2703f6dfcd86432373cfb32
MD5 e33357252c2ae36e0984f1994400769f
BLAKE2b-256 e7a78e703af9260d7d2b5912e909b5ce7f39818aed610010ac2bf178be7b6295

See more details on using hashes here.

Provenance

The following attestation bundles were made for redflow-0.0.7.tar.gz:

Publisher: ci.yml on getrelocapp/redflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file redflow-0.0.7-py3-none-any.whl.

File metadata

  • Download URL: redflow-0.0.7-py3-none-any.whl
  • Upload date:
  • Size: 36.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for redflow-0.0.7-py3-none-any.whl
Algorithm Hash digest
SHA256 6283310f0a18dba6a803f3bc2df2de825754b3a8dd437184f4d189dfa882a4cf
MD5 4223047077079f10d665004bc034eb80
BLAKE2b-256 64ccbb5360807bf3022db88226557ab7a32fe2304c3263cfc473f0dd78c7ece0

See more details on using hashes here.

Provenance

The following attestation bundles were made for redflow-0.0.7-py3-none-any.whl:

Publisher: ci.yml on getrelocapp/redflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page