A lightweight async DAG orchestrator for small to medium ETL pipelines
Project description
flowrun
flowrun is a lightweight async DAG orchestrator for small to medium ETL pipelines.
It is designed for low operational overhead and low complexity workloads (for example
Polars pipelines, API ingest + transform + load jobs, or periodic data sync tasks).
Core ideas:
- Keep orchestration simple: declare tasks + dependencies, run a DAG.
- Keep runtime dependency-free: stdlib-based implementation.
- Keep behavior explicit: retries, timeouts, skip semantics, run reports.
Installation
pip install flowrun-dag
The import name remains
flowrun:import flowrun
For development:
git clone https://github.com/Mg30/flowrun.git
cd flowrun
uv sync --group dev
uv run pytest -q
Quick Start
import asyncio
from dataclasses import dataclass
from flowrun import RunContext, build_default_engine
engine = build_default_engine(max_workers=4, max_parallel=3)
@dataclass(frozen=True)
class Deps:
source_path: str
@engine.task(name="extract", dag="daily_etl")
def extract(context: RunContext[Deps]) -> list[dict]:
# In real jobs, read from file/API/db
return [{"id": 1, "amount": 10}, {"id": 2, "amount": 15}]
@engine.task(name="transform", dag="daily_etl", deps=[extract])
def transform(extract: list[dict]) -> dict[str, int]:
total = sum(row["amount"] for row in extract)
return {"rows": len(extract), "total": total}
@engine.task(name="load", dag="daily_etl", deps=[transform])
def load(transform: dict[str, int]) -> str:
# Persist results
return f"loaded rows={transform['rows']} total={transform['total']}"
async def main() -> None:
ctx = RunContext(Deps(source_path="/tmp/data.json"))
async with engine:
engine.validate("daily_etl")
run_id = await engine.run_once("daily_etl", context=ctx)
report = engine.get_run_report(run_id)
print(report["status"]) # SUCCESS | FAILED | RUNNING
asyncio.run(main())
Concepts
- Task: Python callable registered with
@engine.task(...)or@task(...). - DAG: namespace (
dag="name") plus dependency edges between tasks. - Run: one execution instance of a DAG (
run_id). - State store: tracks run/task status, timing, errors, and results.
Task status lifecycle:
PENDING -> RUNNING -> SUCCESSPENDING -> RUNNING -> FAILED -> PENDING(retry path)PENDING -> SKIPPED(blocked by failed upstream)
API Guide
build_default_engine(...)
engine = build_default_engine(
executor=None,
max_workers=8,
max_parallel=4,
logger=None,
hooks=None,
state_store=None,
)
Parameters:
executor: optionalconcurrent.futures.Executorfor sync tasks.max_workers: thread pool size ifexecutoris not provided.max_parallel: max concurrent scheduled tasks, must be>= 1.logger: optionallogging.Loggerused across components.hooks: optional list ofRunHookhandlers.state_store: optional custom state store (StateStoreProtocol).
Returns: configured Engine.
Engine methods
Run control:
await engine.run_once(dag_name, context=None) -> strawait engine.resume(run_id, from_tasks=None, context=None) -> strawait engine.run_subgraph(dag_name, targets, context=None) -> str
Validation and discovery:
engine.validate(dag_name) -> Noneengine.list_dags() -> list[str]engine.list_tasks(dag_name) -> list[str]engine.display_dag(dag_name) -> str
Reporting:
engine.get_run_report(run_id) -> dict
Resource lifecycle:
engine.close() -> Noneasync with engine:closes owned thread pool on exit.
Task registration
Preferred style (bound to engine registry):
@engine.task(name="task_a", dag="etl", deps=[...], timeout_s=30.0, retries=1, retain_result=True)
def task_a(...):
...
Arguments:
name: optional, defaults to function name.dag: DAG namespace for selection viarun_once(dag_name).deps: list of task names or decorated task callables.timeout_s: per-attempt timeout (Nonedisables timeout).retries: retry count after failures.retain_result: ifFalse, clear result from state when safe.
Avoid repeating dag=... with a DAG-scoped container:
etl = engine.dag("daily_etl")
@etl.task(name="extract")
def extract() -> list[int]:
return [1, 2, 3]
@etl.task(name="sum_values", deps=[extract])
def sum_values(extract: list[int]) -> int:
return sum(extract)
run_id = await etl.run_once()
Available on the scope:
etl.task(...)etl.task_template(...)await etl.run_once(context=None)await etl.run_subgraph(targets, context=None)etl.validate(),etl.display(),etl.list_tasks()
Also available as global decorator:
from flowrun import task, TaskRegistry
registry = TaskRegistry()
token = registry.activate()
@task
def my_task():
return 1
TaskRegistry.deactivate(token)
Notes:
@task(...),@task, and@task("name", ...)are supported.- If using global
@task, provideregistry=...or activate one.
Dependency result injection
Named dependency injection:
@engine.task(name="extract", dag="etl")
def extract() -> list[int]:
return [1, 2, 3]
@engine.task(name="sum_values", dag="etl", deps=[extract])
def sum_values(extract: list[int]) -> int:
return sum(extract)
Generic upstream injection:
@engine.task(name="combine", dag="etl", deps=["a", "b"])
def combine(upstream: dict[str, object]) -> object:
return (upstream["a"], upstream["b"])
If upstream is declared, named dependency injection is disabled.
Context injection
Tasks can accept a typed RunContext[...] as a positional parameter.
@dataclass(frozen=True)
class Deps:
api_base: str
@engine.task(name="pull", dag="etl")
def pull(context: RunContext[Deps]) -> dict:
return {"base": context.api_base}
Task templates
Register parameterized task variants.
def fetch_table(*, table: str) -> str:
return f"select * from {table}"
tpl = engine.task_template(fetch_table, dag="etl")
tpl.bind("fetch_users", table="users")
tpl.bind("fetch_orders", table="orders")
Execution Semantics
DAG scoping and unknown DAG behavior
- If tasks use explicit
dag=...namespaces, unknown DAG names raiseValueError. - Error messages include available DAG names and a close-match suggestion.
- Legacy behavior remains for unscoped registries (single implicit DAG).
Dependency validation
Build-time validation catches:
- Missing dependencies.
- Cross-DAG dependencies.
- Cycles.
Missing dependency errors include close-match suggestions when available.
Retries
- Retries are per task and per run attempt.
- Downstream tasks are skipped only after upstream retries are exhausted.
Timeouts
- Applied per attempt.
- Async tasks use
asyncio.wait_for. - Sync tasks run in executor and are awaited with timeout.
Result retention
retain_result=True(default): keep result in state.retain_result=False: clear result once all downstream consumers are launched/done.- Useful to reduce memory when passing larger intermediate objects.
Run Report Format
engine.get_run_report(run_id) returns:
{
"run_id": "...",
"dag_name": "...",
"created_at": 0.0,
"finished_at": 0.0,
"status": "SUCCESS", # SUCCESS | FAILED | RUNNING
"tasks": {
"task_name": {
"status": "SUCCESS",
"attempt": 1,
"started_at": 0.0,
"finished_at": 0.0,
"error": None,
"result": {...}
}
}
}
Run-level status rules:
FAILEDif any task isFAILEDorSKIPPED.SUCCESSif all tasks areSUCCESS.RUNNINGotherwise.
Hooks
Use hooks to emit metrics, alerts, or tracing signals.
from flowrun import fn_hook, build_default_engine
hook = fn_hook(
on_task_failure=lambda e: print(f"FAIL {e.task_name}: {e.error}"),
on_dag_end=lambda e: print(f"DAG done: {e.dag_name}"),
)
engine = build_default_engine(hooks=[hook])
Hook API:
RunHookclass with overridable methods.fn_hook(...)for function-based handlers.- Hook errors are caught and logged (do not crash runs).
Events:
DagStartEvent,DagEndEventTaskStartEvent,TaskSuccessEvent,TaskFailureEventTaskRetryEvent,TaskSkipEvent
State Stores
In-memory (default):
StateStore/InMemoryStateStore- Fast, process-local, ephemeral.
SQLite persistent backend:
SqliteStateStore(db_path, serializer=..., cache_ttl_s=None, recover=False)- Persists run and task state.
- Optional crash recovery marks orphaned
RUNNINGtasks as failed.
Serialization options for persisted results:
JsonSerializer(default for SQLite backend)PickleSerializer- custom
ResultSerializerimplementation
Practical ETL Patterns
Small Polars pipeline pattern
- Keep each task focused (
extract,transform,load). - Set
retain_result=Falseon large intermediate transforms. - Use
retrieson flaky IO tasks, not pure transforms. - Keep
max_parallelmodest for predictable resource use.
Re-run from a checkpoint task
new_run_id = await engine.resume(old_run_id, from_tasks=["transform"], context=ctx)
This re-executes transform and all downstream tasks, while preserving unaffected successful upstream tasks.
Run only a target branch
run_id = await engine.run_subgraph("daily_etl", targets=["load"], context=ctx)
This executes load plus all transitive dependencies required for load.
Logging
Pass a logger to build_default_engine(logger=...).
Typical levels:
INFO: DAG start/finish, task success, retries, skips.WARNING: task failures, timeouts.DEBUG: task launch details, tracebacks, shutdown details.
Testing Your DAGs
Recommended test layers:
- Unit test each task function directly.
- Integration test DAG execution with
build_default_engine(). - Validate topology with
engine.validate(...)andengine.list_tasks(...). - Assert on
get_run_report(...)for end-to-end behavior.
Public API Surface
Top-level exports in flowrun:
Engine,build_default_engineRunContexttask,task_template,TaskSpec,TaskRegistrySchedulerConfigRunHook,fn_hookStateStore,InMemoryStateStore,StateStoreProtocolSqliteStateStoreJsonSerializer,PickleSerializer,ResultSerializer
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file flowrun_dag-0.1.0.tar.gz.
File metadata
- Download URL: flowrun_dag-0.1.0.tar.gz
- Upload date:
- Size: 43.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3962e60410dbe321bdc156b5d5ba5774d6939e033f7e7c2f3151fa46c9140c0f
|
|
| MD5 |
3d4fc090926efc1041250db0c87c06c9
|
|
| BLAKE2b-256 |
7128e190d9758e6111a92e1a77bfa741fe5f7be37215cabc57295903aef14ea1
|
Provenance
The following attestation bundles were made for flowrun_dag-0.1.0.tar.gz:
Publisher:
publish.yml on Mg30/flowrun
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
flowrun_dag-0.1.0.tar.gz -
Subject digest:
3962e60410dbe321bdc156b5d5ba5774d6939e033f7e7c2f3151fa46c9140c0f - Sigstore transparency entry: 1003676098
- Sigstore integration time:
-
Permalink:
Mg30/flowrun@af161049ccca439118ac31bb546604f10550fde7 -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/Mg30
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@af161049ccca439118ac31bb546604f10550fde7 -
Trigger Event:
push
-
Statement type:
File details
Details for the file flowrun_dag-0.1.0-py3-none-any.whl.
File metadata
- Download URL: flowrun_dag-0.1.0-py3-none-any.whl
- Upload date:
- Size: 33.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
58d26fa27e8b9344a6c424d60af586bdc41bb11bbaeaa17dd210b9e17c62389c
|
|
| MD5 |
7243ef32f5fddd634ed46384c456b697
|
|
| BLAKE2b-256 |
0187187364636bdaaf121f212616e733ba9f5f7f4409906affa68a438dfb6ae9
|
Provenance
The following attestation bundles were made for flowrun_dag-0.1.0-py3-none-any.whl:
Publisher:
publish.yml on Mg30/flowrun
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
flowrun_dag-0.1.0-py3-none-any.whl -
Subject digest:
58d26fa27e8b9344a6c424d60af586bdc41bb11bbaeaa17dd210b9e17c62389c - Sigstore transparency entry: 1003676101
- Sigstore integration time:
-
Permalink:
Mg30/flowrun@af161049ccca439118ac31bb546604f10550fde7 -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/Mg30
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@af161049ccca439118ac31bb546604f10550fde7 -
Trigger Event:
push
-
Statement type: