Skip to main content

Data pipeline

Project description

Pipeline Vilma

Introduction

Pipeline Vilma is a Data Pipeline designed to be used in Machine Learning projects. The main goal is to create a pipeline which allows user to only annotate information and let the process to the pipeline.

The design is based on components communicating through messages, therefore it is possible to use them all together, as we design or connect the components to your current data pipeline.

Components

Provider

class Provider:
"""
An abstract class used to represent a data provider.
Examples of data providers:
- RGB Camera, providing frames
- Humidity sensor, providing actual humidity level
- Termographic camera, providing a heatmap
...

Attributes
----------
queue_server : str
    the IPv4 of a RabbitMQ queue server
queue_output : str
    the name of the queue where provider will publish their data
sensor_id : str
    an unique identification of the provider, e.g, rgbcamera, microphone, etc

Methods
-------
pack_data(self, data_type, content)
    Based on the data type, choose the right package format for the content

run(self, loop_interval=0.016)
    Main loop method that should be implemented, in order to describe a provider
    behaviour

forward(self, message)
    Forward the package to the next component of the pipeline (queue_output)
"""

Collector

class Collector:
"""
An abstract class used to represent a data collector.
The component is the responsible for creating a dataset from providers input
...

Attributes
----------
bypass : bool
    some applications cannot store data, and or create dataset for privacy
    reasons, therefore collector can work in bypass mode where it only forwards
    the received data
ratio : floatclass
    the percentage (between [0,1]) number of samples to be collected, i.e.,
    if "ratio" is set to 0.5, half of the samples received on collector will be
    stored for dataset creation
queue_server : str
    the IPv4 of a RabbitMQ queue server
queue_input: str
    the name of the queue from where collector consume
queue_output : str
    the name of the queue where provider will publish their data
data_api_config: dict
    expects a Python dictionary with the endpoints to map the class to an user
    implemented API. It allows the component to support different data API
    solutions (like different databases, languages, implementations) as long
    as the expected endpoints are supported.

    Expected dictionary:
        {
            "base_url": "http://192.168.0.1" # IP where the API is running
            "port": 3333,                    # The API's server portclass
            "endpoints": {
                "add_item": "/collection",
                "get_item": "/collection/<ID>",
                "mark_trained": "/collection/<ID>",
                "count_labeled": "/collector/counter",
                "get_training_set": "/dataset/"
            }
        }

    Endpoints description:
    - add_item
        - Method type: HTTP POST
        - URL format: http://host:port/resource
        - Data: support the JSON messager structure, e.g. ImageBase64
        - Request example:
            - http://192.168.0.1:3333/collection
            - Data: {"x": data, "y": data }
        - Response example:
            - The JSON messager sent
    - get_item
        - Method type: HTTP GET
        - URL format: http://host:port/resource/<ID>
        - Request example:
            - http://192.168.0.1:3333/collection/1
        - Response example:
            - A JSON pipeline supported message item
    - mark_trained
        - Method type: HTTP PUT
        - URL format: http://host:port/resource/<ID>
        - Data: {"status": "readyForTraining"}
        - Request example:
            - http://192.168.0.1:3333/collection/1
            - Data: {"status": "readyForTraining"}
        - Response example:
            {"message": "success"}
    - count_labeled:
        - Method type: HTTP GET
        - URL format: http://host:port/resource
        - Request example:
            - http://192.168.0.1:3333/collection-counter
        - Response example:
            - { "readyForTraining": 123 }
    - get_training_set:
        - Method type: HTTP GET
        - URL format: http://host:port/resource
        - Request example:
            - http://192.168.0.1:3333/training-set
        - Response example:
            - { "readyForTraining": ["itemId1", "itemId2"] }

Methods
-------
run(self)
    Starts consuming collectors input queue

on_new_message(self, ch, method, properties, body)
    Based on component's configuration, perform the actions on each
    received message.
    Finally, forwards the current message state to the next component
    of the pipeline.

receive(self, callback)
    Creates a listener on collector's input queue

forward(self, sensor_id, message)
    Forward the message to an individual queue related to the unique
    sensor id

"""

CollectorDataApi

class CollectorDataApi:
"""
A class used to link an HTTP Data API to Collector component.
It is reponsible to allow users to create different APIs
implementation and by that the usage of any database.
To connect it to the pipeline, it is required the
implementation of the APIs described in Collector.
...

Statics
----------
STATUS_FOR_LABELING : str
    default value: 'labeling-required'
    an internal status used to notify other components about a
    data item waiting for labeling
STATUS_READY_FOR_TRAINING : str
    default value: 'readyForTraining'
    an internal status used to notify other components
    about a data item ready for training

Attributes
----------
storage_api : dict
    expects a Python dictionary with the endpoints to an API
    implemented to store the data on a database or any other
    persistence system

    An API, persisting the data on a MongoDB is available on the examples.

Methods
-------
def _add_labeling_properties(self, message)
    Include on the message the labeling status and creation timestamp

def _insert_on_db(self, message)
    Make a request to the API to persist the message

def add_item(self, message)
def get_item(self, item_id)
def mark_trained(self, item_id, status)
def get_training_data(self)
def count_labeled(self)

def store(self, message)
    public method responsible to execute the private ones and store
    the message on the database

"""

FileManager

class FileManager:
"""
A class used to create and manage required files for training and evaluation
process.
To connect it to the pipeline, it is required the implementation of the API
endpoints described in CollectorDataApi.
...

Attributes
----------
queue_server : str
    the IPv4 of a RabbitMQ queue server
queue_output : str
    the name of the queue where FileManager will publish their data
retrain_counter : number
    FileManager is designed to work together with a Trainer component,
    therefore, it is possible to define a minimum number of files to trigger
    this interaction. If retrain_counter is set to 20, every new 20 labeled
    instances are ready for training, this data is turned into physical files
    and then forwarded to training process.
data_api : dict
    expects a Python dictionary with the endpoints to an API implemented
    to store the data on a database or any other persistence system

    An API, persisting the data on a MongoDB is available on the examples.

Methods
-------
def run(self, loop_interval=0.016)
    Run the component with the following behaviour:
    - Get the number of ready for training items through DataApi
    - In case, the number of items is greater than the retrain_counter, download
    the data and forward it to the next component which is usually a training
    component
    - Mark the forwarded items as sent to training

def mark_data_as_included_in_the_training_process(self, item_ids)
    Set the items on the DataApi as "already in training set"

def _download_data_and_labels(self)
    Download the information from DataApi and call a method to be implemented to
    store this data in a physical file, i.e., it is expected from user
    to develop how the file should be, e.g. for images an JPG, for sound MP3,
    etc.

def store_on_filemanager(self, name, x, y)
    Store an item identified by its name, together with its X data, e.g., for
    image the Bas64 encoded image
    and its Y, e.g., for image detection, the annotated bounding boxes

def get_dataset_paths(self)
    Provide the absolute path where the items and their annotations where
    stored. It is expected to return an tuple in the format (items, labels)

def forward_to_trainer(self, message)
    Forward the set of ready items to a training component
"""

Instancer

class Instancer:
"""
An abstract class used to create instances based on multiple sensors (providers)
The instancer is responsible to create data relationship from the different
providers, based on the necessities of the estimator component

Attributes
----------
number_of_providers : int
    the current number of providers included in the pipeline
queue_server : str
    the IPv4 of a RabbitMQ queue server
queue_input: str
    the name of the queue from where collector consume
queue_output : str
    the name of the queue where provider will publish their data

Methods
-------
run(self)
    Should be implemented from the user, in order to describe the provider
    behaviour

create_custom_instance(self)
    Should be implemented from the user, in order to describe the provider
    behaviour

create_simple_instance(self)
    Create an instance directly from the data from the input queue

create_instance_by_time_window(self, sensor_id, time_window_s)
    Create an instance based on the list of messages received in a time window

create_instance_by_repetition(self, sensor_id, number_of_messages)
    Create an instance based on a defined number of received messages.
    e.g.: a model requires 7 images as its input.

create_instance_by_syncing_all_providers(self)
    Synchronize all the providers and create an instance including all of them.

forward(self, message)
    Forward the message to the next component of the pipeline

"""

Messager

class Messager:
"""
An class responsible to define the communication between components
It is a wrapper on RabbitMQ library pika

Attributes
----------
server_url : str
    the IPv4 of a RabbitMQ queue server
queue_name: str
    the name of the queue to be interacted

Methods
-------
validate(self, message)
    Validate the received message, based on the defined schema

publish_to_sibling_queue(self, message, unique_id, validate=True)
    Send a message to a fragmented queue component, i.e., to a component which
    expects
    multiple queues as its input. Used to publish providers' data in individual
    queues

publish(self, message, exchange='', validate=True)
    Publish the data to the Messager's queue

get_message(self)
    Get a message from the queue, FIFO-based.

consume(self, callback)
    Provide a listener to the queue, so every new message on the queue, the
    callback (implemented by the user) is called

"""

Estimator

class Estimator:
"""
An class responsible to define the behavior of an estimator responsible to
perform inferences on the pipeline's data

Attributes
----------
queue_server : str
    the IPv4 of a RabbitMQ queue server
input_queue_instancer : str
    the name of the queue to be interacted
input_evaluator_queue : str
    the name of the queue used from evaluator to inform estimator about
    new models
output_queue : str
    the name of the queue to be interacted

Methods
-------
get_estimator_model(self)
    Should be implemented from the user, in order to describe the desired
    behaviour

set_estimator_model(self, message)
    Should be implemented from the user, in order to describe the desired
    behaviour

update_model(self)
    Should be implemented from the user, in order to describe the desired
    behaviour

estimate(self, message)
    Should be implemented from the user, in order to describe the desired
    behaviour

run(self)
    Provide the component's behaviour:
        - Check for new models available
        - Get a message from the queue
        - Make an inference
        - Forward the results

forward(self, message)
    Forward the message to the next component of the pipeline

"""

Evaluator

class Evaluator:
"""
An abstract class responsible to evaluate the quality of trained models.

Get a list of models from a trainer component and forwards the best one for an
Estimator component, therefore, the inference is always done with the best
possible results.

Constructor
----------

Attributes
----------

queue_server : str
    the IPv4 of a RabbitMQ queue server
input_queue : Messager
    the name of the queue to be consumed
output_queue : Messager
    the name of the queue which Evaluator publish its data
evaluated : list(key,value)
    a list of already evaluated models
current_best : (key,value)
    the current best model, e.g. weights, evaluated, i.e., the one recommended
    to be used on an Estimator component

Methods
-------

evaluate_models(self, message)
    Should be implemented from the user, in order to describe the provider
    behaviour

include_evaluation_to_list(self, key, value)
    self.evaluated.append((key, value))

is_included_in_evaluation_list(self, key):
    check if a model, identified by its key, was already evaluated
    this can reduce drastically the processing time of the component

get_current_best_evaluated(self):
    return the actual best model evaluated in the format (key,value)
    key: model path
    value: it's evaluation value, e.g. mAP

set_current_best_evaluated(self, key, value):
    update the current best evaluated model that is informed to the interested
    components by the queue system

run(self):
    implements the behaviour of the component

forward(self, message):
    publish the evaluated data to next component
    the message follows the format:
    {
        "key": "uniqueFilePath",
        "value" "the evaluation value of the model",
        "params": {
            "modelDefinition": "some extra data about the results"
        }
    }
"""

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

pipeline_vilma-0.0.31-py3-none-any.whl (22.8 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page