The Confluent Cloud adapter plugin for DBT
Project description
dbt-confluent
The dbt adapter for Confluent Cloud Flink SQL.
Build, test, and manage streaming data transformations on Confluent Cloud using dbt's familiar development workflow.
Overview
dbt-confluent lets you use dbt to define and run SQL transformations on Confluent Cloud's fully managed Apache Flink service. It supports both batch-style and streaming materializations, enabling continuous data pipelines defined as dbt models.
Features:
- Standard dbt materializations (table, view, ephemeral) adapted for Flink SQL
- Streaming-native materializations (
streaming_table,streaming_source) for continuous data pipelines - Integration with Confluent Cloud connectors (e.g., Datagen/Faker) via
streaming_source
See Materializations for the full list and details.
Installation
pip install dbt-confluent
or with uv:
uv add dbt-confluent
Requires Python 3.10+.
Configuration
After installing, scaffold a new project with:
dbt init my_project
Select confluent as the adapter and fill in the prompts for your Confluent Cloud credentials (API key, compute pool, environment, etc.).
Concept mapping
Confluent Cloud Flink uses different terminology than traditional databases. Here's how dbt concepts map to Flink and Confluent Cloud:
| dbt concept | Flink concept | Confluent Cloud entity |
|---|---|---|
database |
Catalog | Environment |
schema |
Database | Kafka cluster |
Schema configuration
Unlike most dbt adapters, dbt-confluent cannot create or drop schemas — a dbt schema maps to a Flink database (Kafka cluster) in Confluent Cloud, which is managed externally. Both the dbname in your profiles.yml and any model-level schema config must reference an existing Flink database by name:
# dbt_project.yml
models:
my_project:
+schema: my-kafka-cluster
Usage
Streaming table
A streaming table creates a table and runs a continuous INSERT query against it:
-- models/pageviews_enriched.sql
{{
config(
materialized='streaming_table',
with={'changelog.mode': 'append'}
)
}}
SELECT
p.user_id,
p.page_url,
u.username
FROM {{ ref('pageviews') }} p
JOIN {{ ref('users') }} u ON p.user_id = u.user_id
Streaming source
A streaming source creates a connector-backed source table. The model SQL defines the column definitions:
-- models/datagen_users.sql
{{
config(
materialized='streaming_source',
connector='faker',
with={'rows-per-second': '10'}
)
}}
`user_id` INT,
`username` STRING,
`email` STRING
See Materializations for the full list and details.
Known Limitations
- No schema management: Flink databases (Kafka clusters) cannot be created or dropped — they are managed in Confluent Cloud.
- No table renames:
ALTER TABLE RENAMEis not supported; to effectively rename a model you must drop and recreate the underlying table, which fortable,streaming_table, andstreaming_sourcematerializations requires running with--full-refresh. - No transactions: Flink SQL is non-transactional.
- No snapshots: Flink SQL lacks the batch operations (MERGE, UPDATE) required by dbt snapshots.
- No incremental: dbt's batch-incremental semantics does not map to Flink's continuous processing model. Use
streaming_tableinstead. - Drift detection for WITH options: Schema drift detection only verifies that user-specified
WITHoptions exist with correct values. It cannot detect when options are removed from the config (because connectors may add default options that cannot be distinguished from user-specified ones). Use--full-refreshto change or remove WITH options. Drift detection can be disabled per-model withconfig(on_schema_drift='ignore'). See Materializations for details.
Development
git clone https://github.com/confluentinc/dbt-confluent
cd dbt-confluent
uv sync --dev
See CONTRIBUTING.md for changelog and contribution guidelines.
Code quality
uv run ruff check dbt/ tests/
uv run ruff format --check dbt/ tests/
Running tests
Tests require a Confluent Cloud environment. Set the following environment variables (or add them to a test.env file):
export CONFLUENT_ENV_ID=env-xxxxxx
export CONFLUENT_ORG_ID=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
export CONFLUENT_COMPUTE_POOL_ID=lfcp-xxxxx
export CONFLUENT_CLOUD_PROVIDER=aws
export CONFLUENT_CLOUD_REGION=us-west-6
export CONFLUENT_TEST_DBNAME=dbname
export CONFLUENT_FLINK_API_KEY=xxx
export CONFLUENT_FLINK_API_SECRET=xxx
uv run pytest
Versioning
This adapter follows semantic versioning and is versioned independently from dbt Core. Compatibility with dbt Core is declared via dependencies (currently requires dbt-core~=1.11).
License
Apache-2.0 — see LICENSE for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dbt_confluent-0.2.0.tar.gz.
File metadata
- Download URL: dbt_confluent-0.2.0.tar.gz
- Upload date:
- Size: 137.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c26a7afb512de4f11eb56755c83a2511ee66776ba7fb6a4511ca22948b566f19
|
|
| MD5 |
220602aafd1c8bf5a949eb7347ca2bc9
|
|
| BLAKE2b-256 |
2ee361a7f39cad61343a4d7b276fe4fde15443282745b404aa032ffb6f74de0b
|
File details
Details for the file dbt_confluent-0.2.0-py3-none-any.whl.
File metadata
- Download URL: dbt_confluent-0.2.0-py3-none-any.whl
- Upload date:
- Size: 40.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e41aeb457a8ece90116e6bef0d88803a2346b3c8c15ef86a1e9a96d61fcf8cac
|
|
| MD5 |
58329aa82335e31cb96fe455cfa1aa59
|
|
| BLAKE2b-256 |
6af79c8ceb7ac1a08fcb181b6d54601b0e35cc1cb4fedd32d70e439b83e69818
|