A micro framework for building and executing pipelines from different domains.
Project description
ipipeline
A micro framework for building and executing pipelines from different domains.
Features
-
Simplicity: high-level interfaces that can be used to perform complex tasks.
-
Flexibility: freedom to build the pipeline according to the requirements of the problem.
-
Scalability: pipeline execution through concurrency or parallelism (coming soon).
Installation
ipipeline is installed from the Python Package Index (PyPI).
pip install ipipeline
Documentation
To learn how this package works, follow the documentation (coming soon).
Contribution
To learn how to contribute to this repository, follow the contribution file.
License
To learn about the legal rights linked to this repository, follow the license file.
Example
This example was divided into sections to explain the main features of the package. In case of questions about a specific detail the package contains docstrings for all modules, classes, methods and functions.
Imports
The ipipeline package tries to keep things simple, therefore all the work is done through the pipeline interface imported as Pipeline and the execution interface imported as SequentialExecutor.
import logging
from ipipeline.control import SequentialExecutor
from ipipeline.structure import Pipeline
logging.basicConfig(
format='[%(asctime)s] %(levelname)s %(name)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=logging.INFO
)
Tasks
The functions below represent the user tasks that need to be executed in a certain order which forms a workflow with the following idea: data is extracted from somewhere, then transformed in two different ways and finally loaded to somewhere. Although this example only contains functions, the methods of an instance can also be used.
def extract() -> list:
return [1, 2]
def transform1(x: int) -> int:
return x + 1
def transform2(y: int) -> int:
return y * 2
def load(x: int, y: int, z: int) -> None:
print(f'loading - x: {x}, y: {y}, z: {z}')
Pipeline
A pipeline is the entry point for the user tasks, through it the nodes (tasks) and connections (relationships between tasks) added are represented as a graph (workflow). The graph is used by the executor and is not visible to the user.
pipeline = Pipeline('p1', tags=['example'])
pipeline.add_node(
'n1', extract, outputs=['x', 'y'], tags=['extract']
)
pipeline.add_node(
'n2', transform1, inputs={'x': 'c.x'}, outputs=['x'], tags=['transform1']
)
pipeline.add_node(
'n3', transform2, inputs={'y': 'c.y'}, outputs=['y'], tags=['transform2']
)
pipeline.add_node(
'n4', load, inputs={'x': 'c.x', 'y': 'c.y', 'z': 8}, tags=['load']
)
pipeline.add_conn('c1', 'n1', 'n2')
pipeline.add_conn('c2', 'n1', 'n3')
pipeline.add_conn('c3', 'n2', 'n4')
pipeline.add_conn('c4', 'n3', 'n4')
Based on the workflow defined, the pipeline was built with four nodes and four connections. Two aspects deserve attention here, the inputs and outputs parameters of the add_node method.
The outputs parameter, when declared, indicates that during the pipeline execution, the function returns must be stored in the catalog with specific names. For example, the outputs parameter of the 'n1' node expects to store two items in the catalog with the names 'x' and 'y' which are obtained from the returns of the function.
The inputs parameter, when declared, indicates that during the pipeline execution, the function receives a dictionary with its arguments. For example, the inputs parameter of the 'n4' node expects to receive a dictionary where the 'x' and 'y' values are obtained from the catalog ('c.<item_id>') and the 'z' value is obtained directly. The 'c.' prefix assumes the existence of an item in the catalog stored by a predecessor node.
The connections determine the order in which the nodes are executed. For example, 'c1' connection indicates a relationship between 'n1' node (source) and 'n2' node (destination) where the 'n2' node depends on the execution of the 'n1' node. A node can dependent on another node even though it does not use the outputs of its predecessor.
Executor
An executor is responsible for executing a pipeline from the topological order (execution order) of the graph. Therefore, it is expected that the connections between the nodes form a DAG (Directed Acyclic Graph), if this does not happen, an error is raised. Behind the scenes, a catalog is created to store the node returns that are requested by other nodes during the execution.
executor = SequentialExecutor(pipeline)
executor.execute_pipeline(executor.obtain_topo_order())
Below are the log results generated by the execution. It is recommended to turn off the logs in cases where there are many nodes or the pipeline is called many times inside a loop.
[2021-10-28 15:24:59] INFO ipipeline.control.execution - topo_order: [['n1'], ['n2', 'n3'], ['n4']]
[2021-10-28 15:24:59] INFO ipipeline.control.execution - node - id: n1, tags: ['extract']
[2021-10-28 15:24:59] INFO ipipeline.control.execution - node - id: n2, tags: ['transform1']
[2021-10-28 15:24:59] INFO ipipeline.control.execution - node - id: n3, tags: ['transform2']
[2021-10-28 15:24:59] INFO ipipeline.control.execution - node - id: n4, tags: ['load']
loading - x: 2, y: 4, z: 8
According to the defined workflow, the nodes were executed in the expected order. The inner lists of the topological order must always be executed in order, however, the elements within them can be executed simultaneously. As in this example the SequentialExecutor class was used, the nodes were executed as if the topological order were a flat list.
CLI
The package provides a CLI with two commands called project and execution. The project command creates a project in the file system that provides a standard structure for organizing the tasks that interact with the package. Let's assume the project path is the home directory and the project name is iexample, therefore the command would be entered like this:
python -m ipipeline project ~ iexample
The result of this command would be the following structure:
iexample
|
|----iexample
| |
| |----config
| |
| |----group
| |
| |----task
| |
| |----__init__.py
| |
| |----__main__.py
| |
| |----exception.py
|
|----io
|
|----requirement
|
|----test
|
|----.gitignore
|
|----CONTRIBUTING.md
|
|----LICENSE.md
|
|----MANIFEST.in
|
|----README.md
|
|----setup.py
The example code provided would fit into this structure as follows:
-
The log configuration is moved to the config package.
-
The functions are moved to the task package.
-
The pipeline is moved to the group package.
-
The executor is moved to the main module.
With these modifications this project can be executed with the following command:
python iexample
Another option to execute this project without having to deal with the execution interface would be through the execution command. For this, the pipeline building process must be wrap in a function that returns the pipeline instance. Let's suppose that the wrapper function is called build_pipeline and the module where it was declared is called etl (inside the iexample.group package), therefore the command would be as follows:
python -m ipipeline execution iexample.group.etl build_pipeline sequential
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file ipipeline-0.10.0.tar.gz
.
File metadata
- Download URL: ipipeline-0.10.0.tar.gz
- Upload date:
- Size: 18.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 |
21cef0a1ec75af6030fde6e66ac04c0a861999c829609a8f679b537e79edd83b
|
|
MD5 |
c108817d1aba233f74557cf8b0e188b9
|
|
BLAKE2b-256 |
63e4d7da08ebab252895390876209cd77b3009a4fcfd2b69cae703b2dfe5cbd3
|
File details
Details for the file ipipeline-0.10.0-py3-none-any.whl
.
File metadata
- Download URL: ipipeline-0.10.0-py3-none-any.whl
- Upload date:
- Size: 23.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 |
504cda4604d5b1e1996009298f50fb6eaa817df711cab32d010c6914f62f59a9
|
|
MD5 |
803f8f15fb49f2c078d1eda7bfda9b91
|
|
BLAKE2b-256 |
8e09720dc6e99e9abf09b0b4dff2f3f04b0ed9de75abf17d83d949a89a785bec
|