Skip to main content

Application workflow framework

Project description

pyapp-flow

A simple application level workflow library.

Allows complex processes to be broken into smaller specific steps, greatly simplifying testing and re-use.

Installation

pip install pyapp-flow

Usage

from pathlib import Path
from typing import Sequence
import pyapp_flow as flow

# Define steps:

@flow.step(name="Load Names", output="names")
def load_names(root_path: Path) -> Sequence[str]:
    """
    Read a sequence of names from a file
    """
    with (root_path / "names.txt").open() as f_in:
        return [name.strip() for name in f_in.readlines()]


@flow.step(name="Say hello")
def say_hi(name: str):
    print(f"Hello {name}")


# Define a workflow:

great_everybody = (
    flow.Workflow(name="Great everybody in names file")
    .nodes(load_names)
    .foreach("name", "names", say_hi)
)


# Execute workflow:

context = flow.WorkflowContext(root_path=Path())
great_everybody(context)

All nodes within the workflow follow a simple interface of:

def node_function(context: WorkflowContext):
    ...

or using typing

NodeFunction = Callable[[WorkflowContext], Any]

The step decorator simplifies definition of a step by handling loading and saving of state variables from the WorkflowContext.

Reference

Workflow

At the basic level a workflow is an object that holds a series of nodes to be called in sequence. The workflow object also includes helper methods to generate and append the nodes defined in the Builtin Nodes section of the documentation.

Just like every node in pyApp-Flow a workflow is called with an WorkflowContext object, this means workflows can be nested in workflows, or called from a for-each node.

The one key aspect with a workflow object is related to context variable scope. When a workflow is triggered the context scope is copied and any changes made to the variables are discarded when the workflow ends. However, just like Python scoping only the reference to the variable is copied meaning mutable objects can be modified (eg list/dicts).

workflow = (
    Workflow(name="My Workflow")
    .nodes(...)
)

WorkflowContext

The workflow context object holds the state of the workflow including handling variable scoping and helper methods for logging progress.

Properties

  • state

    Direct access to state variables in the current scope.

  • depth

    Current scope depth

  • indent

    Helper that returns a string indent for use formatting messages

Methods

  • format

    Format a string using values from the context state. Most name values for nodes/workflows use this method to allow values to be included from scope eg:

    context.format("Current path {working_path}")
    
  • push_state/pop_state

    Used to step into or out of a lower state scope. Typically these methods are not called directly but are called via using a with block eg:

    with context:
        pass  # Separate variable scope 
    
  • Logging wrappers

    Wrappers around an internal workflow logger that handle indentation to make reading the log easier.

    • log
    • debug
    • info
    • warning
    • error
    • exception

Builtin Nodes

Modify context variables

  • SetVar

    Set one or more variables into the context

    SetVar(my_var="foo")
    
  • Append

    Append a value to a list in the context object (will create the list if it does not exist).

    Append("messages", "Operation failed to add {my_var}")
    
  • CaptureErrors

    Capture and store any errors raised by node(s) within the capture block to a variable within the context.

    CaptureErrors("errors", my_flaky_step)
    

    This node also has a try_all argument that controls the behaviour when an
    error is captured, if True every node is called even if they all raise errors, this is useful for running a number of separate tests that may fail.

    CaptureErrors(
        "errors", 
        my_first_check, 
        my_second_check, 
        try_all=True
    )
    

Provide feedback

  • LogMessage

    Insert a message within optional values from the context into the runtime log with an optional level.

    LogMessage("State of my_var is {my_var}", level=logging.INFO)
    

Branching

Branching nodes utilise a fluent interface for defining the nodes within each branch.

  • Conditional / If

    Analogous with an if statement, it can accept either a context variable that can be interpreted as a bool or a function/lamba that accepts a WorkflowContext object and returns a bool.

    # With context variable
    (
        If("is_successful")
        .true(log_message("Process successful :)"))
        .false(log_message("Process failed :("))
    )
    
    # With Lambda
    (
        If(lambda context: len(context.state.errors) == 0)
        .true(log_message("Process successful :)"))
        .false(log_message("Process failed :("))
    )
    
  • Switch

    Analogous with a switch statement found in many languages or with Python a dict lookup with a default fallback.

    Like the conditional node switch can accept a context variable or a function/lambda that accepts a WorkflowContext, except returns any hashable object.

    # With context variable
    (
        Switch("my_var")
        .case("foo", log_message("Found foo!"))
        .case("bar", log_message("Found bar!"))
        .default(log_message("Found neither."))
    )
    
    # With Lambda
    (
        Switch(lambda context: context.state["my_var"])
        .case("foo", log_message("Found foo!"))
        .case("bar", log_message("Found bar!"))
    )
    

Iteration

  • ForEach

    Analogous with a for loop this node will iterate through a sequence and call each of the child nodes.

    All nodes within a for-each loop are in a nested context scope.

    # With a single target variable
    (
        ForEach("message", in_var="messages")
        .loop(log_message("- {message}"))
    )
    
    # With multiple target variables
    (
        ForEach("name, age", in_var="students")
        .loop(log_message("- {name} is {age} years old."))
    )
    

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyapp-flow-0.3.0.tar.gz (15.5 kB view details)

Uploaded Source

Built Distribution

pyapp_flow-0.3.0-py3-none-any.whl (11.6 kB view details)

Uploaded Python 3

File details

Details for the file pyapp-flow-0.3.0.tar.gz.

File metadata

  • Download URL: pyapp-flow-0.3.0.tar.gz
  • Upload date:
  • Size: 15.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.14 CPython/3.10.2 Linux/5.13.0-1031-azure

File hashes

Hashes for pyapp-flow-0.3.0.tar.gz
Algorithm Hash digest
SHA256 4b4c9705d7e60071577d8a6da0a93a4f8b5ea711c6d8c786c1f30c8193a2636b
MD5 13a4278d3955508db53faa92617b5efe
BLAKE2b-256 dad363df5b09abadc8ef97ead6f93bf813fb808c02e033a42645510a4c3c9e14

See more details on using hashes here.

File details

Details for the file pyapp_flow-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: pyapp_flow-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 11.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.14 CPython/3.10.2 Linux/5.13.0-1031-azure

File hashes

Hashes for pyapp_flow-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 24f10bebb96a49d3198f3d5968c9afbb9e4ea2e3726a80e9f1c91500d106e757
MD5 3108338483b69e2ecb5b2700ad2f81a2
BLAKE2b-256 69766fcd2953fa5070c0bde236d16817ae4aee88c188211f20716f78e074dfa0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page