DataRobot Airflow provider.
Project description
DataRobot Provider for Apache Airflow
This package provides operators, sensors, and a hook to integrate DataRobot into Apache Airflow. Using these components, you should be able to build the essential DataRobot pipeline - create a project, train models, deploy a model, and score predictions against the model deployment.
Install the Airflow provider
The DataRobot provider for Apache Airflow requires an environment with the following dependencies installed:
-
Apache Airflow >= 2.3
-
DataRobot Python API Client >= 3.2.0
To install the DataRobot provider, run the following command:
pip install airflow-provider-datarobot
Create a connection from Airflow to DataRobot
The next step is to create a connection from Airflow to DataRobot:
-
In the Airflow user interface, click Admin > Connections to add an Airflow connection.
-
On the List Connection page, click + Add a new record.
-
In the Add Connection dialog box, configure the following fields:
Field Description Connection Id datarobot_default
(this name is used by default in all operators)Connection Type DataRobot API Key A DataRobot API key, created in the DataRobot Developer Tools, from the API Keys section. DataRobot endpoint URL https://app.datarobot.com/api/v2
by default -
Click Test to establish a test connection between Airflow and DataRobot.
-
When the connection test is successful, click Save.
Create preconfigured connections to DataRobot
You can create preconfigured connections to store and manage credentials to use with Airflow Operators, replicating the connection on the DataRobot side.
Currently, the supported credential types are:
Credentials | Description |
---|---|
DataRobot Basic Credentials |
Login/password pairs |
DataRobot GCP Credentials |
Google Cloud Service account key |
DataRobot AWS Credentials |
AWS access keys |
DataRobot Azure Storage Credentials |
Azure Storage secret |
DataRobot OAuth Credentials |
OAuth tokens |
DataRobot JDBC DataSource |
JDBC connection attributes |
After creating a preconfigured connection through the Airflow UI or API,
you can access your stored credentials with GetOrCreateCredentialOperator
or GetOrCreateDataStoreOperator
to replicate them in DataRobot and retrieve the corresponding credentials_id
or datastore_id
.
JSON configuration for the DAG run
Operators and sensors use parameters from the config JSON submitted when triggering the DAG; for example:
{
"training_data": "s3-presigned-url-or-local-path-to-training-data",
"project_name": "Project created from Airflow",
"autopilot_settings": {
"target": "readmitted"
},
"deployment_label": "Deployment created from Airflow",
"score_settings": {
"intake_settings": {
"type": "s3",
"url": "s3://path/to/scoring-data/Diabetes10k.csv",
"credential_id": "<credential_id>"
},
"output_settings": {
"type": "s3",
"url": "s3://path/to/results-dir/Diabetes10k_predictions.csv",
"credential_id": "<credential_id>"
}
}
}
These config values are accessible in the execute()
method of any operator in the DAG
through the context["params"]
variable; for example, to get training data, you could use the following:
def execute(self, context: Dict[str, Any]) -> str:
...
training_data = context["params"]["training_data"]
...
Modules
Operators
GetOrCreateCredentialOperator
Fetches a credential by name. This operator attempts to find a DataRobot credential with the provided name. If the credential doesn't exist, the operator creates it using the Airflow preconfigured connection with the same connection name.
Returns a credential ID.
Required config parameters:
Parameter | Type | Description |
---|---|---|
credentials_param_name |
str | The name of parameter in the config file for the credential name. |
GetOrCreateDataStoreOperator
Fetches a DataStore by Connection name. If the DataStore does not exist, the operator attempts to create it using Airflow preconfigured connection with the same connection name.
Returns a credential ID.
Required config params:
Parameter | Type | Description |
---|---|---|
connection_param_name |
str | The name of the parameter in the config file for the connection name. |
CreateDatasetFromDataStoreOperator
Loads a dataset from a JDBC Connection to the DataRobot AI Catalog.
Returns a dataset ID.
Required config params:
Parameter | Type | Description |
---|---|---|
datarobot_jdbc_connection |
str | The existing preconfigured DataRobot JDBC connection name. |
dataset_name |
str | The name of the loaded dataset. |
table_schema |
str | The database table schema. |
table_name |
str | The source table name. |
do_snapshot |
bool | If True , creates a snapshot dataset. If False , creates a remote dataset. If unset, uses the server default (True ). Creating snapshots from non-file sources may be disabled by the Disable AI Catalog Snapshots permission. |
persist_data_after_ingestion |
bool | If True , enforce saving all data (for download and sampling) and allow a user to view the extended data profile (which includes data statistics like min, max, median, mean, histogram, etc.). If False , don't enforce saving data. If unset, uses the server default (True ). The data schema (feature names and types) will still be available. Specifying this parameter to False and doSnapshot to True results in an error. |
UploadDatasetOperator
Uploads a local file to the DataRobot AI Catalog.
Returns a dataset ID.
Required config params:
Parameter | Type | Description |
---|---|---|
dataset_file_path |
str | The local path to the training dataset. |
UpdateDatasetFromFileOperator
Creates a new dataset version from a file.
Returns a dataset version ID when the new version uploads successfully.
Required config params:
Parameter | Type | Description |
---|---|---|
dataset_id |
str | The DataRobot AI Catalog dataset ID. |
dataset_file_path |
str | The local path to the training dataset. |
CreateDatasetVersionOperator
Creates a new version of the existing dataset in the AI Catalog.
Returns a dataset version ID.
Required config params:
Parameter | Type | Description |
---|---|---|
dataset_id |
str | The DataRobot AI Catalog dataset ID. |
datasource_id |
str | The existing DataRobot datasource ID. |
credential_id |
str | The existing DataRobot credential ID. |
CreateOrUpdateDataSourceOperator
Creates a data source or updates it if it already exists.
Returns a DataRobot DataSource ID.
Required config params:
Parameter | Type | Description |
---|---|---|
data_store_id |
str | THe DataRobot datastore ID. |
CreateProjectOperator
Creates a DataRobot project.
Returns a project ID.
Several options of source dataset supported:
Local file or pre-signed S3 URL
Create a project directly from a local file or a pre-signed S3 URL.
Required config params:
Parameter | Type | Description |
---|---|---|
training_data |
str | The pre-signed S3 URL or the local path to the training dataset. |
project_name |
str | The project name. |
Note: In case of an S3 input, the
training_data
value must be a pre-signed AWS S3 URL.
AI Catalog dataset from config file
Create a project from an existing dataset in the DataRobot AI Catalog using a dataset ID defined in the config file.
Required config params:
Parameter | Type | Description |
---|---|---|
training_dataset_id |
str | The dataset ID corresponding to existing dataset in the DataRobot AI Catalog. |
project_name |
str | The project name. |
AI Catalog dataset from previous operator
Create a project from an existing dataset in the DataRobot AI Catalog using a dataset ID from the previous operator.
In this case, your previous operator must return a valid dataset ID (for example UploadDatasetOperator
)
and you should use this output value as a dataset_id
argument in the CreateProjectOperator
object creation step.
Required config params:
Parameter | Type | Description |
---|---|---|
project_name |
str | The project name. |
For more project settings, see the DataRobot documentation.
TrainModelsOperator
Runs DataRobot Autopilot to train models.
Returns None
.
Parameters:
Parameter | Type | Description |
---|---|---|
project_id |
str | The DataRobot project ID. |
Required config params:
Parameter | Type | Description |
---|---|---|
target |
str | The name of the column defining the modeling target. |
"autopilot_settings": {
"target": "readmitted"
}
For more autopilot settings, see the DataRobot documentation.
DeployModelOperator
Deploy a specified model.
Returns a deployment ID.
Parameters:
Parameter | Type | Description |
---|---|---|
model_id |
str | The DataRobot model ID. |
Required config params:
Parameter | Type | Description |
---|---|---|
deployment_label |
str | The deployment label name. |
For more deployment settings, see the DataRobot documentation.
DeployRecommendedModelOperator
Deploys a recommended model.
Returns a deployment ID.
Parameters:
Parameter | Type | Description |
---|---|---|
project_id |
str | The DataRobot project ID. |
Required config params:
Parameter | Type | Description |
---|---|---|
deployment_label |
str | The deployment label name. |
For more deployment settings, see the DataRobot documentation.
ScorePredictionsOperator
Scores batch predictions against the deployment.
Returns a batch prediction job ID.
Prerequisites:
-
Use
GetOrCreateCredentialOperator
to pass acredential_id
from the preconfigured DataRobot Credentials (Airflow Connections) or manually set thecredential_id
parameter in the config.Note: You can add S3 credentials to DataRobot via the Python API client.
-
Or use a Dataset ID from the DataRobot AI Catalog.
-
Or use a DataStore ID for a JDBC source connection; you can use
GetOrCreateDataStoreOperator
to passdatastore_id
from a preconfigured Airflow Connection.
Parameters:
Parameter | Type | Description |
---|---|---|
deployment_id |
str | The DataRobot deployment ID. |
intake_datastore_id |
str | The DataRobot datastore ID for the JDBC source connection. |
output_datastore_id |
str | The DataRobot datastore ID for the JDBC destination connection. |
intake_credential_id |
str | The DataRobot credentials ID for the source connection. |
output_credential_id |
str | The DataRobot credentials ID for the destination connection. |
Sample config: Pre-signed S3 URL
"score_settings": {
"intake_settings": {
"type": "s3",
"url": "s3://my-bucket/Diabetes10k.csv",
},
"output_settings": {
"type": "s3",
"url": "s3://my-bucket/Diabetes10k_predictions.csv",
}
}
Sample config: Pre-signed S3 URL with a manually set credential ID
"score_settings": {
"intake_settings": {
"type": "s3",
"url": "s3://my-bucket/Diabetes10k.csv",
"credential_id": "<credential_id>"
},
"output_settings": {
"type": "s3",
"url": "s3://my-bucket/Diabetes10k_predictions.csv",
"credential_id": "<credential_id>"
}
}
Sample config: Scoring dataset in the AI Catalog
"score_settings": {
"intake_settings": {
"type": "dataset",
"dataset_id": "<datasetId>",
},
"output_settings": { }
}
For more batch prediction settings, see the DataRobot documentation.
GetTargetDriftOperator
Gets the target drift from a deployment.
Returns a dict with the target drift data.
Parameters:
Parameter | Type | Description |
---|---|---|
deployment_id |
str | THe DataRobot deployment ID. |
No config params are required; however, the optional params may be passed in the config as follows:
"target_drift": { }
GetFeatureDriftOperator
Gets the feature drift from a deployment.
Returns a dict with the feature drift data.
Parameters:
Parameter | Type | Description |
---|---|---|
deployment_id |
str | The DataRobot deployment ID. |
No config params are required; however, the optional params may be passed in the config as follows:
"feature_drift": { }
GetServiceStatsOperator
Gets service stats measurements from a deployment.
Returns a dict with the service stats measurements data.
Parameters:
Parameter | Type | Description |
---|---|---|
deployment_id |
str | THe DataRobot deployment ID. |
No config params are required; however, the optional params may be passed in the config as follows:
"service_stats": { }
GetAccuracyOperator
Gets the accuracy of a deployment’s predictions.
Returns a dict with the accuracy for a deployment.
Parameters:
Parameter | Type | Description |
---|---|---|
deployment_id |
str | The DataRobot deployment ID. |
No config params are required; however, the optional params may be passed in the config as follows:
"accuracy": { }
GetBiasAndFairnessSettingsOperator
Gets the Bias And Fairness settings for deployment.
Returns a dict with the Bias And Fairness settings for a Deployment.
Parameters:
Parameter | Type | Description |
---|---|---|
deployment_id |
str | The DataRobot deployment ID. |
No config params are required.
UpdateBiasAndFairnessSettingsOperator
Updates the Bias And Fairness settings for deployment.
Parameters:
Parameter | Type | Description |
---|---|---|
deployment_id |
str | The DataRobot deployment ID. |
Sample config params:
"protected_features": ["attribute1"],
"preferable_target_value": "True",
"fairness_metrics_set": "equalParity",
"fairness_threshold": 0.1,
GetSegmentAnalysisSettingsOperator
Gets the segment analysis settings for a deployment.
Returns a dict with the segment analysis settings for a deployment
Parameters:
Parameter | Type | Description |
---|---|---|
deployment_id |
str | The DataRobot deployment ID. |
No config params are required.
UpdateSegmentAnalysisSettingsOperator
Updates the segment analysis settings for a deployment.
Parameters:
Parameter | Type | Description |
---|---|---|
deployment_id |
str | The DataRobot deployment ID. |
Sample config params:
"segment_analysis_enabled": True,
"segment_analysis_attributes": ["attribute1", "attribute2"],
GetMonitoringSettingsOperator
Gets the monitoring settings for deployment.
Returns a dict with the config params for a deployment.
Parameters:
Parameter | Type | Description |
---|---|---|
deployment_id |
str | The DataRobot deployment ID. |
No config params are required.
Sample monitoring settings:
{
"drift_tracking_settings": { }
"association_id_settings": { }
"predictions_data_collection_settings": { }
}
Dictionary | Description |
---|---|
drift_tracking_settings |
The drift tracking settings for this deployment. |
association_id_settings |
The association ID settings for this deployment. |
predictions_data_collection_settings |
The predictions data collection settings of this deployment. |
UpdateMonitoringSettingsOperator
Updates monitoring settings for a deployment.
Parameters:
Parameter | Type | Description |
---|---|---|
deployment_id |
str | The DataRobot deployment ID. |
Sample config params:
"target_drift_enabled": True,
"feature_drift_enabled": True,
"association_id_column": ["id"],
"required_association_id": False,
"predictions_data_collection_enabled": False,
BatchMonitoringOperator
Creates a batch monitoring job for the deployment.
Returns a batch monitoring job ID.
Prerequisites:
-
Use
GetOrCreateCredentialOperator
to pass acredential_id
from the preconfigured DataRobot Credentials (Airflow Connections) or manually set thecredential_id
parameter in the config.Note: You can add S3 credentials to DataRobot via the Python API client.
-
Or use a Dataset ID from the DataRobot AI Catalog.
-
Or use a DataStore ID for a JDBC source connection; you can use
GetOrCreateDataStoreOperator
to passdatastore_id
from a preconfigured Airflow Connection.
Parameters:
Parameter | Type | Description |
---|---|---|
deployment_id |
str | The DataRobot deployment ID. |
datastore_id |
str | The DataRobot datastore ID. |
credential_id |
str | The DataRobot credentials ID. |
Sample config params:
Sample config
"deployment_id": "61150a2fadb5586af4118980",
"monitoring_settings": {
"intake_settings": {
"type": "bigquery",
"dataset": "integration_example_demo",
"table": "actuals_demo",
"bucket": "datarobot_demo_airflow",
},
"monitoring_columns": {
"predictions_columns": [
{"class_name": "True", "column_name": "target_True_PREDICTION"},
{"class_name": "False", "column_name": "target_False_PREDICTION"},
],
"association_id_column": "id",
"actuals_value_column": "ACTUAL",
},
}
Sample config: Manually set credential ID
"deployment_id": "61150a2fadb5586af4118980",
"monitoring_settings": {
"intake_settings": {
"type": "bigquery",
"dataset": "integration_example_demo",
"table": "actuals_demo",
"bucket": "datarobot_demo_airflow",
"credential_id": "<credential_id>"
},
"monitoring_columns": {
"predictions_columns": [
{"class_name": "True", "column_name": "target_True_PREDICTION"},
{"class_name": "False", "column_name": "target_False_PREDICTION"},
],
"association_id_column": "id",
"actuals_value_column": "ACTUAL",
},
}
For more batch monitoring settings, see the DataRobot documentation.
DownloadModelScoringCodeOperator
Downloads scoring code artifact from a model.
Parameters:
Parameter | Type | Description |
---|---|---|
project_id |
str | The DataRobot project ID. |
model_id |
str | The DataRobot model ID. |
base_path |
str | The base path for storing a downloaded model artifact. |
Sample config params:
"source_code": False,
For more scoring code download parameters, see the DataRobot documentation.
DownloadDeploymentScoringCodeOperator
Downloads scoring code artifact from a deployment.
Parameters:
Parameter | Type | Description |
---|---|---|
deployment_id |
str | The DataRobot deployment ID. |
base_path |
str | The base path for storing a downloaded model artifact. |
Sample config params:
"source_code": False,
"include_agent": False,
"include_prediction_explanations": False,
"include_prediction_intervals": False,
For more scoring code download parameters, see the DataRobot documentation.
SubmitActualsFromCatalogOperator
Downloads scoring code artifact from a deployment.
Returns an actuals upload job ID.
Parameters:
Parameter | Type | Description |
---|---|---|
deployment_id |
str | The DataRobot deployment ID. |
dataset_id |
str | The DataRobot AI Catalog dataset ID. |
dataset_version_id |
str | The DataRobot AI Catalog dataset version ID. |
Sample config params:
"association_id_column": "id",
"actual_value_column": "ACTUAL",
"timestamp_column": "timestamp",
Sensors
AutopilotCompleteSensor
Checks if Autopilot is complete.
Parameters:
Parameter | Type | Description |
---|---|---|
project_id |
str | The DataRobot project ID. |
ScoringCompleteSensor
Checks if batch scoring is complete.
Parameters:
Parameter | Type | Description |
---|---|---|
job_id |
str | The batch prediction job ID. |
MonitoringJobCompleteSensor
Checks if a monitoring job is complete.
Parameters:
Parameter | Type | Description |
---|---|---|
job_id |
str | The batch monitoring job ID. |
BaseAsyncResolutionSensor
Checks if the DataRobot Async API call is complete.
Parameters:
Parameter | Type | Description |
---|---|---|
job_id |
str | The DataRobot async API call status check ID. |
Hooks
DataRobotHook
A hook to initialize the DataRobot Public API client.
Pipeline
The modules described above allow you to construct a standard DataRobot pipeline in an Airflow DAG:
create_project_op >> train_models_op >> autopilot_complete_sensor >> deploy_model_op >> score_predictions_op >> scoring_complete_sensor
Example DAGS
See the the datarobot_provider/example_dags
directory for the example DAGs.
You can find the following examples using a preconfigured connection in the datarobot_provider/example_dags
directory:
Example DAG | Description |
---|---|
datarobot_pipeline_dag.py |
Run the basic end-to-end workflow in DataRobot. |
datarobot_score_dag.py |
Perform DataRobot batch scoring. |
datarobot_jdbc_batch_scoring_dag.py |
Perform DataRobot batch scoring with a JDBC data source. |
datarobot_aws_s3_batch_scoring_dag.py |
Use DataRobot AWS Credentials with ScorePredictionsOperator . |
datarobot_gcp_storage_batch_scoring_dag.py |
Use DataRobot GCP Credentials with ScorePredictionsOperator . |
datarobot_bigquery_batch_scoring_dag.py |
Use DataRobot GCP Credentials with ScorePredictionsOperator . |
datarobot_azure_storage_batch_scoring_dag.py |
Use DataRobot Azure Storage Credentials with ScorePredictionsOperator . |
datarobot_jdbc_dataset_dag.py |
Upload a dataset to the AI Catalog through a JDBC connection. |
datarobot_batch_monitoring_job_dag.py |
Run a batch monitoring job. |
datarobot_create_project_from_ai_catalog_dag.py |
Create a DataRobot project from a DataRobot AI Catalog dataset. |
datarobot_create_project_from_dataset_version_dag.py |
Create a DataRobot project from a specific dataset version in the DataRobot AI Catalog. |
datarobot_dataset_new_version_dag.py |
Create a new version of an existing dataset in the DataRobot AI Catalog. |
datarobot_dataset_upload_dag.py |
Upload a local file to the DataRobot AI Catalog. |
datarobot_get_datastore_dag.py |
Create a DataRobot DataStore with GetOrCreateDataStoreOperator . |
datarobot_jdbc_dataset_dag.py |
Create a DataRobot project from a JDBC data source. |
datarobot_jdbc_dynamic_dataset_dag.py |
Create a DataRobot project from a JDBC dynamic data source. |
datarobot_upload_actuals_catalog_dag.py |
Upload actuals from the DataRobot AI Catalog. |
deployment_service_stats_dag.py |
Get a deployment's service statistics with GetServiceStatsOperator |
deployment_stat_and_accuracy_dag.py |
Get a deployment's service statistics and accuracy. |
deployment_update_monitoring_settings_dag.py |
Update a deployment's monitoring settings. |
deployment_update_segment_analysis_settings_dag.py |
Update a deployment's segment analysis settings. |
download_scoring_code_from_deployment_dag.py |
Download scoring code (JAR file) from a DataRobot deployment. |
advanced_datarobot_pipeline_jdbc_dag.py |
Run the advanced end-to-end workflow in DataRobot. |
The advanced end-to-end workflow in DataRobot (advanced_datarobot_pipeline_jdbc_dag.py
) contains the following steps:
- Ingest a dataset to the AI Catalog from JDBC datasource
- Create a DataRobot project
- Train models using Autopilot
- Deploy the recommended model
- Change deployment settings (enable monitoring settings, segment analysis, and bias and fairness)
- Run batch scoring using a JDBC datasource
- Upload actuals from a JDBC datasource
- Collect deployment metrics: service statistics, features drift, target drift, accuracy and process it with custom python operator.
Issues
Please submit issues and pull requests in our official repo: https://github.com/datarobot/airflow-provider-datarobot
We are happy to hear from you. Please email any feedback to the authors at support@datarobot.com.
Copyright Notice
Copyright 2023 DataRobot, Inc. and its affiliates.
All rights reserved.
This is proprietary source code of DataRobot, Inc. and its affiliates.
Released under the terms of DataRobot Tool and Utility Agreement.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Hashes for airflow_provider_datarobot-0.0.6-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 7e4de789b6a70b3bc47ed0b40829f171aa635d31661efa3222a700ecc73e6a0e |
|
MD5 | 290ae09eeb6e15df27ef7aba61a4ebaf |
|
BLAKE2b-256 | d51cc9cd12e8ed72fc5ba336b0cd4e1412c8f72d251b602a5192161b75e89531 |