Application workflow framework
Project description
pyapp-flow
A simple application level workflow library.
Allows complex processes to be broken into smaller specific steps, greatly simplifying testing and re-use.
Installation
pip install pyapp-flow
Usage
from pathlib import Path
from typing import Sequence
import pyapp_flow as flow
# Define steps:
@flow.step(name="Load Names", output="names")
def load_names(root_path: Path) -> Sequence[str]:
"""
Read a sequence of names from a file
"""
with (root_path / "names.txt").open() as f_in:
return [name.strip() for name in f_in.readlines()]
@flow.step(name="Say hello")
def say_hi(name: str):
print(f"Hello {name}")
# Define a workflow:
great_everybody = (
flow.Workflow(name="Great everybody in names file")
.nodes(
load_names,
flow.ForEach("name", in_var="names").loop(say_hi)
)
)
# Execute workflow:
context = flow.WorkflowContext(root_path=Path())
great_everybody(context)
All nodes within the workflow follow a simple interface of:
def node_function(context: flow.WorkflowContext):
...
or using typing
NodeFunction = Callable[[flow.WorkflowContext], Any]
The step decorator simplifies definition of a step by handling loading and saving
of state variables from the WorkflowContext.
Reference
Workflow
At the basic level a workflow is an object that holds a series of nodes to be called in sequence. The workflow object also includes helper methods to generate and append the nodes defined in the Builtin Nodes section of the documentation.
Just like every node in pyApp-Flow a workflow is called with an WorkflowContext
object, this means workflows can be nested in workflows, or called from a for-each
node.
The one key aspect with a workflow object is related to context variable scope. When a workflow is triggered the context scope is copied and any changes made to the variables are discarded when the workflow ends. However, just like Python scoping only the reference to the variable is copied meaning mutable objects can be modified (eg list/dicts).
workflow = (
flow.Workflow(name="My Workflow")
.nodes(...)
)
WorkflowContext
The workflow context object holds the state of the workflow including handling variable scoping and helper methods for logging progress.
Properties
-
stateDirect access to state variables in the current scope.
-
depthCurrent scope depth
-
indentHelper that returns a string indent for use formatting messages
Methods
-
formatFormat a string using values from the context state. Most name values for nodes/workflows use this method to allow values to be included from scope eg:
context.format("Current path {working_path}")
-
push_state/pop_stateUsed to step into or out of a lower state scope. Typically these methods are not called directly but are called via using a with block eg:
with context: pass # Separate variable scope
-
Logging wrappers
Wrappers around an internal workflow logger that handle indentation to make reading the log easier.
- log
- debug
- info
- warning
- error
- exception
Builtin Nodes
Modify context variables
-
SetVarSet one or more variables into the context
flow.SetVar(my_var="foo")
-
SetGlobalVarSet one or more variables globally in the context
flow.SetGlobalVar(my_var="foo")
-
AppendAppend a value to a list in the context object (will create the list if it does not exist).
flow.Append("messages", "Operation failed to add {my_var}")
-
CaptureErrorsCapture and store any errors raised by node(s) within the capture block to a variable within the context.
flow.CaptureErrors("errors").nodes(my_flaky_step)
This node also has a
try_allargument that controls the behaviour when an
error is captured, ifTrueevery node is called even if they all raise errors, this is useful for running a number of separate tests that may fail.flow.CaptureErrors( "errors", try_all=True ).nodes( my_first_check, my_second_check, )
Provide feedback
-
LogMessageInsert a message within optional values from the context into the runtime log with an optional level.
flow.LogMessage("State of my_var is {my_var}", level=logging.INFO)
This node also has helper methods:
LogMessage.debugLogMessage.infoLogMessage.warningLogMessage.error
Branching
Branching nodes utilise a fluent interface for defining the nodes within each branch.
-
Conditional/IfAnalogous with an
ifstatement, it can accept either a context variable that can be interpreted as aboolor a function/lamba that accepts aWorkflowContextobject and returns abool.# With context variable ( If("is_successful") .true(log_message("Process successful :)")) .false(log_message("Process failed :(")) ) # With Lambda ( If(lambda context: len(context.state.errors) == 0) .true(log_message("Process successful :)")) .false(log_message("Process failed :(")) )
-
SwitchAnalogous with a
switchstatement found in many languages or with Python adictlookup with a default fallback.Like the conditional node switch can accept a context variable or a function/lambda that accepts a
WorkflowContext, except returns any hashable object.# With context variable ( Switch("my_var") .case("foo", log_message("Found foo!")) .case("bar", log_message("Found bar!")) .default(log_message("Found neither.")) ) # With Lambda ( Switch(lambda context: context.state["my_var"]) .case("foo", log_message("Found foo!")) .case("bar", log_message("Found bar!")) )
Iteration
-
ForEachAnalogous with a
forloop this node will iterate through a sequence and call each of the child nodes.All nodes within a for-each loop are in a nested context scope.
# With a single target variable ( flow.ForEach("message", in_var="messages") .loop(log_message("- {message}")) ) # With multiple target variables ( flow.ForEach("name, age", in_var="students") .loop(log_message("- {name} is {age} years old.")) )
Error Handling
-
TryExceptSimilar to the standard Python try/except/finally block.
Can handle single or multiple exceptions in a block with a finally block as a fallback.
# With a single exception ( flow.TryExcept(log_message("could this fail?")) .except_on(FileNotFoundError, log_message("Yes it did")) ) # With multiple exceptions ( flow.TryExcept(log_message("could this fail?")) .except_on([FileNotFoundError, TimeoutError], log_message("Yes it did")) .except_on(RuntimeError, log_message("Another failure reason")) ) # Finally ( flow.TryExcept(log_message("could this fail?")) .and_finally(log_message("Always do this")) )
-
TryUntilTry a set of nodes until one of them does not raise an exception.
( TryUntil() .nodes( resolve_state_a, resolve_state_b, ) .default( fallback_state, ) )
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pyapp_flow-0.20.2.tar.gz.
File metadata
- Download URL: pyapp_flow-0.20.2.tar.gz
- Upload date:
- Size: 36.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.9.22
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bfa4ef9911599454beb79ad4779fe32cd970c671a4b803539e817afbd054b959
|
|
| MD5 |
fb7a83f30f8e035910e05089c07dd9d5
|
|
| BLAKE2b-256 |
eaef037cdced0d8f5a0f558078bb2e52cda35ad0d644f5a6cad6ddd142173d8d
|
File details
Details for the file pyapp_flow-0.20.2-py3-none-any.whl.
File metadata
- Download URL: pyapp_flow-0.20.2-py3-none-any.whl
- Upload date:
- Size: 30.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.9.22
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9f79bd5cd0f7591276f62a41eab10528dbc7938a8c685db662f3ff745c72201f
|
|
| MD5 |
4d6904a2b7e6685b13d7281c2961252c
|
|
| BLAKE2b-256 |
9ca337fae0f6997e25ffb881143f34a0975bfe64ac17a0bb503bcebf3e009a3b
|