Skip to main content

Minimal workflows

Project description

marque

marque (mark) is a minimal library for building workflows and pipelines in python. It has basic primitives for storing long-term data (keep()/recall()), passing runtime context (put()/get()), and dynamically extending the flow as it's executing (push()).

  • Flow: Defines the workflow and orchestrates it
  • Context: Runtime object storage for flexible dependencies
  • Step: Individual function in the workflow
  • Scope: A storage object provided to each step for artifacts/tagging
    • Tag: A str, float | None pair you can use to label a scope
    • Artifact: Arbitrary data you want to keep and recall later

The output of a flow is essentially a set of Scope objects which represent all the important data generated during the workflow run. You can use the search() function to find prior scopes and use their data to prepare new steps.

Here is a basic example:

from random import Random

from marque import Flow, repeat
from marque.storage import PolarsStorage


def add(flow: Flow):
    a, b = flow.get(int, ["a", "b"])            # pull values from the current context
    flow.tag(f"{a} + {b}")                      # add tags for faster analytics/filtering
    flow.keep("data", {"answer": a + b})        # persist data to recall later

def sub(flow: Flow):
    a, b = flow.get(int, ["a", "b"])
    flow.tag(f"{a} - {b}")
    flow.keep("data", {"answer": a - b})

def simple_math(flow: Flow):
    random = flow.get(Random)           # pull context values using only types

    a = random.randint(10, 100)
    b = random.randint(10, 100)

    flow.push(
        random.choice([add, sub]),      # extend the workflow with a new step
        a=a, b=b                        # pass required context values
    )

def inspect_add(flow: Flow):
    for scope in flow.search(                       # search prior scopes and filter
        "*",                                        # glob syntax is supported
        where=lambda tag: "+" in tag.content
    ):
        for tag in scope.tags:
            flow.log(tag.content)

def inspect_sub(flow: Flow):
    for scope in flow.search(sub):                  # or just pass the function
        flow.log(scope.recall("data")["answer"])    # and pull data from the scope

flow = (
    Flow("test", PolarsStorage("test.parquet"))     # persist data to a parquet file
    .fail_fast()                                    # don't ignore errors
    .put(random=Random(42))                         # prepare a seeded random generator
    .push(repeat(simple_math, 5))                   # add 5 steps for simple_math
    .push([inspect_add, inspect_sub])               # run after everything else
)

flow()                 # Execute our flow

print(flow.run)        # The run id (2-part slug)
print(flow.logs)       # list of any logs
print(flow.scopes)     # list of all scopes
print(flow.errors)     # any errors if we don't fail_fast

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

marque-0.1.1.tar.gz (10.8 kB view details)

Uploaded Source

Built Distribution

marque-0.1.1-py3-none-any.whl (11.5 kB view details)

Uploaded Python 3

File details

Details for the file marque-0.1.1.tar.gz.

File metadata

  • Download URL: marque-0.1.1.tar.gz
  • Upload date:
  • Size: 10.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.0.0 CPython/3.12.2

File hashes

Hashes for marque-0.1.1.tar.gz
Algorithm Hash digest
SHA256 bee34aa2d22a874cf7f0a1e14ab58a632746cd03d848d142999f16a075c57e49
MD5 772cd345efcdd1e4baa10aa5bab240d3
BLAKE2b-256 40175f4bc1a3fdb3cb071833af7d87cd198106fab2a8da948f35a5ae5d7c3ee2

See more details on using hashes here.

File details

Details for the file marque-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: marque-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 11.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.0.0 CPython/3.12.2

File hashes

Hashes for marque-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 f97d0bfb744c96c810627b3f2d88403b5313a6dc06a97e5478cd1b319dd2f99d
MD5 03c6e88868e7f6a05b182368691b9196
BLAKE2b-256 57c7c8a9450b05eab01a1272ee4538a7fc5ae80a3742548a6249bbe576e7b1df

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page