Skip to main content

Backend-agnostic workflow execution engine

Project description

muFlow

Pronounced "microflow" (μFlow).

Backend-agnostic workflow execution engine.

Overview

muFlow provides abstractions for defining and executing workflows that can run on multiple backends (Celery, AWS Lambda, AWS Step Functions) without modification.

Workflows are registered as pure computational units via @register_workflow. DAG topology is declared separately in a Pipeline definition, keeping workflow logic decoupled from orchestration.

Installation

pip install muflow

# With S3 support (for Lambda/Step Functions)
pip install muflow[s3]

# For development
pip install muflow[dev]

Architecture

┌─────────────────────────────────────────────────┐
│                    User Code                    │
│  @register_workflow()    Pipeline(steps={...})  │
└────────────────────────┬────────────────────────┘
                         │
                         ▼
              ┌──────────────────────┐
              │  Pipeline.build_plan │
              │   (compiles steps    │
              │    into static DAG)  │
              └──────────┬───────────┘
                         │
                         ▼
              ┌──────────────────────┐
              │   WorkflowPlan (DAG) │
              │   nodes, root_key    │
              └──────────┬───────────┘
                         │
            ┌────────────┼────────────┐
            ▼            ▼            ▼
      ┌────────┐  ┌─────────┐  ┌──────────────┐
      │ Local  │  │ Celery  │  │ Step         │
      │Backend │  │ Backend │  │ Functions    │
      └────────┘  └─────────┘  └──────────────┘

Core Concepts

Registering Workflows

Workflows are registered as plain functions:

from muflow import register_workflow

@register_workflow(name="ml.compute_features")
def compute_features(context):
    ds = context.kwargs["dataset_name"]
    # ... compute features ...
    context.save_json("features.json", {"dataset": ds, "features": [...]})

WorkflowContext

The WorkflowContext wraps a StorageBackend and provides file I/O, dependency access, and progress reporting:

from muflow import create_local_context

ctx = create_local_context(
    path="/tmp/workflow-output",
    kwargs={"param1": "value1"},
)

# Write outputs
ctx.save_json("result.json", {"accuracy": 0.95})
ctx.save_xarray("model.nc", xr.Dataset({"weights": [1, 2, 3]}))

# Read back
result = ctx.read_json("result.json")

# Access upstream dependencies
for key in ctx.dependency_keys():
    dep = ctx.dependency(key)
    data = dep.read_json("features.json")

Pipelines (Recommended for Multi-Step Workflows)

The Pipeline abstraction lets you declare the full DAG in one place. Individual workflows remain pure — they have no knowledge of the DAG topology.

from muflow import Pipeline, Step, ForEach

ml_pipeline = Pipeline(
    name="ml.full_pipeline",
    display_name="ML Training Pipeline",
    steps={
        "features": ForEach(
            workflow="ml.compute_features",
            over=lambda subject_key, kwargs: [
                {"dataset_name": ds} for ds in kwargs["datasets"]
            ],
        ),
        "train": Step(
            workflow="ml.train_model",
            after=["features"],
        ),
        "loo_cv": ForEach(
            workflow="ml.loo_cv",
            after=["features"],
            over=lambda subject_key, kwargs: [
                {"leave_out_index": i, "datasets": kwargs["datasets"]}
                for i in range(len(kwargs["datasets"]))
            ],
        ),
        "reports": ForEach(
            workflow="ml.generate_report",
            after=["train", "loo_cv"],
            over=lambda subject_key, kwargs: [
                {"format": fmt} for fmt in ("pdf", "xlsx", "csv")
            ],
        ),
    },
)

# Build the execution plan
plan = ml_pipeline.build_plan(
    subject_key="experiment:1",
    kwargs={"datasets": ["dataset_a", "dataset_b", "dataset_c"]},
)

# Execute on any backend
backend.submit_plan(plan)

This produces the following DAG (for N=3 datasets, 11 nodes total):

[features:0]  [features:1]  [features:2]     ← 3 parallel leaf nodes
      │              │              │
[train_model] [loo_cv:0] [loo_cv:1] [loo_cv:2]  ← 1 + 3 parallel
      │           │          │          │
[report_pdf] [report_xlsx] [report_csv]  ← 3 parallel
      │            │             │
           [sentinel/root]               ← auto-created

Step Types

  • Step(workflow, after) — a single job. Use kwargs_map to compute step-specific kwargs.
  • ForEach(workflow, over, after) — fan-out: over(subject_key, kwargs) returns a list of per-job kwargs dicts. One node is created per item.

Dependency Access Keys

When a downstream step references an upstream ForEach step, the access keys use colon-indexed notation:

# upstream "features" has 3 jobs → access keys are "features:0", "features:1", "features:2"
# upstream "train" has 1 job → access key is just "train"

@register_workflow(name="ml.generate_report")
def generate_report(context):
    model = context.dependency("train").read_json("model.json")
    for key in context.dependency_keys():
        if key.startswith("loo_cv:"):
            cv = context.dependency(key).read_json("cv_result.json")

WorkflowPlan

A static DAG representing the complete execution plan. Plans are compiled from a Pipeline definition once upfront and can be serialized as JSON.

from muflow import WorkflowPlan

# Build from a pipeline
plan = my_pipeline.build_plan("tag:1", {"param": "value"})

# Inspect the plan
print(f"Total nodes: {len(plan.nodes)}")
print(f"Leaf nodes: {[n.function for n in plan.leaf_nodes()]}")

# Walk through execution order
completed = set()
while not plan.is_complete(completed):
    ready = plan.ready_nodes(completed)
    for node in ready:
        execute(node)
        completed.add(node.key)

# Serialize to JSON
json_str = plan.to_json()

Content-Addressed Storage

muFlow uses deterministic, content-addressed storage prefixes. Same inputs always produce the same prefix, enabling automatic caching:

from muflow import compute_prefix

prefix = compute_prefix(
    {"workflow": "my.workflow", "subject": "data:123", "param": "value"},
)
# Returns: "muflow/my.workflow/a1b2c3d4..."

Caching

Both Pipeline.build_plan() and WorkflowPlanner.build_plan() accept an is_cached callback. Cached nodes are skipped during execution, and their dependents treat them as already completed:

def check_cache(workflow_name, subject_key, kwargs):
    prefix = compute_prefix({"workflow": workflow_name, "subject": subject_key, **kwargs})
    return storage.exists(f"{prefix}/manifest.json")

plan = ml_pipeline.build_plan(
    "experiment:1",
    {"datasets": ["a", "b", "c"]},
    is_cached=check_cache,
)
# Cached feature nodes are skipped → training starts immediately

Identity Keys

Use IdentityKey annotations to control which parameters affect caching. Only identity-keyed fields are included in the content-addressed hash:

from typing import Annotated
import pydantic
from muflow import IdentityKey, register_workflow

class TrainParams(pydantic.BaseModel):
    dataset_id: Annotated[int, IdentityKey()]  # affects hash
    display_name: str                           # does not affect hash

@register_workflow(name="ml.train", parameters=TrainParams)
def train(context):
    # context.kwargs is a validated TrainParams instance
    print(context.kwargs.dataset_id)

Execution Backends

  • LocalBackend — synchronous in-process execution (for testing)
  • CeleryBackend — Celery chord/group for parallel execution
  • StepFunctionsBackend — AWS Step Functions with Lambda
from muflow import LocalBackend

backend = LocalBackend(base_path="/tmp/results")
backend.submit_plan(plan)

Testing

pip install muflow[dev]
pytest

Testing Utilities

muFlow provides utilities for testing pipelines locally:

from muflow import run_plan_locally

result = run_plan_locally(
    pipeline=my_pipeline,
    subject_key="dataset:test",
    kwargs={"param": "value"},
    output_dir="/tmp/test_output",
)

assert result.success
data = result.read_json("result.json")
files = result.list_files()

Resource Management

Utilities for transparent resource fetching from local files or URLs:

from muflow import ResourceManager, is_url, resolve_uri

# Resolve a URI (downloads URLs to temp files)
local_path = resolve_uri("https://example.com/data.nc")

# Automatic cleanup with context manager
with ResourceManager() as rm:
    path1 = rm.resolve("/local/file.nc")
    path2 = rm.resolve("https://example.com/remote.nc")

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

muflow-0.1.0.tar.gz (64.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

muflow-0.1.0-py3-none-any.whl (51.3 kB view details)

Uploaded Python 3

File details

Details for the file muflow-0.1.0.tar.gz.

File metadata

  • Download URL: muflow-0.1.0.tar.gz
  • Upload date:
  • Size: 64.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for muflow-0.1.0.tar.gz
Algorithm Hash digest
SHA256 4a70c511830ae3086c37c74afb42807d35cae8468d5cb60f4ddd307da8f3ccf7
MD5 8b0a3ba0a646f4a51166d4ef0b64bd37
BLAKE2b-256 a3ce1fb4e679f8363e21d7a2c6cb7d459d4fb616a425c5ff1aec5ccf9c391897

See more details on using hashes here.

Provenance

The following attestation bundles were made for muflow-0.1.0.tar.gz:

Publisher: publish.yml on ContactEngineering/muFlow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file muflow-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: muflow-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 51.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for muflow-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6a7093287306cb3db871afa707ad8e29723e7c5698811845079dde89c72a8665
MD5 15a51c1d35e5a9c981dc8ce06ba30d1b
BLAKE2b-256 b12adb47a1dd82e18c7e914232d2f4e75901fcea71c92319e2265f4228cb1432

See more details on using hashes here.

Provenance

The following attestation bundles were made for muflow-0.1.0-py3-none-any.whl:

Publisher: publish.yml on ContactEngineering/muFlow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page