Skip to main content

Backend-agnostic workflow execution engine

Project description

muFlow

Pronounced "microflow" (μFlow).

Backend-agnostic task execution engine.

Overview

muFlow provides abstractions for defining and executing tasks that can run on multiple backends (Celery, AWS Lambda, AWS Step Functions) without modification.

Tasks are registered as pure computational units via @register_task. DAG topology is declared separately in a Pipeline definition, keeping task logic decoupled from orchestration.

Installation

pip install muflow

# With S3 support (for Lambda/Step Functions)
pip install muflow[s3]

# For development
pip install muflow[dev]

Architecture

┌─────────────────────────────────────────────────┐
│                    User Code                    │
│  @register_task()    Pipeline(steps={...})  │
└────────────────────────┬────────────────────────┘
                         │
                         ▼
              ┌──────────────────────┐
              │  Pipeline.build_plan │
              │   (compiles steps    │
              │    into static DAG)  │
              └──────────┬───────────┘
                         │
                         ▼
              ┌──────────────────────┐
              │   TaskPlan (DAG) │
              │   nodes, root_key    │
              └──────────┬───────────┘
                         │
            ┌────────────┼────────────┐
            ▼            ▼            ▼
      ┌────────┐  ┌─────────┐  ┌──────────────┐
      │ Local  │  │ Celery  │  │ Step         │
      │Backend │  │ Backend │  │ Functions    │
      └────────┘  └─────────┘  └──────────────┘

Core Concepts

Registering Tasks

Tasks are registered as plain functions:

from muflow import register_task

@register_task(name="ml.compute_features")
def compute_features(context):
    ds = context.kwargs["dataset_name"]
    # ... compute features ...
    context.save_json("features.json", {"dataset": ds, "features": [...]})

TaskContext

The TaskContext wraps a StorageBackend and provides file I/O, dependency access, and progress reporting:

from muflow import create_local_context

ctx = create_local_context(
    path="/tmp/task-output",
    kwargs={"param1": "value1"},
)

# Write outputs
ctx.save_json("result.json", {"accuracy": 0.95})
ctx.save_xarray("model.nc", xr.Dataset({"weights": [1, 2, 3]}))

# Read back
result = ctx.read_json("result.json")

# Access upstream dependencies
for key in ctx.dependency_keys():
    dep = ctx.dependency(key)
    data = dep.read_json("features.json")

Pipelines (Recommended for Multi-Step Tasks)

The Pipeline abstraction lets you declare the full DAG in one place. Individual tasks remain pure — they have no knowledge of the DAG topology.

from muflow import Pipeline, Step, ForEach

ml_pipeline = Pipeline(
    name="ml.full_pipeline",
    display_name="ML Training Pipeline",
    steps={
        "features": ForEach(
            task="ml.compute_features",
            over=lambda subject_key, kwargs: [
                {"dataset_name": ds} for ds in kwargs["datasets"]
            ],
        ),
        "train": Step(
            task="ml.train_model",
            after=["features"],
        ),
        "loo_cv": ForEach(
            task="ml.loo_cv",
            after=["features"],
            over=lambda subject_key, kwargs: [
                {"leave_out_index": i, "datasets": kwargs["datasets"]}
                for i in range(len(kwargs["datasets"]))
            ],
        ),
        "reports": ForEach(
            task="ml.generate_report",
            after=["train", "loo_cv"],
            over=lambda subject_key, kwargs: [
                {"format": fmt} for fmt in ("pdf", "xlsx", "csv")
            ],
        ),
    },
)

# Build the execution plan
plan = ml_pipeline.build_plan(
    subject_key="experiment:1",
    kwargs={"datasets": ["dataset_a", "dataset_b", "dataset_c"]},
)

# Execute on any backend
backend.submit_plan(plan)

This produces the following DAG (for N=3 datasets, 11 nodes total):

[features:0]  [features:1]  [features:2]     ← 3 parallel leaf nodes
      │              │              │
[train_model] [loo_cv:0] [loo_cv:1] [loo_cv:2]  ← 1 + 3 parallel
      │           │          │          │
[report_pdf] [report_xlsx] [report_csv]  ← 3 parallel
      │            │             │
           [sentinel/root]               ← auto-created

Step Types

  • Step(task, after) — a single job. Use kwargs_map to compute step-specific kwargs.
  • ForEach(task, over, after) — fan-out: over(subject_key, kwargs) returns a list of per-job kwargs dicts. One node is created per item.

Dependency Access Keys

When a downstream step references an upstream ForEach step, the access keys use colon-indexed notation:

# upstream "features" has 3 jobs → access keys are "features:0", "features:1", "features:2"
# upstream "train" has 1 job → access key is just "train"

@register_task(name="ml.generate_report")
def generate_report(context):
    model = context.dependency("train").read_json("model.json")
    for key in context.dependency_keys():
        if key.startswith("loo_cv:"):
            cv = context.dependency(key).read_json("cv_result.json")

TaskPlan

A static DAG representing the complete execution plan. Plans are compiled from a Pipeline definition once upfront and can be serialized as JSON.

from muflow import TaskPlan

# Build from a pipeline
plan = my_pipeline.build_plan("tag:1", {"param": "value"})

# Inspect the plan
print(f"Total nodes: {len(plan.nodes)}")
print(f"Leaf nodes: {[n.function for n in plan.leaf_nodes()]}")

# Walk through execution order
completed = set()
while not plan.is_complete(completed):
    ready = plan.ready_nodes(completed)
    for node in ready:
        execute(node)
        completed.add(node.key)

# Serialize to JSON
json_str = plan.to_json()

Content-Addressed Storage

muFlow uses deterministic, content-addressed storage prefixes. Same inputs always produce the same prefix, enabling automatic caching:

from muflow import compute_prefix

prefix = compute_prefix(
    {"task": "my.task", "subject": "data:123", "param": "value"},
)
# Returns: "muflow/my.task/a1b2c3d4..."

Caching

Caching is automatic and requires no configuration. When execute_task() runs a node, it first checks whether manifest.json already exists at the node's storage prefix. If it does, the task is skipped and marked cached — the node counts as completed and downstream tasks proceed normally.

# Re-running the same plan reuses any already-complete nodes automatically.
plan = ml_pipeline.build_plan(
    subject_key="experiment:1",
    kwargs={"datasets": ["a", "b", "c"]},
)
backend.submit_plan(plan)
# feature nodes with existing manifest.json are skipped instantly

Identity Keys

Use IdentityKey annotations to control which parameters affect caching. Only identity-keyed fields are included in the content-addressed hash:

from typing import Annotated
import pydantic
from muflow import IdentityKey, register_task

class TrainParams(pydantic.BaseModel):
    dataset_id: Annotated[int, IdentityKey()]  # affects hash
    display_name: str                           # does not affect hash

@register_task(name="ml.train", parameters=TrainParams)
def train(context):
    # context.kwargs is a validated TrainParams instance
    print(context.kwargs.dataset_id)

Execution Backends

  • LocalBackend — synchronous in-process execution (for testing)
  • CeleryBackend — Celery chord/group for parallel execution
  • StepFunctionsBackend — AWS Step Functions with Lambda

submit_plan() returns a PlanHandle — a serialisable reference to the submitted execution:

from muflow.backends import LocalBackend

backend = LocalBackend(base_path="/tmp/results")
handle = backend.submit_plan(plan)

# Query state (no S3 queries; uses Redis for Celery, sfn.describe_execution for SFN)
print(handle.get_state())   # "success" | "failure" | "running" | "pending"

# Check per-node progress (checks manifest.json at each node's storage prefix)
progress = handle.get_progress()
print(f"{progress.completed}/{progress.total} nodes complete")

# Persist across processes (e.g. store in a Django model field)
stored = handle.to_json()
handle = PlanHandle.from_json(stored)

Testing

pip install muflow[dev]
pytest

Testing Utilities

muFlow provides utilities for testing pipelines locally:

from muflow import run_plan_locally

result = run_plan_locally(
    pipeline=my_pipeline,
    subject_key="dataset:test",
    kwargs={"param": "value"},
    output_dir="/tmp/test_output",
)

assert result.success
data = result.read_json("result.json")
files = result.list_files()

Resource Management

Utilities for transparent resource fetching from local files or URLs:

from muflow import ResourceManager, is_url, resolve_uri

# Resolve a URI (downloads URLs to temp files)
local_path = resolve_uri("https://example.com/data.nc")

# Automatic cleanup with context manager
with ResourceManager() as rm:
    path1 = rm.resolve("/local/file.nc")
    path2 = rm.resolve("https://example.com/remote.nc")

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

muflow-0.2.0.tar.gz (73.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

muflow-0.2.0-py3-none-any.whl (54.8 kB view details)

Uploaded Python 3

File details

Details for the file muflow-0.2.0.tar.gz.

File metadata

  • Download URL: muflow-0.2.0.tar.gz
  • Upload date:
  • Size: 73.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for muflow-0.2.0.tar.gz
Algorithm Hash digest
SHA256 216a75f4e0b858ebfbcfe9a125261e62c4b14eb17497253eb79ac096e2447f79
MD5 08ac3c1d8fa9768c12ffa2ab1e56e0ce
BLAKE2b-256 2bd17714ca2d70b6def658d73a0945af9672b842172a9a3892744edab2bdd2cc

See more details on using hashes here.

Provenance

The following attestation bundles were made for muflow-0.2.0.tar.gz:

Publisher: publish.yml on ContactEngineering/muFlow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file muflow-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: muflow-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 54.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for muflow-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 1a8c5a227e0ff43894d8b87a2328d2bf8d7cfcff96028d5586960fe49e767e03
MD5 570f55a5bd840d628b1c2c9176b84136
BLAKE2b-256 715e61d74b80ec6823a8f882233af2f4fae20a4085be3bee84ff14c45909208a

See more details on using hashes here.

Provenance

The following attestation bundles were made for muflow-0.2.0-py3-none-any.whl:

Publisher: publish.yml on ContactEngineering/muFlow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page