Pipeline library for AI workflows.
Project description
Octopipes
Octopipes is a pipeline library for AI workflows. Not only it allows for easy definition of multi-step pipelines, but also handles testing and collection of information of multiple workflows.
pip install octopipes
Introduction
When using multiple-step (AI) pipelines, octopipes helps you define workflows and work with workflows in an easy way. It allows for instance, adding post-workflow hooks that can clean up GPU memory (or anything else).
Workflows are defined through the Workflow
class, each workflow can contain a chain of processes
(some common ones are already defined).
Every step of the pipeline is saved and can be used later. For easier managements of different outputs, we can provide an output handler for each
specific step (Some basic ones are already defined as well such BboxesHandler
etc.).
To run multiple workflows on the same input, you can use the AggregateFlows
class for that. The library also provides a way to read datasets
and run benchmarks
To keep octopipes
ML library agnostic, it does not require pytorch
or tensorflow
to be installed
as it can work with both just fine.
Get started
Workflows
To add steps to a workflow, the class provides the add
method which takes as input the function (process) and optionally
an OutputHandler. The whole workflow will act as the |
(pipe) operator in Unix terminals by successively feeding the output of a process as
the input of the next process.
p1 -> p2 -> p3
This shows how to define a simple workflow:
from octopipes.workflow import Workflow
# Define a workflow with two steps
wf = Workflow('wf_name').add(lambda x: x ** 2).add(lambda x: x - 4)
print(wf.nsteps)
# output: 1
# We can now run the workflow on a specific starting input value (in this case 4)
# `wf` will first run the input on the first function x ** 2, then run the second x - 4 with the result of the previous step.
# So in the first step of the iteration the result will be 16 (4 ** 2) then 12 (16 - 4)
wf_iter = wf(4)
for result in wf_iter:
pass
# return a frozen instance of the workflow run.
# This is especially important, when working with memory intensive GPU workflows
frozen_res = wf_iter.freeze()
# Get the duration recap of each step of the workflow
wf_iter.recap()
# Define a workflow with some metadata attached
# this metadata can then be used to differentiate
# workflows with the same name but different params
wf = Workflow('wf_name', metadata={'thresh': 0.4}).add(lambda x: x ** 2)
Output handlers
When adding a new step that outputs a certain results that you want to be processed in a particular way, you can pass a class that
implements the OutputHandler
interface.
The interface has 3 methods:
- output_on_image: used when outputting the results on an image (Used in computer vision mostly)
- len_output: give the output size of the result
- to_json: returns a serialized json object of the result
The library already provides some basic ones such as:
- BboxesHandler
- SegmentationMaskHandler
- CmapBboxesHandler
- CirclesHandler
Handlers are added this way. If None were supplied, DefaultHandler
is used. (In most cases a handler needs to be passed)
wf = Workflow('wf_name').add(some_func, some_handler)
AggregateFlows
AggregateFlows
allows running multiple workflows on the same input. This is usually used when either benchmarking multiple
pipelines at the same time or wanting to select the "best" output out of different workflows.
from octopipes.workflow import Workflow
from octopipes.aggreage_flows import AggregateFlows
wf1 = Workflow('wf_name').add(some_func, some_handler)
wf2 = Workflow('wf_name').add(some_func, some_handler)
flows = AggregateFlows(input, workflows=[wf1, wf2])
flows.run_workflows()
# get results for wf1
flows.results[0]
Benchmark
As the name suggest, Benchmark
allows testing your workflows on a dataset and then being able to calculate easily your metrics.
Depending on the batch size of the dataset loader, the tests will be run simultaneously (as many processes as the batch size). Take note
however that as of now, a single AggregateFlow
is run synchronously.
If you're using pytorch
or tensorflow
, some memory freeing hooks might be needed. For that, you can pass an instance of
DefaultAggregateFlowsFactory
with the specific hooks needed. Otherwise, you can always define implement your own AggregateFlowsFactory
.
Here's a simple example:
from octopipes.workflow import Workflow
from octopipes.benchmark import Benchmark
dataloader = ...
wf1 = Workflow('wf1_name').add(some_func, some_handler)
wf2 = Workflow('wf2_name').add(some_func, some_handler)
bench = Benchmark(dataloader=dataloader, workflows=[wf1, wf2])
bench.run_tests()
Requirements & Installation
The module is tested against versions >=3.10
. However, this requirement is due to using type hinting so the module can be
altered to work on lower version of the interpreter.
Installing the package is pretty standard:
git clone https://github.com/octomiro/octopipes
pip install -r requirements.txt
# or requirements.in
pip install -r dev-requirements.in
# Running tests
pytest
# or
tox
Contributions
PRs are more than welcome! If the change is big enough to require some discussion, it's better to open an issue for it. To keep the history of the repo clean, all PRs are rebased instead of merged so make sure everything is correct be submitting anything.
Why Octopipes?
This library was born from a need to define, benchmark, and debug in an easy way workflows that use foundational models. In the course of our work, we did not find something that met that need in terms of flexibility or features. So we created octopipes internally, and decided to open source it.
Octopipes is developed and maintained by octomiro, an AI company that makes ERP systems intelligent.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file octopipes-0.1.0.tar.gz
.
File metadata
- Download URL: octopipes-0.1.0.tar.gz
- Upload date:
- Size: 13.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 121616ea218fc566829893a6cb7fe7448047a23ed0800d88c35328e3a595b694 |
|
MD5 | e4e402430e8b367f6af34f035f7d7c7d |
|
BLAKE2b-256 | afc384e7a1a718bfb37a3f7186535c11e2e9113b63f6263cded2213851619522 |
File details
Details for the file octopipes-0.1.0-py3-none-any.whl
.
File metadata
- Download URL: octopipes-0.1.0-py3-none-any.whl
- Upload date:
- Size: 10.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4b22adb6718397ff2f03e4660a37c4d6618928683b35068fee20a7f9115c02fe |
|
MD5 | 117027c37defb3a74875633d2349ddc3 |
|
BLAKE2b-256 | 393da9fc67c5ae25d471289b84a13260c45ea1be2d4212a19763f2fc07f201ed |