Skip to main content

Фреймворк для работы с пайплайном ML моделей

Project description

ML Pipeline Engine

Графовый движок для конвейеров ML-моделей

Что нужно, чтобы сделать свой пайплайн?

  1. Написать классы узлов
  2. Связать узлы посредством указания зависимости

Поддерживаемые типы узлов

Протоколы

  1. DataSource
  2. FeatureBase
  3. MLModelBase
  4. ProcessorBase
  5. FeatureVectorizerBase

Примеры

Простейший пример на основе функций

def invert_number(num: float):
    return -num

def add_const(num: Input(invert_number), const: float = 0.1):
    return num + const

def double_number(num: Input(add_const)):
    return num * 2

build_dag(input_node=invert_number, output_node=double_number).run(pipeline_context(num=2.5))

Пример, который реализует переключение нод по условию (Аналог switch)

 def ident(num: float):
     return num

 def switch_node(num: Input(ident)):
     if num < 0.0:
         return 'invert'
     if num == 0.0:
         return 'const'
     if num == 1.0:
         return 'double'
     return 'add_sub_chain'

 def const_noinput():
     return 10.0

 def add_100(num: Input(ident)):
     return num + 100

 def add_some_const(num: Input(ident), const: float = 9.0):
     return num + const

 def invert_number(num: Input(ident)):
     return -num

 def sub_ident(num: Input(add_some_const), num_ident: Input(ident)):
     return num - num_ident

 def double_number(num: Input(ident)):
     return num * 2

 SomeSwitchCase = SwitchCase(
     switch=switch_node,
     cases=[
         ('const', const_noinput),
         ('double', double_number),
         ('invert', invert_number),
         ('add_sub_chain', sub_ident),
     ],
 )

 def case_node(num: SomeSwitchCase, num2: Input(ident), num3: Input(add_100)):
     return num + num2 + num3

 def out(num: Input(case_node)):
     return num

build_dag(input_node=ident, output_node=out).run(pipeline_context(num=input_num))

Пример с использованием классов как узлов графа

class SomeInput(NodeBase):
    name = 'input'

    def process(self, base_num: int, other_num: int) -> dict:
        return {
            'base_num': base_num,
            'other_num': other_num,
        }

class SomeDataSource(NodeBase):
    name = 'some_data_source'

    def collect(self, inp: Input(SomeInput)) -> int:
        return inp['base_num'] + 100

class SomeFeature(NodeBase):
    name = 'some_feature'

    def extract(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int:
        return ds_value + inp['other_num'] + 10

class SomeVectorizer(NodeBase):
    name = 'some_vectorizer'

    def vectorize(self, feature_value: Input(SomeFeature)) -> int:
        return feature_value + 20

class SomeMLModel(NodeBase):
    name = 'some_model'

    def predict(self, vec_value: Input(SomeVectorizer)):
        return (vec_value + 30) / 100


build_dag(input_node=SomeInput, output_node=SomeMLModel).run(pipeline_context(base_num=10, other_num=5))

Пример с использованием Generic-нод, которые под собой хранят общее поведение и не зависимы от конкретной модели

class SomeInput(NodeBase):
    name = 'input'

    def process(self, base_num: int, other_num: int) -> dict:
        return {
            'base_num': base_num,
            'other_num': other_num,
        }

class SomeDataSource(NodeBase):
    name = 'some_data_source'

    def collect(self, inp: Input(SomeInput)) -> int:
        return inp['base_num'] + 100

class SomeCommonFeature(NodeBase):
    name = 'some_feature'

    def extract(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int:
        return ds_value + inp['other_num'] + 10

# Пример Generic-ноды
class GenericVectorizer(NodeBase):
    name = 'some_vectorizer'

    def vectorize(self, feature_value: InputGeneric(NodeLike)) -> int:
        return feature_value + 20

class AnotherFeature(NodeBase):
    name = 'another_feature'

    def extract(self, inp: Input(SomeInput)) -> int:
        return inp['base_num'] + 100_000

# Первый переопределенный подграф
SomeParticularVectorizer = build_node(  # noqa
    GenericVectorizer,
    feature_value=Input(SomeCommonFeature),
)

# Второй переопределенный подграф
AnotherParticularVectorizer = build_node(  # noqa
    GenericVectorizer,
    feature_value=Input(AnotherFeature),
)

class SomeMLModel(NodeBase):
    name = 'some_model'

    def predict(self, vec_value: Input(SomeParticularVectorizer)):
        return (vec_value + 30) / 100

class AnotherMlModel(NodeBase):
    name = 'another_model'

    def predict(self, vec_value: Input(AnotherParticularVectorizer)):
        return (vec_value + 30) / 100

# Первый граф
build_dag(input_node=SomeInput, output_node=SomeMLModel).run(pipeline_context(base_num=10, other_num=5))

# Второй граф
build_dag(input_node=SomeInput, output_node=AnotherMlModel).run(pipeline_context(base_num=10, other_num=5))

Пример указания ретрая

class HierarchyException(Exception):
    pass


class SecondHierarchyException(Exception):
    pass


class SomeMlModel(NodeBase):

    node_type = 'ml_model'
    delay = 1.1
    attempts = 5
    exceptions = (SecondHierarchyException, )

    def predict(self, *args, **kwargs):
        ...

Пример запуска чарта с хранилищем артефактов

from ml_pipeline_engine.artifact_store.store.filesystem import FileSystemArtifactStore


def invert_number(num: float):
    return -num

def add_const(num: Input(invert_number), const: float = 0.1):
    return num + const

def double_number(num: Input(add_const)):
    return num * 2

dag = build_dag(input_node=invert_number, output_node=double_number)

chart = PipelineChart(
   model_name='name',
   entrypoint=dag,
   # Тут может быть указан артефакт стор, который пишет данные в S3
   artifact_store=... if not is_local() else FileSystemArtifactStore,
   # Добавление эвентов позволяет отслеживать полезную нагрузку, результаты и ошибки узлов для построения графиков
   event_managers=[DatabaseEventManager],
)

result = chart.run(
   input_kwargs=dict(
      ...,
   ),
)

Установка либы для разработки

Первоначальная настройка

  1. Устанавливаем Python>=3.8

  2. Устанавливаем зависимости для тестирования:

    pip3 install -e ".[tests]"
    
  3. Подключаем pre-commit hooks

    pre-commit install -f
    
  4. Запуск тестов

    python -m pytest tests
    

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ml_pipeline_engine-1.2.1.tar.gz (27.8 kB view details)

Uploaded Source

Built Distribution

ml_pipeline_engine-1.2.1-py3-none-any.whl (41.1 kB view details)

Uploaded Python 3

File details

Details for the file ml_pipeline_engine-1.2.1.tar.gz.

File metadata

  • Download URL: ml_pipeline_engine-1.2.1.tar.gz
  • Upload date:
  • Size: 27.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.0.0 CPython/3.9.18

File hashes

Hashes for ml_pipeline_engine-1.2.1.tar.gz
Algorithm Hash digest
SHA256 a25fda58f243be52a70ccc63537885c8f716b66b179cda70b856a5fb376a7b86
MD5 c6a0f951eedde9b8306175428a559879
BLAKE2b-256 29272e43f50d18a6b7cc91aef961a50a7f49dc5da766cba57fa61b4d42e4dc25

See more details on using hashes here.

File details

Details for the file ml_pipeline_engine-1.2.1-py3-none-any.whl.

File metadata

File hashes

Hashes for ml_pipeline_engine-1.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 6a07894526fdfb707cc8e6ea70737c2cf2aaa696f802fa932e9426a4db4d4d8a
MD5 780ffccd04072e424f1f5e07316aa79d
BLAKE2b-256 1f5d2ebc2bf5ee5cc175bb39b48b3f59195b27b5a8bab7881355cd2410301215

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page