Data pipeline
Project description
Pipeline Vilma
Components
Provider
class Provider:
"""
An abstract class used to represent a data provider.
Examples of data providers:
- RGB Camera, providing frames
- Humidity sensor, providing actual humidity level
- Termographic camera, providing a heatmap
...
Attributes
----------
queue_server : str
the IPv4 of a RabbitMQ queue server
queue_output : str
the name of the queue where provider will publish their data
Methods
-------
pack_data(self, data_type, content, sensor_id)
Based on the data type, choose the right package format for the content
run(self)
Should be implemented from the user, in order to describe the provider
behaviour
"""
Collector
class Collector:
"""
An abstract class used to represent a data collector.
A collector is the responsible for creating a dataset from providers input
...
Attributes
----------
bypass : bool
some applications cannot store data, and or create dataset for privacy
reasons, therefore collector can work in bypass mode where it only forwards
the received data
ratio : floatclass
the percentage (between [0,1]) number of samples to be collected, i.e.,
if "ratio" is set to 0.5, half of the samples received on collector will be
stored for dataset creation
queue_server : str
the IPv4 of a RabbitMQ queue server
queue_input: str
the name of the queue from where collector consume
queue_output : str
the name of the queue where provider will publish their data
data_api_config: dict
expects a Python dictionary with the endpoints to map the class to the API
implemented. It allows user to use different APIs together with the pipeline
Expected dictionary:
{
"base_url": "http://192.168.0.1" # IP where the API is running
"port": 3333, # The API's server portclass
"endpoints": {
"add_item": "collection # Must be a HTTP POST
}
}
Methods
-------
run(self)
Starts consuming collectors input queue
on_new_message(self, ch, method, properties, body)
Based on collectors configuration, perform the actions on each received
message.
Also, forwards the current message to the next component to not break the
pipeline
receive(self, callback)
Creates a listener on collector's input queue
forward(self, sensor_id, message)
Forward the message to an individual queue related to the sensor id
"""
CollectorDataApi
class CollectorDataApi:
"""
A class used to a data collector.
It is reponsible to allow user to create different database APIs, in different
languages, only requiring to map the API endpoints into the CollectorDataApi
constructor.
...
Statics
----------
STATUS_FOR_LABELING: str
default value: 'labeling-required'
an internal status used to notify components about the sample status
Attributes
----------
storage_api : dict
expects a Python dictionary with the endpoints to an API implemented
to store the data on a database or any other persistence system
Expected dictionary:
{
"base_url": "http://192.168.0.1" # IP where the API is running
"port": 3333, # The API's server portclass
"endpoints": {
"add_item": "collection # Must be a HTTP POST
}
}
An API, persisting the data on a MongoDB is available on the examples.
Methods
-------
def _add_labeling_properties(self, message)
Include on the message the labeling status and creation timestamp
def _insert_on_db(self, message)
Make a request to the API to persist the message
def store(self, message)
public method responsible to execute the private ones and store
the message on the database
"""
Instancer
class Instancer:
"""
An abstract class used to create instances based on multiple sensors (providers)
The instancer is responsible to create data relationship from the different
providers, based on the necessities of the estimator component
...
Attributes
----------
number_of_providers : int
the current number of providers included in the pipeline
queue_server : str
the IPv4 of a RabbitMQ queue server
queue_input: str
the name of the queue from where collector consume
queue_output : str
the name of the queue where provider will publish their data
Methods
-------
run(self)
Should be implemented from the user, in order to describe the provider
behaviour
create_custom_instance(self)
Should be implemented from the user, in order to describe the provider
behaviour
create_simple_instance(self)
Create an instance directly from the data from the input queue
create_instance_by_time_window(self, sensor_id, time_window_s)
Create an instance based on the list of messages received in a time window
create_instance_by_repetition(self, sensor_id, number_of_messages)
Create an instance based on a defined number of received messages.
e.g.: a model requires 7 images as its input.
create_instance_by_syncing_all_providers(self)
Synchronize all the providers and create an instance including all of them.
forward(self, message)
Forward the message to the next component of the pipeline
"""
Messager
class Messager:
"""
An class responsible to define the communication between components
It is a wrapper on RabbitMQ library pika
...
Attributes
----------
queue_server : str
the IPv4 of a RabbitMQ queue server
queue_name: str
the name of the queue to be interacted
Methods
-------
validate(self, message)
Validate the received message, based on the defined schema
publish_to_sibling_queue(self, message, unique_id, validate=True)
Send a message to a fragmented queue, i.e., to a component which expects
multiple queues as its input. Used to publish providers data in an individual
queue
publish(self, message, exchange='', validate=True)
Publish the data to the Messager's queue
get_message(self)
Get a message from the queue, FIFO-based.
consume(self, callback)
Provide a listener to the queue, so every new message on the queue, the
callback (provided by the user) is called
"""
Estimator
class Estimator:
"""
An class responsible to define the behavior of an estimator responsible to
perform inferences on the pipeline's data
...
Attributes
----------
queue_server : str
the IPv4 of a RabbitMQ queue server
input_queue : str
the name of the queue to be interacted
output_queue : str
the name of the queue to be interacted
input_evaluator_queue : str
the name of the queue used from evaluator to inform estimator about
new models
actor : EstimatorActor
the estimator actor, i.e., the implementation of the model inference
Methods
-------
get_estimator_model(self)
Should be implemented from the user, in order to describe the desired
behaviour
set_estimator_model(self, message)
Should be implemented from the user, in order to describe the desired
behaviour
estimate(message)
Should be implemented from the user, in order to describe the desired
behaviour
run(self)
Provide the component's behaviour
forward(self, message)
Forward the message to the next component of the pipeline
"""
EstimatorActor
class EstimatorActor:
"""
An interface to implement the estimator actions. It is reponsible to do the inference inside the Estimator component
...
Methods
-------
update_model(self):
Should be implemented from the user, in order to describe the provider
behaviour
estimate(self, message):
Should be implemented from the user, in order to describe the provider
behaviour
Evaluator
class Evaluator:
"""
An class responsible to define the communication between components
It is a wrapper on RabbitMQ library pika
...
Attributes
----------
queue_server : str
the IPv4 of a RabbitMQ queue server
input_queue : str
the name of the queue to be consumed
output_queue : str
the name of the queue which Evaluator publish its data
evaluated : list(key,value)
a list of already evaluated models
current_best : (key,value)
the current best model, e.g. weights, evaluated, i.e., the one recommended
to be used on an Estimator component
Methods
-------
evaluate_models(self, message)
Should be implemented from the user, in order to describe the provider
behaviour
include_evaluation_to_list(self, key, value)
self.evaluated.append((key, value))
is_included_in_evaluation_list(self, key):
check if a model, identified by its key, was already evaluated
this can reduce drastically the processing time of the component
get_current_best_evaluated(self):
return the actual best model evaluated in the format (key,value)
key: model path
value: it's evaluation value, e.g. mAP
set_current_best_evaluated(self, key, value):
update the current best evaluated model that is informed to the interested
components by the queue system
run(self):
implements the behaviour of the component
forward(self, message):
publish the evaluated data to next component
the message follows the format:
{
"key": "uniqueFilePath",
"value" "the evaluation value of the model",
"params": {
"modelDefinition": "some extra data about the results"
}
}
"""
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
No source distribution files available for this release.See tutorial on generating distribution archives.
Built Distribution
Close
Hashes for pipeline_vilma-0.0.29-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2885234104d2b47ce58d723ea4a757a879e249648149fd1268f206b8a3cf7945 |
|
MD5 | 13d1753c203bd7b67d1081b360188f79 |
|
BLAKE2b-256 | a2c0b56f1de8d8ad113944fc6876a3047f09827ab4c56980cb0f451c674a6789 |