Skip to main content

Data pipeline

Project description

Pipeline Vilma

Components

Provider

class Provider:
"""
An abstract class used to represent a data provider.
Examples of data providers:
- RGB Camera, providing frames
- Humidity sensor, providing actual humidity level
- Termographic camera, providing a heatmap
...

Attributes
----------
queue_server : str
    the IPv4 of a RabbitMQ queue server
queue_output : str
    the name of the queue where provider will publish their data

Methods
-------
pack_data(self, data_type, content, sensor_id)
    Based on the data type, choose the right package format for the content

run(self)
    Should be implemented from the user, in order to describe the provider
    behaviour

"""

Collector

class Collector:
"""
An abstract class used to represent a data collector.
A collector is the responsible for creating a dataset from providers input
...

Attributes
----------
bypass : bool
    some applications cannot store data, and or create dataset for privacy
    reasons, therefore collector can work in bypass mode where it only forwards
    the received data
ratio : floatclass
    the percentage (between [0,1]) number of samples to be collected, i.e.,
    if "ratio" is set to 0.5, half of the samples received on collector will be
    stored for dataset creation
queue_server : str
    the IPv4 of a RabbitMQ queue server
queue_input: str
    the name of the queue from where collector consume
queue_output : str
    the name of the queue where provider will publish their data
data_api_config: dict
    expects a Python dictionary with the endpoints to map the class to the API
    implemented. It allows user to use different APIs together with the pipeline
    Expected dictionary:
        {
            "base_url": "http://192.168.0.1" # IP where the API is running
            "port": 3333,                    # The API's server portclass
            "endpoints": {
                "add_item": "collection      # Must be a HTTP POST
            }
        }


Methods
-------
run(self)
    Starts consuming collectors input queue

on_new_message(self, ch, method, properties, body)
    Based on collectors configuration, perform the actions on each received
    message.
    Also, forwards the current message to the next component to not break the
    pipeline

receive(self, callback)
    Creates a listener on collector's input queue

forward(self, sensor_id, message)
    Forward the message to an individual queue related to the sensor id

"""

CollectorDataApi

class CollectorDataApi:
"""
A class used to  a data collector.
It is reponsible to allow user to create different database APIs, in different
languages, only requiring to map the API endpoints into the CollectorDataApi
constructor.
...

Statics
----------
STATUS_FOR_LABELING: str
    default value: 'labeling-required'
    an internal status used to notify components about the sample status

Attributes
----------
storage_api : dict
    expects a Python dictionary with the endpoints to an API implemented
    to store the data on a database or any other persistence system
    Expected dictionary:
        {
            "base_url": "http://192.168.0.1" # IP where the API is running
            "port": 3333,                    # The API's server portclass
            "endpoints": {
                "add_item": "collection      # Must be a HTTP POST
            }
        }

    An API, persisting the data on a MongoDB is available on the examples.

Methods
-------
def _add_labeling_properties(self, message)
    Include on the message the labeling status and creation timestamp

def _insert_on_db(self, message)
    Make a request to the API to persist the message

def store(self, message)
    public method responsible to execute the private ones and store
    the message on the database

"""

Instancer

class Instancer:
"""
An abstract class used to create instances based on multiple sensors (providers)
The instancer is responsible to create data relationship from the different
providers, based on the necessities of the estimator component

...

Attributes
----------
number_of_providers : int
    the current number of providers included in the pipeline
queue_server : str
    the IPv4 of a RabbitMQ queue server
queue_input: str
    the name of the queue from where collector consume
queue_output : str
    the name of the queue where provider will publish their data

Methods
-------
run(self)
    Should be implemented from the user, in order to describe the provider
    behaviour

create_custom_instance(self)
    Should be implemented from the user, in order to describe the provider
    behaviour

create_simple_instance(self)
    Create an instance directly from the data from the input queue

create_instance_by_time_window(self, sensor_id, time_window_s)
    Create an instance based on the list of messages received in a time window

create_instance_by_repetition(self, sensor_id, number_of_messages)
    Create an instance based on a defined number of received messages.
    e.g.: a model requires 7 images as its input.

create_instance_by_syncing_all_providers(self)
    Synchronize all the providers and create an instance including all of them.

forward(self, message)
    Forward the message to the next component of the pipeline

"""

Messager

class Messager:
"""
An class responsible to define the communication between components
It is a wrapper on RabbitMQ library pika

...

Attributes
----------
queue_server : str
    the IPv4 of a RabbitMQ queue server
queue_name: str
    the name of the queue to be interacted

Methods
-------
validate(self, message)
    Validate the received message, based on the defined schema

publish_to_sibling_queue(self, message, unique_id, validate=True)
    Send a message to a fragmented queue, i.e., to a component which expects
    multiple queues as its input. Used to publish providers data in an individual
    queue

publish(self, message, exchange='', validate=True)
    Publish the data to the Messager's queue

get_message(self)
    Get a message from the queue, FIFO-based.

consume(self, callback)
    Provide a listener to the queue, so every new message on the queue, the
    callback (provided by the user) is called

"""

Estimator

class Estimator:
"""
An class responsible to define the behavior of an estimator responsible to
perform inferences on the pipeline's data

...

Attributes
----------
queue_server : str
    the IPv4 of a RabbitMQ queue server
input_queue : str
    the name of the queue to be interacted
output_queue : str
    the name of the queue to be interacted
input_evaluator_queue : str
    the name of the queue used from evaluator to inform estimator about
    new models
actor : object
    the estimator actor, i.e., the implementation of the model inference

Methods
-------
def get_estimator_model(self)
    Should be implemented from the user, in order to describe the desired
    behaviour

def set_estimator_model(self, message)
    Should be implemented from the user, in order to describe the desired
    behaviour

run(self)
    Should be implemented from the user, in order to describe the desired
    behaviour

consume(self)
    Create a listener for the input queue, and call the implementation of
    self.estimate()

forward(self, message)
    Forward the message to the next component of the pipeline

"""

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

pipeline_vilma-0.0.23-py3-none-any.whl (17.0 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page