Skip to main content

The Confluent Cloud adapter plugin for DBT

Project description

dbt-confluent

The dbt adapter for Confluent Cloud Flink SQL.

Build, test, and manage streaming data transformations on Confluent Cloud using dbt's familiar development workflow.

Overview

dbt-confluent lets you use dbt to define and run SQL transformations on Confluent Cloud's fully managed Apache Flink service. It supports both batch-style and streaming materializations, enabling continuous data pipelines defined as dbt models.

Features:

  • Standard dbt materializations (table, view, ephemeral) adapted for Flink SQL
  • Streaming-native materializations (streaming_table, streaming_source) for continuous data pipelines
  • Integration with Confluent Cloud connectors (e.g., Datagen/Faker) via streaming_source

See Materializations for the full list and details.

Installation

pip install dbt-confluent

or with uv:

uv add dbt-confluent

Requires Python 3.10+.

Configuration

After installing, scaffold a new project with:

dbt init my_project

Select confluent as the adapter and fill in the prompts for your Confluent Cloud credentials (API key, compute pool, environment, etc.).

Concept mapping

Confluent Cloud Flink uses different terminology than traditional databases. Here's how dbt concepts map to Flink and Confluent Cloud:

dbt concept Flink concept Confluent Cloud entity
database Catalog Environment
schema Database Kafka cluster

Schema configuration

Unlike most dbt adapters, dbt-confluent cannot create or drop schemas — a dbt schema maps to a Flink database (Kafka cluster) in Confluent Cloud, which is managed externally. Both the dbname in your profiles.yml and any model-level schema config must reference an existing Flink database by name:

# dbt_project.yml
models:
  my_project:
    +schema: my-kafka-cluster

Usage

Streaming table

A streaming table creates a table and runs a continuous INSERT query against it:

-- models/pageviews_enriched.sql
{{
  config(
    materialized='streaming_table',
    with={'changelog.mode': 'append'}
  )
}}

SELECT
  p.user_id,
  p.page_url,
  u.username
FROM {{ ref('pageviews') }} p
JOIN {{ ref('users') }} u ON p.user_id = u.user_id

Streaming source

A streaming source creates a connector-backed source table. The model SQL defines the column definitions:

-- models/datagen_users.sql
{{
  config(
    materialized='streaming_source',
    connector='faker',
    with={'rows-per-second': '10'}
  )
}}

`user_id` INT,
`username` STRING,
`email` STRING

See Materializations for the full list and details.

Known Limitations

  • No schema management: Flink databases (Kafka clusters) cannot be created or dropped — they are managed in Confluent Cloud.
  • No table renames: ALTER TABLE RENAME is not supported; to effectively rename a model you must drop and recreate the underlying table, which for table, streaming_table, and streaming_source materializations requires running with --full-refresh.
  • No transactions: Flink SQL is non-transactional.
  • No snapshots: Flink SQL lacks the batch operations (MERGE, UPDATE) required by dbt snapshots.
  • No incremental: dbt's batch-incremental semantics does not map to Flink's continuous processing model. Use streaming_table instead.
  • Drift detection for WITH options: Schema drift detection only verifies that user-specified WITH options exist with correct values. It cannot detect when options are removed from the config (because connectors may add default options that cannot be distinguished from user-specified ones). Use --full-refresh to change or remove WITH options. Drift detection can be disabled per-model with config(on_schema_drift='ignore'). See Materializations for details.

Development

git clone https://github.com/confluentinc/dbt-confluent
cd dbt-confluent
uv sync --dev

See CONTRIBUTING.md for changelog and contribution guidelines.

Code quality

uv run ruff check dbt/ tests/
uv run ruff format --check dbt/ tests/

Running tests

Tests require a Confluent Cloud environment. Set the following environment variables (or add them to a test.env file):

export CONFLUENT_ENV_ID=env-xxxxxx
export CONFLUENT_ORG_ID=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
export CONFLUENT_COMPUTE_POOL_ID=lfcp-xxxxx
export CONFLUENT_CLOUD_PROVIDER=aws
export CONFLUENT_CLOUD_REGION=us-west-6
export CONFLUENT_TEST_DBNAME=dbname
export CONFLUENT_FLINK_API_KEY=xxx
export CONFLUENT_FLINK_API_SECRET=xxx
uv run pytest

Versioning

This adapter follows semantic versioning and is versioned independently from dbt Core. Compatibility with dbt Core is declared via dependencies (currently requires dbt-core~=1.11).

License

Apache-2.0 — see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dbt_confluent-0.2.0.tar.gz (137.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dbt_confluent-0.2.0-py3-none-any.whl (40.2 kB view details)

Uploaded Python 3

File details

Details for the file dbt_confluent-0.2.0.tar.gz.

File metadata

  • Download URL: dbt_confluent-0.2.0.tar.gz
  • Upload date:
  • Size: 137.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for dbt_confluent-0.2.0.tar.gz
Algorithm Hash digest
SHA256 c26a7afb512de4f11eb56755c83a2511ee66776ba7fb6a4511ca22948b566f19
MD5 220602aafd1c8bf5a949eb7347ca2bc9
BLAKE2b-256 2ee361a7f39cad61343a4d7b276fe4fde15443282745b404aa032ffb6f74de0b

See more details on using hashes here.

File details

Details for the file dbt_confluent-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: dbt_confluent-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 40.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for dbt_confluent-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e41aeb457a8ece90116e6bef0d88803a2346b3c8c15ef86a1e9a96d61fcf8cac
MD5 58329aa82335e31cb96fe455cfa1aa59
BLAKE2b-256 6af79c8ceb7ac1a08fcb181b6d54601b0e35cc1cb4fedd32d70e439b83e69818

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page