Skip to main content

datarobot-model-metrics provides a framework to compute model ML metrics over time and produce aggregated metrics.

Project description

DataRobot Model Metrics Overview

The datarobot-model-metrics (dmm) library provides the tools necessary to create custom metrics, including the following operations:

  • Set up the environment
  • Create a data source
  • Create a custom metric
  • Evaluate a custom metric
  • Submit results to DataRobot

Review the following documentation to learn more about the datarobot-model-metrics interfaces used to perform custom metric operations:

  • Environment Setup
  • Data Sources
  • Custom Metrics
  • Metric Evaluator
  • DR Custom Metrics

For end-to-end examples, you can review the ./examples directory.

Environment setup

There are two primary ecosystems where you can develop a custom metric:

  1. Within the DataRobot application (via notebook or scheduled job).
  2. In a local development environment.

Environmental considerations

Python modules:

  • When running from within the DataRobot application, the ecosystem has all the required Python modules.
  • When running locally, you need to install the dmm module (built in this repository—it will update the Python environment with all the required modules.

Setting parameters:

  • When running from within the DataRobot application, parameters get set through environment variables.
  • When running locally, it is easier to pass values using arguments to set parameters than to set environment variables.

Initializing your application, as show below, allows setting parameters using either method without significant changes.

Initializing your application

The CustomMetricArgumentParser is a class that wraps the standard argparser.ArgumentParser. It provides some convenience functions that allow reading values from the environment or normal argument parsing. When the CustomMetricArgumentParser.parse_args() gets called, it checks for missing values.

The log_manager provides a set of functions to help with logging. The DMM and the DataRobot public API client use standard Python logging primitives. A complete list of log classes with their current levels is available using get_log_levels(). The initialize_loggers() function takes care of initializing all the loggers with a format like shown below:

2024-08-09 02:19:50 PM - dmm.data_source.datarobot_source - INFO - fetching the next predictions dataframe... 2024-07-15 00:00:00 - 2024-08-09 14:19:46.643722
2024-08-09 02:19:56 PM - urllib3.connectionpool - DEBUG - https://staging.datarobot.com:443 "POST /api/v2/deployments/66a90a712ad81645df8c469c/predictionDataExports/ HTTP/1.1" 202 368

The following snippet shows how to set up your runtime environment using the previously mentioned classes:

import sys
from dmm import CustomMetricArgumentParser
from dmm.log_manager import initialize_loggers

parser = CustomMetricArgumentParser(description="My new custom metric")
parser.add_base_args()  # adds standard arguments
# Add more with standard ArgumentParser primitives, or some convenience functions such as add_environment_arg()

# Parse the program arguments (if any) to an argparse.Namespace.
args = parser.parse_args(sys.argv[1:])

# Initialize the logging based on the 'LOG' environment variable, or the --log option
initialize_loggers(args.log)

The standard/base arguments include:

  • BASE_URL - URL to the public API
  • API_KEY - token used for authentication to server at BASE_URL
  • DEPLOYMENT_ID - deployment identifier from application
  • CUSTOM_METRIC_ID - custom-metric identifier from application
  • DRY_RUN - flag to indicate whether to really report the custom-metric result
  • START_TS - start of time for calculations
  • END_TS - end of time for calculations
  • MAX_ROWS - maximum number of rows to process
  • LOG - initialization of logging, defaults to setting all dmm and datarobot modules to WARNING.

Here's an example of the help when using the CustomMetricArgumentParser:

(model-runner) $ python3 custom.py --help
usage: custom.py [-h] [--api-key KEY] [--base-url URL] [--deployment-id ID] [--custom-metric-id ID] [--dry-run] [--start-ts TIMESTAMP] [--end-ts TIMESTAMP] [--max-rows ROWS] [--required] [--log [[NAME:]LEVEL ...]]

My new custom metric

optional arguments:
  -h, --help            show this help message and exit
  --api-key KEY         API key used to authenticate to server. Settable via 'API_KEY', required.
  --base-url URL        URL for server. Settable via 'BASE_URL' (default: https://staging.datarobot.com/api/v2), required.
  --deployment-id ID    Deployment ID. Settable via 'DEPLOYMENT_ID' (default: None), required.
  --custom-metric-id ID
                        Custom metric ID. Settable via 'CUSTOM_METRIC_ID' (default: None), required.
  --dry-run             Dry run. Settable via 'DRY_RUN' (default: False).
  --start-ts TIMESTAMP  Start timestamp. Settable with 'START_TS', or 'LAST_SUCCESSFUL_RUN_TS' (when not dry run). Default is 2024-08-08 14:27:55.493027
  --end-ts TIMESTAMP    End timestamp. Settable with 'END_TS' or 'CURRENT_RUN_TS'. Default is 2024-08-09 14:27:55.493044.
  --max-rows ROWS       Maximum number of rows. Settable via 'MAX_ROWS' (default: 100000).
  --required            List the required properties and exit.
  --log [[NAME:]LEVEL ...]
                        Logging level list. Settable via 'LOG' (default: WARNING).
(model-runner) $ 

Other utilities

This section mentions some utilities that may help you develop.

save_to_csv()

During development, it is common to run your code over the same data multiple times to see how your changes impact the results. The save_to_csv() utility allows you to save your results to a CSV file, so you can compare the results between successive runs on the same data.

DataRobotSource

The most commonly used data source is DataRobotSource. This data source connects to DataRobot to fetch selected data from the DataRobot platform.

DataRobotSource Initialization

Initialize DataRobotSource with the following mandatory parameters:

from dmm.data_source import DataRobotSource

source = DataRobotSource(
    base_url=DATAROBOT_ENDPOINT,
    token=DATAROBOT_API_TOKEN,
    deployment_id=deployment_id,
    start=start_of_export_window,
    end=end_of_export_window,
)

You can also provide the base_url and token parameters as environment variables: os.environ['DATAROBOT_ENDPOINT'] and os.environ['BASE_URL']

from dmm.data_source import DataRobotSource

source = DataRobotSource(
    deployment_id=deployment_id,
    start=start_of_export_window,
    end=end_of_export_window,
)

The following example initializes DataRobotSource with all parameters:

from dmm.data_source import DataRobotSource

source = DataRobotSource(
    base_url=DATAROBOT_ENDPOINT,
    token=DATAROBOT_API_TOKEN,
    client=None,
    deployment_id=deployment_id,
    model_id=model_id,
    start=start_of_export_window,
    end=end_of_export_window,
    max_rows=10000,
    delete_exports=False,
    use_cache=False,
    actuals_with_matched_predictions=True,
)
Parameter Description
base_url: str The DataRobot API URL; for example, https://app.datarobot.com/api/v2.
token: str A DataRobot API token from Developer Tools.
client: Optional[dr.Client] Use the dr.Client object instead of base_url and token.
deployment_id: str The ID of the deployment evaluated by the custom metric.
model_id: Optional[str] The ID of the model evaluated by the custom metric. If you don't specify a model ID, the champion model ID is used.
start: datetime The start of the export window. Define the date you want to start to retrieving data from.
end: datetime The end of the export window. Define the date you want to retrieve data until.
max_rows: Optional[int] The maximum number of rows to fetch at once when the requested data doesn't fit into memory.
delete_exports: Optional[bool] If True, datasets with exported data created in the AI Catalog are automatically deleted. The default value is False.
use_cache: Optional[bool] If True, use existing datasets stored in the AI Catalog for time ranges included in previous exports. The default value is False.
actuals_with_matched_predictions: Optional[bool] If False, allow actuals export without matched predictions. The default value is True.

DataRobotSource Usage

Prediction Data Export

The get_prediction_data method returns a chunk of prediction data with the appropriate chunk ID; the returned data chunk is a pandas DataFrame with the number of rows respecting the max_rows parameter. This method returns data until the data source is exhausted.

prediction_df_1, prediction_chunk_id_1 = source.get_prediction_data()

print(prediction_df_1.head(5).to_string())
print(f"chunk id: {prediction_chunk_id_1}")

   DR_RESERVED_PREDICTION_TIMESTAMP  DR_RESERVED_PREDICTION_VALUE_high  DR_RESERVED_PREDICTION_VALUE_low date_non_unique date_random  id       年月日
0  2023-09-13 11:02:51.248000+00:00                           0.697782                          0.302218      1950-10-01  1949-01-27   1  1949-01-01
1  2023-09-13 11:02:51.252000+00:00                           0.581351                          0.418649      1959-04-01  1949-02-03   2  1949-02-01
2  2023-09-13 11:02:51.459000+00:00                           0.639347                          0.360653      1954-05-01  1949-03-28   3  1949-03-01
3  2023-09-13 11:02:51.459000+00:00                           0.627727                          0.372273      1951-09-01  1949-04-07   4  1949-04-01
4  2023-09-13 11:02:51.664000+00:00                           0.591612                          0.408388      1951-03-01  1949-05-16   5  1949-05-01
chunk id: 0

When the data source is exhausted, None and -1 are returned:

prediction_df_2, prediction_chunk_id_2 = source.get_prediction_data()

print(prediction_df_2)
print(prediction_chunk_id_2)

None
chunk id: -1

The reset method resets the exhausted data source, allowing it to iterate from the beginning:

source.reset()

The get_all_prediction_data method returns all prediction data available for a data source object in a single DataFrame:

prediction_df = source.get_all_prediction_data()

Actuals Data Export

The get_actuals_data method returns a chunk of actuals data with the appropriate chunk ID the returned data chunk is a pandas DataFrame with the number of rows respecting the max_rows parameter. This method returns data until the data source is exhausted.

actuals_df_1, actuals_chunk_id_1 = source.get_actuals_data()

print(actuals_df_1.head(5).to_string())
print(f"chunk id: {actuals_chunk_id_1}")

     association_id                  timestamp label  actuals  predictions predicted_class
0                 1  2023-09-13 11:00:00+00:00   low        0     0.302218            high
194              57  2023-09-13 11:00:00+00:00   low        1     0.568564             low
192              56  2023-09-13 11:00:00+00:00   low        1     0.569865             low
190              55  2023-09-13 11:00:00+00:00   low        0     0.473282            high
196              58  2023-09-13 11:00:00+00:00   low        1     0.573861             low
chunk id: 0

To return raw data in the format of data from postgresql, set the return_original_column_names parameter to True:

actuals_df_1, actuals_chunk_id_1 = source.get_actuals_data()

print(actuals_df_1.head(5).to_string())
print(f"chunk id: {actuals_chunk_id_1}")

     id                  timestamp label  actuals         y predicted_class
0     1  2023-09-13 11:00:00+00:00   low        0  0.302218            high
194  57  2023-09-13 11:00:00+00:00   low        1  0.568564             low
192  56  2023-09-13 11:00:00+00:00   low        1  0.569865             low
190  55  2023-09-13 11:00:00+00:00   low        0  0.473282            high
196  58  2023-09-13 11:00:00+00:00   low        1  0.573861             low
chunk id: 0

To return all actuals data available for a source object in a single DataFrame, use the get_all_actuals_data method:

actuals_df = source.get_all_actuals_data()

When the data source is exhausted, None and -1 are returned:

actuals_df_2, actuals_chunk_id_2 = source.get_actuals_data()

print(actuals_df_2)
print(actuals_chunk_id_2)

None
chunk id: -1

The reset method resets the exhausted data source, allowing it to iterate from the beginning:

source.reset()

Training Data Export

The get_training_data method returns all data used for training in one call. The returned data is a pandas DataFrame:

train_df = source.get_training_data()
print(train_df.head(5).to_string())

      y date_random date_non_unique       年月日
0  high  1949-01-27      1950-10-01  1949-01-01
1  high  1949-02-03      1959-04-01  1949-02-01
2   low  1949-03-28      1954-05-01  1949-03-01
3  high  1949-04-07      1951-09-01  1949-04-01
4  high  1949-05-16      1951-03-01  1949-05-01

Combined data export

The get_data method returns combined_data, which includes merged scoring data, predictions, and matched actuals:
This Metric Evaluator uses this method as the main data export method.

df, chunk_id_1 = source.get_data()
print(df.head(5).to_string())
print(f"chunk id: {chunk_id_1}")

                          timestamp  predictions date_non_unique date_random  association_id       年月日 predicted_class label  actuals
0  2023-09-13 11:02:51.248000+00:00     0.302218      1950-10-01  1949-01-27               1  1949-01-01            high   low        0
1  2023-09-13 11:02:51.252000+00:00     0.418649      1959-04-01  1949-02-03               2  1949-02-01            high   low        0
2  2023-09-13 11:02:51.459000+00:00     0.360653      1954-05-01  1949-03-28               3  1949-03-01            high   low        1
3  2023-09-13 11:02:51.459000+00:00     0.372273      1951-09-01  1949-04-07               4  1949-04-01            high   low        0
4  2023-09-13 11:02:51.664000+00:00     0.408388      1951-03-01  1949-05-16               5  1949-05-01            high   low        0
chunk id: 0

The get_all_data returns all combined data available for that source object in a single DataFrame:

df = source.get_all_data()

BatchDataRobotSource

The BatchDataRobotSource interface is for batch deployments.

The following example initializes BatchDataRobotSource with all parameters:

from dmm.data_source import BatchDataRobotSource

source = BatchDataRobotSource(
    base_url=DATAROBOT_ENDPOINT,
    token=DATAROBOT_API_TOKEN,
    client=None,
    deployment_id=deployment_id,
    model_id=model_id,
    batch_ids=batch_ids,
    max_rows=10000,
    delete_exports=False,
    use_cache=False,
)

The parameters for this method are analogous to those for DataRobotSource. The most important difference is that instead of the time range (start and end), you must provide batch IDs. In addition, a batch source doesn't support actuals export.

The get_prediction_data method returns a chunk of prediction data with the appropriate chunk ID; the returned data chunk is a pandas DataFrame with the number of rows respecting the max_rows parameter. This method returns data until the data source is exhausted.

prediction_df_1, prediction_chunk_id_1 = source.get_prediction_data()
print(prediction_df_1.head(5).to_string())
print(f"chunk id: {prediction_chunk_id_1}")

    AGE       B  CHAS     CRIM     DIS                  batch_id    DR_RESERVED_BATCH_NAME                         timestamp   INDUS  LSTAT  MEDV    NOX  PTRATIO  RAD     RM  TAX    ZN  id
0  65.2  396.90     0  0.00632  4.0900                <batch_id>                    batch1  2023-06-23 09:47:47.060000+00:00    2.31   4.98  24.0  0.538     15.3    1  6.575  296  18.0   1
1  78.9  396.90     0  0.02731  4.9671                <batch_id>                    batch1  2023-06-23 09:47:47.060000+00:00    7.07   9.14  21.6  0.469     17.8    2  6.421  242   0.0   2
2  61.1  392.83     0  0.02729  4.9671                <batch_id>                    batch1  2023-06-23 09:47:47.060000+00:00    7.07   4.03  34.7  0.469     17.8    2  7.185  242   0.0   3
3  45.8  394.63     0  0.03237  6.0622                <batch_id>                    batch1  2023-06-23 09:47:47.060000+00:00    2.18   2.94  33.4  0.458     18.7    3  6.998  222   0.0   4
4  54.2  396.90     0  0.06905  6.0622                <batch_id>                    batch1  2023-06-23 09:47:47.060000+00:00    2.18   5.33  36.2  0.458     18.7    3  7.147  222   0.0   5
chunk id: 0

prediction_df = source.get_all_prediction_data()

source.reset()

df, chunk_id_1 = source.get_data()

The get_training_data method returns all data used for training in one call. The returned data is a pandas DataFrame:

train_df = source.get_training_data()

Note:: actuals export for batches is not implemented yet.

DataFrameSource

If you aren't exporting data directly from DataRobot, and instead have it downloaded locally (for example), you can load the dataset into DataFrameSource. The DataFrameSource method wraps any pd.DataFrame to create a library-compatible source. This is the easiest way to interact with the library when bringing your own data:

source = DataFrameSource(
    df=pd.read_csv("./data_hour_of_week.csv"),
    max_rows=10000,
    timestamp_col="date"
)

df, chunk_id_1 = source.get_data()
print(df.head(5).to_string())
print(f"chunk id: {chunk_id_1}")

                  date         y
0  1959-12-31 23:59:57 -0.183669
1  1960-01-01 01:00:02  0.283993
2  1960-01-01 01:59:52  0.020663
3  1960-01-01 03:00:14  0.404304
4  1960-01-01 03:59:58  1.005252
chunk id: 0

In addition, it is possible to create new data source definitions. To define a new data source, you can customize and implement the DataSourceBase interface.

TimeBucket

The TimeBucket enum defines the required data aggregation granularity over time . By default, TimeBucket is set to TimeBucket.ALL. You can specify any of the following values: SECOND, MINUTE, HOUR, DAY, WEEK, MONTH, QUARTER, or ALL. To change the TimeBucket value, use the init method: source.init(time_bucket):

# let's generate a dummy DataFrame with 2 rows per time bucket (Hour in this scenario)
test_df = gen_dataframe_for_accuracy_metric(
    nr_rows=10,
    rows_per_time_bucket=2,
    prediction_value=1,
    with_actuals=True,
    with_predictions=True,
    time_bucket=TimeBucket.HOUR,
)
print(test_df)
                    timestamp  predictions  actuals
0  01/06/2005 13:00:00.000000            1    0.999
1  01/06/2005 13:00:00.000000            1    0.999
2  01/06/2005 14:00:00.000000            1    0.999
3  01/06/2005 14:00:00.000000            1    0.999
4  01/06/2005 15:00:00.000000            1    0.999
5  01/06/2005 15:00:00.000000            1    0.999
6  01/06/2005 16:00:00.000000            1    0.999
7  01/06/2005 16:00:00.000000            1    0.999
8  01/06/2005 17:00:00.000000            1    0.999
9  01/06/2005 17:00:00.000000            1    0.999

# let's use DataFrameSource and load created DataFrame
source = DataFrameSource(
    df=test_df,
    max_rows=10000,
    timestamp_col="timestamp",
)
# init source with the selected TimeBucket
source.init(TimeBucket.HOUR)
df, _ = source.get_data()
print(df)
                    timestamp predictions actuals
0  01/06/2005 13:00:00.000000           1   0.999
1  01/06/2005 13:00:00.000000           1   0.999
df, _ = source.get_data()
print(df)
                    timestamp predictions actuals
2  01/06/2005 14:00:00.000000           1   0.999
3  01/06/2005 14:00:00.000000           1   0.999

source.init(TimeBucket.DAY)
df, _ = source.get_data()
print(df)
                    timestamp predictions actuals
0  01/06/2005 13:00:00.000000           1   0.999
1  01/06/2005 13:00:00.000000           1   0.999
2  01/06/2005 14:00:00.000000           1   0.999
3  01/06/2005 14:00:00.000000           1   0.999
4  01/06/2005 15:00:00.000000           1   0.999
5  01/06/2005 15:00:00.000000           1   0.999
6  01/06/2005 16:00:00.000000           1   0.999
7  01/06/2005 16:00:00.000000           1   0.999
8  01/06/2005 17:00:00.000000           1   0.999
9  01/06/2005 17:00:00.000000           1   0.999

The returned data chunks follow the selected TimeBucket. This is helpful in the MetricEvaluator. In addition to TimeBucket, the source respects the max_rows parameter when generating data chunks; for example, using the same dataset as in the example above (but with max_rows set to 3):

source = DataFrameSource(
    df=test_df,
    max_rows=3,
    timestamp_col="timestamp",
)
source.init(TimeBucket.DAY)
df, chunk_id = source.get_data()
print(df)
                    timestamp predictions actuals
0  01/06/2005 13:00:00.000000           1   0.999
1  01/06/2005 13:00:00.000000           1   0.999
2  01/06/2005 14:00:00.000000           1   0.999

In DataRobotSource, you can specify the TimeBucket and max_rows parameters for all export types except training data export, which is returned in one piece.

Deployment

The Deployment class is a helper class which provides access to relevant deployment properties. This class is used inside the DataRobotSource to select the appropriate workflow to work with data.

import datarobot as dr
from dmm.data_source.datarobot.deployment import Deployment
dr.Client()
deployment = Deployment(deployment_id=deployment_id)

deployment_type = deployment.type()
target_column = deployment.target_column()
positive_class_label = deployment.positive_class_label()
negative_class_label = deployment.negative_class_label()
prediction_threshold = deployment.prediction_threshold()
.
.
.

Custom Metrics

The MetricBase provides an interface to define your own custom metrics.
See the examples of custom metrics located in the metric directory.

MetricBase

In MetricBase, you must define the type of data a metric requires, and the custom metric must inherit that definition:

class MetricBase(object):
    def __init__(
        self,
        name: str,
        description: str = None,
        need_predictions: bool = False,
        need_actuals: bool = False,
        need_scoring_data: bool = False,
        need_training_data: bool = False,
    ):
        self.name = name
        self.description = description
        self._need_predictions = need_predictions
        self._need_actuals = need_actuals
        self._need_scoring_data = need_scoring_data
        self._need_training_data = need_training_data

In addition, you must implement the scoring and reduction methods in MetricBase:

  • Scoring (score): Uses initialized data types to calculate a metric.
  • Reduction (reduce_func): Reduces multiple values in the same TimeBucket to one value.
    def score(
        self,
        scoring_data: pd.DataFrame,
        predictions: np.array,
        actuals: np.array,
        fit_ctx=None,
        metadata=None,
    ) -> float:
        raise NotImplemented

    def reduce_func(self) -> callable:
        return np.mean

ModelMetricBase and DataMetricBase

Two default classes can help you create your own custom metrics: ModelMetricBase and DataMetricBase.

ModelMetricBase is the base class for metrics that require actuals and predictions for metric calculation.

class ModelMetricBase(MetricBase):
    def __init__(
        self, name: str, description: str = None, need_training_data: bool = False
    ):
        super().__init__(
            name=name,
            description=description,
            need_scoring_data=False,
            need_predictions=True,
            need_actuals=True,
            need_training_data=need_training_data,
        )

    def score(
        self,
        prediction: np.array,
        actuals: np.array,
        fit_context=None,
        metadata=None,
        scoring_data=None,
    ) -> float:
        raise NotImplemented

DataMetricBase is the base class for metrics that require scoring data for metric calculation.

class DataMetricBase(MetricBase):
    def __init__(
        self, name: str, description: str = None, need_training_data: bool = False
    ):
        super().__init__(
            name=name,
            description=description,
            need_scoring_data=True,
            need_predictions=False,
            need_actuals=False,
            need_training_data=need_training_data,
        )

    def score(
        self,
        scoring_data: pd.DataFrame,
        fit_ctx=None,
        metadata=None,
        predictions=None,
        actuals=None,
    ) -> float:
        raise NotImplemented

LLM metrics

In the case of LLM support, a new type of metric was introduced LLMMetricBase is the base class for metrics that require scoring data and predictions for metric calculation. Which in the LLM world can be translated into prompts (user input) and completions (LLM response).

class LLMMetricBase(MetricBase):
    def __init__(
        self, name: str, description: str = None, need_training_data: bool = False
    ):
        super().__init__(
            name=name,
            description=description,
            need_scoring_data=True,
            need_predictions=True,
            need_actuals=False,
            need_training_data=need_training_data,
        )

    def score(
        self,
        scoring_data: pd.DataFrame,
        predictions: np.array,
        fit_ctx=None,
        metadata=None,
        actuals=None,
    ) -> float:
        raise NotImplemented

SklearnMetric

To accelerate the implementation of custom metrics, you can use ready-made, proven metrics from Sklearn. Provide the name of a metric, using the SklearnMetric class as the base class, to create a custom metric this way.
See the example below:

from dmm.metric.sklearn_metric import SklearnMetric


class MedianAbsoluteError(SklearnMetric):
    """
    Metric that calculates the median absolute error of the difference between predictions and actuals
    """

    def __init__(self):
        super().__init__(
            metric="median_absolute_error",
        )

PromptSimilarityMetricBase

The PromptSimilarityMetricBase is designed to make for easy comparison of LLM prompt and context vectors. This class is generally used with TextGen models where the prompt and context vectors are populated as described below.

The base class takes care of pulling the vectors from the scoring_data, and iterating over each entry. The prompt vector is pulled from the prompt_column (which defaults to _LLM_PROMPT_VECTOR) of the scoring_data. The context vectors are pulled from the context_column (which defaults to _LLM_CONTEXT) of the scoring_data. The context column contains a list of context dictionaries, and each context needs to have a vector element. Both the prompt_column and context_column are expected to be JSON encoded data.

A derived class must implement calculate_distance() -- for this class, the score() is already implemented. The calculate_distance function returns a single floating point value based on a single prompt_vector, and a list of context_vectors. Using the PromptSimilarityMetricBase to calculate the minimum Euclidean distance is shown below:

from dmm.metric import PromptSimilarityMetricBase

class EuclideanMinMetric(PromptSimilarityMetricBase):
    """Calculate the minimum Euclidean distance between a prompt vector, and a list of context vectors"""
    def calculate_distance(self, prompt_vector: np.array, context_vectors: List[np.array]) -> float:
        distances = [
            np.linalg.norm(prompt_vector - context_vector)
            for context_vector in context_vectors
        ]
        return min(distances)

# an instantiation would potentially look like this
scorer = EuclideanMinMetric(name=custom_metric.name, description="Euclidean minimum distance between prompt and context vectors")

Submit custom metric values with datarobot-model-metrics

The metrics mentioned above can provide the source of the custom metric definitions in the DataRobot platform.
The CustomMetric interface retrieves the metadata of an existing custom metric in DataRobot to report data to that custom metric.

We can initialize the metric by providing the parameters explicitly (metric_id, deployment_id, model_id, dr.Client()):

from dmm.custom_metric import CustomMetric


cm = CustomMetric.from_id(metric_id=METRIC_ID, deployment_id=DEPLOYMENT_ID, model_id=MODEL_ID, client=CLIENT)

You can also define these parameters as environment variables: os.environ["DEPLOYMENT_ID"], os.environ["CUSTOM_METRIC_ID"] os.environ['BASE_URL'], and os.environ['DATAROBOT_ENDPOINT']:

from dmm.custom_metric import CustomMetric


cm = CustomMetric.from_id()

In the case of batch mode, it is required to specify it:

from dmm.custom_metric import CustomMetric


cm = CustomMetric.from_id(is_batch=True)

The report method submits custom metric values to a custom metric defined in DataRobot. To use this method, report a dataframe in the shape of the output from the metric evaluator. For more information, see MetricEvaluator.

print(aggregated_metric_per_time_bucket.to_string())

                    timestamp  samples  median_absolute_error
1  01/06/2005 14:00:00.000000        2                  0.001

response = cm.report(df=aggregated_metric_per_time_bucket)
print(response.status_code)
202

The dry_run parameter determines if the custom metric values transfer is a dry run (where the values aren't saved in the database) or if it is a production data transfer. This parameter is set to false by default.

response = cm.report(df=aggregated_metric_per_time_bucket, dry_run=True)
print(response.status_code)
202

Metric Evaluator

The MetricEvaluator class calculates metric values over time using the selected source.
This class is used to "stream" data through the metric object, generating metric values.

Metric Evaluator Initialization

Initialize the MetricEvaluator with the following mandatory parameters:

from dmm import MetricEvaluator, TimeBucket
from dmm.data_source.datarobot_source import DataRobotSource
from dmm.metric import MedianAbsoluteError

source = DataRobotSource(
    deployment_id=DEPLOYMENT_ID,
    start=datetime.utcnow() - timedelta(weeks=1),
    end=datetime.utcnow(),
)

metric = MedianAbsoluteError()

metric_evaluator = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.MINUTE)

To use MetricEvaluator, create a metric class implementing the MetricBase interface, a source implementing DataSourceBase, and then specify the level of aggregation granularity.

Initialize MetricEvaluator with all parameters:

from dmm import ColumnName, MetricEvaluator, TimeBucket

metric_evaluator = MetricEvaluator(
    metric=metric,
    source=source,
    time_bucket=TimeBucket.HOUR,
    prediction_col=ColumnName.PREDICTIONS,
    actuals_col=ColumnName.ACTUALS,
    timestamp_col=ColumnName.TIMESTAMP,
    filter_actuals=False,
    filter_predictions=False,
    filter_scoring_data=False,
    segment_attribute=None,
    segment_value=None,
)
Parameter Description
metric: Union[str, MetricBase, List[str], List[MetricBase]] If a string or list of strings is passed,then MetricEvaluator will look for matched sklearn metrics, in case a metrics or list of objects is passed they must implement MetricBase interface.
source: DataSourceBase Source to pull the data from, DataRobotSource or DataFrameSource or other sources that implement DataSourceBase interface.
time_bucket: TimeBucket Time bucket size to use for evaluating metrics, determines the granularity of aggregation.
prediction_col: Optional[str] The name of the column that contains predictions.
actuals_col: Optional[str] The name of the column that contains actuals.
timestamp_col: Optional[str] The name of the column that contains timestamps.
filter_actuals: Optional[bool] If True metric evaluator removes missing actuals values before scoring. The default value is False.
filter_predictions: Optional[bool] If True metric evaluator removes missing predictions values before scoring. The default value is False.
filter_scoring_data: Optional[bool] If True metric evaluator removes missing scoring values before scoring. The default value is False.
segment_attribute: Optional[str] The name of the column with segment values.
segment_value: Optional[Union[str or List[str]]] Single value or a list of values of the segment attribute to segment on.

Metric Evaluator Usage

The score method returns a metric aggregated as defined by `TimeBucket, The output returned as a pandas DataFrame contains the results per time bucket for all data from the source.

source = DataRobotSource(
    deployment_id=DEPLOYMENT_ID,
    start=datetime.utcnow() - timedelta(hours=3),
    end=datetime.utcnow(),
)
metric = LogLossFromSklearn()

me = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.HOUR)

aggregated_metric_per_time_bucket = me.score()
print(aggregated_metric_per_time_bucket.to_string())

                          timestamp  samples  log_loss
0  2023-09-14 13:29:48.065000+00:00      499  0.539315
1  2023-09-14 14:01:51.484000+00:00      499  0.539397

# we can see the evaluator's statistics
stats = me.stats()
print(stats)
total rows: 998, score calls: 2, reduce calls: 2

To pass more than one metric at a time, you can do the following:

metrics = [LogLossFromSklearn(), AsymmetricError(), RocAuc()]
me = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.HOUR)

aggregated_metric_per_time_bucket = me.score()
stats = me.stats()
print(aggregated_metric_per_time_bucket.to_string())
print(stats)

                          timestamp  samples  log_loss  Asymmetric Error  roc_auc_score
0  2023-09-14 13:29:48.065000+00:00      499  0.539315          0.365571       0.787030
1  2023-09-14 14:01:51.484000+00:00      499  0.539397          0.365636       0.786837
total rows: 998, score calls: 6, reduce calls: 6

For your own data, you can provide the names of the columns to evaluate:

test_df = gen_dataframe_for_accuracy_metric(
    nr_rows=5,
    rows_per_time_bucket=1,
    prediction_value=1,
    time_bucket=TimeBucket.DAY,
    prediction_col="my_pred_col",
    actuals_col="my_actuals_col",
    timestamp_col="my_timestamp_col"
)
print(test_df)
             my_timestamp_col  my_pred_col  my_actuals_col
0  01/06/2005 13:00:00.000000            1           0.999
1  02/06/2005 13:00:00.000000            1           0.999
2  03/06/2005 13:00:00.000000            1           0.999
3  04/06/2005 13:00:00.000000            1           0.999
4  05/06/2005 13:00:00.000000            1           0.999

source = DataFrameSource(
    df=test_df,
    max_rows=10000,
    timestamp_col="timestamp",
)

metric = LogLossFromSklearn()

me = MetricEvaluator(metric=metric, 
                     source=source, 
                     time_bucket=TimeBucket.DAY,
                     prediction_col="my_pred_col", 
                     actuals_col="my_actuals_col", 
                     timestamp_col="my_timestamp_col"
                     )
aggregated_metric_per_time_bucket = me.score()

Metric Evaluator Data Filtering

If some data is missing, use filtering flags. For example, the following example is for data with missing actuals.

In this scenario without a flag, an exception is raised:

test_df = gen_dataframe_for_accuracy_metric(
    nr_rows=10,
    rows_per_time_bucket=5,
    prediction_value=1,
    time_bucket=TimeBucket.HOUR,
)
test_df["actuals"].loc[2] = None
test_df["actuals"].loc[5] = None
print(test_df)
                    timestamp  predictions  actuals
0  01/06/2005 13:00:00.000000            1    0.999
1  01/06/2005 13:00:00.000000            1    0.999
2  01/06/2005 13:00:00.000000            1      NaN
3  01/06/2005 13:00:00.000000            1    0.999
4  01/06/2005 13:00:00.000000            1    0.999
5  01/06/2005 14:00:00.000000            1      NaN
6  01/06/2005 14:00:00.000000            1    0.999
7  01/06/2005 14:00:00.000000            1    0.999
8  01/06/2005 14:00:00.000000            1    0.999
9  01/06/2005 14:00:00.000000            1    0.999

source = DataFrameSource(df=test_df)

metric = MedianAbsoluteError()

me = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.HOUR)

aggregated_metric_per_time_bucket = me.score()
"ValueError: Could not apply metric median_absolute_error, make sure you are passing the right data (see the sklearn docs).
The error message was: Input contains NaN."

For the same dataset, compare the previous result with the result when you enable the filter_actuals flag:

me = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.HOUR, filter_actuals=True)

aggregated_metric_per_time_bucket = me.score()
"removed 1 rows out of 5 in the data chunk before scoring, due to missing values in ['actuals'] data"
"removed 1 rows out of 5 in the data chunk before scoring, due to missing values in ['actuals'] data"

print(aggregated_metric_per_time_bucket.to_string())
                    timestamp  samples  median_absolute_error
0  01/06/2005 13:00:00.000000        4                  0.001
1  01/06/2005 14:00:00.000000        4                  0.001

Using the filter_actuals, filter_predictions, filter_scoring_data flags, you can filter out missing values from the data before calculating the metric. By default, these flags are set to False.

If all data needed to calculate the metric is missing in the data chunk, we skip this data chunk with the appropriate log:

test_df = gen_dataframe_for_accuracy_metric(
    nr_rows=4,
    rows_per_time_bucket=2,
    prediction_value=1,
    time_bucket=TimeBucket.HOUR,
)
test_df["actuals"].loc[0] = None
test_df["actuals"].loc[1] = None
print(test_df)
                    timestamp  predictions  actuals
0  01/06/2005 13:00:00.000000            1      NaN
1  01/06/2005 13:00:00.000000            1      NaN
2  01/06/2005 14:00:00.000000            1    0.999
3  01/06/2005 14:00:00.000000            1    0.999

source = DataFrameSource(df=test_df)

metric = MedianAbsoluteError()

me = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.HOUR, filter_actuals=True)

aggregated_metric_per_time_bucket = me.score()
"removed 2 rows out of 2 in the data chunk before scoring, due to missing values in ['actuals'] data"
"data chunk is empty, skipping scoring..."

print(aggregated_metric_per_time_bucket.to_string())
                    timestamp  samples  median_absolute_error
1  01/06/2005 14:00:00.000000        2                  0.001

Metric Evaluator Segmented Analysis

Perform segmented analysis by defining the segment_attribute and each segment_value:

metrics = LogLossFromSklearn()
me = MetricEvaluator(metric=metric,
                     source=source,
                     time_bucket=TimeBucket.HOUR,
                     segment_attribute="insulin",
                     segment_value="Down",
                     )

aggregated_metric_per_time_bucket = me.score()
print(aggregated_metric_per_time_bucket.to_string())
                          timestamp  samples  log_loss [Down]
0  2023-09-14 13:29:49.737000+00:00       49         0.594483
1  2023-09-14 14:01:52.437000+00:00       49         0.594483

# passing more than one segment value
me = MetricEvaluator(metric=metric,
                     source=source,
                     time_bucket=TimeBucket.HOUR,
                     segment_attribute="insulin",
                     segment_value=["Down", "Steady"],
                     )

aggregated_metric_per_time_bucket = me.score()
print(aggregated_metric_per_time_bucket.to_string())
                          timestamp  samples  log_loss [Down]  log_loss [Steady]
0  2023-09-14 13:29:48.502000+00:00      199         0.594483           0.515811
1  2023-09-14 14:01:51.758000+00:00      199         0.594483           0.515811

# passing more than one segment value and more than one metric
me = MetricEvaluator(metric=[LogLossFromSklearn(), RocAuc()],
                     source=source,
                     time_bucket=TimeBucket.HOUR,
                     segment_attribute="insulin",
                     segment_value=["Down", "Steady"],
                     )

aggregated_metric_per_time_bucket = me.score()
print(aggregated_metric_per_time_bucket.to_string())
                          timestamp  samples  log_loss [Down]  log_loss [Steady]  roc_auc_score [Down]  roc_auc_score [Steady]
0  2023-09-14 13:29:48.502000+00:00      199         0.594483           0.515811              0.783333                0.826632
1  2023-09-14 14:01:51.758000+00:00      199         0.594483           0.515811              0.783333                0.826632

Batch Metric Evaluator

The BatchMetricEvaluator class uses aggregation per batch instead of aggregation over time. For batches, you don't define TimeBucket:

from dmm.batch_metric_evaluator import BatchMetricEvaluator
from dmm.data_source.datarobot_source import BatchDataRobotSource
from dmm.metric import MissingValuesFraction

source = BatchDataRobotSource(
    deployment_id=DEPLOYMENT_ID,
    batch_ids=BATCH_IDS,
    model_id=MODEL_ID,
)

feature_name = 'RAD'
metric = MissingValuesFraction(feature_name=feature_name)

missing_values_fraction_evaluator = BatchMetricEvaluator(metric=metric, source=source)

aggregated_metric_per_batch = missing_values_fraction_evaluator.score()
print(aggregated_metric_per_batch.to_string())
     batch_id   samples  Missing Values Fraction
0  <batch_id>       506                      0.0
1  <batch_id>       506                      0.0
2  <batch_id>       506                      0.0

Note: For batches, actuals and multiple segments are not supported.

Individual Metric Evaluator

The IndividualMetricEvaluator class is used to evaluate metrics without data aggregation. Perform metric calculations on all exported data, return a list of individual results. This evaluator allows submitting individual data points with a corresponding association id. This is useful for the cases when you want to visualize your metric results alongside predictions and actuals. To use this evaluator with custom metric, it is necessary to provide score method that contains, among others, the following parameters: 'timestamps' and 'association_ids'.

from itertools import zip_longest
from typing import List
from datetime import datetime
from datetime import timedelta

from dmm.individual_metric_evaluator import IndividualMetricEvaluator
from dmm.custom_metric import CustomMetric
from dmm.custom_metric import SingleMetricResult
from dmm.data_source import DataRobotSource
from dmm.metric.metric_base import LLMMetricBase
from nltk import sent_tokenize
import numpy as np
import pandas as pd

source = DataRobotSource(
    deployment_id=DEPLOYMENT_ID,
    start=datetime.utcnow() - timedelta(weeks=1),
    end=datetime.utcnow(),
)

custom_metric = CustomMetric.from_id()

class SentenceCount(LLMMetricBase):
    """
    Calculates the total number of sentences created while working with the LLM model.
    Returns the sum of the number of sentences from prompts and completions.
    """

    def __init__(self):
        super().__init__(
            name=custom_metric.name,
            description="Calculates the total number of sentences created while working with the LLM model.",
            need_training_data=False,
        )
        self.prompt_column = "promptColumn"

    def score(
        self,
        scoring_data: pd.DataFrame,
        predictions: np.array,
        timestamps: np.array,
        association_ids: np.array,
        **kwargs,
    ) -> List[SingleMetricResult]:
        if self.prompt_column not in scoring_data.columns:
            raise ValueError(
                f"Prompt column {self.prompt_column} not found in the exported data, "
                f"modify 'PROMPT_COLUMN' runtime parameter"
            )
        prompts = scoring_data[self.prompt_column].to_numpy()

        sentence_count = []
        for prompt, completion, ts, a_id in zip_longest(
            prompts, predictions, timestamps, association_ids
        ):
            if not isinstance(prompt, str) or not isinstance(completion, str):
                continue
            value = len(sent_tokenize(prompt)) + len(sent_tokenize(completion))
            sentence_count.append(
                SingleMetricResult(value=value, timestamp=ts, association_id=a_id)
            )
        return sentence_count


sentence_count_evaluator = IndividualMetricEvaluator(
    metric=SentenceCount(),
    source=source,
)
metric_results = sentence_count_evaluator.score()

DR Custom Metrics

The DR Custom Metrics module allows better synchronization with existing metrics on the DR side. The logic of this module is based on unique names for custom metrics, so you can operate on metrics without knowing their IDs. Thanks to this solution, we can define the metric earlier (e.g. before creating the deployment) and synchronize it with DR at the appropriate time.

DRCustomMetric:

This class DRCustomMetric allows you to create new or fetch existing metrics from DR. the logic is as follows:

  • you can provide custom metrics configuration in the selected format: YAML, dict, YAML file, JSON file.
  • the configuration contains custom metrics metadata that describe them.
  • DRCustomMetric.sync() method retrieves information about existing custom metrics on the DR side, if a metric is defined locally but is not on the DR side, it will be created on the DR side.
  • DRCustomMetric.report() method allows you to report a single value based on a unique name.

Example:

dr_cm = DRCustomMetric(
    dr_client=client, deployment_id=deployment_id, model_package_id=model_package_id
)

metric_config_yaml = f"""
     customMetrics:
       - name: new metric
         description: foo bar
         type: average
         timeStep: hour
         units: count
         directionality: lowerIsBetter
         isModelSpecific: yes
         baselineValue: 0
     """

dr_cm.set_config(config_yaml=metric_config_yaml)
dr_cm.sync()
dr_cm.get_dr_custom_metrics()
> [{"name": "existing metric", "id": "65ef19410239ff8015f05a94", ...}, 
>  {"name": "new metric", "id": "65ef197ce5d7b2176ceecf3a", ...}]

dr_cm.report_value("existing metric", 1)
dr_cm.report_value("new metric", 9)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

datarobot_model_metrics-0.6.2-py3-none-any.whl (69.8 kB view details)

Uploaded Python 3

File details

Details for the file datarobot_model_metrics-0.6.2-py3-none-any.whl.

File metadata

File hashes

Hashes for datarobot_model_metrics-0.6.2-py3-none-any.whl
Algorithm Hash digest
SHA256 842dad6aa176ff851f786c29fbff2da8c2d1e21577decd4def76e51f8682ed39
MD5 3a10ec84fd7c6b28f7489fe419db252c
BLAKE2b-256 22844041508f2dcd6b9789dbb85527c2d568a3ebbd9dda9eda06ec012ca7899e

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page