A Dynamic Workflow Execution Tool for Python 🚀
Project description
DynaFlow
DynaFlow is a flexible library for executing JSON-defined workflows. Its supports dynamic state management, error handling, and data manipulation (with JSONPath), making it ideal for orchestrating complex workflows.
DynaFlow lib is ASL-based, so it shares most of its definitions. For more information about ASL, you can follow the links in the References section.
Features
- State-Driven Execution: Includes core states like
Task,Choice,Parallel,Mapand more. - Dynamic Function Handling: Integrate with custom function registries for seamless execution.
- Error Handling: Built-in support for
RetryandCatchrules. - Data Transformation: Apply trasnformations at every stage using
InputPath,Parameters,ResultPath, and more with JSONPath. - Nested Flows: Recursive execution for
ParallelandMapstates. - JSON Validation: Ensures flow definitions adhere to a strict schema.
Installation
Requirements
- Python 3.9 or higher
- pip, pipeenv or poetry
Install with pip
pip install py-dynaflow
Install from source
git clone https://github.com/sgg10/dynaflow.git
cd dynaflow
pip install -e .
Quick Start
Defining a Workflow
{
"StartAt": "GetAges",
"States": {
"GetAges": {
"Type": "Pass",
"InputPath": "$",
"Parameters": {"ages.$": "$.users[*].age"},
"ResultPath": "$",
"OutputPath": "$.ages",
"End": true,
}
}
}
Running a Workflow
from dynaflow import DynaFlow
flow = {
"StartAt": "GetAges",
"States": {
"GetAges": {
"Type": "Pass",
"InputPath": "$",
"Parameters": {"ages.$": "$.users[*].age"},
"ResultPath": "$",
"OutputPath": "$.ages",
"End": True,
}
}
}
executor = DynaFlow(
flow,
function_database={},
search_function=lambda db, params: db[params["Name"]]
)
result = executor.run({"users": [{"age": 20, "name": "Alice"}, {"age": 30, "name": "Bob"}]})
print(result) # -> [20, 30]
Examples
1. Use a custom functions
To execute flows that require functions, you can create a simple function catalog with the help of a dictionary. You must also define a search function that will search for the function in the catalog based on the parameters provided in the definition of the Task state.
from dynaflow import DynaFlow
def get_ages(user_list: list) -> list[int]:
return [user["age"] for user in user_list]
function_catalog = {"get_ages": get_ages}
def search_function(db, params):
return db[params["Name"]]
flow = {
"StartAt": "GetAges",
"States": {
"GetAges": {
"Type": "Task",
"Function": {"Name": "get_ages"},
"Parameters": {"user_list.$": "$.users"},
"ResultPath": "$.result",
"End": True,
}
},
}
executor = DynaFlow(
flow, function_database=function_catalog, search_function=search_function
)
result = executor.run(
{"users": [{"age": 20, "name": "Alice"}, {"age": 30, "name": "Bob"}]}
)
print(result["result"]) # -> [20, 30]
2. Use Function Registry Library to easily manage the function catalog.
To easily manage a catalog of functions for the execution of your flows, you can install and use the [FunctionRegistry] library (https://github.com/sgg10/function-registry).
pip install function_registry
from dynaflow import DynaFlow
from function_registry import FunctionRegistry
# Create a function catalog
fc = FunctionRegistry()
@fc.save_version("get_ages", 1)
def get_ages(user_list: list) -> list[int]:
return [user["age"] for user in user_list]
@fc.save_version("get_ages", 2)
def get_ages(user_list: list) -> list[int]:
return [user["age"] for user in user_list if user["age"] > 18]
def search_function(db, params):
return db.get_version(params["Name"], params["Version"])["function"]
flow = {
"StartAt": "GetAges",
"States": {
"GetAges": {
"Type": "Task",
"Function": {"Name": "get_ages", "Version": 2},
"Parameters": {"user_list.$": "$.users"},
"ResultPath": "$.result_v2",
"Next": "GetAgesV1",
},
"GetAgesV1": {
"Type": "Task",
"Function": {"Name": "get_ages", "Version": 1},
"Parameters": {"user_list.$": "$.users"},
"ResultPath": "$.result_v1",
"End": True,
},
},
}
executor = DynaFlow(
flow, function_database=fc, search_function=search_function
)
result = executor.run(
{"users": [{"age": 20, "name": "Alice"}, {"age": 30, "name": "Bob"}, {"age": 15, "name": "Charlie"}]}
)
print(result["result_v1"]) # -> [20, 30, 15]
print(result["result_v2"]) # -> [20, 30]
Testing
To run the unit tests, execute the following command from the project root directory:
pytest tests
Contributing
All contributions to improve Function Registry are welcome! To contribute, follow these steps:
1. Fork the repository
git clone https://github.com/sgg10/dynaflow.git
cd dynaflow
2. Create a new branch for your changes:
git checkout -b feature/my-feature
3. Make changes and test them locally.
4. Submit a pull request: Open a pull request describing your changes.
For bug reports or feature requests, please visit the issues page
Documentation
For detailed information, visit the documentation. Which includes:
- Usage Guide: Step-by-step examples and use cases.
- API Reference: Technical details for each module and class.
References
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file py_dynaflow-1.0.0.tar.gz.
File metadata
- Download URL: py_dynaflow-1.0.0.tar.gz
- Upload date:
- Size: 26.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.9.18
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b8e7818a0913470bb93ed3227e64018965a4140181f2d1eef2645be13c66c4bc
|
|
| MD5 |
d41fdd79e46fabfbe9e5e12c18396ae8
|
|
| BLAKE2b-256 |
0b41ba063cece62482f793a8f6ac9eb2c0f718f9a247e0d94963425ac216a7e7
|
File details
Details for the file py_dynaflow-1.0.0-py3-none-any.whl.
File metadata
- Download URL: py_dynaflow-1.0.0-py3-none-any.whl
- Upload date:
- Size: 37.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.9.18
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f3c30acee60c3f8ef100f9a0dd8c45962b0f4252007d57ad97e805222ebe8a79
|
|
| MD5 |
57b0401f8b4271134bc5714000ae0beb
|
|
| BLAKE2b-256 |
1b612254413f403c89b810fdc41b3439526e136385488c34a88070a54befe958
|