Skip to main content

Workflow orchestration primitives for illumo demo

Project description

illumo-flow

Workflow orchestration primitives featuring declarative DSL wiring, routing control, and fail-fast execution.

Installation

pip install illumo-flow

Quick Example

from illumo_flow import Flow, FunctionNode

# Define lightweight callables (payload-first; shared context access is opt-in)
def extract(payload):
    return {"customer_id": 42, "source": "demo"}

def transform(payload):
    return {**payload, "normalized": True}

def load(payload):
    return f"stored:{payload['customer_id']}"

nodes = {
    "extract": FunctionNode(extract, name="extract", outputs="$ctx.data.raw"),
    "transform": FunctionNode(
        transform,
        name="transform",
        inputs="$ctx.data.raw",
        outputs="$ctx.data.normalized",
    ),
    "load": FunctionNode(
        load,
        name="load",
        inputs="$ctx.data.normalized",
        outputs="$ctx.data.persisted",
    ),
}

flow = Flow.from_dsl(
    nodes=nodes,
    entry="extract",
    edges=["extract >> transform", "transform >> load"],
)

context = {}
flow.run(context)
print(context["data"]["persisted"])  # stored:42

# Flow.run returns the mutated context; per-node outputs remain
# available under `context["payloads"]`.

Examples & CLI

The GitHub repository ships reference examples and a CLI (e.g. python -m examples linear_etl). Clone the repo if you want to explore them locally:

git clone https://github.com/kitfactory/illumo-flow.git
cd illumo-flow
python -m examples linear_etl

YAML Configuration

Flows can also be defined in configuration files:

flow:
  entry: extract
  nodes:
    extract:
      type: illumo_flow.core.FunctionNode
      name: extract
      context:
        inputs:
          callable: examples.ops.extract
        outputs: $ctx.data.raw
    transform:
      type: illumo_flow.core.FunctionNode
      name: transform
      context:
        inputs:
          callable: examples.ops.transform
          payload: $ctx.data.raw
        outputs: $ctx.data.normalized
    load:
      type: illumo_flow.core.FunctionNode
      name: load
      context:
        inputs:
          callable: examples.ops.load
          payload: $ctx.data.normalized
        outputs: $ctx.data.persisted
  edges:
    - extract >> transform
    - transform >> load

context.inputs.callable supplies the Python callable path for each node. Literal strings are imported at build time, while expressions (e.g. $ctx.registry.my_func) are evaluated during execution.

Expressions

  • $ctx.* accesses the shared context (e.g. $ctx.data.raw). Writing ctx.* or the shorthand $.path is automatically normalized to the same form.
  • $payload.* reads from context["payloads"]
  • $joins.* reads from context["joins"]
  • $env.VAR reads environment variables
  • Template strings like "Hello {{ $ctx.user.name }}" are supported in inputs definitions
from illumo_flow import Flow

flow = Flow.from_config("./flow.yaml")
context = {}
flow.run(context)
print(context["data"]["persisted"])

Payload vs Context

  • Flow resolves each node's payload from the declared inputs.
  • Nodes return the next payload; Flow stores it under context["payloads"][node_id] and writes to the paths declared in outputs.
  • Treat the payload as the primary contract. Only nodes created with allow_context_access=True can reach the shared dictionary (e.g., for metrics via context.setdefault("metrics", {})); everyone else operates purely on payloads.

Branching

  • To route dynamically, return a dictionary mapping successor identifiers to payloads (e.g. {"approve": payload}). Only the listed successors are executed.
  • Returning an empty dictionary {} stops downstream execution; returning multiple keys broadcasts to all corresponding successors.

Testing (repository clone)

pytest

The suite in tests/test_flow_examples.py validates the sample DSL flows using the src layout configured in pyproject.toml.

Documentation

Highlights

  • DSL edges such as A >> B, (A & B) >> C
  • Payload-first callable interface(allow_context_access=True を指定したノードのみ共有コンテキストへ明示アクセス)
  • Branching via returned mappings (e.g. {successor: payload})
  • Built-in join handling (nodes with multiple parents automatically wait for all inputs)
  • Examples covering ETL, dynamic routing, fan-out/fan-in, timeout handling, and early stop

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

illumo_flow-0.1.3.tar.gz (13.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

illumo_flow-0.1.3-py3-none-any.whl (10.6 kB view details)

Uploaded Python 3

File details

Details for the file illumo_flow-0.1.3.tar.gz.

File metadata

  • Download URL: illumo_flow-0.1.3.tar.gz
  • Upload date:
  • Size: 13.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.12

File hashes

Hashes for illumo_flow-0.1.3.tar.gz
Algorithm Hash digest
SHA256 cba5ff138404d74c56b09a2baf5e38791deb8bf88f95dc36f6504ba8bf7c6f17
MD5 aecd0522c61b127215411b2c94b7fd59
BLAKE2b-256 3178cd08b1ddc83dc9e2cccb7126039821b4c68d13c90d1c845f29f3dfbd75de

See more details on using hashes here.

File details

Details for the file illumo_flow-0.1.3-py3-none-any.whl.

File metadata

  • Download URL: illumo_flow-0.1.3-py3-none-any.whl
  • Upload date:
  • Size: 10.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.12

File hashes

Hashes for illumo_flow-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 5ba5cb27f8612a8cd863a98ba69c8d4bd2cbedfc865d58d0cf39030cb561b064
MD5 833c9faaddbacb3f7454fc6d73ede684
BLAKE2b-256 a718b4bfb1f48de2ca6121a3da715992bda0cd4d5795b576f346f6c848af8fd1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page