Data pipeline
Project description
Pipeline Vilma
Introduction
Pipeline Vilma is a Data Pipeline designed to be used in Machine Learning projects. The main goal is to create a pipeline which allows user to only annotate information and let the process to the pipeline.
The design is based on components communicating through messages, therefore it is possible to use them all together, as we design or connect the components to your current data pipeline.
Components
Provider
class Provider:
"""
An abstract class used to represent a data provider.
Examples of data providers:
- RGB Camera, providing frames
- Humidity sensor, providing actual humidity level
- Termographic camera, providing a heatmap
...
Attributes
----------
queue_server : str
the IPv4 of a RabbitMQ queue server
queue_output : str
the name of the queue where provider will publish their data
sensor_id : str
an unique identification of the provider, e.g, rgbcamera, microphone, etc
Methods
-------
pack_data(self, data_type, content)
Based on the data type, choose the right package format for the content
run(self, loop_interval=0.016)
Main loop method that should be implemented, in order to describe a provider
behaviour
forward(self, message)
Forward the package to the next component of the pipeline (queue_output)
"""
Collector
class Collector:
"""
An abstract class used to represent a data collector.
The component is the responsible for creating a dataset from providers input
...
Attributes
----------
bypass : bool
some applications cannot store data, and or create dataset for privacy
reasons, therefore collector can work in bypass mode where it only forwards
the received data
ratio : floatclass
the percentage (between [0,1]) number of samples to be collected, i.e.,
if "ratio" is set to 0.5, half of the samples received on collector will be
stored for dataset creation
queue_server : str
the IPv4 of a RabbitMQ queue server
queue_input: str
the name of the queue from where collector consume
queue_output : str
the name of the queue where provider will publish their data
data_api_config: dict
expects a Python dictionary with the endpoints to map the class to an user
implemented API. It allows the component to support different data API
solutions (like different databases, languages, implementations) as long
as the expected endpoints are supported.
Expected dictionary:
{
"base_url": "http://192.168.0.1" # IP where the API is running
"port": 3333, # The API's server portclass
"endpoints": {
"add_item": "/collection",
"get_item": "/collection/<ID>",
"mark_trained": "/collection/<ID>",
"count_labeled": "/collector/counter",
"get_training_set": "/dataset/"
}
}
Endpoints description:
- add_item
- Method type: HTTP POST
- URL format: http://host:port/resource
- Data: support the JSON messager structure, e.g. ImageBase64
- Request example:
- http://192.168.0.1:3333/collection
- Data: {"x": data, "y": data }
- Response example:
- The JSON messager sent
- get_item
- Method type: HTTP GET
- URL format: http://host:port/resource/<ID>
- Request example:
- http://192.168.0.1:3333/collection/1
- Response example:
- A JSON pipeline supported message item
- mark_trained
- Method type: HTTP PUT
- URL format: http://host:port/resource/<ID>
- Data: {"status": "readyForTraining"}
- Request example:
- http://192.168.0.1:3333/collection/1
- Data: {"status": "readyForTraining"}
- Response example:
{"message": "success"}
- count_labeled:
- Method type: HTTP GET
- URL format: http://host:port/resource
- Request example:
- http://192.168.0.1:3333/collection-counter
- Response example:
- { "readyForTraining": 123 }
- get_training_set:
- Method type: HTTP GET
- URL format: http://host:port/resource
- Request example:
- http://192.168.0.1:3333/training-set
- Response example:
- { "readyForTraining": ["itemId1", "itemId2"] }
Methods
-------
run(self)
Starts consuming collectors input queue
on_new_message(self, ch, method, properties, body)
Based on component's configuration, perform the actions on each
received message.
Finally, forwards the current message state to the next component
of the pipeline.
receive(self, callback)
Creates a listener on collector's input queue
forward(self, sensor_id, message)
Forward the message to an individual queue related to the unique
sensor id
"""
CollectorDataApi
class CollectorDataApi:
"""
A class used to link an HTTP Data API to Collector component.
It is reponsible to allow users to create different APIs
implementation and by that the usage of any database.
To connect it to the pipeline, it is required the
implementation of the APIs described in Collector.
...
Statics
----------
STATUS_FOR_LABELING : str
default value: 'labeling-required'
an internal status used to notify other components about a
data item waiting for labeling
STATUS_READY_FOR_TRAINING : str
default value: 'readyForTraining'
an internal status used to notify other components
about a data item ready for training
Attributes
----------
storage_api : dict
expects a Python dictionary with the endpoints to an API
implemented to store the data on a database or any other
persistence system
An API, persisting the data on a MongoDB is available on the examples.
Methods
-------
def _add_labeling_properties(self, message)
Include on the message the labeling status and creation timestamp
def _insert_on_db(self, message)
Make a request to the API to persist the message
def add_item(self, message)
def get_item(self, item_id)
def mark_trained(self, item_id, status)
def get_training_data(self)
def count_labeled(self)
def store(self, message)
public method responsible to execute the private ones and store
the message on the database
"""
FileManager
class FileManager:
"""
A class used to create and manage required files for training and evaluation
process.
To connect it to the pipeline, it is required the implementation of the API
endpoints described in CollectorDataApi.
...
Attributes
----------
queue_server : str
the IPv4 of a RabbitMQ queue server
queue_output : str
the name of the queue where FileManager will publish their data
retrain_counter : number
FileManager is designed to work together with a Trainer component,
therefore, it is possible to define a minimum number of files to trigger
this interaction. If retrain_counter is set to 20, every new 20 labeled
instances are ready for training, this data is turned into physical files
and then forwarded to training process.
data_api : dict
expects a Python dictionary with the endpoints to an API implemented
to store the data on a database or any other persistence system
An API, persisting the data on a MongoDB is available on the examples.
Methods
-------
def run(self, loop_interval=0.016)
Run the component with the following behaviour:
- Get the number of ready for training items through DataApi
- In case, the number of items is greater than the retrain_counter, download
the data and forward it to the next component which is usually a training
component
- Mark the forwarded items as sent to training
def mark_data_as_included_in_the_training_process(self, item_ids)
Set the items on the DataApi as "already in training set"
def _download_data_and_labels(self)
Download the information from DataApi and call a method to be implemented to
store this data in a physical file, i.e., it is expected from user
to develop how the file should be, e.g. for images an JPG, for sound MP3,
etc.
def store_on_filemanager(self, name, x, y)
Store an item identified by its name, together with its X data, e.g., for
image the Bas64 encoded image
and its Y, e.g., for image detection, the annotated bounding boxes
def get_dataset_paths(self)
Provide the absolute path where the items and their annotations where
stored. It is expected to return an tuple in the format (items, labels)
def forward_to_trainer(self, message)
Forward the set of ready items to a training component
"""
Instancer
class Instancer:
"""
An abstract class used to create instances based on multiple sensors (providers)
The instancer is responsible to create data relationship from the different
providers, based on the necessities of the estimator component
Attributes
----------
number_of_providers : int
the current number of providers included in the pipeline
queue_server : str
the IPv4 of a RabbitMQ queue server
queue_input: str
the name of the queue from where collector consume
queue_output : str
the name of the queue where provider will publish their data
Methods
-------
run(self)
Should be implemented from the user, in order to describe the provider
behaviour
create_custom_instance(self)
Should be implemented from the user, in order to describe the provider
behaviour
create_simple_instance(self)
Create an instance directly from the data from the input queue
create_instance_by_time_window(self, sensor_id, time_window_s)
Create an instance based on the list of messages received in a time window
create_instance_by_repetition(self, sensor_id, number_of_messages)
Create an instance based on a defined number of received messages.
e.g.: a model requires 7 images as its input.
create_instance_by_syncing_all_providers(self)
Synchronize all the providers and create an instance including all of them.
forward(self, message)
Forward the message to the next component of the pipeline
"""
Messager
class Messager:
"""
An class responsible to define the communication between components
It is a wrapper on RabbitMQ library pika
Attributes
----------
server_url : str
the IPv4 of a RabbitMQ queue server
queue_name: str
the name of the queue to be interacted
Methods
-------
validate(self, message)
Validate the received message, based on the defined schema
publish_to_sibling_queue(self, message, unique_id, validate=True)
Send a message to a fragmented queue component, i.e., to a component which
expects
multiple queues as its input. Used to publish providers' data in individual
queues
publish(self, message, exchange='', validate=True)
Publish the data to the Messager's queue
get_message(self)
Get a message from the queue, FIFO-based.
consume(self, callback)
Provide a listener to the queue, so every new message on the queue, the
callback (implemented by the user) is called
"""
Estimator
class Estimator:
"""
An class responsible to define the behavior of an estimator responsible to
perform inferences on the pipeline's data
Attributes
----------
queue_server : str
the IPv4 of a RabbitMQ queue server
input_queue_instancer : str
the name of the queue to be interacted
input_evaluator_queue : str
the name of the queue used from evaluator to inform estimator about
new models
output_queue : str
the name of the queue to be interacted
Methods
-------
get_estimator_model(self)
Should be implemented from the user, in order to describe the desired
behaviour
set_estimator_model(self, message)
Should be implemented from the user, in order to describe the desired
behaviour
update_model(self)
Should be implemented from the user, in order to describe the desired
behaviour
estimate(self, message)
Should be implemented from the user, in order to describe the desired
behaviour
run(self)
Provide the component's behaviour:
- Check for new models available
- Get a message from the queue
- Make an inference
- Forward the results
forward(self, message)
Forward the message to the next component of the pipeline
"""
Evaluator
class Evaluator:
"""
An abstract class responsible to evaluate the quality of trained models.
Get a list of models from a trainer component and forwards the best one for an
Estimator component, therefore, the inference is always done with the best
possible results.
Constructor
----------
Attributes
----------
queue_server : str
the IPv4 of a RabbitMQ queue server
input_queue : Messager
the name of the queue to be consumed
output_queue : Messager
the name of the queue which Evaluator publish its data
evaluated : list(key,value)
a list of already evaluated models
current_best : (key,value)
the current best model, e.g. weights, evaluated, i.e., the one recommended
to be used on an Estimator component
Methods
-------
evaluate_models(self, message)
Should be implemented from the user, in order to describe the provider
behaviour
include_evaluation_to_list(self, key, value)
self.evaluated.append((key, value))
is_included_in_evaluation_list(self, key):
check if a model, identified by its key, was already evaluated
this can reduce drastically the processing time of the component
get_current_best_evaluated(self):
return the actual best model evaluated in the format (key,value)
key: model path
value: it's evaluation value, e.g. mAP
set_current_best_evaluated(self, key, value):
update the current best evaluated model that is informed to the interested
components by the queue system
run(self):
implements the behaviour of the component
forward(self, message):
publish the evaluated data to next component
the message follows the format:
{
"key": "uniqueFilePath",
"value" "the evaluation value of the model",
"params": {
"modelDefinition": "some extra data about the results"
}
}
"""
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
No source distribution files available for this release.See tutorial on generating distribution archives.
Built Distribution
Close
Hashes for pipeline_vilma-0.0.30-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9f06f05f704fac2a886cd86e845d3aa6e6195d72ac97c1a4aaac3b2705ba5827 |
|
MD5 | c352e6b34f9a33f8c0f00e3291ad8008 |
|
BLAKE2b-256 | 99d184d9cd8dfd2b7d2fb72db9c7ce0bc8ea0b1b9de51cf2d819bf7f8cbb6102 |