Skip to main content

The SparkSQL plugin for dbt (data build tool)

Project description

dbt logo

CircleCI Slack

dbt-spark

This plugin ports dbt functionality to Spark. It supports running dbt against Spark clusters that are hosted via Databricks (AWS + Azure), Amazon EMR, or Docker.

We have not tested extensively against older versions of Apache Spark. The plugin uses syntax that requires version 2.2.0 or newer.

Documentation

For more information on using Spark with dbt, consult the dbt documentation:

Installation

This plugin can be installed via pip:

# Install dbt-spark from PyPi:
$ pip install dbt-spark

dbt-spark also supports connections via ODBC driver, but it requires pyodbc. You can install it seperately or via pip as well:

# Install dbt-spark w/ pyodbc from PyPi:
$ pip install "dbt-spark[ODBC]"

See https://github.com/mkleehammer/pyodbc/wiki/Install for more info about installing pyodbc.

Configuring your profile

Connection Method

Connections can be made to Spark in two different modes. The http mode is used when connecting to a managed service such as Databricks, which provides an HTTP endpoint; the thrift mode is used to connect directly to the master node of a cluster (either on-premise or in the cloud).

A dbt profile can be configured to run against Spark using the following configuration:

Option Description Required? Example
method Specify the connection method (thrift or http or odbc) Required http
schema Specify the schema (database) to build models into Required analytics
host The hostname to connect to Required yourorg.sparkhost.com
port The port to connect to the host on Optional (default: 443 for http and odbc, 10001 for thrift) 443
token The token to use for authenticating to the cluster Required for http and odbc abc123
organization The id of the Azure Databricks workspace being used; only for Azure Databricks See Databricks Note 1234567891234567
cluster The name of the cluster to connect to Required for http and odbc if connecting to a specific cluster 01234-23423-coffeetime
endpoint The ID of the SQL endpoint to connect to Required for odbc if connecting to SQL endpoint 1234567891234a
driver Path of ODBC driver installed or name of ODBC DSN configured Required for odbc /opt/simba/spark/lib/64/libsparkodbc_sb64.so
user The username to use to connect to the cluster Optional hadoop
connect_timeout The number of seconds to wait before retrying to connect to a Pending Spark cluster Optional (default: 10) 60
connect_retries The number of times to try connecting to a Pending Spark cluster before giving up Optional (default: 0) 5

Databricks Note

AWS and Azure Databricks have differences in their connections, likely due to differences in how their URLs are generated between the two services.

Organization: To connect to an Azure Databricks cluster, you will need to obtain your organization ID, which is a unique ID Azure Databricks generates for each customer workspace. To find the organization ID, see https://docs.microsoft.com/en-us/azure/databricks/dev-tools/databricks-connect#step-2-configure-connection-properties. This is a string field; if there is a leading zero, be sure to include it.

Port: Please ignore all references to port 15001 in the databricks-connect docs as that is specific to that tool; port 443 is used for dbt-spark's https connection.

Host: The host field for Databricks can be found at the start of your workspace or cluster url: region.azuredatabricks.net for Azure, or account.cloud.databricks.com for AWS. Do not include https://.

Usage with Amazon EMR

To connect to Spark running on an Amazon EMR cluster, you will need to run sudo /usr/lib/spark/sbin/start-thriftserver.sh on the master node of the cluster to start the Thrift server (see https://aws.amazon.com/premiumsupport/knowledge-center/jdbc-connection-emr/ for further context). You will also need to connect to port 10001, which will connect to the Spark backend Thrift server; port 10000 will instead connect to a Hive backend, which will not work correctly with dbt.

Example profiles.yml entries:

http, e.g. Databricks

your_profile_name:
  target: dev
  outputs:
    dev:
      method: http
      type: spark
      schema: analytics
      host: yourorg.sparkhost.com
      organization: 1234567891234567    # Azure Databricks ONLY
      port: 443
      token: abc123
      cluster: 01234-23423-coffeetime
      connect_retries: 5
      connect_timeout: 60

Thrift connection

your_profile_name:
  target: dev
  outputs:
    dev:
      method: thrift
      type: spark
      schema: analytics
      host: 127.0.0.1
      port: 10001
      user: hadoop
      connect_retries: 5
      connect_timeout: 60

ODBC connection

your_profile_name:
  target: dev
  outputs:
    dev:
      method: odbc
      type: spark
      schema: analytics
      host: yourorg.sparkhost.com
      organization: 1234567891234567    # Azure Databricks ONLY
      port: 443
      token: abc123

      # one of:
      cluster: 01234-23423-coffeetime
      endpoint: coffee01234time

      driver: path/to/driver
      connect_retries: 5    # cluster only
      connect_timeout: 60   # cluster only

Usage Notes

Model Configuration

The following configurations can be supplied to models run with the dbt-spark plugin:

Option Description Required? Example
file_format The file format to use when creating tables (parquet, delta, csv, json, text, jdbc, orc, hive or libsvm). Optional parquet
location_root The created table uses the specified directory to store its data. The table alias is appended to it. Optional /mnt/root
partition_by Partition the created table by the specified columns. A directory is created for each partition. Optional partition_1
clustered_by Each partition in the created table will be split into a fixed number of buckets by the specified columns. Optional cluster_1
buckets The number of buckets to create while clustering Required if clustered_by is specified 8
incremental_strategy The strategy to use for incremental models (insert_overwrite or merge). Note merge requires file_format = delta and unique_key to be specified. Optional (default: insert_overwrite) merge
persist_docs Whether dbt should include the model description as a table comment Optional {'relation': true}

Incremental Models

To use incremental models, specify a partition_by clause in your model config. The default incremental strategy used is insert_overwrite, which will overwrite the partitions included in your query. Be sure to re-select all of the relevant data for a partition when using the insert_overwrite strategy. If a partition_by config is not specified, dbt will overwrite the entire table as an atomic operation, replacing it with new data of the same schema. This is analogous to truncate + insert.

{{ config(
    materialized='incremental',
    partition_by=['date_day'],
    file_format='parquet'
) }}

/*
  Every partition returned by this query will be overwritten
  when this model runs
*/

select
    date_day,
    count(*) as users

from {{ ref('events') }}
where date_day::date >= '2019-01-01'
group by 1

The merge strategy is only supported when using file_format delta (supported in Databricks). It also requires you to specify a unique key to match existing records.

{{ config(
    materialized='incremental',
    incremental_strategy='merge',
    partition_by=['date_day'],
    file_format='delta'
) }}

select *
from {{ ref('events') }}
{% if is_incremental() %}
  where date_day > (select max(date_day) from {{ this }})
{% endif %}

Running locally

A docker-compose environment starts a Spark Thrift server and a Postgres database as a Hive Metastore backend.

docker-compose up

Your profile should look like this:

your_profile_name:
  target: local
  outputs:
    local:
      method: thrift
      type: spark
      schema: analytics
      host: 127.0.0.1
      port: 10000
      user: dbt
      connect_retries: 5
      connect_timeout: 60

Connecting to the local spark instance:

  • The Spark UI should be available at http://localhost:4040/sqlserver/
  • The endpoint for SQL-based testing is at http://localhost:10000 and can be referenced with the Hive or Spark JDBC drivers using connection string jdbc:hive2://localhost:10000 and default credentials dbt:dbt

Note that the Hive metastore data is persisted under ./.hive-metastore/, and the Spark-produced data under ./.spark-warehouse/. To completely reset you environment run the following:

docker-compose down
rm -rf ./.hive-metastore/
rm -rf ./.spark-warehouse/

Reporting bugs and contributing code

  • Want to report a bug or request a feature? Let us know on Slack, or open an issue.

Code of Conduct

Everyone interacting in the dbt project's codebases, issue trackers, chat rooms, and mailing lists is expected to follow the PyPA Code of Conduct.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dbt-spark-0.18.1.1.tar.gz (24.1 kB view hashes)

Uploaded Source

Built Distribution

dbt_spark-0.18.1.1-py3-none-any.whl (23.5 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page