The DeltaStream adapter plugin for dbt
Project description
dbt-deltastream
A dbt adapter for DeltaStream - a streaming processing engine based on Apache Flink.
Features
- Seamless integration with DeltaStream's streaming capabilities
- Support for DeltaStream core concepts through dbt materialization types:
table: Traditional batch table materializationmaterialized_view: Continuously updated viewstream: Pure streaming transformationchangelog: Change data capture (CDC) streamstore: External system connection (Kafka, PostgreSQL, etc.)entity: Entity definition in a storedatabase: Database definition
Installation
pip install dbt-deltastream
Requirements:
- Python >= 3.11
- dbt-core >= 1.8.0
Configuration
Add to your profiles.yml:
your_profile_name:
target: dev
outputs:
dev:
type: deltastream
# Required Parameters
token: your-api-token # Authentication token
database: your-database # Target database name
schema: your-schema # Target schema name
organization_id: your-org-id # Organization identifier
# Optional Parameters
url: https://api.deltastream.io/v2 # DeltaStream API URL
timezone: UTC # Timezone for operations
session_id: your-session-id # Custom session identifier for debugging purpose
role: your-role # User role
store: your-store # Target store name
compute_pool: your-compute-pool # Compute pool name
The following parameters are supported in the profile configuration:
Required Parameters
token: Authentication token for DeltaStream APIdatabase: Target default database nameschema: Target default schema nameorganization_id: Organization identifier
Optional Parameters
-
url: DeltaStream API URL (default: https://api.deltastream.io/v2) -
timezone: Timezone for operations (default: UTC) -
session_id: Custom session identifier for debugging -
compute_pool: Compute pool name to be used if any else use the default compute pool (for models that require one) -
role: User role -
store: target default store name
Best practices
When configuring your project for production, it is recommended to use environment variables to store sensitive information such as the token:
your_profile_name:
target: prod
outputs:
prod:
type: deltastream
token: "{{ env_var('DELTASTREAM_API_TOKEN') }}"
...
Materializations
DeltaStream supports two types of model definitions:
- YAML-only resources for defining infrastructure components
- SQL models for data transformations
YAML-Only Resources
These models don't contain SQL SELECT statements but define infrastructure components using YAML configuration. YAML-only resources can be used to define external system connections such as streams, changelogs, and stores. They can be either: managed or unmanaged by dbt DAG.
Managed
When a YAML-only resource is managed by dbt DAG, it is automatically included in the DAG by creating them as models, for instance:
version: 2
models:
- name: my_kafka_stream
config:
materialized: stream
parameters:
topic: 'user_events'
value.format: 'json'
store: 'my_kafka_store'
In that case, we're running into a dbt limitation where we need to create a placeholder .sql file for the model to be detected. That .sql file would contain any content as long as it doesn't contain a "SELECT". We expect that limitation to be lifted in future dbt versions but it's still part of discussions.
Then it can be referenced in downstream model using the regular ref function:
SELECT * FROM {{ ref('my_kafka_stream') }}
Unmanaged
When a YAML-only resource is not managed by dbt DAG, it has to be created as sources, for instance:
version: 2
sources:
- name: kafka
schema: public
tables:
- name: pageviews
description: "Pageviews stream"
config:
materialized: stream
parameters:
topic: pageviews
store: 'my_kafka_store'
'value.format': JSON
columns:
- name: viewtime
type: BIGINT
- name: userid
type: VARCHAR
- name: pageid
type: VARCHAR
Then it requires to execute specific macros to create the resources on demand. To create all sources, run:
dbt run-operation create_sources
To create a specific source, run:
dbt run-operation create_source_by_name --args '{source_name: user_events}'
Then it can be referenced in downstream model using the regular source function:
SELECT * FROM {{ source('kafka', 'pageviews') }}
YAML-Only Resources Examples
Following example can be created both as managed (models) or as unmanaged (sources).
Managed example
version: 2
models:
- name: my_kafka_store
config:
materialized: store
parameters:
type: KAFKA # required
access_region: "AWS us-east-1"
uris: "kafka.broker1.url:9092,kafka.broker2.url:9092"
tls.ca_cert_file: "@/certs/us-east-1/self-signed-kafka-ca.crt"
- name: ps_store
config:
materialized: store
parameters:
type: POSTGRESQL # required
access_region: "AWS us-east-1"
uris: "postgresql://mystore.com:5432/demo"
postgres.username: "user"
postgres.password: "password"
- name: user_events_stream
config:
materialized: stream
columns:
event_time:
type: TIMESTAMP
not_null: true
user_id:
type: VARCHAR
action:
type: VARCHAR
parameters:
topic: 'user_events'
value.format: 'json'
key.format: 'primitive'
key.type: 'VARCHAR'
timestamp: 'event_time'
- name: order_changes
config:
materialized: changelog
columns:
order_id:
type: VARCHAR
not_null: true
status:
type: VARCHAR
updated_at:
type: TIMESTAMP
primary_key:
- order_id
parameters:
topic: 'order_updates'
value.format: 'json'
- name: pv_kinesis
config:
materialized: entity
store: kinesis_store
parameters:
'kinesis.shards' = 3
- name: my_compute_pool
config:
materialized: compute_pool
parameters:
'compute_pool.size' = 'small',
'compute_pool.timeout_min' = 5
Unmanaged example
version: 2
sources:
- name: example # source name, not used in DeltaStream but required by dbt for the {{ source("example", "...") }}
tables:
- name: my_kafka_store
config:
materialized: store
parameters:
type: KAFKA # required
access_region: "AWS us-east-1"
uris: "kafka.broker1.url:9092,kafka.broker2.url:9092"
tls.ca_cert_file: "@/certs/us-east-1/self-signed-kafka-ca.crt"
- name: ps_store
config:
materialized: store
parameters:
type: POSTGRESQL # required
access_region: "AWS us-east-1"
uris: "postgresql://mystore.com:5432/demo"
postgres.username: "user"
postgres.password: "password"
- name: user_events_stream
config:
materialized: stream
columns:
event_time:
type: TIMESTAMP
not_null: true
user_id:
type: VARCHAR
action:
type: VARCHAR
parameters:
topic: 'user_events'
value.format: 'json'
key.format: 'primitive'
key.type: 'VARCHAR'
timestamp: 'event_time'
- name: order_changes
config:
materialized: changelog
columns:
order_id:
type: VARCHAR
not_null: true
status:
type: VARCHAR
updated_at:
type: TIMESTAMP
primary_key:
- order_id
parameters:
topic: 'order_updates'
value.format: 'json'
- name: pv_kinesis
config:
materialized: entity
store: kinesis_store
parameters:
'kinesis.shards': 3
- name: my_compute_pool
config:
materialized: compute_pool
parameters:
'compute_pool.size': 'small'
'compute_pool.timeout_min': 5
SQL Models
These models contain SQL SELECT statements for data transformations.
Stream (SQL)
Creates a continuous streaming transformation:
{{ config(
materialized='stream',
parameters={
'topic': 'purchase_events',
'value.format': 'json'
}
) }}
SELECT
event_time,
user_id,
action
FROM {{ ref('source_stream') }}
WHERE action = 'purchase'
Changelog (SQL)
Captures changes in the data stream:
{{ config(
materialized='changelog',
parameters={
'topic': 'order_updates',
'value.format': 'json'
}
) }}
SELECT
order_id,
status,
updated_at
FROM {{ ref('orders_stream') }}
Table
Creates a traditional batch table:
{{ config(materialized='table') }}
SELECT
date,
SUM(amount) as daily_total
FROM {{ ref('transactions') }}
GROUP BY date
Materialized View
Creates a continuously updated view:
{{ config(materialized='materialized_view') }}
SELECT
product_id,
COUNT(*) as purchase_count
FROM {{ ref('purchase_events') }}
GROUP BY product_id
Query Termination Macros
DeltaStream dbt adapter provides macros to help you manage and terminate running queries directly from dbt.
Terminate a Specific Query
Use the terminate_query macro to terminate a query by its ID:
dbt run-operation terminate_query --args '{query_id: "<QUERY_ID>"}'
Terminate All Running Queries
Use the terminate_all_queries macro to terminate all currently running queries:
dbt run-operation terminate_all_queries
These macros leverage DeltaStream's LIST QUERIES; and TERMINATE QUERY <query_id>; SQL commands to identify and terminate running queries. This is useful for cleaning up long-running or stuck jobs during development or operations.
Using this specific macro is not recommended in production environments as it will stop all queries including those that weren't created by the current user or in dbt.
Query Listing Macro
List All Queries
The list_all_queries macro displays all queries currently known to DeltaStream, including their state, owner, and SQL. It prints a formatted table to the dbt logs for easy inspection.
Usage:
dbt run-operation list_all_queries
Example Output:
ID | Name | Version | IntendedState | ActualState | Query | Owner | CreatedAt | UpdatedAt
-----------------------------------------------------------------------------------------
<query row 1>
<query row 2>
...
This macro is useful for debugging, monitoring, and operational tasks. It leverages DeltaStream's LIST QUERIES; SQL command and prints the results in a readable table format.
Contributing
We welcome contributions! Please feel free to submit a Pull Request.
License
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dbt_deltastream-1.10.1.tar.gz.
File metadata
- Download URL: dbt_deltastream-1.10.1.tar.gz
- Upload date:
- Size: 22.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
304c2f0b5c3a57311bed0adecba22c72274e6a20fe1534a0390507cbe760f9b2
|
|
| MD5 |
32d1ee76d55e672b3600dfe4ccaf0d42
|
|
| BLAKE2b-256 |
cb61d33c315940cc781e8a6acacfe577f3d989c55fe83fbabf0f8299d35b5b88
|
Provenance
The following attestation bundles were made for dbt_deltastream-1.10.1.tar.gz:
Publisher:
release.yml on deltastreaminc/dbt-deltastream
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dbt_deltastream-1.10.1.tar.gz -
Subject digest:
304c2f0b5c3a57311bed0adecba22c72274e6a20fe1534a0390507cbe760f9b2 - Sigstore transparency entry: 230244858
- Sigstore integration time:
-
Permalink:
deltastreaminc/dbt-deltastream@56f9e0caf83ac918a6bbb57e78358159b17f1864 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/deltastreaminc
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@56f9e0caf83ac918a6bbb57e78358159b17f1864 -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file dbt_deltastream-1.10.1-py3-none-any.whl.
File metadata
- Download URL: dbt_deltastream-1.10.1-py3-none-any.whl
- Upload date:
- Size: 32.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
798952e03caffbcb2a796c37e91d2f413b6b62e046b3b957fa0a16d2ce6fa0e0
|
|
| MD5 |
c464941f210a890795f341d6492b7782
|
|
| BLAKE2b-256 |
043f887232094907ca1e248ba3aea6a275b4a82762e1b119bac2add36b4dd8f9
|
Provenance
The following attestation bundles were made for dbt_deltastream-1.10.1-py3-none-any.whl:
Publisher:
release.yml on deltastreaminc/dbt-deltastream
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dbt_deltastream-1.10.1-py3-none-any.whl -
Subject digest:
798952e03caffbcb2a796c37e91d2f413b6b62e046b3b957fa0a16d2ce6fa0e0 - Sigstore transparency entry: 230244859
- Sigstore integration time:
-
Permalink:
deltastreaminc/dbt-deltastream@56f9e0caf83ac918a6bbb57e78358159b17f1864 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/deltastreaminc
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@56f9e0caf83ac918a6bbb57e78358159b17f1864 -
Trigger Event:
workflow_dispatch
-
Statement type: