Skip to main content

Фреймворк для работы с пайплайном ML моделей

Project description

ML Pipeline Engine

Un-spaghetti and speed-up your data processing pipelines without usage of any complex DAG framework

Key benefits

  • Fast in-memory pipeline runtime. Suitable for online transaction processing
  • No Graph DSL needed. Just use Pyhton type annotations to declare node's dependencies
  • Strong topological analysis based pipeline execution engine with MVCC support
  • Synchronous and asyncronous processor nodes supported
  • Asyncronous pipeline interface enabled by default
  • Simple but powerful control flow operators - no need to declare branches or node groups, just declare right dependencies
  • Built-in pipeline visualizer
  • Support for pipeline lifecycle events

Table of Contents

Usage

"""
main.py
"""

import asyncio
import time

from ml_pipeline_engine.chart import PipelineChart
from ml_pipeline_engine.dag_builders.annotation import build_dag
from ml_pipeline_engine.dag_builders.annotation.marks import Input
from ml_pipeline_engine.node import ProcessorBase
from ml_pipeline_engine.parallelism import threads_pool_registry


# 1. Setup thread pool

threads_pool_registry.auto_init()


# 2. Define nodes and their dependencies


class InvertNumber(ProcessorBase):
    def process(self, num: float) -> float:
        return -num


class AsyncAddConst(ProcessorBase):
    async def process(self, num: Input(InvertNumber), const: float = 0.2) -> float:
        await asyncio.sleep(2)

        return num + const


class DoubleNumber(ProcessorBase):
    def process(self, num: Input(InvertNumber)) -> float:
        time.sleep(2)

        return num * 2


class AddNumbers(ProcessorBase):
    def process(self, num1: Input(AsyncAddConst), num2: Input(DoubleNumber)) -> float:
        return num1 + num2


# 3. Define pipeline

pipeline = PipelineChart(
    "example_pipeline",
    build_dag(input_node=InvertNumber, output_node=AddNumbers),
)

# 4. Run it


async def main():
    start = time.time()

    result = await pipeline.run(input_kwargs=dict(num=3.0))

    end = time.time()

    assert result.error is None
    assert result.value == -8.8

    # Execution engine used concurrency, basing on graph topology analysis,
    # so AsyncAddConst and DoubleNumber nodes were ran in parallel
    assert end - start < 2.1


if __name__ == "__main__":
    asyncio.run(main())

See additional usage examples in docs: docs/examples/.

Development

Environment setup

Clone the project

git clone https://github.com/tochka-public/ml-pipeline-engine.git

Go to the project directory

cd ml-pipeline-engine

Use Python>=3.9 and the package manager poetry to install ml-pipeline-engine dependencies

poetry install --no-root

For further contribution, use pre-commit hooks to maintain consistent code format

pre-commit install -f --hook-type pre-commit --hook-type pre-push

Run tests

python -m pytest tests

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ml_pipeline_engine-2.3.2.tar.gz (1.3 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ml_pipeline_engine-2.3.2-py3-none-any.whl (1.4 MB view details)

Uploaded Python 3

File details

Details for the file ml_pipeline_engine-2.3.2.tar.gz.

File metadata

  • Download URL: ml_pipeline_engine-2.3.2.tar.gz
  • Upload date:
  • Size: 1.3 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.9.23

File hashes

Hashes for ml_pipeline_engine-2.3.2.tar.gz
Algorithm Hash digest
SHA256 d8ddd9f2d7ea3342b5838d48e76ecf73fa464e68f7d8d25c5aa8fcce6c73c60b
MD5 e3851ddfa02991e97ec2ce357adb169f
BLAKE2b-256 74673b0db0c2ad203596cc60097af2515edcee0ab5bbad8beb4f476e5a8d8a13

See more details on using hashes here.

File details

Details for the file ml_pipeline_engine-2.3.2-py3-none-any.whl.

File metadata

File hashes

Hashes for ml_pipeline_engine-2.3.2-py3-none-any.whl
Algorithm Hash digest
SHA256 e5ae95690ae245ddc424e75f899d96b57681e8ac6e239dba7cbd075639da6a84
MD5 141ff2ca67c3c8477cd114a5a4950b6d
BLAKE2b-256 70aaa25c3a08eced8a0a11af976e104c8b9a609bca9984002e25159434a732d1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page