Skip to main content

Фреймворк для работы с пайплайном ML моделей

Project description

ML Pipeline Engine

Un-spaghetti and speed-up your data processing pipelines without usage of any complex DAG framework

Key benefits

  • Fast in-memory pipeline runtime. Suitable for online transaction processing
  • No Graph DSL needed. Just use Pyhton type annotations to declare node's dependencies
  • Strong topological analysis based pipeline execution engine with MVCC support
  • Synchronous and asyncronous processor nodes supported
  • Asyncronous pipeline interface enabled by default
  • Simple but powerful control flow operators - no need to declare branches or node groups, just declare right dependencies
  • Built-in pipeline visualizer
  • Support for pipeline lifecycle events

Table of Contents

Usage

"""
main.py
"""

import asyncio
import time

from ml_pipeline_engine.chart import PipelineChart
from ml_pipeline_engine.dag_builders.annotation import build_dag
from ml_pipeline_engine.dag_builders.annotation.marks import Input
from ml_pipeline_engine.node import ProcessorBase
from ml_pipeline_engine.parallelism import threads_pool_registry


# 1. Setup thread pool

threads_pool_registry.auto_init()


# 2. Define nodes and their dependencies


class InvertNumber(ProcessorBase):
    def process(self, num: float) -> float:
        return -num


class AsyncAddConst(ProcessorBase):
    async def process(self, num: Input(InvertNumber), const: float = 0.2) -> float:
        await asyncio.sleep(2)

        return num + const


class DoubleNumber(ProcessorBase):
    def process(self, num: Input(InvertNumber)) -> float:
        time.sleep(2)

        return num * 2


class AddNumbers(ProcessorBase):
    def process(self, num1: Input(AsyncAddConst), num2: Input(DoubleNumber)) -> float:
        return num1 + num2


# 3. Define pipeline

pipeline = PipelineChart(
    "example_pipeline",
    build_dag(input_node=InvertNumber, output_node=AddNumbers),
)

# 4. Run it


async def main():
    start = time.time()

    result = await pipeline.run(input_kwargs=dict(num=3.0))

    end = time.time()

    assert result.error is None
    assert result.value == -8.8

    # Execution engine used concurrency, basing on graph topology analysis,
    # so AsyncAddConst and DoubleNumber nodes were ran in parallel
    assert end - start < 2.1


if __name__ == "__main__":
    asyncio.run(main())

See additional usage examples in docs: docs/examples/.

Development

Environment setup

Clone the project

git clone https://github.com/tochka-public/ml-pipeline-engine.git

Go to the project directory

cd ml-pipeline-engine

Use Python>=3.9 and the package manager poetry to install ml-pipeline-engine dependencies

poetry install --no-root

For further contribution, use pre-commit hooks to maintain consistent code format

pre-commit install -f --hook-type pre-commit --hook-type pre-push

Run tests

python -m pytest tests

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ml_pipeline_engine-2.2.0.tar.gz (1.3 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ml_pipeline_engine-2.2.0-py3-none-any.whl (1.4 MB view details)

Uploaded Python 3

File details

Details for the file ml_pipeline_engine-2.2.0.tar.gz.

File metadata

  • Download URL: ml_pipeline_engine-2.2.0.tar.gz
  • Upload date:
  • Size: 1.3 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.9.22

File hashes

Hashes for ml_pipeline_engine-2.2.0.tar.gz
Algorithm Hash digest
SHA256 0a0ea594ba20f5d46b7e92c642cfec915e3f2cad026268be6aafccfe8e14fb1c
MD5 c3a2512bb867bca8b70f881957eccc29
BLAKE2b-256 4070b11bb059de49373055c82a8b2adc4552a279d1153a6c5a503f33cbcb3423

See more details on using hashes here.

File details

Details for the file ml_pipeline_engine-2.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for ml_pipeline_engine-2.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 06e48ccd2c4cb4525b3a4c20ee0bb679a8c6a850acca29a66568bc21b96d3787
MD5 d551db6bc834bbc6af2d1f1076556d04
BLAKE2b-256 9169c7021c5cfcd3db7e4e5237762b5057d0e8a1d229a4b363bcd5c166423700

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page