Pipelines for machine learning workloads.
Project description
Pipeline SDK

Created by mystic.ai
Find loads of premade models in in production for free in Catalyst: https://www.mystic.ai/explore
Table of Contents
About
Pipeline is a python library that provides a simple way to construct computational graphs for AI/ML. The library is suitable for both development and production environments supporting inference and training/finetuning. This library is also a direct interface to Catalyst which provides a compute engine to run pipelines at scale and on enterprise GPUs. Along with Catalyst, this SDK can also be used with Pipeline Core on a private hosted cluster.
The syntax used for defining AI/ML pipelines shares some similarities in syntax to sessions in Tensorflow v1, and Flows found in Prefect. In future releases we will be moving away from this syntax to a C based graph compiler which interprets python directly (and other languages) allowing users of the API to compose graphs in a more native way to the chosen language.
Installation
:warning: You must be using
python==3.10
.
python -m pip install pipeline-ai
Models
Below are some popular models that have been premade by the community on Catalyst. You can find more models in the explore section of Catalyst, and the source code for these models is also referenced in the table.
Model | Category | Description | Source |
---|---|---|---|
meta/llama2-7B | LLM | A 7B parameter LLM created by Meta (vllm accelerated) | source |
meta/llama2-13B | LLM | A 13B parameter LLM created by Meta (vllm accelerated) | source |
meta/llama2-70B | LLM | A 70B parameter LLM created by Meta (vllm accelerated) | source |
runwayml/stable-diffusion-1.5 | Vision | Text -> Image | source |
stabilityai/stable-diffusion-xl-refiner-1.0 | Vision | SDXL Text -> Image | source |
matthew/e5_large-v2 | LLM | Text embedding | source |
matthew/musicgen_large | Audio | Music generation | source |
matthew/blip | Vision | Image captioning | source |
Example and tutorials
Tutorial | Description |
---|---|
Keyword schemas | Set default, min, max, and various other constraints on your inputs with schemas |
Entity objects | Use entity objects to persist values and store things |
Cold start optimisations | Premade functions to do heavy tasks seperately |
Input/output types | Defining what goes in and out of your pipes |
Files and directories | Inputing or outputing files from your runs |
Pipeline building | Building pipelines - how it works |
Virtual environments | Creating a virtual environment for your pipeline to run in |
GPUs and Accelerators | Add hardware definitions to your pipelines |
Runs | Running a pipeline remotely - how it works |
Below is some sample python that demonstrates various features and how to use the Pipeline SDK to create a simple pipeline that can be run locally or on Catalyst.
from pathlib import Path
from typing import List
import torch
from diffusers import StableDiffusionPipeline
from pipeline import Pipeline, Variable, pipe, entity
from pipeline.cloud import compute_requirements, environments, pipelines
from pipeline.objects import File
from pipeline.objects.graph import InputField, InputSchema
class ModelKwargs(InputSchema): # TUTORIAL: Keyword schemas
height: int | None = InputField(default=512, ge=64, le=1024)
width: int | None = InputField(default=512, ge=64, le=1024)
num_inference_steps: int | None = InputField(default=50)
num_images_per_prompt: int | None = InputField(default=1, ge=1, le=4)
guidance_scale: int | None = InputField(default=7.5)
@entity # TUTORIAL: Entity objects
class StableDiffusionModel:
@pipe(on_startup=True, run_once=True) # TUTORIAL: Cold start optimisations
def load(self):
model_id = "runwayml/stable-diffusion-v1-5"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.pipe = StableDiffusionPipeline.from_pretrained(
model_id,
)
self.pipe = self.pipe.to(device)
@pipe
def predict(self, prompt: str, kwargs: ModelKwargs) -> List[File]: # TUTORIAL: Input/output types
defaults = kwargs.to_dict()
images = self.pipe(prompt, **defaults).images
output_images = []
for i, image in enumerate(images):
path = Path(f"/tmp/sd/image-{i}.jpg")
path.parent.mkdir(parents=True, exist_ok=True)
image.save(str(path))
output_images.append(File(path=path, allow_out_of_context_creation=True)) # TUTORIAL: Files
return output_images
with Pipeline() as builder: # TUTORIAL: Pipeline building
prompt = Variable(str)
kwargs = Variable(ModelKwargs)
model = StableDiffusionModel()
model.load()
output = model.predict(prompt, kwargs)
builder.output(output)
my_pl = builder.get_pipeline()
environments.create_environment( # TUTORIAL: Virtual environments
"stable-diffusion",
python_requirements=[
"torch==2.0.1",
"transformers==4.30.2",
"diffusers==0.19.3",
"accelerate==0.21.0",
],
)
pipelines.upload_pipeline(
my_pl,
"stable-diffusion:latest",
environment_id_or_name="stable-diffusion",
required_gpu_vram_mb=10_000,
accelerators=[
compute_requirements.Accelerator.nvidia_l4, # TUTORIAL: GPUs and Accelerators
],
)
output = pipelines.run_pipeline( # TUTORIAL: Runs
"stable-diffusion:latest",
prompt="A photo of a cat",
kwargs=dict(),
)
Development
This project is made with poetry, so firstly setup poetry on your machine.
Once that is done, please run
./setup.sh
With this you should be good to go. This sets up dependencies, pre-commit hooks and pre-push hooks.
You can manually run pre commit hooks with
pre-commit run --all-files
To run tests manually please run
pytest
License
Pipeline is licensed under Apache Software License Version 2.0.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for pipeline_ai-1.0.26-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 69b03a0d5a2e362b076158b4d287ce1abcc34b54cb7a3d5048852b9455f44982 |
|
MD5 | 78cc27d5d6235b949ff6424abadd70ce |
|
BLAKE2b-256 | ac64da68d8a7c1015bb96d063f5ab054aea0329a9c5a4a7e906221359bbc6704 |