Skip to main content

sdk enabling data collection from model serving code for our MPM solution

Project description

pulsar_data_collection

Pulsar data collection SDK is an open-source Python library for pushing/processing/collecting features, predictions and metadata. Works with different data storages, at this point InfluxDB is implemented.

Getting started

Install Pulsar Data Collection with pip:

python3 -m pip install --upgrade pip
python3 -m pip install --upgrade pulsar-data-collection

Components

There are two core components in data collection SDK: storage engine and data capture. Right now storage engine implemented only for InfluxDb, it helps to make ingestion and digestion operations to the database.

Data Capture

DataCapture class helps to ingest dataset to database with needed parameters and needed format for future digestion and metrics calculation without any significant changes of data.

It requires storage_engine (available only influxdb right now), operation_type (DATABASE_OPERATION_TYPE_INSERT_PREDICTION, DATABASE_OPERATION_TYPE_METRICS), login_url (object of DatabaseLogin class) as input parameters.

Operation type DATABASE_OPERATION_TYPE_INSERT_PREDICTION uses for any ingestion operations to the database. It requires additional parameters: model_id, model_version, data_id", y_name, pred_name what describes an input dataset. For operation type DATABASE_OPERATION_TYPE_METRICS what commonly uses for retrieving dataset ready for metrics calculation these parameters aren't required.

The last and probably one the most important class to work with is DataWithPrediction. It requires two parameters as input: prediction, data_points. Where prediction is prediction value of the model, and data_points is features dataset. Push method of the DataCapture takes object of DataWithPrediction as required parameter, and after that makes ingestion operation to database with data transforming, like adding timestamp, changing name of prediction column in dataset, combining features with prediction into single dataset, creating influxdb unique cache, etc.

List of methods of DataCapture class:

  • push(data: DataWithPrediction)
  • ingests data to the db after preprocessing it;
  • collect(filters: dict) - retrieves data from db;
  • collect_eval_timestamp - retrieves the newest timestamp in the database;
  • push_eval_timestamp(eval_df: df) - ingesting new one timestamp into db;
  • push_metrics(metrics_df: df) - ingesting metrics dataframe to the database after calculations

Example usage

Initialize Database credentials:

from pulsar_data_collection.data_capture import DatabaseLogin
database_login = DatabaseLogin(db_host=<db_host>), db_port=<db_port>, db_user=<db_user>, db_password=<db_password>, protocol=<db_protocol>)

Initialize DataCapture class, depends on operation type use appropriate constant. For inserting data into the database:

from pulsar_data_collection.data_capture import DataCapture, DATABASE_OPERATION_TYPE_INSERT_PREDICTION

dat_predict = DataWithPrediction(prediction=prediction, data_points=to_predict)

dat_capture = DataCapture(
    storage_engine="influxdb",
    model_id=<model_id>,
    model_version=<model_verstion>,
    data_id=<data_id>,
    y_name=<y_name>,
    pred_name=<pred_name>,
    operation_type=<operation_type>,
    login_url=<database_login>,
)

dat_capture.push(dat_predict)

For collecting data from the database:

from pulsar_data_collection.data_capture import DataCapture, DATABASE_OPERATION_TYPE_METRICS

dat_capture = DataCapture(
    storage_engine="influxdb",
    operation_type=DATABASE_OPERATION_TYPE_METRICS,
    login_url=database_login
)

dat_capture.collect()

Collection the newest prediction data what wasn't precessed

# receiving the last period of data

last_eval_timestamp = dat_capture.collect_eval_timestamp()

# if last period exists, collecting only data what wasn't collected previously
if last_eval_timestamp:
    last_eval_timestamp_str = last_eval_timestamp.strftime('%Y-%m-%d %H:%M:%S')
    db_df = pd.DataFrame(dat_capture.collect({"time": f">= '{last_eval_timestamp_str}'"}).get("prediction"))
else:
    db_df = pd.DataFrame(dat_capture.collect().get("prediction"))

Example of pushing calculated metrics:

dat_capture.push_metrics(df_result_drift)

Example of pushing the timestamp when metrics were calculated:

dat_capture.push_eval_timestamp(eval_timestamp_df)

TODO: add use cases of input dataframes: metrics, prediction, datapoint

About PulsarML

PulsarML is a project helping with monitoring your models and gain powerful insights into its performance.

We released two Open Source packages :

  • pulsar-data-collection : lightweight python SDK enabling data collection of features, predictions and metadata from an ML model serving code/micro-service
  • pulsar-metrics : library for evaluating and monitoring data and concept drift with an extensive set of metrics. It also offers the possibility to use custom metrics defined by the user.

We also created pulsar demo to display an example use-case showing how to leverage both packages to implement model monitoring and performance management.

Want to interact with the community? join our slack channel

Powered by Rocket Science Development

Contributing

  1. Fork this repository, develop, and test your changes
  2. open an issue
  3. Submit a pull request with a reference to the issue

TODO: add use cases of input dataframes: metrics, prediction, datapoint

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pulsar_data_collection-0.1.1.tar.gz (9.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pulsar_data_collection-0.1.1-py3-none-any.whl (8.6 kB view details)

Uploaded Python 3

File details

Details for the file pulsar_data_collection-0.1.1.tar.gz.

File metadata

  • Download URL: pulsar_data_collection-0.1.1.tar.gz
  • Upload date:
  • Size: 9.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.2.2 CPython/3.10.6 Linux/5.15.0-1024-azure

File hashes

Hashes for pulsar_data_collection-0.1.1.tar.gz
Algorithm Hash digest
SHA256 461c58501514d66be6c01476c0b8c39e04f59e337ce6e3289c34df6d6483f924
MD5 71545ffb534d3870f8c2e2280c1d2f9a
BLAKE2b-256 c110f8461f17b377a784c51372df9def51b9660caebffe0d4acdecff6469556f

See more details on using hashes here.

File details

Details for the file pulsar_data_collection-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for pulsar_data_collection-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 c7e57a4f04a04aac652982862875645919cec26319f1f6b0db32fb487cb42932
MD5 e97362375776aba13b9a73f7844c8131
BLAKE2b-256 b75608e52ef276bd735a3ac6c9656d7154627413fea44378ddf99736fcbc5ca2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page