Skip to main content

Фреймворк для работы с пайплайном ML моделей

Project description

ML Pipeline Engine

Графовый движок для конвейеров ML-моделей

Что нужно, чтобы сделать свой пайплайн?

  1. Написать классы узлов
  2. Связать узлы посредством указания зависимости

Поддерживаемые типы узлов

Протоколы

  1. DataSource
  2. FeatureBase
  3. MLModelBase
  4. ProcessorBase
  5. FeatureVectorizerBase

Примеры

Простейший пример на основе функций

def invert_number(num: float):
    return -num

def add_const(num: Input(invert_number), const: float = 0.1):
    return num + const

def double_number(num: Input(add_const)):
    return num * 2

build_dag(input_node=invert_number, output_node=double_number).run(pipeline_context(num=2.5))

Пример, который реализует переключение нод по условию (Аналог switch)

 def ident(num: float):
     return num

 def switch_node(num: Input(ident)):
     if num < 0.0:
         return 'invert'
     if num == 0.0:
         return 'const'
     if num == 1.0:
         return 'double'
     return 'add_sub_chain'

 def const_noinput():
     return 10.0

 def add_100(num: Input(ident)):
     return num + 100

 def add_some_const(num: Input(ident), const: float = 9.0):
     return num + const

 def invert_number(num: Input(ident)):
     return -num

 def sub_ident(num: Input(add_some_const), num_ident: Input(ident)):
     return num - num_ident

 def double_number(num: Input(ident)):
     return num * 2

 SomeSwitchCase = SwitchCase(
     switch=switch_node,
     cases=[
         ('const', const_noinput),
         ('double', double_number),
         ('invert', invert_number),
         ('add_sub_chain', sub_ident),
     ],
 )

 def case_node(num: SomeSwitchCase, num2: Input(ident), num3: Input(add_100)):
     return num + num2 + num3

 def out(num: Input(case_node)):
     return num

build_dag(input_node=ident, output_node=out).run(pipeline_context(num=input_num))

Пример с использованием классов как узлов графа

class SomeInput(NodeBase):
    name = 'input'

    def process(self, base_num: int, other_num: int) -> dict:
        return {
            'base_num': base_num,
            'other_num': other_num,
        }

class SomeDataSource(NodeBase):
    name = 'some_data_source'

    def collect(self, inp: Input(SomeInput)) -> int:
        return inp['base_num'] + 100

class SomeFeature(NodeBase):
    name = 'some_feature'

    def extract(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int:
        return ds_value + inp['other_num'] + 10

class SomeVectorizer(NodeBase):
    name = 'some_vectorizer'

    def vectorize(self, feature_value: Input(SomeFeature)) -> int:
        return feature_value + 20

class SomeMLModel(NodeBase):
    name = 'some_model'

    def predict(self, vec_value: Input(SomeVectorizer)):
        return (vec_value + 30) / 100


build_dag(input_node=SomeInput, output_node=SomeMLModel).run(pipeline_context(base_num=10, other_num=5))

Пример с использованием Generic-нод, которые под собой хранят общее поведение и не зависимы от конкретной модели

class SomeInput(NodeBase):
    name = 'input'

    def process(self, base_num: int, other_num: int) -> dict:
        return {
            'base_num': base_num,
            'other_num': other_num,
        }

class SomeDataSource(NodeBase):
    name = 'some_data_source'

    def collect(self, inp: Input(SomeInput)) -> int:
        return inp['base_num'] + 100

class SomeCommonFeature(NodeBase):
    name = 'some_feature'

    def extract(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int:
        return ds_value + inp['other_num'] + 10

# Пример Generic-ноды
class GenericVectorizer(NodeBase):
    name = 'some_vectorizer'

    def vectorize(self, feature_value: InputGeneric(NodeLike)) -> int:
        return feature_value + 20

class AnotherFeature(NodeBase):
    name = 'another_feature'

    def extract(self, inp: Input(SomeInput)) -> int:
        return inp['base_num'] + 100_000

# Первый переопределенный подграф
SomeParticularVectorizer = build_node(  # noqa
    GenericVectorizer,
    feature_value=Input(SomeCommonFeature),
)

# Второй переопределенный подграф
AnotherParticularVectorizer = build_node(  # noqa
    GenericVectorizer,
    feature_value=Input(AnotherFeature),
)

class SomeMLModel(NodeBase):
    name = 'some_model'

    def predict(self, vec_value: Input(SomeParticularVectorizer)):
        return (vec_value + 30) / 100

class AnotherMlModel(NodeBase):
    name = 'another_model'

    def predict(self, vec_value: Input(AnotherParticularVectorizer)):
        return (vec_value + 30) / 100

# Первый граф
build_dag(input_node=SomeInput, output_node=SomeMLModel).run(pipeline_context(base_num=10, other_num=5))

# Второй граф
build_dag(input_node=SomeInput, output_node=AnotherMlModel).run(pipeline_context(base_num=10, other_num=5))

Пример указания ретрая

class HierarchyException(Exception):
    pass


class SecondHierarchyException(Exception):
    pass


class SomeMlModel(NodeBase):

    node_type = 'ml_model'
    delay = 1.1
    attempts = 5
    exceptions = (SecondHierarchyException, )

    def predict(self, *args, **kwargs):
        ...

Пример запуска чарта с хранилищем артефактов

from ml_pipeline_engine.artifact_store.store.filesystem import FileSystemArtifactStore


def invert_number(num: float):
    return -num

def add_const(num: Input(invert_number), const: float = 0.1):
    return num + const

def double_number(num: Input(add_const)):
    return num * 2

dag = build_dag(input_node=invert_number, output_node=double_number)

chart = PipelineChart(
   model_name='name',
   entrypoint=dag,
   # Тут может быть указан артефакт стор, который пишет данные в S3
   artifact_store=... if not is_local() else FileSystemArtifactStore,
   # Добавление эвентов позволяет отслеживать полезную нагрузку, результаты и ошибки узлов для построения графиков
   event_managers=[DatabaseEventManager],
)

result = chart.run(
   input_kwargs=dict(
      ...,
   ),
)

Визуализация пайплайна

ml_pipeline_engine build-static --dag_path 'ml_pipeline_engine.visualization.sample:sample_dag' --target_dir ./public

Установка либы для разработки

Первоначальная настройка

  1. Устанавливаем Python>=3.8

  2. Устанавливаем зависимости для тестирования:

    pip3 install -e ".[tests]"
    
  3. Подключаем pre-commit hooks

    pre-commit install -f
    
  4. Запуск тестов

    python -m pytest tests
    

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ml_pipeline_engine-1.6.0.tar.gz (1.3 MB view details)

Uploaded Source

Built Distribution

ml_pipeline_engine-1.6.0-py3-none-any.whl (1.4 MB view details)

Uploaded Python 3

File details

Details for the file ml_pipeline_engine-1.6.0.tar.gz.

File metadata

  • Download URL: ml_pipeline_engine-1.6.0.tar.gz
  • Upload date:
  • Size: 1.3 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.0.0 CPython/3.9.19

File hashes

Hashes for ml_pipeline_engine-1.6.0.tar.gz
Algorithm Hash digest
SHA256 da0a8bb21c57f8d4ca07ec05b535a99be3cc3d85ac83c5f99eaaed78b9f80dc6
MD5 b66469e0bc150d3fb35b27867794cf3e
BLAKE2b-256 5a17796461af77accbfad63c4848868439a8cfc0f5969640c2499aa2e22725c0

See more details on using hashes here.

File details

Details for the file ml_pipeline_engine-1.6.0-py3-none-any.whl.

File metadata

File hashes

Hashes for ml_pipeline_engine-1.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 27746a44610a564b74406e3baabdd76022c4a04b371ab20d881634823c286130
MD5 b42d887657c0addd08376cf7ddb882ed
BLAKE2b-256 50073a879f463a18638d9dd8d5d8757c27d6478ec56a758857359021f6a331ec

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page