Skip to main content

Manage your prompts

Project description

Pipeline development framework, easy to experiment and compare different pipelines, quick to deploy to workflow orchestration tools


Most of the current workflow orchestrators focus on executing the already-developed pipelines in production. This library focuses on the pipeline development process. It aims to make it easy to develop pipeline, and once the user reaches a good pipeline, it aims to make it easy to export to other production-grade workflow orchestrators. Notable features:

  • Manage pipeline experiment: store pipeline run outputs, compare pipelines & visualize.
  • Support pipeline as code: allow complex customization.
  • Support pipeline as configuration - suitable for plug-and-play when pipeline is more stable.
  • Fast pipeline execution, auto-cache & run from cache when necessary.
  • Allow version control of artifacts with git-lfs/dvc/god...
  • Export pipeline to compatible workflow orchestration tools (e.g. Argo workflow, Airflow, Kubeflow...).

Install

pip install theflow

Quick start

(A code walk-through of this session is stored in examples/10-minutes-quick-start.ipynb. You can run it with Google Colab (TODO - attach the link).)

Pipeline can be defined as code. You initialize all the ops in self.initialize and route them in self.run. In self.run, you associate each step with a name _ff_name, which theflow use to identify the edge in the flow graph.

from theflow import Composable

# Define some operations used inside the pipeline
# Operation 1: normal class-based Python object
class IncrementBy(Composable):

  x: int

  def run(self, y):
    return self.x + y

# Operation 2: normal Python function
def decrement_by_5(x):
  return x - 5

# Declare flow
class MathFlow(Composable):

  increment: Composable
  decrement: Composable

  def run(self, x):
    # Route the operations in the flow
    y = self.increment(x, _ff_name="increment1")   # associate _ff_name
    y = self.decrement(y, _ff_name="decrement")
    y = self.increment(y, _ff_name="increment2")
    return y

flow = MathFlow(increment=IncrementBy(x=10), decrement=decrement_by_5)

You run the pipeline by directly calling it. The output is the same object returned by self.run.

output = flow(x=5)
print(f"{output=}, {type(output)=}")      # output=5, type(output)=int

You can investigate pipeline's last run through the last_run property.

flow.last_run.id()                        # id of the last run
flow.last_run.logs()                      # list all information of each step
# [TODO] flow.last_run.visualize(path="vis.png")   # export the graph in `vis.png` file

Future features

  • Arguments management
  • Cache
    • cache by runs, organized by root task, allow reproducible
    • specify the files
    • the keys are like lru_cache, takes in the original input key, specify the cache, but the cache should be file-backed, for run-after-run execution.
    • cli command to manipulate cache
  • Compare pipeline in a result folder
  • Dynamically create reproducible config
  • Support pipeline branching and merging
  • Support single process or multi-processing pipeline running
  • Allow debugging
  • Can synchronize changes in the workflow, allowing logs from different run to be compatible with each other
  • Compare different runs
    • Same cache directory
    • Compare evaluation result based on kwargs
  • CLI List runs
  • CLI Delete unnecessary runs
  • Add coverage, pre-commit, CI...

License

MIT License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

theflow-0.2.0.tar.gz (24.3 kB view details)

Uploaded Source

Built Distribution

theflow-0.2.0-py3-none-any.whl (23.5 kB view details)

Uploaded Python 3

File details

Details for the file theflow-0.2.0.tar.gz.

File metadata

  • Download URL: theflow-0.2.0.tar.gz
  • Upload date:
  • Size: 24.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.12

File hashes

Hashes for theflow-0.2.0.tar.gz
Algorithm Hash digest
SHA256 15c7b893478e1e513cc76aa0f18346a84b314d20480b6154f462d05dabb5748e
MD5 da5198ad03ece957037fd46e18f6098e
BLAKE2b-256 4d50b87a8c69426946f272c96392d8d8e98bbc089e3003567d9610798e278f1b

See more details on using hashes here.

File details

Details for the file theflow-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: theflow-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 23.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.12

File hashes

Hashes for theflow-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 c5855df684c32faacd4c4e11f5735d658bffd28fed845ebe4d50b56eeae54432
MD5 1896edc16323f051139452aab2ed6485
BLAKE2b-256 700cba167589d239cedf4014a49a53a0829f1362d556490050b83acf46bf1630

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page