Skip to main content

A lightweight async DAG orchestrator for small to medium ETL pipelines

Project description

flowrun

flowrun is a lightweight async DAG orchestrator for small to medium ETL pipelines. It is designed for low operational overhead and low complexity workloads (for example Polars pipelines, API ingest + transform + load jobs, or periodic data sync tasks).

Core ideas:

  • Keep orchestration simple: declare tasks + dependencies, run a DAG.
  • Keep runtime dependency-free: stdlib-based implementation.
  • Keep behavior explicit: retries, timeouts, skip semantics, run reports.

Installation

pip install flowrun-dag

The import name remains flowrun:

import flowrun

For development:

git clone https://github.com/Mg30/flowrun.git
cd flowrun
uv sync --group dev
uv run pytest -q

Quick Start

import asyncio
from dataclasses import dataclass

from flowrun import RunContext, build_default_engine

engine = build_default_engine(max_workers=4, max_parallel=3)


@dataclass(frozen=True)
class Deps:
    source_path: str


@engine.task(name="extract", dag="daily_etl")
def extract(context: RunContext[Deps]) -> list[dict]:
    # In real jobs, read from file/API/db
    return [{"id": 1, "amount": 10}, {"id": 2, "amount": 15}]


@engine.task(name="transform", dag="daily_etl", deps=[extract])
def transform(extract: list[dict]) -> dict[str, int]:
    total = sum(row["amount"] for row in extract)
    return {"rows": len(extract), "total": total}


@engine.task(name="load", dag="daily_etl", deps=[transform])
def load(transform: dict[str, int]) -> str:
    # Persist results
    return f"loaded rows={transform['rows']} total={transform['total']}"


async def main() -> None:
    ctx = RunContext(Deps(source_path="/tmp/data.json"))
    async with engine:
        engine.validate("daily_etl")
        run_id = await engine.run_once("daily_etl", context=ctx)
        report = engine.get_run_report(run_id)
        print(report["status"])  # SUCCESS | FAILED | RUNNING


asyncio.run(main())

Concepts

  • Task: Python callable registered with @engine.task(...) or @task(...).
  • DAG: namespace (dag="name") plus dependency edges between tasks.
  • Run: one execution instance of a DAG (run_id).
  • State store: tracks run/task status, timing, errors, and results.

Task status lifecycle:

  • PENDING -> RUNNING -> SUCCESS
  • PENDING -> RUNNING -> FAILED -> PENDING (retry path)
  • PENDING -> SKIPPED (blocked by failed upstream)

API Guide

build_default_engine(...)

engine = build_default_engine(
    executor=None,
    max_workers=8,
    max_parallel=4,
    logger=None,
    hooks=None,
    state_store=None,
)

Parameters:

  • executor: optional concurrent.futures.Executor for sync tasks.
  • max_workers: thread pool size if executor is not provided.
  • max_parallel: max concurrent scheduled tasks, must be >= 1.
  • logger: optional logging.Logger used across components.
  • hooks: optional list of RunHook handlers.
  • state_store: optional custom state store (StateStoreProtocol).

Returns: configured Engine.

Engine methods

Run control:

  • await engine.run_once(dag_name, context=None) -> str
  • await engine.resume(run_id, from_tasks=None, context=None) -> str
  • await engine.run_subgraph(dag_name, targets, context=None) -> str

Validation and discovery:

  • engine.validate(dag_name) -> None
  • engine.list_dags() -> list[str]
  • engine.list_tasks(dag_name) -> list[str]
  • engine.display_dag(dag_name) -> str

Reporting:

  • engine.get_run_report(run_id) -> dict

Resource lifecycle:

  • engine.close() -> None
  • async with engine: closes owned thread pool on exit.

Task registration

Preferred style (bound to engine registry):

@engine.task(name="task_a", dag="etl", deps=[...], timeout_s=30.0, retries=1, retain_result=True)
def task_a(...):
    ...

Arguments:

  • name: optional, defaults to function name.
  • dag: DAG namespace for selection via run_once(dag_name).
  • deps: list of task names or decorated task callables.
  • timeout_s: per-attempt timeout (None disables timeout).
  • retries: retry count after failures.
  • retain_result: if False, clear result from state when safe.

Avoid repeating dag=... with a DAG-scoped container:

etl = engine.dag("daily_etl")

@etl.task(name="extract")
def extract() -> list[int]:
    return [1, 2, 3]

@etl.task(name="sum_values", deps=[extract])
def sum_values(extract: list[int]) -> int:
    return sum(extract)

run_id = await etl.run_once()

Available on the scope:

  • etl.task(...)
  • etl.task_template(...)
  • await etl.run_once(context=None)
  • await etl.run_subgraph(targets, context=None)
  • etl.validate(), etl.display(), etl.list_tasks()

Also available as global decorator:

from flowrun import task, TaskRegistry

registry = TaskRegistry()
token = registry.activate()

@task
def my_task():
    return 1

TaskRegistry.deactivate(token)

Notes:

  • @task(...), @task, and @task("name", ...) are supported.
  • If using global @task, provide registry=... or activate one.

Dependency result injection

Named dependency injection:

@engine.task(name="extract", dag="etl")
def extract() -> list[int]:
    return [1, 2, 3]

@engine.task(name="sum_values", dag="etl", deps=[extract])
def sum_values(extract: list[int]) -> int:
    return sum(extract)

Generic upstream injection:

@engine.task(name="combine", dag="etl", deps=["a", "b"])
def combine(upstream: dict[str, object]) -> object:
    return (upstream["a"], upstream["b"])

If upstream is declared, named dependency injection is disabled.

Context injection

Tasks can accept a typed RunContext[...] as a positional parameter.

@dataclass(frozen=True)
class Deps:
    api_base: str

@engine.task(name="pull", dag="etl")
def pull(context: RunContext[Deps]) -> dict:
    return {"base": context.api_base}

Task templates

Register parameterized task variants.

def fetch_table(*, table: str) -> str:
    return f"select * from {table}"

tpl = engine.task_template(fetch_table, dag="etl")
tpl.bind("fetch_users", table="users")
tpl.bind("fetch_orders", table="orders")

Execution Semantics

DAG scoping and unknown DAG behavior

  • If tasks use explicit dag=... namespaces, unknown DAG names raise ValueError.
  • Error messages include available DAG names and a close-match suggestion.
  • Legacy behavior remains for unscoped registries (single implicit DAG).

Dependency validation

Build-time validation catches:

  • Missing dependencies.
  • Cross-DAG dependencies.
  • Cycles.

Missing dependency errors include close-match suggestions when available.

Retries

  • Retries are per task and per run attempt.
  • Downstream tasks are skipped only after upstream retries are exhausted.

Timeouts

  • Applied per attempt.
  • Async tasks use asyncio.wait_for.
  • Sync tasks run in executor and are awaited with timeout.

Result retention

  • retain_result=True (default): keep result in state.
  • retain_result=False: clear result once all downstream consumers are launched/done.
  • Useful to reduce memory when passing larger intermediate objects.

Run Report Format

engine.get_run_report(run_id) returns:

{
  "run_id": "...",
  "dag_name": "...",
  "created_at": 0.0,
  "finished_at": 0.0,
  "status": "SUCCESS",  # SUCCESS | FAILED | RUNNING
  "tasks": {
    "task_name": {
      "status": "SUCCESS",
      "attempt": 1,
      "started_at": 0.0,
      "finished_at": 0.0,
      "error": None,
      "result": {...}
    }
  }
}

Run-level status rules:

  • FAILED if any task is FAILED or SKIPPED.
  • SUCCESS if all tasks are SUCCESS.
  • RUNNING otherwise.

Hooks

Use hooks to emit metrics, alerts, or tracing signals.

from flowrun import fn_hook, build_default_engine

hook = fn_hook(
    on_task_failure=lambda e: print(f"FAIL {e.task_name}: {e.error}"),
    on_dag_end=lambda e: print(f"DAG done: {e.dag_name}"),
)

engine = build_default_engine(hooks=[hook])

Hook API:

  • RunHook class with overridable methods.
  • fn_hook(...) for function-based handlers.
  • Hook errors are caught and logged (do not crash runs).

Events:

  • DagStartEvent, DagEndEvent
  • TaskStartEvent, TaskSuccessEvent, TaskFailureEvent
  • TaskRetryEvent, TaskSkipEvent

State Stores

In-memory (default):

  • StateStore / InMemoryStateStore
  • Fast, process-local, ephemeral.

SQLite persistent backend:

  • SqliteStateStore(db_path, serializer=..., cache_ttl_s=None, recover=False)
  • Persists run and task state.
  • Optional crash recovery marks orphaned RUNNING tasks as failed.

Serialization options for persisted results:

  • JsonSerializer (default for SQLite backend)
  • PickleSerializer
  • custom ResultSerializer implementation

Practical ETL Patterns

Small Polars pipeline pattern

  • Keep each task focused (extract, transform, load).
  • Set retain_result=False on large intermediate transforms.
  • Use retries on flaky IO tasks, not pure transforms.
  • Keep max_parallel modest for predictable resource use.

Re-run from a checkpoint task

new_run_id = await engine.resume(old_run_id, from_tasks=["transform"], context=ctx)

This re-executes transform and all downstream tasks, while preserving unaffected successful upstream tasks.

Run only a target branch

run_id = await engine.run_subgraph("daily_etl", targets=["load"], context=ctx)

This executes load plus all transitive dependencies required for load.

Logging

Pass a logger to build_default_engine(logger=...).

Typical levels:

  • INFO: DAG start/finish, task success, retries, skips.
  • WARNING: task failures, timeouts.
  • DEBUG: task launch details, tracebacks, shutdown details.

Testing Your DAGs

Recommended test layers:

  • Unit test each task function directly.
  • Integration test DAG execution with build_default_engine().
  • Validate topology with engine.validate(...) and engine.list_tasks(...).
  • Assert on get_run_report(...) for end-to-end behavior.

Public API Surface

Top-level exports in flowrun:

  • Engine, build_default_engine
  • RunContext
  • task, task_template, TaskSpec, TaskRegistry
  • SchedulerConfig
  • RunHook, fn_hook
  • StateStore, InMemoryStateStore, StateStoreProtocol
  • SqliteStateStore
  • JsonSerializer, PickleSerializer, ResultSerializer

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flowrun_dag-0.1.0.tar.gz (43.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flowrun_dag-0.1.0-py3-none-any.whl (33.1 kB view details)

Uploaded Python 3

File details

Details for the file flowrun_dag-0.1.0.tar.gz.

File metadata

  • Download URL: flowrun_dag-0.1.0.tar.gz
  • Upload date:
  • Size: 43.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for flowrun_dag-0.1.0.tar.gz
Algorithm Hash digest
SHA256 3962e60410dbe321bdc156b5d5ba5774d6939e033f7e7c2f3151fa46c9140c0f
MD5 3d4fc090926efc1041250db0c87c06c9
BLAKE2b-256 7128e190d9758e6111a92e1a77bfa741fe5f7be37215cabc57295903aef14ea1

See more details on using hashes here.

Provenance

The following attestation bundles were made for flowrun_dag-0.1.0.tar.gz:

Publisher: publish.yml on Mg30/flowrun

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file flowrun_dag-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: flowrun_dag-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 33.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for flowrun_dag-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 58d26fa27e8b9344a6c424d60af586bdc41bb11bbaeaa17dd210b9e17c62389c
MD5 7243ef32f5fddd634ed46384c456b697
BLAKE2b-256 0187187364636bdaaf121f212616e733ba9f5f7f4409906affa68a438dfb6ae9

See more details on using hashes here.

Provenance

The following attestation bundles were made for flowrun_dag-0.1.0-py3-none-any.whl:

Publisher: publish.yml on Mg30/flowrun

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page