Skip to main content

Data pipeline

Project description

Pipeline Vilma

Components

Provider

class Provider:
"""
An abstract class used to represent a data provider.
Examples of data providers:
- RGB Camera, providing frames
- Humidity sensor, providing actual humidity level
- Termographic camera, providing a heatmap
...

Attributes
----------
queue_server : str
    the IPv4 of a RabbitMQ queue server
queue_output : str
    the name of the queue where provider will publish their data

Methods
-------
pack_data(self, data_type, content, sensor_id)
    Based on the data type, choose the right package format for the content

run(self)
    Should be implemented from the user, in order to describe the provider
    behaviour

"""

Collector

class Collector:
"""
An abstract class used to represent a data collector.
A collector is the responsible for creating a dataset from providers input
...

Attributes
----------
bypass : bool
    some applications cannot store data, and or create dataset for privacy
    reasons, therefore collector can work in bypass mode where it only forwards
    the received data
ratio : floatclass
    the percentage (between [0,1]) number of samples to be collected, i.e.,
    if "ratio" is set to 0.5, half of the samples received on collector will be
    stored for dataset creation
queue_server : str
    the IPv4 of a RabbitMQ queue server
queue_input: str
    the name of the queue from where collector consume
queue_output : str
    the name of the queue where provider will publish their data
data_api_config: dict
    expects a Python dictionary with the endpoints to map the class to the API
    implemented. It allows user to use different APIs together with the pipeline
    Expected dictionary:
        {
            "base_url": "http://192.168.0.1" # IP where the API is running
            "port": 3333,                    # The API's server portclass
            "endpoints": {
                "add_item": "collection      # Must be a HTTP POST
            }
        }


Methods
-------
run(self)
    Starts consuming collectors input queue

on_new_message(self, ch, method, properties, body)
    Based on collectors configuration, perform the actions on each received
    message.
    Also, forwards the current message to the next component to not break the
    pipeline

receive(self, callback)
    Creates a listener on collector's input queue

forward(self, sensor_id, message)
    Forward the message to an individual queue related to the sensor id

"""

CollectorDataApi

class CollectorDataApi:
"""
A class used to  a data collector.
It is reponsible to allow user to create different database APIs, in different
languages, only requiring to map the API endpoints into the CollectorDataApi
constructor.
...

Statics
----------
STATUS_FOR_LABELING: str
    default value: 'labeling-required'
    an internal status used to notify components about the sample status

Attributes
----------
storage_api : dict
    expects a Python dictionary with the endpoints to an API implemented
    to store the data on a database or any other persistence system
    Expected dictionary:
        {
            "base_url": "http://192.168.0.1" # IP where the API is running
            "port": 3333,                    # The API's server portclass
            "endpoints": {
                "add_item": "collection      # Must be a HTTP POST
            }
        }

    An API, persisting the data on a MongoDB is available on the examples.

Methods
-------
def _add_labeling_properties(self, message)
    Include on the message the labeling status and creation timestamp

def _insert_on_db(self, message)
    Make a request to the API to persist the message

def store(self, message)
    public method responsible to execute the private ones and store
    the message on the database

"""

Instancer

class Instancer:
"""
An abstract class used to create instances based on multiple sensors (providers)
The instancer is responsible to create data relationship from the different
providers, based on the necessities of the estimator component

...

Attributes
----------
number_of_providers : int
    the current number of providers included in the pipeline
queue_server : str
    the IPv4 of a RabbitMQ queue server
queue_input: str
    the name of the queue from where collector consume
queue_output : str
    the name of the queue where provider will publish their data

Methods
-------
run(self)
    Should be implemented from the user, in order to describe the provider
    behaviour

create_custom_instance(self)
    Should be implemented from the user, in order to describe the provider
    behaviour

create_simple_instance(self)
    Create an instance directly from the data from the input queue

create_instance_by_time_window(self, sensor_id, time_window_s)
    Create an instance based on the list of messages received in a time window

create_instance_by_repetition(self, sensor_id, number_of_messages)
    Create an instance based on a defined number of received messages.
    e.g.: a model requires 7 images as its input.

create_instance_by_syncing_all_providers(self)
    Synchronize all the providers and create an instance including all of them.

forward(self, message)
    Forward the message to the next component of the pipeline

"""

Messager

class Messager:
"""
An class responsible to define the communication between components
It is a wrapper on RabbitMQ library pika

...

Attributes
----------
queue_server : str
    the IPv4 of a RabbitMQ queue server
queue_name: str
    the name of the queue to be interacted

Methods
-------
validate(self, message)
    Validate the received message, based on the defined schema

publish_to_sibling_queue(self, message, unique_id, validate=True)
    Send a message to a fragmented queue, i.e., to a component which expects
    multiple queues as its input. Used to publish providers data in an individual
    queue

publish(self, message, exchange='', validate=True)
    Publish the data to the Messager's queue

get_message(self)
    Get a message from the queue, FIFO-based.

consume(self, callback)
    Provide a listener to the queue, so every new message on the queue, the
    callback (provided by the user) is called

"""

Estimator

class Estimator:
"""
An class responsible to define the behavior of an estimator responsible to
perform inferences on the pipeline's data

...

Attributes
----------
queue_server : str
    the IPv4 of a RabbitMQ queue server
input_queue : str
    the name of the queue to be interacted
output_queue : str
    the name of the queue to be interacted
input_evaluator_queue : str
    the name of the queue used from evaluator to inform estimator about
    new models
actor : EstimatorActor
    the estimator actor, i.e., the implementation of the model inference

Methods
-------
get_estimator_model(self)
    Should be implemented from the user, in order to describe the desired
    behaviour

set_estimator_model(self, message)
    Should be implemented from the user, in order to describe the desired
    behaviour

estimate(message)
    Should be implemented from the user, in order to describe the desired
    behaviour

run(self)
    Provide the component's behaviour

forward(self, message)
    Forward the message to the next component of the pipeline

"""

EstimatorActor

class EstimatorActor:
"""
An interface to implement the estimator actions. It is reponsible to do the inference inside the Estimator component

...

Methods
-------
update_model(self):
    Should be implemented from the user, in order to describe the provider
    behaviour

estimate(self, message):
    Should be implemented from the user, in order to describe the provider
    behaviour

Evaluator

class Evaluator:
"""
An class responsible to define the communication between components
It is a wrapper on RabbitMQ library pika

...

Attributes
----------

queue_server : str
    the IPv4 of a RabbitMQ queue server
input_queue : str
    the name of the queue to be consumed
output_queue : str
    the name of the queue which Evaluator publish its data
evaluated : list(key,value)
    a list of already evaluated models
current_best : (key,value)
    the current best model, e.g. weights, evaluated, i.e., the one recommended
    to be used on an Estimator component

Methods
-------

evaluate_models(self, message)
    Should be implemented from the user, in order to describe the provider
    behaviour

include_evaluation_to_list(self, key, value)
    self.evaluated.append((key, value))

is_included_in_evaluation_list(self, key):
    check if a model, identified by its key, was already evaluated
    this can reduce drastically the processing time of the component

get_current_best_evaluated(self):
    return the actual best model evaluated in the format (key,value)
    key: model path
    value: it's evaluation value, e.g. mAP

set_current_best_evaluated(self, key, value):
    update the current best evaluated model that is informed to the interested
    components by the queue system

run(self):
    implements the behaviour of the component

forward(self, message):
    publish the evaluated data to next component
    the message follows the format:
    {
        "key": "uniqueFilePath",
        "value" "the evaluation value of the model",
        "params": {
            "modelDefinition": "some extra data about the results"
        }
    }
"""

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for pipeline-vilma, version 0.0.27
Filename, size File type Python version Upload date Hashes
Filename, size pipeline_vilma-0.0.27-py3-none-any.whl (17.8 kB) File type Wheel Python version py3 Upload date Hashes View hashes

Supported by

Elastic Elastic Search Pingdom Pingdom Monitoring Google Google BigQuery Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN DigiCert DigiCert EV certificate StatusPage StatusPage Status page