Skip to main content

A micro framework to build and execute pipelines from different domains.

Project description

ipipeline

ipipeline is a micro framework to build and execute pipelines from different domains.

Features

  • Simplicity: high-level interfaces that can be used to perform complex tasks.

  • Flexibility: freedom to build according to the requirements of the problem.

  • Scalability: execution through concurrency or parallelism (coming soon).

Installation

ipipeline is installed from the Python Package Index (PyPI).

pip install ipipeline

Documentation

To learn how this package works, follow the documentation (coming soon).

Contribution

To learn how to contribute to this repository, follow the contribution file.

License

To learn about the legal rights linked to this repository, follow the license file.

Example

The ipipeline package tries to keep things simple, therefore all the work is done through a few interfaces.

import logging

from ipipeline.control import SequentialExecutor
from ipipeline.structure import Catalog, Pipeline


logging.basicConfig(
    format='[%(asctime)s] %(levelname)s %(name)s - %(message)s', 
    datefmt='%Y-%m-%d %H:%M:%S', 
    level=logging.INFO
)

Tasks

The functions below represent tasks that need to be executed in a certain order which forms a flow of tasks with the following idea: the data is extracted from a source, then transformed in different ways, and finally loaded to a destination. The focus in this example is not on the code block of the functions, but on the dependencies between them.

def extract_data(path: str, encoding: str = None) -> list:
    data = [2, 4]

    return data


def transform_data1(data: list) -> list:
    sum_data = [num + 2 for num in data]

    return sum_data


def transform_data2(data: list) -> list:
    sub_data = [num - 2 for num in data]

    return sub_data


def load_data(sum_data: list, sub_data: list, path: str) -> None:
    pass

Pipeline

A pipeline stores a flow of tasks represented by nodes (tasks) and links (dependencies). The links between the nodes must compose a directed acyclic graph which is used to find a linear ordering for the execution.

pipeline = Pipeline('p1', tags=['example'])
pipeline.add_node(
    'n1', 
    extract_data, 
    pos_inputs=['src_path'], 
    key_inputs={'encoding': 'encoding'}, 
    outputs=['data'], 
    tags=['extract']
)
pipeline.add_node(
    'n2', 
    transform_data1, 
    pos_inputs=['data'], 
    outputs=['sum_data'], 
    tags=['transform1']
)
pipeline.add_node(
    'n3', 
    transform_data2, 
    pos_inputs=['data'], 
    outputs=['sub_data'], 
    tags=['transform2']
)
pipeline.add_node(
    'n4', 
    load_data, 
    pos_inputs=['sum_data', 'sub_data', 'dst_path'], 
    tags=['load']
)
pipeline.add_link('l1', 'n1', 'n2')
pipeline.add_link('l2', 'n1', 'n3')
pipeline.add_link('l3', 'n2', 'n4')
pipeline.add_link('l4', 'n3', 'n4')

The pipeline produces a graph as shown in the image below.

graph

Catalog

A catalog stores the items (arguments) of an execution. When a node is executed, its return is stored in the catalog linked to the name defined in the outputs parameter, creating a key:value pair. This pair is made available to all other nodes that depend on it as an argument. Therefore, the pos_inputs and key_inputs parameters are references to the keys of the arguments stored in the catalog. It is possible to pass default arguments to the nodes before the execution takes place as shown below.

catalog = Catalog('c1', tags=['example'])
catalog.set_item('src_path', 'src/file')
catalog.set_item('dst_path', 'dst/file')
catalog.set_item('encoding', 'utf-8') 

Executor

An executor is responsible to execute a pipeline from the topological ordering of the graph built from the relationships between the nodes. The result of the execution is the catalog populated with the returns of the functions.

executor = SequentialExecutor()
ordering = executor.get_ordering(pipeline)
catalog = executor.execute_pipeline(pipeline, catalog, ordering)

The log generated while executing is shown below.

[2022-07-01 09:30:00] INFO ipipeline.control.executors - ordering: [['n1'], ['n2', 'n3'], ['n4']]
[2022-07-01 09:30:00] INFO ipipeline.control.executors - pipeline.id: p1, pipeline.tags: ['example']
[2022-07-01 09:30:00] INFO ipipeline.control.executors - catalog.id: c1, catalog.tags: ['example']
[2022-07-01 09:30:00] INFO ipipeline.control.executors - node.id: n1, node.tags: ['extract']
[2022-07-01 09:30:00] INFO ipipeline.control.executors - node.id: n2, node.tags: ['transform1']
[2022-07-01 09:30:00] INFO ipipeline.control.executors - node.id: n3, node.tags: ['transform2']
[2022-07-01 09:30:00] INFO ipipeline.control.executors - node.id: n4, node.tags: ['load']

The ordering list has inner lists that represent groups of nodes that must be executed sequentially and the nodes within these groups can be executed simultaneously. As in this case the sequential executor was used, the benefit of simultaneous execution was skipped, but soon new executors will be created to take advantage of this.

CLI

The package provides a CLI with two commands called project and execution. The project command builds a project in the file system that provides a standard structure for organizing the code. Let's assume the project path is the home directory and the project name is example, therefore the command would be entered like this:

python -m ipipeline project ~ example

The execution command executes a pipeline according to the location of the modules and functions that build the pipeline and the catalog. The pipeline and catalog building process can be wrapped into separate functions called, for example, build_pipeline and build_catalog. Let's assume both functions are in the main module of the example project, therefore the command would be as follows:

python -m ipipeline execution SequentialExecutor example.__main__ example.__main__ build_pipeline build_catalog

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ipipeline-0.17.0.tar.gz (13.0 kB view details)

Uploaded Source

Built Distribution

ipipeline-0.17.0-py3-none-any.whl (22.1 kB view details)

Uploaded Python 3

File details

Details for the file ipipeline-0.17.0.tar.gz.

File metadata

  • Download URL: ipipeline-0.17.0.tar.gz
  • Upload date:
  • Size: 13.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.10.5

File hashes

Hashes for ipipeline-0.17.0.tar.gz
Algorithm Hash digest
SHA256 55d14dd5bf48366279ab278d8e194505806f4a51ca0d51b78ef5951f397fe883
MD5 4bb21fc843bad5bd34646c316ff94886
BLAKE2b-256 2f008b9d6ba16d0b75baf6650f8708ea7de8deedc3c69a8e1148fb6e72fa2bc6

See more details on using hashes here.

File details

Details for the file ipipeline-0.17.0-py3-none-any.whl.

File metadata

  • Download URL: ipipeline-0.17.0-py3-none-any.whl
  • Upload date:
  • Size: 22.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.10.5

File hashes

Hashes for ipipeline-0.17.0-py3-none-any.whl
Algorithm Hash digest
SHA256 3d841743525f145e6ec0e62ba1bdffda0d55d73995706903abee9c55a656947b
MD5 9b4fdad7faa0b4992675a46f12ee04fd
BLAKE2b-256 975ab853cfd8620e59715e09c89a57cdc100c456f126881cbffa5f98e22fbba3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page