Skip to main content

Фреймворк для работы с пайплайном ML моделей

Project description

ML Pipeline Engine

Графовый движок для конвейеров ML-моделей

Что нужно, чтобы сделать свой пайплайн?

  1. Написать классы узлов
  2. Связать узлы посредством указания зависимости

Поддерживаемые типы узлов

Протоколы

  1. DataSource
  2. FeatureBase
  3. MLModelBase
  4. ProcessorBase
  5. FeatureVectorizerBase

Примеры

Простейший пример на основе функций

def invert_number(num: float):
    return -num

def add_const(num: Input(invert_number), const: float = 0.1):
    return num + const

def double_number(num: Input(add_const)):
    return num * 2

build_dag(input_node=invert_number, output_node=double_number).run(pipeline_context(num=2.5))

Пример, который реализует переключение нод по условию (Аналог switch)

 def ident(num: float):
     return num

 def switch_node(num: Input(ident)):
     if num < 0.0:
         return 'invert'
     if num == 0.0:
         return 'const'
     if num == 1.0:
         return 'double'
     return 'add_sub_chain'

 def const_noinput():
     return 10.0

 def add_100(num: Input(ident)):
     return num + 100

 def add_some_const(num: Input(ident), const: float = 9.0):
     return num + const

 def invert_number(num: Input(ident)):
     return -num

 def sub_ident(num: Input(add_some_const), num_ident: Input(ident)):
     return num - num_ident

 def double_number(num: Input(ident)):
     return num * 2

 SomeSwitchCase = SwitchCase(
     switch=switch_node,
     cases=[
         ('const', const_noinput),
         ('double', double_number),
         ('invert', invert_number),
         ('add_sub_chain', sub_ident),
     ],
 )

 def case_node(num: SomeSwitchCase, num2: Input(ident), num3: Input(add_100)):
     return num + num2 + num3

 def out(num: Input(case_node)):
     return num

build_dag(input_node=ident, output_node=out).run(pipeline_context(num=input_num))

Пример с использованием классов как узлов графа

class SomeInput(NodeBase):
    name = 'input'

    def process(self, base_num: int, other_num: int) -> dict:
        return {
            'base_num': base_num,
            'other_num': other_num,
        }

class SomeDataSource(NodeBase):
    name = 'some_data_source'

    def collect(self, inp: Input(SomeInput)) -> int:
        return inp['base_num'] + 100

class SomeFeature(NodeBase):
    name = 'some_feature'

    def extract(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int:
        return ds_value + inp['other_num'] + 10

class SomeVectorizer(NodeBase):
    name = 'some_vectorizer'

    def vectorize(self, feature_value: Input(SomeFeature)) -> int:
        return feature_value + 20

class SomeMLModel(NodeBase):
    name = 'some_model'

    def predict(self, vec_value: Input(SomeVectorizer)):
        return (vec_value + 30) / 100


build_dag(input_node=SomeInput, output_node=SomeMLModel).run(pipeline_context(base_num=10, other_num=5))

Пример с использованием Generic-нод, которые под собой хранят общее поведение и не зависимы от конкретной модели

class SomeInput(NodeBase):
    name = 'input'

    def process(self, base_num: int, other_num: int) -> dict:
        return {
            'base_num': base_num,
            'other_num': other_num,
        }

class SomeDataSource(NodeBase):
    name = 'some_data_source'

    def collect(self, inp: Input(SomeInput)) -> int:
        return inp['base_num'] + 100

class SomeCommonFeature(NodeBase):
    name = 'some_feature'

    def extract(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int:
        return ds_value + inp['other_num'] + 10

# Пример Generic-ноды
class GenericVectorizer(NodeBase):
    name = 'some_vectorizer'

    def vectorize(self, feature_value: InputGeneric(NodeLike)) -> int:
        return feature_value + 20

class AnotherFeature(NodeBase):
    name = 'another_feature'

    def extract(self, inp: Input(SomeInput)) -> int:
        return inp['base_num'] + 100_000

# Первый переопределенный подграф
SomeParticularVectorizer = build_node(  # noqa
    GenericVectorizer,
    feature_value=Input(SomeCommonFeature),
)

# Второй переопределенный подграф
AnotherParticularVectorizer = build_node(  # noqa
    GenericVectorizer,
    feature_value=Input(AnotherFeature),
)

class SomeMLModel(NodeBase):
    name = 'some_model'

    def predict(self, vec_value: Input(SomeParticularVectorizer)):
        return (vec_value + 30) / 100

class AnotherMlModel(NodeBase):
    name = 'another_model'

    def predict(self, vec_value: Input(AnotherParticularVectorizer)):
        return (vec_value + 30) / 100

# Первый граф
build_dag(input_node=SomeInput, output_node=SomeMLModel).run(pipeline_context(base_num=10, other_num=5))

# Второй граф
build_dag(input_node=SomeInput, output_node=AnotherMlModel).run(pipeline_context(base_num=10, other_num=5))

Пример указания ретрая

class HierarchyException(Exception):
    pass


class SecondHierarchyException(Exception):
    pass


class SomeMlModel(NodeBase):

    node_type = 'ml_model'
    delay = 1.1
    attempts = 5
    exceptions = (SecondHierarchyException, )

    def predict(self, *args, **kwargs):
        ...

Пример запуска чарта с хранилищем артефактов

from ml_pipeline_engine.artifact_store.store.filesystem import FileSystemArtifactStore


def invert_number(num: float):
    return -num

def add_const(num: Input(invert_number), const: float = 0.1):
    return num + const

def double_number(num: Input(add_const)):
    return num * 2

dag = build_dag(input_node=invert_number, output_node=double_number)

chart = PipelineChart(
   model_name='name',
   entrypoint=dag,
   # Тут может быть указан артефакт стор, который пишет данные в S3
   artifact_store=... if not is_local() else FileSystemArtifactStore,
   # Добавление эвентов позволяет отслеживать полезную нагрузку, результаты и ошибки узлов для построения графиков
   event_managers=[DatabaseEventManager],
)

result = chart.run(
   input_kwargs=dict(
      ...,
   ),
)

Установка либы для разработки

Первоначальная настройка

  1. Устанавливаем Python>=3.8

  2. Устанавливаем зависимости для тестирования:

    pip3 install -e ".[tests]"
    
  3. Подключаем pre-commit hooks

    pre-commit install -f
    
  4. Запуск тестов

    python -m pytest tests
    

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ml_pipeline_engine-1.1.3.tar.gz (27.8 kB view details)

Uploaded Source

Built Distribution

ml_pipeline_engine-1.1.3-py3-none-any.whl (41.1 kB view details)

Uploaded Python 3

File details

Details for the file ml_pipeline_engine-1.1.3.tar.gz.

File metadata

  • Download URL: ml_pipeline_engine-1.1.3.tar.gz
  • Upload date:
  • Size: 27.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.0.0 CPython/3.9.18

File hashes

Hashes for ml_pipeline_engine-1.1.3.tar.gz
Algorithm Hash digest
SHA256 da37bb61494b9198af5b518eabf5c70f88e7bf11a4efa15a37a9d683c859563a
MD5 c28f6dd87adbd14727c3eb6dc42dc93e
BLAKE2b-256 60973c8203bcbe28fdbe15ac5f076fd19528f9ceb584b720de19c35ee908a664

See more details on using hashes here.

File details

Details for the file ml_pipeline_engine-1.1.3-py3-none-any.whl.

File metadata

File hashes

Hashes for ml_pipeline_engine-1.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 23ee4694eecbfe5b2193bdd97951648750f5b6ad4a2690a2dac6a872caeb9c15
MD5 f90d0665fb2a2bad4e16dad7977be891
BLAKE2b-256 130f8cf0026edf4871ce9932a81b18e5a4977a93dd45c9221ae705c06b4be6ef

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page