The DeltaStream adapter plugin for dbt
Project description
dbt-deltastream
A dbt adapter for DeltaStream - a streaming processing engine based on Apache Flink.
Features
- Seamless integration with DeltaStream's streaming capabilities
- Support for DeltaStream core concepts through dbt materialization types:
table: Traditional batch table materializationmaterialized_view: Continuously updated viewstream: Pure streaming transformationchangelog: Change data capture (CDC) streamstore: External system connection (Kafka, PostgreSQL, etc.)entity: Entity definition in a storedatabase: Database definition
Installation
pip install dbt-deltastream
Requirements:
- Python >= 3.11
- dbt-core >= 1.8.0
Configuration
Add to your profiles.yml:
your_profile_name:
target: dev
outputs:
dev:
type: deltastream
# Required Parameters
token: your-api-token # Authentication token
database: your-database # Target database name
schema: your-schema # Target schema name
organization_id: your-org-id # Organization identifier
# Optional Parameters
url: https://api.deltastream.io/v2 # DeltaStream API URL
timezone: UTC # Timezone for operations
session_id: your-session-id # Custom session identifier for debugging purpose
role: your-role # User role
store: your-store # Target store name
The following parameters are supported in the profile configuration:
Required Parameters
token: Authentication token for DeltaStream APIdatabase: Target default database nameschema: Target default schema nameorganization_id: Organization identifier
Optional Parameters
-
url: DeltaStream API URL (default: https://api.deltastream.io/v2) -
timezone: Timezone for operations (default: UTC) -
session_id: Custom session identifier for debugging -
role: User role -
store: target default store name
Best practices
When configuring your project for production, it is recommended to use environment variables to store sensitive information such as the token:
your_profile_name:
target: prod
outputs:
prod:
type: deltastream
token: "{{ env_var('DELTASTREAM_API_TOKEN') }}"
...
Materializations
DeltaStream supports two types of model definitions:
- YAML-only resources for defining infrastructure components
- SQL models for data transformations
YAML-Only Resources
These models don't contain SQL SELECT statements but define infrastructure components using YAML configuration. YAML-only resources can be used to define external system connections such as streams, changelogs, and stores. They can be either: managed or unmanaged by dbt DAG.
Managed
When a YAML-only resource is managed by dbt DAG, it is automatically included in the DAG by creating them as models, for instance:
version: 2
models:
- name: my_kafka_stream
config:
materialized: stream
parameters:
topic: 'user_events'
value.format: 'json'
store: 'my_kafka_store'
In that case, we're running into a dbt limitation where we need to create a placeholder .sql file for the model to be detected. That .sql file would contain any content as long as it doesn't contain a "SELECT". We expect that limitation to be lifted in future dbt versions but it's still part of discussions.
Then it can be referenced in downstream model using the regular ref function:
SELECT * FROM {{ ref('my_kafka_stream') }}
Unmanaged
When a YAML-only resource is not managed by dbt DAG, it has to be created as sources, for instance:
version: 2
sources:
- name: kafka
schema: public
tables:
- name: pageviews
description: "Pageviews stream"
config:
materialized: stream
parameters:
topic: pageviews
store: 'my_kafka_store'
'value.format': JSON
columns:
- name: viewtime
type: BIGINT
- name: userid
type: VARCHAR
- name: pageid
type: VARCHAR
Then it requires to execute specific macros to create the resources on demand. To create all sources, run:
dbt run-operation create_sources
To create a specific source, run:
dbt run-operation create_source_by_name --args '{source_name: user_events}'
Then it can be referenced in downstream model using the regular source function:
SELECT * FROM {{ source('kafka', 'pageviews') }}
YAML-Only Resources Examples
Store
Creates a connection to external systems:
version: 2
models:
- name: my_kafka_store
config:
materialized: store
parameters:
type: KAFKA # required
access_region: "AWS us-east-1"
uris: "kafka.broker1.url:9092,kafka.broker2.url:9092"
tls.ca_cert_file: "@/certs/us-east-1/self-signed-kafka-ca.crt"
PostgreSQL store example:
version: 2
models:
- name: ps_store
config:
materialized: store
parameters:
type: POSTGRESQL # required
access_region: "AWS us-east-1"
uris: "postgresql://mystore.com:5432/demo"
postgres.username: "user"
postgres.password: "password"
Stream (YAML-only)
Defines a stream with explicit column definitions:
version: 2
models:
- name: user_events_stream
config:
materialized: stream
columns:
event_time:
type: TIMESTAMP
not_null: true
user_id:
type: VARCHAR
action:
type: VARCHAR
parameters:
topic: 'user_events'
value.format: 'json'
key.format: 'primitive'
key.type: 'VARCHAR'
timestamp: 'event_time'
Changelog (YAML-only)
Defines a changelog with explicit column definitions and primary key:
version: 2
models:
- name: order_changes
config:
materialized: changelog
columns:
order_id:
type: VARCHAR
not_null: true
status:
type: VARCHAR
updated_at:
type: TIMESTAMP
primary_key:
- order_id
parameters:
topic: 'order_updates'
value.format: 'json'
Entity (YAML-only)
Defines an entity in a store:
version: 2
models:
- name: pv_kinesis
config:
materialized: entity
store: kinesis_store
parameters:
'kinesis.shards' = 3
SQL Models
These models contain SQL SELECT statements for data transformations.
Stream (SQL)
Creates a continuous streaming transformation:
{{ config(
materialized='stream',
parameters={
'topic': 'purchase_events',
'value.format': 'json'
}
) }}
SELECT
event_time,
user_id,
action
FROM {{ ref('source_stream') }}
WHERE action = 'purchase'
Changelog (SQL)
Captures changes in the data stream:
{{ config(
materialized='changelog',
parameters={
'topic': 'order_updates',
'value.format': 'json'
}
) }}
SELECT
order_id,
status,
updated_at
FROM {{ ref('orders_stream') }}
Table
Creates a traditional batch table:
{{ config(materialized='table') }}
SELECT
date,
SUM(amount) as daily_total
FROM {{ ref('transactions') }}
GROUP BY date
Materialized View
Creates a continuously updated view:
{{ config(materialized='materialized_view') }}
SELECT
product_id,
COUNT(*) as purchase_count
FROM {{ ref('purchase_events') }}
GROUP BY product_id
Contributing
We welcome contributions! Please feel free to submit a Pull Request.
License
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dbt_deltastream-1.9.1.tar.gz.
File metadata
- Download URL: dbt_deltastream-1.9.1.tar.gz
- Upload date:
- Size: 19.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9209622c92de7a637ca134acd07251765516394208d8eb3ed810a6712b52c511
|
|
| MD5 |
bdd3561ff73a65785f3f842d529a24ae
|
|
| BLAKE2b-256 |
5dec0b38e1e03fbeac10b517cada7ebc950dfbffa56f62e6e64253760214b85a
|
Provenance
The following attestation bundles were made for dbt_deltastream-1.9.1.tar.gz:
Publisher:
release.yml on deltastreaminc/dbt-deltastream
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dbt_deltastream-1.9.1.tar.gz -
Subject digest:
9209622c92de7a637ca134acd07251765516394208d8eb3ed810a6712b52c511 - Sigstore transparency entry: 179846222
- Sigstore integration time:
-
Permalink:
deltastreaminc/dbt-deltastream@563bb487540a8a8c3b50ac433d9f62eac1e2f898 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/deltastreaminc
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@563bb487540a8a8c3b50ac433d9f62eac1e2f898 -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file dbt_deltastream-1.9.1-py3-none-any.whl.
File metadata
- Download URL: dbt_deltastream-1.9.1-py3-none-any.whl
- Upload date:
- Size: 28.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3c8c3be53d0a4db5f3a08c522ca0a0c989554006a5fa26689022cf398d37ef86
|
|
| MD5 |
d64cf91ba14dc2e83fe2b3b39e4510ea
|
|
| BLAKE2b-256 |
de67f2b692d7d94ce6c50ff5328249db026664af626bcb5257a682cf53cf5c99
|
Provenance
The following attestation bundles were made for dbt_deltastream-1.9.1-py3-none-any.whl:
Publisher:
release.yml on deltastreaminc/dbt-deltastream
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dbt_deltastream-1.9.1-py3-none-any.whl -
Subject digest:
3c8c3be53d0a4db5f3a08c522ca0a0c989554006a5fa26689022cf398d37ef86 - Sigstore transparency entry: 179846224
- Sigstore integration time:
-
Permalink:
deltastreaminc/dbt-deltastream@563bb487540a8a8c3b50ac433d9f62eac1e2f898 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/deltastreaminc
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@563bb487540a8a8c3b50ac433d9f62eac1e2f898 -
Trigger Event:
workflow_dispatch
-
Statement type: