Skip to main content

datarobot-model-metrics provides a framework to compute model ML metrics over time and produce aggregated metrics.

Project description

DataRobot Model Metrics Overview

The datarobot-model-metrics (dmm) library provides the tools necessary to create custom metrics, including the following operations:

  • Create a data source
  • Create a custom metric
  • Evaluate a custom metric
  • Submit results to DataRobot

Review the following documentation to learn more about the datarobot-model-metrics interfaces used to perform custom metric operations:

  • Data Sources
  • Custom Metrics
  • Metric Evaluator

For end-to-end examples, you can review the ./examples directory.

DataRobotSource

The most commonly used data source is DataRobotSource. This data source connects to DataRobot to fetch selected data from the DataRobot platform.

DataRobotSource Initialization

Initialize DataRobotSource with the following mandatory parameters:

from dmm.data_source import DataRobotSource

source = DataRobotSource(
    base_url=DATAROBOT_ENDPOINT,
    token=DATAROBOT_API_TOKEN,
    deployment_id=deployment_id,
    start=start_of_export_window,
    end=end_of_export_window,
)

You can also provide the base_url and token parameters as environment variables: os.environ['DATAROBOT_ENDPOINT'] and os.environ['BASE_URL']

from dmm.data_source import DataRobotSource

source = DataRobotSource(
    deployment_id=deployment_id,
    start=start_of_export_window,
    end=end_of_export_window,
)

The following example initializes DataRobotSource with all parameters:

from dmm.data_source import DataRobotSource

source = DataRobotSource(
    base_url=DATAROBOT_ENDPOINT,
    token=DATAROBOT_API_TOKEN,
    client=None,
    deployment_id=deployment_id,
    model_id=model_id,
    start=start_of_export_window,
    end=end_of_export_window,
    max_rows=10000,
    delete_exports=False,
    use_cache=False,
    actuals_with_matched_predictions=True,
)
Parameter Description
base_url: str The DataRobot API URL; for example, https://app.datarobot.com/api/v2.
token: str A DataRobot API token from Developer Tools.
client: Optional[dr.Client] Use the dr.Client object instead of base_url and token.
deployment_id: str The ID of the deployment evaluated by the custom metric.
model_id: Optional[str] The ID of the model evaluated by the custom metric. If you don't specify a model ID, the champion model ID is used.
start: datetime The start of the export window. Define the date you want to start to retrieving data from.
end: datetime The end of the export window. Define the date you want to retrieve data until.
max_rows: Optional[int] The maximum number of rows to fetch at once when the requested data doesn't fit into memory.
delete_exports: Optional[bool] If True, datasets with exported data created in the AI Catalog are automatically deleted. The default value is False.
use_cache: Optional[bool] If True, use existing datasets stored in the AI Catalog for time ranges included in previous exports. The default value is False.
actuals_with_matched_predictions: Optional[bool] If False, allow actuals export without matched predictions. The default value is True.

DataRobotSource Usage

Prediction Data Export

The get_prediction_data method returns a chunk of prediction data with the appropriate chunk ID; the returned data chunk is a pandas DataFrame with the number of rows respecting the max_rows parameter. This method returns data until the data source is exhausted.

prediction_df_1, prediction_chunk_id_1 = source.get_prediction_data()

print(prediction_df_1.head(5).to_string())
print(f"chunk id: {prediction_chunk_id_1}")

   DR_RESERVED_PREDICTION_TIMESTAMP  DR_RESERVED_PREDICTION_VALUE_high  DR_RESERVED_PREDICTION_VALUE_low date_non_unique date_random  id       年月日
0  2023-09-13 11:02:51.248000+00:00                           0.697782                          0.302218      1950-10-01  1949-01-27   1  1949-01-01
1  2023-09-13 11:02:51.252000+00:00                           0.581351                          0.418649      1959-04-01  1949-02-03   2  1949-02-01
2  2023-09-13 11:02:51.459000+00:00                           0.639347                          0.360653      1954-05-01  1949-03-28   3  1949-03-01
3  2023-09-13 11:02:51.459000+00:00                           0.627727                          0.372273      1951-09-01  1949-04-07   4  1949-04-01
4  2023-09-13 11:02:51.664000+00:00                           0.591612                          0.408388      1951-03-01  1949-05-16   5  1949-05-01
chunk id: 0

When the data source is exhausted, None and -1 are returned:

prediction_df_2, prediction_chunk_id_2 = source.get_prediction_data()

print(prediction_df_2)
print(prediction_chunk_id_2)

None
chunk id: -1

The reset method resets the exhausted data source, allowing it to iterate from the beginning:

source.reset()

The get_all_prediction_data method returns all prediction data available for a data source object in a single DataFrame:

prediction_df = source.get_all_prediction_data()

Actuals Data Export

The get_actuals_data method returns a chunk of actuals data with the appropriate chunk ID the returned data chunk is a pandas DataFrame with the number of rows respecting the max_rows parameter. This method returns data until the data source is exhausted.

actuals_df_1, actuals_chunk_id_1 = source.get_actuals_data()

print(actuals_df_1.head(5).to_string())
print(f"chunk id: {actuals_chunk_id_1}")

     association_id                  timestamp label  actuals  predictions predicted_class
0                 1  2023-09-13 11:00:00+00:00   low        0     0.302218            high
194              57  2023-09-13 11:00:00+00:00   low        1     0.568564             low
192              56  2023-09-13 11:00:00+00:00   low        1     0.569865             low
190              55  2023-09-13 11:00:00+00:00   low        0     0.473282            high
196              58  2023-09-13 11:00:00+00:00   low        1     0.573861             low
chunk id: 0

To return raw data in the format of data from postgresql, set the return_original_column_names parameter to True:

actuals_df_1, actuals_chunk_id_1 = source.get_actuals_data()

print(actuals_df_1.head(5).to_string())
print(f"chunk id: {actuals_chunk_id_1}")

     id                  timestamp label  actuals         y predicted_class
0     1  2023-09-13 11:00:00+00:00   low        0  0.302218            high
194  57  2023-09-13 11:00:00+00:00   low        1  0.568564             low
192  56  2023-09-13 11:00:00+00:00   low        1  0.569865             low
190  55  2023-09-13 11:00:00+00:00   low        0  0.473282            high
196  58  2023-09-13 11:00:00+00:00   low        1  0.573861             low
chunk id: 0

To return all actuals data available for a source object in a single DataFrame, use the get_all_actuals_data method:

actuals_df = source.get_all_actuals_data()

When the data source is exhausted, None and -1 are returned:

actuals_df_2, actuals_chunk_id_2 = source.get_actuals_data()

print(actuals_df_2)
print(actuals_chunk_id_2)

None
chunk id: -1

The reset method resets the exhausted data source, allowing it to iterate from the beginning:

source.reset()

Training Data Export

The get_training_data method returns all data used for training in one call. The returned data is a pandas DataFrame:

train_df = source.get_training_data()
print(train_df.head(5).to_string())

      y date_random date_non_unique       年月日
0  high  1949-01-27      1950-10-01  1949-01-01
1  high  1949-02-03      1959-04-01  1949-02-01
2   low  1949-03-28      1954-05-01  1949-03-01
3  high  1949-04-07      1951-09-01  1949-04-01
4  high  1949-05-16      1951-03-01  1949-05-01

Combined data export

The get_data method returns combined_data, which includes merged scoring data, predictions, and matched actuals:
This Metric Evaluator uses this method as the main data export method.

df, chunk_id_1 = source.get_data()
print(df.head(5).to_string())
print(f"chunk id: {chunk_id_1}")

                          timestamp  predictions date_non_unique date_random  association_id       年月日 predicted_class label  actuals
0  2023-09-13 11:02:51.248000+00:00     0.302218      1950-10-01  1949-01-27               1  1949-01-01            high   low        0
1  2023-09-13 11:02:51.252000+00:00     0.418649      1959-04-01  1949-02-03               2  1949-02-01            high   low        0
2  2023-09-13 11:02:51.459000+00:00     0.360653      1954-05-01  1949-03-28               3  1949-03-01            high   low        1
3  2023-09-13 11:02:51.459000+00:00     0.372273      1951-09-01  1949-04-07               4  1949-04-01            high   low        0
4  2023-09-13 11:02:51.664000+00:00     0.408388      1951-03-01  1949-05-16               5  1949-05-01            high   low        0
chunk id: 0

The get_all_data returns all combined data available for that source object in a single DataFrame:

df = source.get_all_data()

BatchDataRobotSource

The BatchDataRobotSource interface is for batch deployments.

The following example initializes BatchDataRobotSource with all parameters:

from dmm.data_source import BatchDataRobotSource

source = BatchDataRobotSource(
    base_url=DATAROBOT_ENDPOINT,
    token=DATAROBOT_API_TOKEN,
    client=None,
    deployment_id=deployment_id,
    model_id=model_id,
    batch_ids=batch_ids,
    max_rows=10000,
    delete_exports=False,
    use_cache=False,
)

The parameters for this method are analogous to those for DataRobotSource. The most important difference is that instead of the time range (start and end), you must provide batch IDs. In addition, a batch source doesn't support actuals export.

The get_prediction_data method returns a chunk of prediction data with the appropriate chunk ID; the returned data chunk is a pandas DataFrame with the number of rows respecting the max_rows parameter. This method returns data until the data source is exhausted.

prediction_df_1, prediction_chunk_id_1 = source.get_prediction_data()
print(prediction_df_1.head(5).to_string())
print(f"chunk id: {prediction_chunk_id_1}")

    AGE       B  CHAS     CRIM     DIS                  batch_id    DR_RESERVED_BATCH_NAME                         timestamp   INDUS  LSTAT  MEDV    NOX  PTRATIO  RAD     RM  TAX    ZN  id
0  65.2  396.90     0  0.00632  4.0900                <batch_id>                    batch1  2023-06-23 09:47:47.060000+00:00    2.31   4.98  24.0  0.538     15.3    1  6.575  296  18.0   1
1  78.9  396.90     0  0.02731  4.9671                <batch_id>                    batch1  2023-06-23 09:47:47.060000+00:00    7.07   9.14  21.6  0.469     17.8    2  6.421  242   0.0   2
2  61.1  392.83     0  0.02729  4.9671                <batch_id>                    batch1  2023-06-23 09:47:47.060000+00:00    7.07   4.03  34.7  0.469     17.8    2  7.185  242   0.0   3
3  45.8  394.63     0  0.03237  6.0622                <batch_id>                    batch1  2023-06-23 09:47:47.060000+00:00    2.18   2.94  33.4  0.458     18.7    3  6.998  222   0.0   4
4  54.2  396.90     0  0.06905  6.0622                <batch_id>                    batch1  2023-06-23 09:47:47.060000+00:00    2.18   5.33  36.2  0.458     18.7    3  7.147  222   0.0   5
chunk id: 0

prediction_df = source.get_all_prediction_data()

source.reset()

df, chunk_id_1 = source.get_data()

The get_training_data method returns all data used for training in one call. The returned data is a pandas DataFrame:

train_df = source.get_training_data()

Note:: actuals export for batches is not implemented yet.

DataFrameSource

If you aren't exporting data directly from DataRobot, and instead have it downloaded locally (for example), you can load the dataset into DataFrameSource. The DataFrameSource method wraps any pd.DataFrame to create a library-compatible source. This is the easiest way to interact with the library when bringing your own data:

source = DataFrameSource(
    df=pd.read_csv("./data_hour_of_week.csv"),
    max_rows=10000,
    timestamp_col="date"
)

df, chunk_id_1 = source.get_data()
print(df.head(5).to_string())
print(f"chunk id: {chunk_id_1}")

                  date         y
0  1959-12-31 23:59:57 -0.183669
1  1960-01-01 01:00:02  0.283993
2  1960-01-01 01:59:52  0.020663
3  1960-01-01 03:00:14  0.404304
4  1960-01-01 03:59:58  1.005252
chunk id: 0

In addition, it is possible to create new data source definitions. To define a new data source, you can customize and implement the DataSourceBase interface.

TimeBucket

The TimeBucket enum defines the required data aggregation granularity over time . By default, TimeBucket is set to TimeBucket.ALL. You can specify any of the following values: SECOND, MINUTE, HOUR, DAY, WEEK, MONTH, QUARTER, or ALL. To change the TimeBucket value, use the init method: source.init(time_bucket):

# let's generate a dummy DataFrame with 2 rows per time bucket (Hour in this scenario)
test_df = gen_dataframe_for_accuracy_metric(
    nr_rows=10,
    rows_per_time_bucket=2,
    prediction_value=1,
    with_actuals=True,
    with_predictions=True,
    time_bucket=TimeBucket.HOUR,
)
print(test_df)
                    timestamp  predictions  actuals
0  01/06/2005 13:00:00.000000            1    0.999
1  01/06/2005 13:00:00.000000            1    0.999
2  01/06/2005 14:00:00.000000            1    0.999
3  01/06/2005 14:00:00.000000            1    0.999
4  01/06/2005 15:00:00.000000            1    0.999
5  01/06/2005 15:00:00.000000            1    0.999
6  01/06/2005 16:00:00.000000            1    0.999
7  01/06/2005 16:00:00.000000            1    0.999
8  01/06/2005 17:00:00.000000            1    0.999
9  01/06/2005 17:00:00.000000            1    0.999

# let's use DataFrameSource and load created DataFrame
source = DataFrameSource(
    df=test_df,
    max_rows=10000,
    timestamp_col="timestamp",
)
# init source with the selected TimeBucket
source.init(TimeBucket.HOUR)
df, _ = source.get_data()
print(df)
                    timestamp predictions actuals
0  01/06/2005 13:00:00.000000           1   0.999
1  01/06/2005 13:00:00.000000           1   0.999
df, _ = source.get_data()
print(df)
                    timestamp predictions actuals
2  01/06/2005 14:00:00.000000           1   0.999
3  01/06/2005 14:00:00.000000           1   0.999

source.init(TimeBucket.DAY)
df, _ = source.get_data()
print(df)
                    timestamp predictions actuals
0  01/06/2005 13:00:00.000000           1   0.999
1  01/06/2005 13:00:00.000000           1   0.999
2  01/06/2005 14:00:00.000000           1   0.999
3  01/06/2005 14:00:00.000000           1   0.999
4  01/06/2005 15:00:00.000000           1   0.999
5  01/06/2005 15:00:00.000000           1   0.999
6  01/06/2005 16:00:00.000000           1   0.999
7  01/06/2005 16:00:00.000000           1   0.999
8  01/06/2005 17:00:00.000000           1   0.999
9  01/06/2005 17:00:00.000000           1   0.999

The returned data chunks follow the selected TimeBucket. This is helpful in the MetricEvaluator. In addition to TimeBucket, the source respects the max_rows parameter when generating data chunks; for example, using the same dataset as in the example above (but with max_rows set to 3):

source = DataFrameSource(
    df=test_df,
    max_rows=3,
    timestamp_col="timestamp",
)
source.init(TimeBucket.DAY)
df, chunk_id = source.get_data()
print(df)
                    timestamp predictions actuals
0  01/06/2005 13:00:00.000000           1   0.999
1  01/06/2005 13:00:00.000000           1   0.999
2  01/06/2005 14:00:00.000000           1   0.999

In DataRobotSource, you can specify the TimeBucket and max_rows parameters for all export types except training data export, which is returned in one piece.

Deployment

The Deployment class is a helper class which provides access to relevant deployment properties. This class is used inside the DataRobotSource to select the appropriate workflow to work with data.

import datarobot as dr
from dmm.data_source.datarobot.deployment import Deployment
dr.Client()
deployment = Deployment(deployment_id=deployment_id)

deployment_type = deployment.type()
target_column = deployment.target_column()
positive_class_label = deployment.positive_class_label()
negative_class_label = deployment.negative_class_label()
prediction_threshold = deployment.prediction_threshold()
.
.
.

Custom Metrics

The MetricBase provides an interface to define your own custom metrics.
See the examples of custom metrics located in the metric directory.

MetricBase

In MetricBase, you must define the type of data a metric requires, and the custom metric must inherit that definition:

class MetricBase(object):
    def __init__(
        self,
        name: str,
        description: str = None,
        need_predictions: bool = False,
        need_actuals: bool = False,
        need_scoring_data: bool = False,
        need_training_data: bool = False,
    ):
        self.name = name
        self.description = description
        self._need_predictions = need_predictions
        self._need_actuals = need_actuals
        self._need_scoring_data = need_scoring_data
        self._need_training_data = need_training_data

In addition, you must implement the scoring and reduction methods in MetricBase:

  • Scoring (score): Uses initialized data types to calculate a metric.
  • Reduction (reduce_func): Reduces multiple values in the same TimeBucket to one value.
    def score(
        self,
        scoring_data: pd.DataFrame,
        predictions: np.array,
        actuals: np.array,
        fit_ctx=None,
        metadata=None,
    ) -> float:
        raise NotImplemented

    def reduce_func(self) -> callable:
        return np.mean

ModelMetricBase and DataMetricBase

Two default classes can help you create your own custom metrics: ModelMetricBase and DataMetricBase.

ModelMetricBase is the base class for metrics that require actuals and predictions for metric calculation.

class ModelMetricBase(MetricBase):
    def __init__(
        self, name: str, description: str = None, need_training_data: bool = False
    ):
        super().__init__(
            name=name,
            description=description,
            need_scoring_data=False,
            need_predictions=True,
            need_actuals=True,
            need_training_data=need_training_data,
        )

    def score(
        self,
        prediction: np.array,
        actuals: np.array,
        fit_context=None,
        metadata=None,
        scoring_data=None,
    ) -> float:
        raise NotImplemented

DataMetricBase is the base class for metrics that require scoring data for metric calculation.

class DataMetricBase(MetricBase):
    def __init__(
        self, name: str, description: str = None, need_training_data: bool = False
    ):
        super().__init__(
            name=name,
            description=description,
            need_scoring_data=True,
            need_predictions=False,
            need_actuals=False,
            need_training_data=need_training_data,
        )

    def score(
        self,
        scoring_data: pd.DataFrame,
        fit_ctx=None,
        metadata=None,
        predictions=None,
        actuals=None,
    ) -> float:
        raise NotImplemented

LLM metrics

In the case of LLM support, a new type of metric was introduced LLMMetricBase is the base class for metrics that require scoring data and predictions for metric calculation. Which in the LLM world can be translated into prompts (user input) and completions (LLM response).

class LLMMetricBase(MetricBase):
    def __init__(
        self, name: str, description: str = None, need_training_data: bool = False
    ):
        super().__init__(
            name=name,
            description=description,
            need_scoring_data=True,
            need_predictions=True,
            need_actuals=False,
            need_training_data=need_training_data,
        )

    def score(
        self,
        scoring_data: pd.DataFrame,
        predictions: np.array,
        fit_ctx=None,
        metadata=None,
        actuals=None,
    ) -> float:
        raise NotImplemented

SklearnMetric

To accelerate the implementation of custom metrics, you can use ready-made, proven metrics from Sklearn. Provide the name of a metric, using the SklearnMetric class as the base class, to create a custom metric this way.
See the example below:

from dmm.metric.sklearn_metric import SklearnMetric


class MedianAbsoluteError(SklearnMetric):
    """
    Metric that calculates the median absolute error of the difference between predictions and actuals
    """

    def __init__(self):
        super().__init__(
            metric="median_absolute_error",
        )

Submit custom metric values with datarobot-model-metrics

The metrics mentioned above can provide the source of the custom metric definitions in the DataRobot platform.
The CustomMetric interface retrieves the metadata of an existing custom metric in DataRobot to report data to that custom metric.

We can initialize the metric by providing the parameters explicitly (metric_id, deployment_id, model_id, dr.Client()):

from dmm.custom_metric import CustomMetric


cm = CustomMetric.from_id(metric_id=METRIC_ID, deployment_id=DEPLOYMENT_ID, model_id=MODEL_ID, client=CLIENT)

You can also define these parameters as environment variables: os.environ["DEPLOYMENT_ID"], os.environ["CUSTOM_METRIC_ID"] os.environ['BASE_URL'], and os.environ['DATAROBOT_ENDPOINT']:

from dmm.custom_metric import CustomMetric


cm = CustomMetric.from_id()

In the case of batch mode, it is required to specify it:

from dmm.custom_metric import CustomMetric


cm = CustomMetric.from_id(is_batch=True)

The report method submits custom metric values to a custom metric defined in DataRobot. To use this method, report a dataframe in the shape of the output from the metric evaluator. For more information, see MetricEvaluator.

print(aggregated_metric_per_time_bucket.to_string())

                    timestamp  samples  median_absolute_error
1  01/06/2005 14:00:00.000000        2                  0.001

response = cm.report(df=aggregated_metric_per_time_bucket)
print(response.status_code)
202

The dry_run parameter determines if the custom metric values transfer is a dry run (where the values aren't saved in the database) or if it is a production data transfer. This parameter is set to false by default.

response = cm.report(df=aggregated_metric_per_time_bucket, dry_run=True)
print(response.status_code)
202

Metric Evaluator

The MetricEvaluator class calculates metric values over time using the selected source.
This class is used to "stream" data through the metric object, generating metric values.

Metric Evaluator Initialization

Initialize the MetricEvaluator with the following mandatory parameters:

from dmm import MetricEvaluator, TimeBucket
from dmm.data_source.datarobot_source import DataRobotSource
from dmm.metric import MedianAbsoluteError

source = DataRobotSource(
    deployment_id=DEPLOYMENT_ID,
    start=datetime.utcnow() - timedelta(weeks=1),
    end=datetime.utcnow(),
)

metric = MedianAbsoluteError()

metric_evaluator = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.MINUTE)

To use MetricEvaluator, create a metric class implementing the MetricBase interface, a source implementing DataSourceBase, and then specify the level of aggregation granularity.

Initialize MetricEvaluator with all parameters:

from dmm import ColumnName, MetricEvaluator, TimeBucket

metric_evaluator = MetricEvaluator(
    metric=metric,
    source=source,
    time_bucket=TimeBucket.HOUR,
    prediction_col=ColumnName.PREDICTIONS,
    actuals_col=ColumnName.ACTUALS,
    timestamp_col=ColumnName.TIMESTAMP,
    filter_actuals=False,
    filter_predictions=False,
    filter_scoring_data=False,
    segment_attribute=None,
    segment_value=None,
)
Parameter Description
metric: Union[str, MetricBase, List[str], List[MetricBase]] If a string or list of strings is passed,then MetricEvaluator will look for matched sklearn metrics, in case a metrics or list of objects is passed they must implement MetricBase interface.
source: DataSourceBase Source to pull the data from, DataRobotSource or DataFrameSource or other sources that implement DataSourceBase interface.
time_bucket: TimeBucket Time bucket size to use for evaluating metrics, determines the granularity of aggregation.
prediction_col: Optional[str] The name of the column that contains predictions.
actuals_col: Optional[str] The name of the column that contains actuals.
timestamp_col: Optional[str] The name of the column that contains timestamps.
filter_actuals: Optional[bool] If True metric evaluator removes missing actuals values before scoring. The default value is False.
filter_predictions: Optional[bool] If True metric evaluator removes missing predictions values before scoring. The default value is False.
filter_scoring_data: Optional[bool] If True metric evaluator removes missing scoring values before scoring. The default value is False.
segment_attribute: Optional[str] The name of the column with segment values.
segment_value: Optional[Union[str or List[str]]] Single value or a list of values of the segment attribute to segment on.

Metric Evaluator Usage

The score method returns a metric aggregated as defined by `TimeBucket, The output returned as a pandas DataFrame contains the results per time bucket for all data from the source.

source = DataRobotSource(
    deployment_id=DEPLOYMENT_ID,
    start=datetime.utcnow() - timedelta(hours=3),
    end=datetime.utcnow(),
)
metric = LogLossFromSklearn()

me = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.HOUR)

aggregated_metric_per_time_bucket = me.score()
print(aggregated_metric_per_time_bucket.to_string())

                          timestamp  samples  log_loss
0  2023-09-14 13:29:48.065000+00:00      499  0.539315
1  2023-09-14 14:01:51.484000+00:00      499  0.539397

# we can see the evaluator's statistics
stats = me.stats()
print(stats)
total rows: 998, score calls: 2, reduce calls: 2

To pass more than one metric at a time, you can do the following:

metrics = [LogLossFromSklearn(), AsymmetricError(), RocAuc()]
me = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.HOUR)

aggregated_metric_per_time_bucket = me.score()
stats = me.stats()
print(aggregated_metric_per_time_bucket.to_string())
print(stats)

                          timestamp  samples  log_loss  Asymmetric Error  roc_auc_score
0  2023-09-14 13:29:48.065000+00:00      499  0.539315          0.365571       0.787030
1  2023-09-14 14:01:51.484000+00:00      499  0.539397          0.365636       0.786837
total rows: 998, score calls: 6, reduce calls: 6

For your own data, you can provide the names of the columns to evaluate:

test_df = gen_dataframe_for_accuracy_metric(
    nr_rows=5,
    rows_per_time_bucket=1,
    prediction_value=1,
    time_bucket=TimeBucket.DAY,
    prediction_col="my_pred_col",
    actuals_col="my_actuals_col",
    timestamp_col="my_timestamp_col"
)
print(test_df)
             my_timestamp_col  my_pred_col  my_actuals_col
0  01/06/2005 13:00:00.000000            1           0.999
1  02/06/2005 13:00:00.000000            1           0.999
2  03/06/2005 13:00:00.000000            1           0.999
3  04/06/2005 13:00:00.000000            1           0.999
4  05/06/2005 13:00:00.000000            1           0.999

source = DataFrameSource(
    df=test_df,
    max_rows=10000,
    timestamp_col="timestamp",
)

metric = LogLossFromSklearn()

me = MetricEvaluator(metric=metric, 
                     source=source, 
                     time_bucket=TimeBucket.DAY,
                     prediction_col="my_pred_col", 
                     actuals_col="my_actuals_col", 
                     timestamp_col="my_timestamp_col"
                     )
aggregated_metric_per_time_bucket = me.score()

Metric Evaluator Data Filtering

If some data is missing, use filtering flags. For example, the following example is for data with missing actuals.

In this scenario without a flag, an exception is raised:

test_df = gen_dataframe_for_accuracy_metric(
    nr_rows=10,
    rows_per_time_bucket=5,
    prediction_value=1,
    time_bucket=TimeBucket.HOUR,
)
test_df["actuals"].loc[2] = None
test_df["actuals"].loc[5] = None
print(test_df)
                    timestamp  predictions  actuals
0  01/06/2005 13:00:00.000000            1    0.999
1  01/06/2005 13:00:00.000000            1    0.999
2  01/06/2005 13:00:00.000000            1      NaN
3  01/06/2005 13:00:00.000000            1    0.999
4  01/06/2005 13:00:00.000000            1    0.999
5  01/06/2005 14:00:00.000000            1      NaN
6  01/06/2005 14:00:00.000000            1    0.999
7  01/06/2005 14:00:00.000000            1    0.999
8  01/06/2005 14:00:00.000000            1    0.999
9  01/06/2005 14:00:00.000000            1    0.999

source = DataFrameSource(df=test_df)

metric = MedianAbsoluteError()

me = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.HOUR)

aggregated_metric_per_time_bucket = me.score()
"ValueError: Could not apply metric median_absolute_error, make sure you are passing the right data (see the sklearn docs).
The error message was: Input contains NaN."

For the same dataset, compare the previous result with the result when you enable the filter_actuals flag:

me = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.HOUR, filter_actuals=True)

aggregated_metric_per_time_bucket = me.score()
"removed 1 rows out of 5 in the data chunk before scoring, due to missing values in ['actuals'] data"
"removed 1 rows out of 5 in the data chunk before scoring, due to missing values in ['actuals'] data"

print(aggregated_metric_per_time_bucket.to_string())
                    timestamp  samples  median_absolute_error
0  01/06/2005 13:00:00.000000        4                  0.001
1  01/06/2005 14:00:00.000000        4                  0.001

Using the filter_actuals, filter_predictions, filter_scoring_data flags, you can filter out missing values from the data before calculating the metric. By default, these flags are set to False.

If all data needed to calculate the metric is missing in the data chunk, we skip this data chunk with the appropriate log:

test_df = gen_dataframe_for_accuracy_metric(
    nr_rows=4,
    rows_per_time_bucket=2,
    prediction_value=1,
    time_bucket=TimeBucket.HOUR,
)
test_df["actuals"].loc[0] = None
test_df["actuals"].loc[1] = None
print(test_df)
                    timestamp  predictions  actuals
0  01/06/2005 13:00:00.000000            1      NaN
1  01/06/2005 13:00:00.000000            1      NaN
2  01/06/2005 14:00:00.000000            1    0.999
3  01/06/2005 14:00:00.000000            1    0.999

source = DataFrameSource(df=test_df)

metric = MedianAbsoluteError()

me = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.HOUR, filter_actuals=True)

aggregated_metric_per_time_bucket = me.score()
"removed 2 rows out of 2 in the data chunk before scoring, due to missing values in ['actuals'] data"
"data chunk is empty, skipping scoring..."

print(aggregated_metric_per_time_bucket.to_string())
                    timestamp  samples  median_absolute_error
1  01/06/2005 14:00:00.000000        2                  0.001

Metric Evaluator Segmented Analysis

Perform segmented analysis by defining the segment_attribute and each segment_value:

metrics = LogLossFromSklearn()
me = MetricEvaluator(metric=metric,
                     source=source,
                     time_bucket=TimeBucket.HOUR,
                     segment_attribute="insulin",
                     segment_value="Down",
                     )

aggregated_metric_per_time_bucket = me.score()
print(aggregated_metric_per_time_bucket.to_string())
                          timestamp  samples  log_loss [Down]
0  2023-09-14 13:29:49.737000+00:00       49         0.594483
1  2023-09-14 14:01:52.437000+00:00       49         0.594483

# passing more than one segment value
me = MetricEvaluator(metric=metric,
                     source=source,
                     time_bucket=TimeBucket.HOUR,
                     segment_attribute="insulin",
                     segment_value=["Down", "Steady"],
                     )

aggregated_metric_per_time_bucket = me.score()
print(aggregated_metric_per_time_bucket.to_string())
                          timestamp  samples  log_loss [Down]  log_loss [Steady]
0  2023-09-14 13:29:48.502000+00:00      199         0.594483           0.515811
1  2023-09-14 14:01:51.758000+00:00      199         0.594483           0.515811

# passing more than one segment value and more than one metric
me = MetricEvaluator(metric=[LogLossFromSklearn(), RocAuc()],
                     source=source,
                     time_bucket=TimeBucket.HOUR,
                     segment_attribute="insulin",
                     segment_value=["Down", "Steady"],
                     )

aggregated_metric_per_time_bucket = me.score()
print(aggregated_metric_per_time_bucket.to_string())
                          timestamp  samples  log_loss [Down]  log_loss [Steady]  roc_auc_score [Down]  roc_auc_score [Steady]
0  2023-09-14 13:29:48.502000+00:00      199         0.594483           0.515811              0.783333                0.826632
1  2023-09-14 14:01:51.758000+00:00      199         0.594483           0.515811              0.783333                0.826632

Batch Metric Evaluator

The BatchMetricEvaluator class uses aggregation per batch instead of aggregation over time. For batches, you don't define TimeBucket:

from dmm.batch_metric_evaluator import BatchMetricEvaluator
from dmm.data_source.datarobot_source import BatchDataRobotSource
from dmm.metric import MissingValuesFraction

source = BatchDataRobotSource(
    deployment_id=DEPLOYMENT_ID,
    batch_ids=BATCH_IDS,
    model_id=MODEL_ID,
)

feature_name = 'RAD'
metric = MissingValuesFraction(feature_name=feature_name)

missing_values_fraction_evaluator = BatchMetricEvaluator(metric=metric, source=source)

aggregated_metric_per_batch = missing_values_fraction_evaluator.score()
print(aggregated_metric_per_batch.to_string())
     batch_id   samples  Missing Values Fraction
0  <batch_id>       506                      0.0
1  <batch_id>       506                      0.0
2  <batch_id>       506                      0.0

Note: For batches, actuals and multiple segments are not supported.

DR Custom Metrics

The DR Custom Metrics module allows better synchronization with existing metrics on the DR side. The logic of this module is based on unique names for custom metrics, so you can operate on metrics without knowing their IDs. Thanks to this solution, we can define the metric earlier (e.g. before creating the deployment) and synchronize it with DR at the appropriate time.

DRCustomMetric:

This class DRCustomMetric allows you to create new or fetch existing metrics from DR. the logic is as follows:

  • you can provide custom metrics configuration in the selected format: YAML, dict, YAML file, JSON file.
  • the configuration contains custom metrics metadata that describe them.
  • DRCustomMetric.sync() method retrieves information about existing custom metrics on the DR side, if a metric is defined locally but is not on the DR side, it will be created on the DR side.
  • DRCustomMetric.report() method allows you to report a single value based on a unique name.

Example:

dr_cm = DRCustomMetric(
    dr_client=client, deployment_id=deployment_id, model_package_id=model_package_id
)

metric_config_yaml = f"""
     customMetrics:
       - name: new metric
         description: foo bar
         type: average
         timeStep: hour
         units: count
         directionality: lowerIsBetter
         isModelSpecific: yes
         baselineValue: 0
     """

dr_cm.set_config(config_yaml=metric_config_yaml)
dr_cm.sync()
dr_cm.get_dr_custom_metrics()
> [{"name": "existing metric", "id": "65ef19410239ff8015f05a94", ...}, 
>  {"name": "new metric", "id": "65ef197ce5d7b2176ceecf3a", ...}]

dr_cm.report_value("existing metric", 1)
dr_cm.report_value("new metric", 9)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

datarobot_model_metrics-0.5.6-py2.py3-none-any.whl (59.7 kB view hashes)

Uploaded Python 2 Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page