A simple framework to build and run flows
Project description
Pipeline development framework, easy to experiment and compare different pipelines, quick to deploy to workflow orchestration tools
Most of the current workflow orchestrators focus on executing the already-developed pipelines in production. This library focuses on the pipeline development process. It aims to make it easy to develop pipeline, and once the user reaches a good pipeline, it aims to make it easy to export to other production-grade workflow orchestrators. Notable features:
- Manage pipeline experiment: store pipeline run outputs, compare pipelines & visualize.
- Support pipeline as code: allow complex customization.
- Support pipeline as configuration - suitable for plug-and-play when pipeline is more stable.
- Fast pipeline execution, auto-cache & run from cache when necessary.
- Allow version control of artifacts with git-lfs/dvc/god...
- Export pipeline to compatible workflow orchestration tools (e.g. Argo workflow, Airflow, Kubeflow...).
Install
pip install theflow
Quick start
(A code walk-through of this session is stored in examples/10-minutes-quick-start.ipynb
. You can run it with Google Colab (TODO - attach the link).)
Pipeline can be defined as code. You initialize all the ops in self.initialize
and route them in self.run
.
from theflow import Compose
# Define some operations used inside the pipeline
# Operation 1: normal class-based Python object
class IncrementBy(Compose):
x: int
def run(self, y):
return self.x + y
# Operation 2: normal Python function
def decrement_by_5(x):
return x - 5
# Declare flow
class MathFlow(Compose):
increment: Compose
decrement: Compose
def run(self, x):
# Route the operations in the flow
y = self.increment(x)
y = self.decrement(y)
y = self.increment(y)
return y
flow = MathFlow(increment=IncrementBy(x=10), decrement=decrement_by_5)
You run the pipeline by directly calling it. The output is the same object returned by self.run
.
output = flow(x=5)
print(f"{output=}, {type(output)=}") # output=5, type(output)=int
You can investigate pipeline's last run through the last_run
property.
flow.last_run.id() # id of the last run
flow.last_run.logs() # list all information of each step
# [TODO] flow.last_run.visualize(path="vis.png") # export the graph in `vis.png` file
Future features
- Arguments management
- Cache
- cache by runs, organized by root task, allow reproducible
- specify the files
- the keys are like
lru_cache
, takes in the original input key, specify the cache, but the cache should be file-backed, for run-after-run execution. - cli command to manipulate cache
- Compare pipeline in a result folder
- Dynamically create reproducible config
- Support pipeline branching and merging
- Support single process or multi-processing pipeline running
- Can synchronize changes in the workflow, allowing logs from different run to be compatible with each other
- Compare different runs
- Same cache directory
- Compare evaluation result based on kwargs
- CLI List runs
- CLI Delete unnecessary runs
- Add coverage, pre-commit, CI...
License
MIT License.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file theflow-0.4.9.tar.gz
.
File metadata
- Download URL: theflow-0.4.9.tar.gz
- Upload date:
- Size: 38.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | d19fa8eec6d2905d8c6584fd01fc72e9b9f98a7984e6512bdc7e08d9fc6c0335 |
|
MD5 | f8eed66815cc71d9cd35d985ad258b6a |
|
BLAKE2b-256 | 5960ee8892ef0764c64a6ac6d599b938594c3cc6ad36a0e4d757e924fb7362d7 |
File details
Details for the file theflow-0.4.9-py3-none-any.whl
.
File metadata
- Download URL: theflow-0.4.9-py3-none-any.whl
- Upload date:
- Size: 37.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1fed30cdf6d6f2f50952ea24387e8aadfc5110243ac76186492b4ee099afc77b |
|
MD5 | 766fcef2c451ed9e5bf4f3e55c4817f2 |
|
BLAKE2b-256 | 57b2299c9e0865589c0a109b16b0dd92c2ba0ba95ee31c8f69fbe732772222a3 |