Skip to main content

Workflow orchestration primitives for illumo demo

Project description

illumo-flow

Workflow orchestration primitives featuring declarative DSL wiring, routing control, and fail-fast execution.

Installation

pip install illumo-flow

Quick Example

from illumo_flow import Flow, FunctionNode

# Define lightweight callables (each works on a shared context dict)
def extract(payload):
    return {"customer_id": 42, "source": "demo"}

def transform(payload, ctx):
    return {**payload, "normalized": True}

def load(payload, ctx):
    return f"stored:{payload['customer_id']}"

nodes = {
    "extract": FunctionNode(extract, name="extract", outputs="$ctx.data.raw"),
    "transform": FunctionNode(
        transform,
        name="transform",
        inputs="$ctx.data.raw",
        outputs="$ctx.data.normalized",
    ),
    "load": FunctionNode(
        load,
        name="load",
        inputs="$ctx.data.normalized",
        outputs="$ctx.data.persisted",
    ),
}

flow = Flow.from_dsl(
    nodes=nodes,
    entry="extract",
    edges=["extract >> transform", "transform >> load"],
)

context = {}
flow.run(context)
print(context["data"]["persisted"])  # stored:42

# Flow.run now returns the mutated context; per-node outputs also remain
# available under `context["payloads"]`.

Examples & CLI

The GitHub repository ships reference examples and a CLI (e.g. python -m examples linear_etl). Clone the repo if you want to explore them locally:

git clone https://github.com/kitfactory/illumo-flow.git
cd illumo-flow
python -m examples linear_etl

YAML Configuration

Flows can also be defined in configuration files:

flow:
  entry: extract
  nodes:
    extract:
      type: illumo_flow.core.FunctionNode
      name: extract
      context:
        inputs:
          callable: examples.ops.extract
        outputs: $ctx.data.raw
    transform:
      type: illumo_flow.core.FunctionNode
      name: transform
      context:
        inputs:
          callable: examples.ops.transform
          payload: $ctx.data.raw
        outputs: $ctx.data.normalized
    load:
      type: illumo_flow.core.FunctionNode
      name: load
      context:
        inputs:
          callable: examples.ops.load
          payload: $ctx.data.normalized
        outputs: $ctx.data.persisted
  edges:
    - extract >> transform
    - transform >> load

context.inputs.callable supplies the Python callable path for each node. Literal strings are imported at build time, while expressions (e.g. $ctx.registry.my_func) are evaluated during execution.

Expressions

  • $ctx.* accesses the shared context (e.g. $ctx.data.raw). Writing ctx.* or the shorthand $.path is automatically normalized to the same form.
  • $payload.* reads from context["payloads"]
  • $joins.* reads from context["joins"]
  • $env.VAR reads environment variables
  • Template strings like "Hello {{ $ctx.user.name }}" are supported in inputs definitions
from illumo_flow import Flow

flow = Flow.from_config("./flow.yaml")
context = {}
flow.run(context)
print(context["data"]["persisted"])

Payload vs Context

  • Flow resolves each node's payload from the declared inputs.
  • Nodes return the next payload; Flow stores it under context["payloads"][node_id] and writes to the paths declared in outputs.
  • Within a node, use the context view helpers (ctx.get(...), ctx.write(...), ctx.route(...)) when shared state needs to be read or updated. Direct mutation of the underlying dictionary is discouraged.

Testing (repository clone)

pytest

The suite in tests/test_flow_examples.py validates the sample DSL flows using the src layout configured in pyproject.toml.

Documentation

Highlights

  • DSL edges such as A >> B, (A & B) >> C
  • Payload-first callable interface with a constrained context view helper
  • Routing metadata via Routing(next, confidence, reason)
  • Built-in join handling (nodes with multiple parents automatically wait for all inputs)
  • Examples covering ETL, dynamic routing, fan-out/fan-in, timeout handling, and early stop

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

illumo_flow-0.1.2.tar.gz (13.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

illumo_flow-0.1.2-py3-none-any.whl (10.2 kB view details)

Uploaded Python 3

File details

Details for the file illumo_flow-0.1.2.tar.gz.

File metadata

  • Download URL: illumo_flow-0.1.2.tar.gz
  • Upload date:
  • Size: 13.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.12

File hashes

Hashes for illumo_flow-0.1.2.tar.gz
Algorithm Hash digest
SHA256 85503d82b480130cde9b2c28b512277b10fa78407991082e53285416c90d084d
MD5 33af4fd8dcd5cd1dedc43fea8abe1b69
BLAKE2b-256 92fa0c3e0825cc6721cf207b1464e0205f4747af807f7deb71a80694a8a08578

See more details on using hashes here.

File details

Details for the file illumo_flow-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: illumo_flow-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 10.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.12

File hashes

Hashes for illumo_flow-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 0ce6ac5130802a4c826ab96121e41a2819cf1301d6f942f6a91762dda0a7aca5
MD5 cbf510949e97a06f579c374d7a637649
BLAKE2b-256 6f838aeae120628f02e2613c55af22aebfee3724bbdb103e9c14648c752e98dd

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page