Skip to main content

The Confluent Cloud adapter plugin for DBT

Project description

dbt-confluent

The dbt adapter for Confluent Cloud Flink SQL.

Build, test, and manage streaming data transformations on Confluent Cloud using dbt's familiar development workflow.

Overview

dbt-confluent lets you use dbt to define and run SQL transformations on Confluent Cloud's fully managed Apache Flink service. It supports both batch-style and streaming materializations, enabling continuous data pipelines defined as dbt models.

Features:

  • Standard dbt materializations (table, view, ephemeral) adapted for Flink SQL
  • Streaming-native materializations (streaming_table, streaming_source) for continuous data pipelines
  • Materialized views powered by Flink's continuous query execution
  • Integration with Confluent Cloud connectors (e.g., Datagen/Faker) via streaming_source

See Materializations for the full list and details.

Installation

pip install dbt-confluent

or with uv:

uv add dbt-confluent

Requires Python 3.10+.

Configuration

After installing, scaffold a new project with:

dbt init my_project

Select confluent as the adapter and fill in the prompts for your Confluent Cloud credentials (API key, compute pool, environment, etc.).

Concept mapping

Confluent Cloud Flink uses different terminology than traditional databases. Here's how dbt concepts map to Flink and Confluent Cloud:

dbt concept Flink concept Confluent Cloud entity
database Catalog Environment
schema Database Kafka cluster

Schema configuration

Unlike most dbt adapters, dbt-confluent cannot create or drop schemas — a dbt schema maps to a Flink database (Kafka cluster) in Confluent Cloud, which is managed externally. Both the dbname in your profiles.yml and any model-level schema config must reference an existing Flink database by name:

# dbt_project.yml
models:
  my_project:
    +schema: my-kafka-cluster

Usage

Streaming table

A streaming table creates a table and runs a continuous INSERT query against it:

-- models/pageviews_enriched.sql
{{
  config(
    materialized='streaming_table',
    with={'changelog.mode': 'append'}
  )
}}

SELECT
  p.user_id,
  p.page_url,
  u.username
FROM {{ ref('pageviews') }} p
JOIN {{ ref('users') }} u ON p.user_id = u.user_id

Streaming source

A streaming source creates a connector-backed source table. The model SQL defines the column definitions:

-- models/datagen_users.sql
{{
  config(
    materialized='streaming_source',
    connector='faker',
    with={'rows-per-second': '10'}
  )
}}

`user_id` INT,
`username` STRING,
`email` STRING

See Materializations for the full list and details.

Known Limitations

  • No schema management: Flink databases (Kafka clusters) cannot be created or dropped — they are managed in Confluent Cloud.
  • No table renames: ALTER TABLE RENAME is not supported; to effectively rename a model you must drop and recreate the underlying table, which for table, streaming_table, and streaming_source materializations requires running with --full-refresh.
  • No transactions: Flink SQL is non-transactional.
  • No snapshots: Flink SQL lacks the batch operations (MERGE, UPDATE) required by dbt snapshots.
  • No incremental: dbt's batch-incremental semantics does not map to Flink's continuous processing model. Use streaming_table instead.

Development

git clone https://github.com/confluentinc/dbt-confluent
cd dbt-confluent
uv sync --extra dev --extra test

Code quality

uv run ruff check dbt/ tests/
uv run ruff format --check dbt/ tests/

Running tests

Tests require a Confluent Cloud environment. Set the following environment variables (or add them to a test.env file):

export CONFLUENT_ENV_ID=env-xxxxxx
export CONFLUENT_ORG_ID=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
export CONFLUENT_COMPUTE_POOL_ID=lfcp-xxxxx
export CONFLUENT_CLOUD_PROVIDER=aws
export CONFLUENT_CLOUD_REGION=us-west-6
export CONFLUENT_TEST_DBNAME=dbname
export CONFLUENT_FLINK_API_KEY=xxx
export CONFLUENT_FLINK_API_SECRET=xxx
uv run pytest

Versioning

This adapter follows semantic versioning and is versioned independently from dbt Core. Compatibility with dbt Core is declared via dependencies (currently requires dbt-core~=1.11).

License

Apache-2.0 — see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dbt_confluent-0.1.0.tar.gz (126.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dbt_confluent-0.1.0-py3-none-any.whl (36.1 kB view details)

Uploaded Python 3

File details

Details for the file dbt_confluent-0.1.0.tar.gz.

File metadata

  • Download URL: dbt_confluent-0.1.0.tar.gz
  • Upload date:
  • Size: 126.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.0 {"installer":{"name":"uv","version":"0.11.0","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for dbt_confluent-0.1.0.tar.gz
Algorithm Hash digest
SHA256 96e0ef281b3b3e8c4f222a3c2c3cb5ce00977a18c274c0c1ad616e0ea6f89e51
MD5 b1c90a46c1f1ace9899772131789f07c
BLAKE2b-256 70fb5de550119c1b791f12d9ca4fc147bef2bcb6e31a1d1347fdb24452c6aa2c

See more details on using hashes here.

File details

Details for the file dbt_confluent-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: dbt_confluent-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 36.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.0 {"installer":{"name":"uv","version":"0.11.0","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for dbt_confluent-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 9b6bf5cfd0126090834bb99e1a73a2af4451f90b6b6ded7bcc18043d18f47856
MD5 b238a73c19845d2e010d689fb0d12e26
BLAKE2b-256 2abb7d538ef8598545711bcb1623394a6021f12bf41a29ecccd33ce957fdba5e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page