Skip to main content

A micro framework for building and executing pipelines from different domains.

Project description

ipipeline

A micro framework for building and executing pipelines from different domains.

Features

  • Simplicity: high-level interfaces that can be used to perform complex tasks.

  • Flexibility: freedom to build the pipeline according to the requirements of the problem.

  • Scalability: pipeline execution through concurrency or parallelism (coming soon).

Installation

ipipeline is installed from the Python Package Index (PyPI).

pip install ipipeline

Documentation

To learn how this package works, follow the documentation (coming soon).

Contribution

To learn how to contribute to this repository, follow the contribution file.

License

To learn about the legal rights linked to this repository, follow the license file.

Example

This example was divided into sections to explain the main features of the package. In case of questions about a specific detail the package contains docstrings for all modules, classes, methods and functions.

Imports

The ipipeline package tries to keep things simple, therefore all the work is done through the pipeline interface imported as Pipeline and the execution interface imported as SequentialExecutor.

import logging

from ipipeline.control import SequentialExecutor
from ipipeline.structure import Pipeline


logging.basicConfig(
    format='[%(asctime)s] %(levelname)s %(name)s - %(message)s', 
    datefmt='%Y-%m-%d %H:%M:%S', 
    level=logging.INFO
)

Tasks

The functions below represent the user tasks that need to be executed in a certain order which forms a workflow with the following idea: data is extracted from somewhere, then transformed in two different ways and finally loaded to somewhere. Although this example only contains functions, the methods of an instance can also be used.

def extract() -> list:
    return [1, 2]


def transform1(x: int) -> int:
    return x + 1


def transform2(y: int) -> int:
    return y * 2


def load(x: int, y: int, z: int) -> None:
    print(f'loading - x: {x}, y: {y}, z: {z}')

Pipeline

A pipeline is the entry point for the user tasks, through it the nodes (tasks) and connections (relationships between tasks) added are represented as a graph (workflow). The graph is used by the executor and is not visible to the user.

pipeline = Pipeline('p1', tags=['example'])
pipeline.add_node(
    'n1', extract, outputs=['x', 'y'], tags=['extract']
)
pipeline.add_node(
    'n2', transform1, inputs={'x': 'c.x'}, outputs=['x'], tags=['transform1']
)
pipeline.add_node(
    'n3', transform2, inputs={'y': 'c.y'}, outputs=['y'], tags=['transform2']
)
pipeline.add_node(
    'n4', load, inputs={'x': 'c.x', 'y': 'c.y', 'z': 8}, tags=['load']
)
pipeline.add_conn('c1', 'n1', 'n2')
pipeline.add_conn('c2', 'n1', 'n3')
pipeline.add_conn('c3', 'n2', 'n4')
pipeline.add_conn('c4', 'n3', 'n4')

Based on the workflow defined, the pipeline was built with four nodes and four connections. Two aspects deserve attention here, the inputs and outputs parameters of the add_node method.

The outputs parameter, when declared, indicates that during the pipeline execution, the function returns must be stored in the catalog with specific names. For example, the outputs parameter of the 'n1' node expects to store two items in the catalog with the names 'x' and 'y' which are obtained from the returns of the function.

The inputs parameter, when declared, indicates that during the pipeline execution, the function receives a dictionary with its arguments. For example, the inputs parameter of the 'n4' node expects to receive a dictionary where the 'x' and 'y' values are obtained from the catalog and the 'z' value is obtained directly. The 'c.' prefix assumes the existence of an item ('c.<item_id>') or a list of items ('c.[<item_id>, ..., <item_id>]') in the catalog stored by the predecessor nodes.

The connections determine the order in which the nodes are executed. For example, 'c1' connection indicates a relationship between 'n1' node (source) and 'n2' node (destination) where the 'n2' node depends on the execution of the 'n1' node. A node can dependent on another node even though it does not use the outputs of its predecessor.

Executor

An executor is responsible for executing a pipeline from the topological order (execution order) of the graph. Therefore, it is expected that the connections between the nodes form a DAG (Directed Acyclic Graph), if this does not happen, an error is raised. Behind the scenes, a catalog is created to store the node returns that are requested by other nodes during the execution.

executor = SequentialExecutor(pipeline)
topo_order = executor.obtain_topo_order()
executor.execute_pipeline(topo_order)

Below are the log results generated by the execution. It is recommended to turn off the logs in cases where there are many nodes or the pipeline is called many times inside a loop.

[2021-10-28 15:24:59] INFO ipipeline.control.execution - topo_order: [['n1'], ['n2', 'n3'], ['n4']]
[2021-10-28 15:24:59] INFO ipipeline.control.execution - node.id: n1, node.tags: ['extract']
[2021-10-28 15:24:59] INFO ipipeline.control.execution - node.id: n2, node.tags: ['transform1']
[2021-10-28 15:24:59] INFO ipipeline.control.execution - node.id: n3, node.tags: ['transform2']
[2021-10-28 15:24:59] INFO ipipeline.control.execution - node.id: n4, node.tags: ['load']
loading - x: 2, y: 4, z: 8

According to the defined workflow, the nodes were executed in the expected order. The inner lists of the topological order must always be executed in order, however, the elements within them can be executed simultaneously. As in this example the SequentialExecutor class was used, the nodes were executed as if the topological order were a flat list.

CLI

The package provides a CLI with two commands called project and execution. The project command creates a project in the file system that provides a standard structure for organizing the tasks that interact with the package. Let's assume the project path is the home directory and the project name is iexample, therefore the command would be entered like this:

python -m ipipeline project ~ iexample

The result of this command would be the following structure:

iexample
|
|----iexample
|    |
|    |----config
|    |
|    |----group
|    |
|    |----task
|    |
|    |----__init__.py
|    |
|    |----__main__.py
|    |
|    |----exception.py
|
|----io
|
|----requirements
|
|----tests
|
|----.gitignore
|
|----CONTRIBUTING.md
|
|----LICENSE.md
|
|----MANIFEST.in
|
|----README.md
|
|----setup.py

The example code provided would fit into this structure as follows:

  • The configurations is moved to the config package.

  • The functions are moved to the task package.

  • The pipeline is moved to the group package.

  • The executor is moved to the main module.

With these modifications this project can be executed with the following command:

python -m iexample

Another option to execute this project without having to deal with the execution interface would be through the execution command. For this, the pipeline building process must be wrap in a function that returns the pipeline instance. Let's suppose that the wrapper function is called build_pipeline and the module where it was declared is called etl (inside the iexample.group package), therefore the command would be as follows:

python -m ipipeline execution iexample.group.etl build_pipeline sequential

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ipipeline-0.12.0.tar.gz (19.3 kB view details)

Uploaded Source

Built Distribution

ipipeline-0.12.0-py3-none-any.whl (25.3 kB view details)

Uploaded Python 3

File details

Details for the file ipipeline-0.12.0.tar.gz.

File metadata

  • Download URL: ipipeline-0.12.0.tar.gz
  • Upload date:
  • Size: 19.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.7

File hashes

Hashes for ipipeline-0.12.0.tar.gz
Algorithm Hash digest
SHA256 d7dc39889faf764cfc9dcfb7907735201faa653aa23d2983ce5b5f51bb36cd2e
MD5 de9ea2359730b8e8083015b5b42852e7
BLAKE2b-256 bdf8eb8c20879fd2e752be94ad1ae1ff26d29599581a3d932bc04b0110148e21

See more details on using hashes here.

File details

Details for the file ipipeline-0.12.0-py3-none-any.whl.

File metadata

  • Download URL: ipipeline-0.12.0-py3-none-any.whl
  • Upload date:
  • Size: 25.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.7

File hashes

Hashes for ipipeline-0.12.0-py3-none-any.whl
Algorithm Hash digest
SHA256 20e8fd49f5768cfc72daf9619e7a3e11414dae6e068d97e79e541bdd496cbcfd
MD5 3276a58cbe1233cdd1d1ae6eb21c682b
BLAKE2b-256 cec2a2e6a6180cb41fd36529a9af2d144037594c71e0703e121487074c23e372

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page