Skip to main content

A Dynamic Workflow Execution Tool for Python 🚀

Project description

DynaFlow

DynaFlow is a flexible library for executing JSON-defined workflows. Its supports dynamic state management, error handling, and data manipulation (with JSONPath), making it ideal for orchestrating complex workflows.

DynaFlow lib is ASL-based, so it shares most of its definitions. For more information about ASL, you can follow the links in the References section.

Features

  • State-Driven Execution: Includes core states like Task, Choice, Parallel, Map and more.
  • Dynamic Function Handling: Integrate with custom function registries for seamless execution.
  • Error Handling: Built-in support for Retry and Catch rules.
  • Data Transformation: Apply trasnformations at every stage using InputPath, Parameters, ResultPath, and more with JSONPath.
  • Nested Flows: Recursive execution for Parallel and Map states.
  • JSON Validation: Ensures flow definitions adhere to a strict schema.

Installation

Requirements

  • Python 3.9 or higher
  • pip, pipeenv or poetry

Install with pip

pip install py-dynaflow

Install from source

git clone https://github.com/sgg10/dynaflow.git
cd dynaflow
pip install -e .

Quick Start

Defining a Workflow

{
    "StartAt": "GetAges",
    "States": {
        "GetAges": {
            "Type": "Pass",
            "InputPath": "$",
            "Parameters": {"ages.$": "$.users[*].age"},
            "ResultPath": "$",
            "OutputPath": "$.ages",
            "End": true,
        }
    }
}

Running a Workflow

from dynaflow import DynaFlow

flow = {
    "StartAt": "GetAges",
    "States": {
        "GetAges": {
            "Type": "Pass",
            "InputPath": "$",
            "Parameters": {"ages.$": "$.users[*].age"},
            "ResultPath": "$",
            "OutputPath": "$.ages",
            "End": True,
        }
    }
}

executor = DynaFlow(
    flow,
    function_database={},
    search_function=lambda db, params: db[params["Name"]]
)
result = executor.run({"users": [{"age": 20, "name": "Alice"}, {"age": 30, "name": "Bob"}]})
print(result) # -> [20, 30]

Examples

1. Use a custom functions

To execute flows that require functions, you can create a simple function catalog with the help of a dictionary. You must also define a search function that will search for the function in the catalog based on the parameters provided in the definition of the Task state.

from dynaflow import DynaFlow


def get_ages(user_list: list) -> list[int]:
    return [user["age"] for user in user_list]


function_catalog = {"get_ages": get_ages}


def search_function(db, params):
    return db[params["Name"]]


flow = {
    "StartAt": "GetAges",
    "States": {
        "GetAges": {
            "Type": "Task",
            "Function": {"Name": "get_ages"},
            "Parameters": {"user_list.$": "$.users"},
            "ResultPath": "$.result",
            "End": True,
        }
    },
}

executor = DynaFlow(
    flow, function_database=function_catalog, search_function=search_function
)
result = executor.run(
    {"users": [{"age": 20, "name": "Alice"}, {"age": 30, "name": "Bob"}]}
)

print(result["result"])  # -> [20, 30]

2. Use Function Registry Library to easily manage the function catalog.

To easily manage a catalog of functions for the execution of your flows, you can install and use the [FunctionRegistry] library (https://github.com/sgg10/function-registry).

pip install function_registry
from dynaflow import DynaFlow
from function_registry import FunctionRegistry


# Create a function catalog
fc = FunctionRegistry()

@fc.save_version("get_ages", 1)
def get_ages(user_list: list) -> list[int]:
    return [user["age"] for user in user_list]

@fc.save_version("get_ages", 2)
def get_ages(user_list: list) -> list[int]:
    return [user["age"] for user in user_list if user["age"] > 18]


def search_function(db, params):
    return db.get_version(params["Name"], params["Version"])["function"]

flow = {
    "StartAt": "GetAges",
    "States": {
        "GetAges": {
            "Type": "Task",
            "Function": {"Name": "get_ages", "Version": 2},
            "Parameters": {"user_list.$": "$.users"},
            "ResultPath": "$.result_v2",
            "Next": "GetAgesV1",
        },
        "GetAgesV1": {
            "Type": "Task",
            "Function": {"Name": "get_ages", "Version": 1},
            "Parameters": {"user_list.$": "$.users"},
            "ResultPath": "$.result_v1",
            "End": True,
        },
    },
}

executor = DynaFlow(
    flow, function_database=fc, search_function=search_function
)

result = executor.run(
    {"users": [{"age": 20, "name": "Alice"}, {"age": 30, "name": "Bob"}, {"age": 15, "name": "Charlie"}]}
)

print(result["result_v1"])  # -> [20, 30, 15]
print(result["result_v2"])  # -> [20, 30]

Testing

To run the unit tests, execute the following command from the project root directory:

pytest tests

Contributing

All contributions to improve Function Registry are welcome! To contribute, follow these steps:

1. Fork the repository

git clone https://github.com/sgg10/dynaflow.git
cd dynaflow

2. Create a new branch for your changes:

git checkout -b feature/my-feature

3. Make changes and test them locally.

4. Submit a pull request: Open a pull request describing your changes.

For bug reports or feature requests, please visit the issues page

Documentation

For detailed information, visit the documentation. Which includes:

References

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

py_dynaflow-1.0.0.tar.gz (26.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

py_dynaflow-1.0.0-py3-none-any.whl (37.1 kB view details)

Uploaded Python 3

File details

Details for the file py_dynaflow-1.0.0.tar.gz.

File metadata

  • Download URL: py_dynaflow-1.0.0.tar.gz
  • Upload date:
  • Size: 26.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.9.18

File hashes

Hashes for py_dynaflow-1.0.0.tar.gz
Algorithm Hash digest
SHA256 b8e7818a0913470bb93ed3227e64018965a4140181f2d1eef2645be13c66c4bc
MD5 d41fdd79e46fabfbe9e5e12c18396ae8
BLAKE2b-256 0b41ba063cece62482f793a8f6ac9eb2c0f718f9a247e0d94963425ac216a7e7

See more details on using hashes here.

File details

Details for the file py_dynaflow-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: py_dynaflow-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 37.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.9.18

File hashes

Hashes for py_dynaflow-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f3c30acee60c3f8ef100f9a0dd8c45962b0f4252007d57ad97e805222ebe8a79
MD5 57b0401f8b4271134bc5714000ae0beb
BLAKE2b-256 1b612254413f403c89b810fdc41b3439526e136385488c34a88070a54befe958

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page