Skip to main content

A micro framework for building and executing pipelines from different domains.

Project description

ipipeline

A micro framework for building and executing pipelines from different domains.

Features

  • Simplicity: high-level interfaces that can be used to perform complex tasks.

  • Flexibility: freedom to build the pipeline according to the requirements of the problem.

  • Scalability: pipeline execution through concurrency or parallelism (coming soon).

Installation

ipipeline is installed from the Python Package Index (PyPI).

pip install ipipeline

Documentation

To learn how this package works, follow the documentation (coming soon).

Contribution

To learn how to contribute to this repository, follow the contribution file.

License

To learn about the legal rights linked to this repository, follow the license file.

Example

This example was divided into sections to explain the main features of the package. In case of questions about a specific detail the package contains docstrings for all modules, classes, methods and functions.

Imports

The ipipeline package tries to keep things simple, therefore all the work is done through the pipeline interface imported as Pipeline and the execution interface imported as SequentialExecutor.

import logging

from ipipeline.control import SequentialExecutor
from ipipeline.structure import Pipeline


logging.basicConfig(
    format='[%(asctime)s] %(levelname)s %(name)s - %(message)s', 
    datefmt='%Y-%m-%d %H:%M:%S', 
    level=logging.INFO
)

Tasks

The functions below represent the user tasks that need to be executed in a certain order which forms a workflow with the following idea: data is extracted from somewhere, then transformed in two different ways and finally loaded to somewhere. Although this example only contains functions, the methods of an instance can also be used.

def extract() -> list:
    return [1, 2]


def transform1(x: int) -> int:
    return x + 1


def transform2(y: int) -> int:
    return y * 2


def load(x: int, y: int, z: int) -> None:
    print(f'loading - x: {x}, y: {y}, z: {z}')

Pipeline

A pipeline is the entry point for the user tasks, through it the nodes (tasks) and connections (relationships between tasks) added are represented as a graph (workflow). The graph is used by the executor and is not visible to the user.

pipeline = Pipeline('p1', tags=['example'])
pipeline.add_node(
    'n1', extract, outputs=['x', 'y'], tags=['extract']
)
pipeline.add_node(
    'n2', transform1, inputs={'x': 'c.x'}, outputs=['x'], tags=['transform1']
)
pipeline.add_node(
    'n3', transform2, inputs={'y': 'c.y'}, outputs=['y'], tags=['transform2']
)
pipeline.add_node(
    'n4', load, inputs={'x': 'c.x', 'y': 'c.y', 'z': 8}, tags=['load']
)
pipeline.add_conn('c1', 'n1', 'n2')
pipeline.add_conn('c2', 'n1', 'n3')
pipeline.add_conn('c3', 'n2', 'n4')
pipeline.add_conn('c4', 'n3', 'n4')

Based on the workflow defined, the pipeline was built with four nodes and four connections. Two aspects deserve attention here, the inputs and outputs parameters of the add_node method.

The outputs parameter, when declared, indicates that during the pipeline execution, the function returns must be stored in the catalog with specific names. For example, the outputs parameter of the 'n1' node expects to store two items in the catalog with the names 'x' and 'y' which are obtained from the returns of the function.

The inputs parameter, when declared, indicates that during the pipeline execution, the function receives a dictionary with its arguments. For example, the inputs parameter of the 'n4' node expects to receive a dictionary where the 'x' and 'y' values are obtained from the catalog ('c.<item_id>') and the 'z' value is obtained directly. The 'c.' prefix assumes the existence of an item in the catalog stored by a predecessor node.

The connections determine the order in which the nodes are executed. For example, 'c1' connection indicates a relationship between 'n1' node (source) and 'n2' node (destination) where the 'n2' node depends on the execution of the 'n1' node. A node can dependent on another node even though it does not use the outputs of its predecessor.

Executor

An executor is responsible for executing a pipeline from the topological order (execution order) of the graph. Therefore, it is expected that the connections between the nodes form a DAG (Directed Acyclic Graph), if this does not happen, an error is raised. Behind the scenes, a catalog is created to store the node returns that are requested by other nodes during the execution.

executor = SequentialExecutor(pipeline)
executor.execute_pipeline(executor.obtain_topo_order())

Below are the log results generated by the execution. It is recommended to turn off the logs in cases where there are many nodes or the pipeline is called many times inside a loop.

[2021-10-28 15:24:59] INFO ipipeline.control.execution - topo_order: [['n1'], ['n2', 'n3'], ['n4']]
[2021-10-28 15:24:59] INFO ipipeline.control.execution - node - id: n1, tags: ['extract']
[2021-10-28 15:24:59] INFO ipipeline.control.execution - node - id: n2, tags: ['transform1']
[2021-10-28 15:24:59] INFO ipipeline.control.execution - node - id: n3, tags: ['transform2']
[2021-10-28 15:24:59] INFO ipipeline.control.execution - node - id: n4, tags: ['load']
loading - x: 2, y: 4, z: 8

According to the defined workflow, the nodes were executed in the expected order. The inner lists of the topological order must always be executed in order, however, the elements within them can be executed simultaneously. As in this example the SequentialExecutor class was used, the nodes were executed as if the topological order were a flat list.

CLI

The package provides a CLI with two commands called project and execution. The project command creates a project in the file system that provides a standard structure for organizing the tasks that interact with the package. Let's assume the project path is the home directory and the project name is iexample, therefore the command would be entered like this:

python -m ipipeline project ~ iexample

The result of this command would be the following structure:

iexample
|
|----iexample
|    |
|    |----config
|    |
|    |----group
|    |
|    |----task
|    |
|    |----__init__.py
|    |
|    |----__main__.py
|    |
|    |----exception.py
|
|----io
|
|----requirement
|
|----test
|
|----.gitignore
|
|----CONTRIBUTING.md
|
|----LICENSE.md
|
|----MANIFEST.in
|
|----README.md
|
|----setup.py

The example code provided would fit into this structure as follows:

  • The log configuration is moved to the config package.

  • The functions are moved to the task package.

  • The pipeline is moved to the group package.

  • The executor is moved to the main module.

With these modifications this project can be executed with the following command:

python iexample

Another option to execute this project without having to deal with the execution interface would be through the execution command. For this, the pipeline building process must be wrap in a function that returns the pipeline instance. Let's suppose that the wrapper function is called build_pipeline and the module where it was declared is called etl (inside the iexample.group package), therefore the command would be as follows:

python -m ipipeline execution iexample.group.etl build_pipeline sequential

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ipipeline-0.10.0.tar.gz (18.6 kB view details)

Uploaded Source

Built Distribution

ipipeline-0.10.0-py3-none-any.whl (23.6 kB view details)

Uploaded Python 3

File details

Details for the file ipipeline-0.10.0.tar.gz.

File metadata

  • Download URL: ipipeline-0.10.0.tar.gz
  • Upload date:
  • Size: 18.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.7

File hashes

Hashes for ipipeline-0.10.0.tar.gz
Algorithm Hash digest
SHA256 21cef0a1ec75af6030fde6e66ac04c0a861999c829609a8f679b537e79edd83b
MD5 c108817d1aba233f74557cf8b0e188b9
BLAKE2b-256 63e4d7da08ebab252895390876209cd77b3009a4fcfd2b69cae703b2dfe5cbd3

See more details on using hashes here.

File details

Details for the file ipipeline-0.10.0-py3-none-any.whl.

File metadata

  • Download URL: ipipeline-0.10.0-py3-none-any.whl
  • Upload date:
  • Size: 23.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.7

File hashes

Hashes for ipipeline-0.10.0-py3-none-any.whl
Algorithm Hash digest
SHA256 504cda4604d5b1e1996009298f50fb6eaa817df711cab32d010c6914f62f59a9
MD5 803f8f15fb49f2c078d1eda7bfda9b91
BLAKE2b-256 8e09720dc6e99e9abf09b0b4dff2f3f04b0ed9de75abf17d83d949a89a785bec

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page