Yet another ETL pipeline runner for python, using multiprocessing and directed acyclic graphs.
Project description
PypeRunner is an easy to use yet powerful workflow pipeline tool written in pure python, with parallel processing of tasks, smart caching of results and reproducible runs. PypeRunner allows for the creation of complex workflows with branching and merging of several concurrent execution flows.
Features
Easy to use extract, transform load (ETL) pipeline tool
Reproducible pipeline runs for data science / machine learning tasks
Easy creation of pipelines using functional API chaining (see below)
Parallel processing of steps
Caching of previously run steps to speed up processing
Re-run of steps (and all subsequent steps) when parameters are changed
Save & read pipelines to/from yaml
Graphical output of pipeline run
Installation
Install from conda forge via
conda install pyperunner -c conda-forge
Install from pip via
pip install pyperunner
Or from source via
git clone https://github.com/glichtner/pyperunner.git
cd pyperunner
python setup.py install
Quickstart
Pyperunner has three basic components:
Task: Definition of the work to do (Python classes or functions)
Pipeline: A collection of tasks that are connected in a directed fashion
Runner: The executor of a pipeline
Hello world example
from pyperunner import Runner, Pipeline, task
@task("Hello", receives_input=False)
def hello():
print("in hello()")
return "Hello"
@task("World")
def world(data):
hello = data["Hello()"]
print("in world()")
return f"{hello} world"
# instantiate and connect tasks
hello = hello()
world = world()(hello)
# create pipeline and set root element
pipeline = Pipeline("hello-world-example", [hello])
# print a summary of the pipeline
pipeline.summary()
# run pipeline
runner = Runner(data_path="data/", log_path="log/")
runner.run(pipeline)
# get pipeline results object from the pipeline that was just run
results = runner.results()
# show the results
for task_name in results:
print(f"Output of task '{task_name}' was '{results[task_name]}'")
Running this script outputs the following:
~/pyperunner/examples$ python hello-world-func.py
+---------+
| Hello() |
+---------+
*
*
*
+---------+
| World() |
+---------+
2021-01-03 20:55:47 INFO MainProcess root Storing pipeline parameters in examples/log/hello-world-example_210103T205547/pipeline.yaml
2021-01-03 20:55:47 INFO MainProcess root Storing pipeline data in examples/data
2021-01-03 20:55:47 INFO Process-1 Hello() Starting
2021-01-03 20:55:47 INFO Process-1 Hello() in hello()
2021-01-03 20:55:47 INFO Process-1 Hello() Finished: Status.SUCCESS
2021-01-03 20:55:47 INFO Process-2 World() Starting
2021-01-03 20:55:47 INFO Process-2 World() in world()
2021-01-03 20:55:47 INFO Process-2 World() Finished: Status.SUCCESS
2021-01-03 20:55:47 INFO MainProcess root Pipeline run finished
Output of task 'Hello()' was 'Hello'
Output of task 'World()' was 'Hello world'
Note that if you re-run the script, pyperunner will detect that the current configuration has already run and will skip the execution of these tasks:
~/pyperunner/examples$ python hello-world.py
2021-01-03 20:56:36 INFO MainProcess root No need to execute task "Hello()", skipping it
2021-01-03 20:56:36 INFO MainProcess root No need to execute task "World()", skipping it
2021-01-03 20:56:36 INFO MainProcess root Pipeline run finished
If you need to reprocess outputs, just add the force_reload=True parameter to the pipeline run:
runner.run(pipeline, force_reload=True)
Or to run just a specific task again, use the reload=True parameter when initializing the task:
# instantiate and connect tasks
hello = hello()
world = world(reload=True)(hello)
Note that pyperunner detects which tasks it must re-execute: All depending tasks of a reloaded task are automatically re-executed, and only those tasks are fully skipped from execution from which the output is not required in a successor task. Also, if a task has been previously executed and its output is required, it is read from disk.
~/pyperunner/examples$ python hello-world.py
2021-01-03 20:57:26 INFO Process-1 Hello() Starting
2021-01-03 20:57:26 INFO Process-1 Hello() Loading output from disk, skipping processing
2021-01-03 20:57:26 INFO Process-1 Hello() Finished: Status.SUCCESS
2021-01-03 20:57:26 INFO Process-2 World() Starting
2021-01-03 20:57:26 INFO Process-2 World() in world()
2021-01-03 20:57:26 INFO Process-2 World() Finished: Status.SUCCESS
2021-01-03 20:57:26 INFO MainProcess root Pipeline run finished
At each run, the pipeline is automatically stored in a yaml file in the log path to ensure reproducibility:
pipeline:
name: hello-world-example
tasks:
Hello():
hash: 22179f3afd85ab64dd32c63bc21a9eb4
module: __main__
name: Hello
params: {}
parents: []
tag: ''
World():
hash: f7d904856f2aa4fda20e05521298397f
module: __main__
name: World
params: {}
parents:
- Hello()
tag: ''
Additionally, a graphical representation of the run is saved in the log path:
Documentation
The API Reference provides API-level documentation.
Examples
Look in the examples/ directory for some example scripts.
Multiple paths pipeline
# Create pipeline
pipeline = Pipeline("my-pipeline")
# Create first stream of tasks: LoadData(csv) --> ProcessData(normalize-l2)
load_db = LoadData(
"database",
database={"host": "localhost", "username": "user", "password": "password"},
wait=10,
)
norm_l2 = ProcessData("normalize-l2", norm="l2", axis=0, wait=1)(load_db)
# Create second stream of tasks:
# LoadData(csv) --> ProcessData(normalize-l1) --> AugmentData(augment)
load_csv = LoadData("csv", filename="data.csv", wait=1)
norm_l1 = ProcessData("normalize-l1", norm="l1", wait=1)(load_csv)
augment = AugmentData("augment", types=["rotate", "noise"], wait=1)(norm_l1)
# Combine outputs of both streams (ProcessData(normalize-l2)
# and AugmentData(augment)), additionally add output from ProcessData(normalize-l1)
evaluate = Evaluate("both", wait=1)([norm_l1, norm_l2, augment])
# Add the roots of both streams to the pipeline
pipeline.add(load_db)
pipeline.add(load_csv)
# print a summary of the pipeline
pipeline.summary()
# Run pipeline
runner = Runner(data_path="data/", log_path="log/")
runner.run(pipeline, force_reload=False)
pipeline.summary() prints the following ascii summary:
+---------------+
| LoadData(csv) |
+---------------+
*
*
*
+--------------------+ +---------------------------+
| LoadData(database) | | ProcessData(normalize-l1) |
+--------------------+ +---------------------------+
* *** ***
* **** ***
* ** ****
+---------------------------+ +----------------------+ ****
| ProcessData(normalize-l2) | | AugmentData(augment) | *******
+---------------------------+**** +----------------------+ *******
******* * *******
******* * *******
**** * ****
+----------------+
| Evaluate(both) |
+----------------+
Notice how multiple tasks run simultaneously:
2021-01-03 19:09:05 INFO Process-1 LoadData(csv) Starting
2021-01-03 19:09:05 INFO Process-2 LoadData(database) Starting
2021-01-03 19:09:06 INFO Process-1 LoadData(csv) Finished: Status.SUCCESS
2021-01-03 19:09:06 INFO Process-3 ProcessData(normalize-l1) Starting
2021-01-03 19:09:07 INFO Process-3 ProcessData(normalize-l1) Finished: Status.SUCCESS
2021-01-03 19:09:07 INFO Process-4 AugmentData(augment) Starting
2021-01-03 19:09:08 INFO Process-4 AugmentData(augment) Finished: Status.SUCCESS
2021-01-03 19:09:15 INFO Process-2 LoadData(database) Finished: Status.SUCCESS
2021-01-03 19:09:15 INFO Process-5 ProcessData(normalize-l2) Starting
2021-01-03 19:09:16 INFO Process-5 ProcessData(normalize-l2) Finished: Status.SUCCESS
2021-01-03 19:09:16 INFO Process-6 Evaluate(both) Starting
2021-01-03 19:09:17 INFO Process-6 Evaluate(both) Finished: Status.SUCCESS
Change Log
Version History
- 0.1.1
Added LICENSE to source dist
- 0.1.1
Fixed setup.py for conda installation
- 0.1.0
PypeRunner’s first release
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file pyperunner-0.1.2.tar.gz
.
File metadata
- Download URL: pyperunner-0.1.2.tar.gz
- Upload date:
- Size: 142.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.3.0 pkginfo/1.7.0 requests/2.25.1 setuptools/51.3.3 requests-toolbelt/0.9.1 tqdm/4.56.0 CPython/3.7.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | beb73a64290beb5553cbc3bd1b128191658d8e94d7913fcf4e1e3edff88dfb03 |
|
MD5 | 0cc08ca021f2da254c95f55b57dcb24a |
|
BLAKE2b-256 | cf77529295472902ae0907caeeaaa68c0142cd9a9c679c43958aab3ed370c2f2 |