Фреймворк для работы с пайплайном ML моделей
Project description
ML Pipeline Engine
Графовый движок для конвейеров ML-моделей
Что нужно, чтобы сделать свой пайплайн?
- Написать классы узлов
- Связать узлы посредством указания зависимости
Поддерживаемые типы узлов
Примеры
Простейший пример на основе функций
def invert_number(num: float):
return -num
def add_const(num: Input(invert_number), const: float = 0.1):
return num + const
def double_number(num: Input(add_const)):
return num * 2
build_dag(input_node=invert_number, output_node=double_number).run(pipeline_context(num=2.5))
Пример, который реализует переключение нод по условию (Аналог switch)
def ident(num: float):
return num
def switch_node(num: Input(ident)):
if num < 0.0:
return 'invert'
if num == 0.0:
return 'const'
if num == 1.0:
return 'double'
return 'add_sub_chain'
def const_noinput():
return 10.0
def add_100(num: Input(ident)):
return num + 100
def add_some_const(num: Input(ident), const: float = 9.0):
return num + const
def invert_number(num: Input(ident)):
return -num
def sub_ident(num: Input(add_some_const), num_ident: Input(ident)):
return num - num_ident
def double_number(num: Input(ident)):
return num * 2
SomeSwitchCase = SwitchCase(
switch=switch_node,
cases=[
('const', const_noinput),
('double', double_number),
('invert', invert_number),
('add_sub_chain', sub_ident),
],
)
def case_node(num: SomeSwitchCase, num2: Input(ident), num3: Input(add_100)):
return num + num2 + num3
def out(num: Input(case_node)):
return num
build_dag(input_node=ident, output_node=out).run(pipeline_context(num=input_num))
Пример с использованием классов как узлов графа
class SomeInput(NodeBase):
name = 'input'
def process(self, base_num: int, other_num: int) -> dict:
return {
'base_num': base_num,
'other_num': other_num,
}
class SomeDataSource(NodeBase):
name = 'some_data_source'
def collect(self, inp: Input(SomeInput)) -> int:
return inp['base_num'] + 100
class SomeFeature(NodeBase):
name = 'some_feature'
def extract(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int:
return ds_value + inp['other_num'] + 10
class SomeVectorizer(NodeBase):
name = 'some_vectorizer'
def vectorize(self, feature_value: Input(SomeFeature)) -> int:
return feature_value + 20
class SomeMLModel(NodeBase):
name = 'some_model'
def predict(self, vec_value: Input(SomeVectorizer)):
return (vec_value + 30) / 100
build_dag(input_node=SomeInput, output_node=SomeMLModel).run(pipeline_context(base_num=10, other_num=5))
Пример с использованием Generic-нод, которые под собой хранят общее поведение и не зависимы от конкретной модели
class SomeInput(NodeBase):
name = 'input'
def process(self, base_num: int, other_num: int) -> dict:
return {
'base_num': base_num,
'other_num': other_num,
}
class SomeDataSource(NodeBase):
name = 'some_data_source'
def collect(self, inp: Input(SomeInput)) -> int:
return inp['base_num'] + 100
class SomeCommonFeature(NodeBase):
name = 'some_feature'
def extract(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int:
return ds_value + inp['other_num'] + 10
# Пример Generic-ноды
class GenericVectorizer(NodeBase):
name = 'some_vectorizer'
def vectorize(self, feature_value: InputGeneric(NodeLike)) -> int:
return feature_value + 20
class AnotherFeature(NodeBase):
name = 'another_feature'
def extract(self, inp: Input(SomeInput)) -> int:
return inp['base_num'] + 100_000
# Первый переопределенный подграф
SomeParticularVectorizer = build_node( # noqa
GenericVectorizer,
feature_value=Input(SomeCommonFeature),
)
# Второй переопределенный подграф
AnotherParticularVectorizer = build_node( # noqa
GenericVectorizer,
feature_value=Input(AnotherFeature),
)
class SomeMLModel(NodeBase):
name = 'some_model'
def predict(self, vec_value: Input(SomeParticularVectorizer)):
return (vec_value + 30) / 100
class AnotherMlModel(NodeBase):
name = 'another_model'
def predict(self, vec_value: Input(AnotherParticularVectorizer)):
return (vec_value + 30) / 100
# Первый граф
build_dag(input_node=SomeInput, output_node=SomeMLModel).run(pipeline_context(base_num=10, other_num=5))
# Второй граф
build_dag(input_node=SomeInput, output_node=AnotherMlModel).run(pipeline_context(base_num=10, other_num=5))
Пример указания ретрая
class HierarchyException(Exception):
pass
class SecondHierarchyException(Exception):
pass
class SomeMlModel(NodeBase):
node_type = 'ml_model'
delay = 1.1
attempts = 5
exceptions = (SecondHierarchyException, )
def predict(self, *args, **kwargs):
...
Пример запуска чарта с хранилищем артефактов
from ml_pipeline_engine.artifact_store.store.filesystem import FileSystemArtifactStore
def invert_number(num: float):
return -num
def add_const(num: Input(invert_number), const: float = 0.1):
return num + const
def double_number(num: Input(add_const)):
return num * 2
dag = build_dag(input_node=invert_number, output_node=double_number)
chart = PipelineChart(
model_name='name',
entrypoint=dag,
# Тут может быть указан артефакт стор, который пишет данные в S3
artifact_store=... if not is_local() else FileSystemArtifactStore,
# Добавление эвентов позволяет отслеживать полезную нагрузку, результаты и ошибки узлов для построения графиков
event_managers=[DatabaseEventManager],
)
result = chart.run(
input_kwargs=dict(
...,
),
)
Установка либы для разработки
Первоначальная настройка
-
Устанавливаем Python>=3.8
-
Устанавливаем зависимости для тестирования:
pip3 install -e ".[tests]"
-
Подключаем pre-commit hooks
pre-commit install -f
-
Запуск тестов
python -m pytest tests
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
ml_pipeline_engine-1.1.0.tar.gz
(27.8 kB
view details)
Built Distribution
File details
Details for the file ml_pipeline_engine-1.1.0.tar.gz
.
File metadata
- Download URL: ml_pipeline_engine-1.1.0.tar.gz
- Upload date:
- Size: 27.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.9.18
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | b012dab24987c12cdcbcf0f38528c5ff6f519c05865ca2dfccfabdf8c0f1b2b2 |
|
MD5 | 7ab88794162d7179ec0c3679ce5df67e |
|
BLAKE2b-256 | 2cb13f637e229e64f5b391167aaf6e103e3e4bce1c39498c2f5c8e00f9e2b14f |
File details
Details for the file ml_pipeline_engine-1.1.0-py3-none-any.whl
.
File metadata
- Download URL: ml_pipeline_engine-1.1.0-py3-none-any.whl
- Upload date:
- Size: 41.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.9.18
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 509c3e6162be717ff0667e0bbd57ddd496bf1171ab3f95124a9bc870f9574a36 |
|
MD5 | 87680b5df0984e5c8c6d385e14316283 |
|
BLAKE2b-256 | 6cdc23c130ef751e4d0d35ba7139366795b4d3c6735f8978d0345f29baff459c |