Skip to main content

Фреймворк для работы с пайплайном ML моделей

Project description

ML Pipeline Engine

Un-spaghetti and speed-up your data processing pipelines without usage of any complex DAG framework

Key benefits

  • Fast in-memory pipeline runtime. Suitable for online transaction processing
  • No Graph DSL needed. Just use Pyhton type annotations to declare node's dependencies
  • Strong topological analysis based pipeline execution engine with MVCC support
  • Synchronous and asyncronous processor nodes supported
  • Asyncronous pipeline interface enabled by default
  • Simple but powerful control flow operators - no need to declare branches or node groups, just declare right dependencies
  • Built-in pipeline visualizer
  • Support for pipeline lifecycle events

Table of Contents

Usage

"""
main.py
"""

import asyncio
import time

from ml_pipeline_engine.chart import PipelineChart
from ml_pipeline_engine.dag_builders.annotation import build_dag
from ml_pipeline_engine.dag_builders.annotation.marks import Input
from ml_pipeline_engine.node import ProcessorBase
from ml_pipeline_engine.parallelism import threads_pool_registry


# 1. Setup thread pool

threads_pool_registry.auto_init()


# 2. Define nodes and their dependencies


class InvertNumber(ProcessorBase):
    def process(self, num: float) -> float:
        return -num


class AsyncAddConst(ProcessorBase):
    async def process(self, num: Input(InvertNumber), const: float = 0.2) -> float:
        await asyncio.sleep(2)

        return num + const


class DoubleNumber(ProcessorBase):
    def process(self, num: Input(InvertNumber)) -> float:
        time.sleep(2)

        return num * 2


class AddNumbers(ProcessorBase):
    def process(self, num1: Input(AsyncAddConst), num2: Input(DoubleNumber)) -> float:
        return num1 + num2


# 3. Define pipeline

pipeline = PipelineChart(
    "example_pipeline",
    build_dag(input_node=InvertNumber, output_node=AddNumbers),
)

# 4. Run it


async def main():
    start = time.time()

    result = await pipeline.run(input_kwargs=dict(num=3.0))

    end = time.time()

    assert result.error is None
    assert result.value == -8.8

    # Execution engine used concurrency, basing on graph topology analysis,
    # so AsyncAddConst and DoubleNumber nodes were ran in parallel
    assert end - start < 2.1


if __name__ == "__main__":
    asyncio.run(main())

See additional usage examples in docs: docs/examples/.

Development

Environment setup

Clone the project

git clone https://github.com/tochka-public/ml-pipeline-engine.git

Go to the project directory

cd ml-pipeline-engine

Use Python>=3.9 and the package manager poetry to install ml-pipeline-engine dependencies

poetry install --no-root

For further contribution, use pre-commit hooks to maintain consistent code format

pre-commit install -f --hook-type pre-commit --hook-type pre-push

Run tests

python -m pytest tests

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ml_pipeline_engine-2.3.0.tar.gz (1.3 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ml_pipeline_engine-2.3.0-py3-none-any.whl (1.4 MB view details)

Uploaded Python 3

File details

Details for the file ml_pipeline_engine-2.3.0.tar.gz.

File metadata

  • Download URL: ml_pipeline_engine-2.3.0.tar.gz
  • Upload date:
  • Size: 1.3 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.9.22

File hashes

Hashes for ml_pipeline_engine-2.3.0.tar.gz
Algorithm Hash digest
SHA256 28d2d5f31321f956f1a8a55fc247efac37446ab61689740df1222d26834e4964
MD5 65d34383cdf3593cfc2c3e7509e214fc
BLAKE2b-256 8004878754906c8565570e9757570c92a46322caabd66d92f7e0172e794e1bcb

See more details on using hashes here.

File details

Details for the file ml_pipeline_engine-2.3.0-py3-none-any.whl.

File metadata

File hashes

Hashes for ml_pipeline_engine-2.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6dfe2cb52adb156b8a94aa83f5cdc78f5068ec62af2811fa371ebde5c6f26148
MD5 6c9c9273d7cda2dea9f8bc94a7569959
BLAKE2b-256 9f6d20823f18937be9929c3736ab7a702cde1b4eeed5a4b9c5596907dfcab025

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page