Skip to main content

Фреймворк для работы с пайплайном ML моделей

Project description

ML Pipeline Engine

Un-spaghetti and speed-up your data processing pipelines without usage of any complex DAG framework

Key benefits

  • Fast in-memory pipeline runtime. Suitable for online transaction processing
  • No Graph DSL needed. Just use Pyhton type annotations to declare node's dependencies
  • Strong topological analysis based pipeline execution engine with MVCC support
  • Synchronous and asyncronous processor nodes supported
  • Asyncronous pipeline interface enabled by default
  • Simple but powerful control flow operators - no need to declare branches or node groups, just declare right dependencies
  • Built-in pipeline visualizer
  • Support for pipeline lifecycle events

Table of Contents

Usage

"""
main.py
"""

import asyncio
import time

from ml_pipeline_engine.chart import PipelineChart
from ml_pipeline_engine.dag_builders.annotation import build_dag
from ml_pipeline_engine.dag_builders.annotation.marks import Input
from ml_pipeline_engine.node import ProcessorBase
from ml_pipeline_engine.parallelism import threads_pool_registry


# 1. Setup thread pool

threads_pool_registry.auto_init()


# 2. Define nodes and their dependencies


class InvertNumber(ProcessorBase):
    def process(self, num: float) -> float:
        return -num


class AsyncAddConst(ProcessorBase):
    async def process(self, num: Input(InvertNumber), const: float = 0.2) -> float:
        await asyncio.sleep(2)

        return num + const


class DoubleNumber(ProcessorBase):
    def process(self, num: Input(InvertNumber)) -> float:
        time.sleep(2)

        return num * 2


class AddNumbers(ProcessorBase):
    def process(self, num1: Input(AsyncAddConst), num2: Input(DoubleNumber)) -> float:
        return num1 + num2


# 3. Define pipeline

pipeline = PipelineChart(
    "example_pipeline",
    build_dag(input_node=InvertNumber, output_node=AddNumbers),
)

# 4. Run it


async def main():
    start = time.time()

    result = await pipeline.run(input_kwargs=dict(num=3.0))

    end = time.time()

    assert result.error is None
    assert result.value == -8.8

    # Execution engine used concurrency, basing on graph topology analysis,
    # so AsyncAddConst and DoubleNumber nodes were ran in parallel
    assert end - start < 2.1


if __name__ == "__main__":
    asyncio.run(main())

See additional usage examples in docs: docs/examples/.

Development

Environment setup

Clone the project

git clone https://github.com/tochka-public/ml-pipeline-engine.git

Go to the project directory

cd ml-pipeline-engine

Use Python>=3.9 and the package manager poetry to install ml-pipeline-engine dependencies

poetry install --no-root

For further contribution, use pre-commit hooks to maintain consistent code format

pre-commit install -f --hook-type pre-commit --hook-type pre-push

Run tests

python -m pytest tests

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ml_pipeline_engine-2.3.1.tar.gz (1.3 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ml_pipeline_engine-2.3.1-py3-none-any.whl (1.4 MB view details)

Uploaded Python 3

File details

Details for the file ml_pipeline_engine-2.3.1.tar.gz.

File metadata

  • Download URL: ml_pipeline_engine-2.3.1.tar.gz
  • Upload date:
  • Size: 1.3 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.9.22

File hashes

Hashes for ml_pipeline_engine-2.3.1.tar.gz
Algorithm Hash digest
SHA256 66ceca23fb426f6ac82932fc7a403dc857dacb9fae350bdd74eb8afe94adf257
MD5 707485a44b87952d8dc15fb2415f69a5
BLAKE2b-256 9323bad9040794b47a1fd0825ad7f8555b528be9cc91ba8b2de2b6a26121686f

See more details on using hashes here.

File details

Details for the file ml_pipeline_engine-2.3.1-py3-none-any.whl.

File metadata

File hashes

Hashes for ml_pipeline_engine-2.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 3467d29eaaf05e2e40b07ac4bf83cd032f561b6cc1ada5e50274d179e5792881
MD5 23170015becab3c660b646f2ad444c90
BLAKE2b-256 1b16f8d5b1b02f687c552cdcbe9441d75cae8033297d6e7a0a0e007c7686c6e3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page