Greenplum adapter for dbt with exchange_partition incremental strategy
Project description
dbt enables data analysts and engineers to transform their data using the same practices that software engineers use to build applications.
dbt is the T in ELT. Organize, cleanse, denormalize, filter, rename, and pre-aggregate the raw data in your warehouse so that it's ready for analysis.
dbt-gp-delta
The dbt-gp-delta package contains the code enabling dbt to work with Greenplum. It is based on dbt-gp - a Greenplum adapter built on top of the postgres-adapter - and extends it with the exchange_partition incremental strategy without modifying any of the original functionality.
Installation
Easiest way to start use dbt-greenplum is to install it using pip
pip install dbt-gp-delta==<version>
Where <version> is same as your dbt version
Available versions:
- 1.2.0
exchange_partition incremental strategy
The exchange_partition strategy implements atomic, partition-level incremental loads using Greenplum's EXCHANGE PARTITION DDL operation. Instead of inserting rows into the live table, it builds a separate swap table for each affected partition period and atomically replaces the partition via a metadata-only operation - no physical data movement occurs.
How it works
First run. On the very first run (target table does not exist), dbt creates the partitioned table and immediately loads all data via
INSERT INTO. Theraw_partitionconfig must define a range wide enough to cover all rows returned by the model SQL on the first run — otherwise Greenplum will raiseno partition for partitioning key. On subsequent runs the strategy takes over and loads data partition-by-partition viaEXCHANGE PARTITION.
Overwrite mode (default, exchange_merge_partitions=false)
Run 1 — target does not exist:
└── CREATE TABLE target (columns from model contract in schema.yml,
partition DDL from raw_partition config)
INSERT INTO target ( <model SQL> ) ← all data loaded immediately
raw_partition must cover all rows returned on this run
Run 2+ — target exists:
│
├── if target exists but is NOT partitioned → compile-time error
│
▼
dbt creates a temporary staging table with the model SQL result
│ (standard dbt incremental mechanism, heap table in the session temp schema)
▼
for each period in DISTINCT(staging.partition_key): ← only periods with data
├── ADD PARTITION (if missing)
├── CREATE swap table (schema: exchange_swap_schema, same dist + storage as target)
│ name: <exchange_swap_schema>.__swap_<model>_<YYYYMMDD> (day)
│ name: <exchange_swap_schema>.__swap_<model>_<YYYYMM> (month)
│ columns cast to exact types from model contract (schema.yml)
├── EXCHANGE PARTITION (atomic leaf replacement)
└── DROP swap table
│
▼
dbt drops the temporary staging table automatically (end of session)
ANALYZE target (if exchange_analyze=true)
The target partition is fully replaced with data from the delta.
Merge mode (exchange_merge_partitions=true)
Run 1 — target does not exist:
└── CREATE TABLE target (columns from model contract in schema.yml,
partition DDL from raw_partition config)
INSERT INTO target ( <model SQL> ) ← all data loaded immediately
raw_partition must cover all rows returned on this run
Run 2+ — target exists:
│
├── if target exists but is NOT partitioned → compile-time error
│
▼
dbt creates a temporary staging table with the model SQL result
│ (standard dbt incremental mechanism, heap table in the session temp schema)
▼
for each period in DISTINCT(staging.partition_key): ← only periods with data
├── ADD PARTITION (if missing)
├── CREATE swap table AS (schema: exchange_swap_schema)
│ SELECT ... FROM delta WHERE period = this_period ← all delta rows (cast to types from model contract)
│ UNION ALL
│ SELECT ... FROM target WHERE period = this_period
│ AND NOT EXISTS (SELECT 1 FROM delta WHERE merge_keys match) ← target rows not in delta
├── EXCHANGE PARTITION
└── DROP swap table
│
▼
dbt drops the temporary staging table automatically (end of session)
ANALYZE target (if exchange_analyze=true)
When merge_keys match, delta rows take priority - target rows whose key matches a delta row are dropped via NOT EXISTS. Target rows that are not present in the delta by key are preserved. Duplicates within the delta itself are not deduplicated automatically - if the delta contains multiple rows with the same merge_key, all of them will appear in the result. Deduplicate the delta in the model SQL if needed.
Required model config
| Parameter | Type | Description |
|---|---|---|
partition_column |
string | Column used as the partition key. Must be castable to timestamptz. |
raw_partition |
string | Full PARTITION BY RANGE (...) (...) DDL clause. Written explicitly in config() — no auto-generation. |
exchange_swap_schema |
string | Schema where swap tables are created. Must exist in the database. |
exchange_merge_partitions |
bool | true - merge new rows with existing partition data (requires unique_key). false - fully overwrite affected partitions with staging data. |
contract: enforced: true |
- | Declared in schema.yml (not in config()). Column types are required because the target table is created via explicit DDL, not CTAS. The strategy raises a compile-time error if the contract is not enforced. |
Optional model config
| Parameter | Type | Default | Description |
|---|---|---|---|
unique_key |
string or list | — | Column(s) that uniquely identify a row. Required when exchange_merge_partitions=true. |
exchange_partition_granularity |
string | 'day' |
Partition period: 'day' or 'month'. |
exchange_create_missing_partitions |
bool | true |
Automatically add missing partitions for new periods found in staging data. |
exchange_allow_with_validation |
bool | true |
When true, Greenplum validates that the swap table data satisfies the partition constraints during EXCHANGE PARTITION. Set to false to skip validation for faster exchange — use only when data correctness is guaranteed upstream. |
exchange_analyze |
bool | true |
Run ANALYZE on the target table after all partitions are exchanged. |
distributed_by |
string | RANDOMLY |
Distribution key(s) for the target table, e.g. 'id' or 'tenant_id, id'. |
appendoptimized |
bool | — | Create an append-optimised (AO) table. |
orientation |
string | — | 'column' or 'row'. Applies only when appendoptimized=true. |
compresstype |
string | — | Compression algorithm, e.g. 'ZSTD', 'ZLIB'. Applies only when appendoptimized=true. |
compresslevel |
int | — | Compression level (0–9). Applies only when appendoptimized=true. |
blocksize |
int | — | Block size in bytes. Applies only when appendoptimized=true. |
Overwrite mode example
Every run fully replaces all partitions that appear in the new data. Useful when the source already delivers a clean, complete snapshot for each period.
{{
config(
materialized='incremental',
incremental_strategy='exchange_partition',
partition_column='transaction_date',
raw_partition="""PARTITION BY RANGE (transaction_date) (
START (DATE '2024-01-01') INCLUSIVE
END (DATE '2024-02-01') EXCLUSIVE
EVERY (INTERVAL '1 day')
)""",
exchange_partition_granularity='day',
exchange_swap_schema='stage',
exchange_merge_partitions=false,
distributed_by='id',
appendoptimized=true,
orientation='column',
compresstype='ZSTD',
compresslevel=1
)
}}
select
id,
transaction_date,
amount
from {{ source('raw', 'transactions') }}
{% if is_incremental() %}
where transaction_date >= current_date - interval '3 days'
{% endif %}
schema.yml for this model must declare column types:
models:
- name: transactions
config:
contract:
enforced: true
columns:
- name: id
data_type: bigint
- name: transaction_date
data_type: date
- name: amount
data_type: numeric(18,2)
Merge mode example
New rows are merged with existing partition data. Rows matched by unique_key are replaced by the incoming version; unmatched existing rows are preserved.
{{
config(
materialized='incremental',
incremental_strategy='exchange_partition',
partition_column='event_date',
raw_partition="""PARTITION BY RANGE (event_date) (
START (DATE '2024-01-01') INCLUSIVE
END (DATE '2024-02-01') EXCLUSIVE
EVERY (INTERVAL '1 month')
)""",
exchange_partition_granularity='month',
exchange_swap_schema='stage',
exchange_merge_partitions=true,
unique_key=['tenant_id', 'event_id'],
distributed_by='tenant_id',
appendoptimized=true,
orientation='column',
compresstype='ZSTD',
compresslevel=1
)
}}
select
tenant_id,
event_id,
event_date,
payload
from {{ source('raw', 'events') }}
{% if is_incremental() %}
where event_date >= date_trunc('month', current_date - interval '1 month')
{% endif %}
Important notes
- The target table must be a range-partitioned table. If a non-partitioned table with the same name already exists, the strategy raises a compile-time error. Drop it manually before the first run.
exchange_swap_schemamust exist in the database before the first run. The strategy does not create it automatically.raw_partitionmust be provided explicitly inconfig(). There is no auto-generation of the partition DDL.- Swap tables are named
__swap_{model_name}_{YYYYMMDD}(day) or__swap_{model_name}_{YYYYMM}(month) and are always dropped after a successful exchange. If a run is interrupted they will be cleaned up on the next run. Model name must not exceed 47 characters (day granularity) or 49 characters (month granularity) — Greenplum enforces a 63-character limit on identifiers, and the swap table name prefix__swap_(7 chars) plus date suffix_YYYYMMDD(9 chars) or_YYYYMM(7 chars) consumes the rest. - The staging table is a standard dbt temporary table created by the incremental materialization before the strategy is called. It lives in the session temp schema and is dropped automatically at the end of the session. It is not created in
exchange_swap_schema. WITHOUT VALIDATION(exchange_allow_with_validation=false) skips Greenplum's constraint check during the exchange. Use it only when you are certain the swap table data satisfies the partition constraints.- The strategy processes only partition periods that are actually present in the staging data. Periods with no new data are never touched, so existing partition data for those periods is preserved as-is.
Table naming
| Table | Schema | Example name |
|---|---|---|
| target (partitioned) | model schema (profiles.yml / dbt_project.yml) |
marts.orders |
| staging | session temp schema (managed by dbt) | orders__dbt_tmp |
| swap (day) | exchange_swap_schema |
stage.__swap_orders_20240101 |
| swap (month) | exchange_swap_schema |
stage.__swap_orders_202401 |
The staging table is a standard dbt temp table — created automatically before the strategy runs and dropped at the end of the session. Swap tables live in exchange_swap_schema and are dropped immediately after each partition exchange. The exchange_swap_schema schema must exist in the database before the first run.
Auto-creation of the partitioned table
On the first run dbt creates the target table and immediately loads all data into it — in one step:
- Column definitions are taken from the model contract (
schema.ymlwithcontract: enforced: trueanddata_typeon each column). - The partition DDL is taken from the
raw_partitionconfig parameter — written explicitly inconfig(). This gives full control over the initial partition range, step, and any Greenplum-specific partition options. - Data is inserted via
INSERT INTO target (model SQL). Theraw_partitionrange must be wide enough to cover all rows returned by the model SQL on the first run. If any row falls outside the defined partitions, Greenplum will raiseno partition for partitioning key. - Further partitions are added on demand via
ALTER TABLE ... ADD PARTITIONas new periods appear in the staging data (controlled byexchange_create_missing_partitions). - Distribution:
DISTRIBUTED BY (<distributed_by>)orDISTRIBUTED RANDOMLYifdistributed_byis not set.
On subsequent runs the check is idempotent — if the table already exists and is partitioned, creation is skipped.
If the table exists but is not partitioned, the strategy raises a compile-time error with a hint to drop it manually before re-running.
Usage examples
Schema name. By default dbt constructs the schema name as
<target_schema>_<custom_schema>(e.g.public_marts). This adapter overrides that behaviour via the built-ingenerate_schema_namemacro — the table is created exactly in the specified schema (schema='marts'→ schemamarts).
A model contract in schema.yml is required for all examples:
models:
- name: orders
config:
contract:
enforced: true
columns:
- name: id
data_type: bigint
- name: event_date
data_type: date
- name: user_id
data_type: integer
- name: amount
data_type: numeric(18,4)
- name: loaded_at
data_type: timestamp
1. Append-optimised (AO) table
1.1. Overwrite mode
{{
config(
schema='marts',
materialized='incremental',
incremental_strategy='exchange_partition',
partition_column='event_date',
raw_partition="""PARTITION BY RANGE (event_date) (
START (DATE '2026-01-01') INCLUSIVE
END (DATE '2026-01-02') EXCLUSIVE
EVERY (INTERVAL '1 day')
)""",
exchange_swap_schema='stage',
exchange_merge_partitions=false,
exchange_partition_granularity='day',
distributed_by='id',
appendoptimized=true,
orientation='column',
compresstype='zstd',
compresslevel=2
)
}}
SELECT
id,
event_date::date AS event_date,
user_id,
amount,
current_timestamp::timestamp AS loaded_at
FROM {{ source("ods", "orders") }}
{% if is_incremental() %}
WHERE event_date::date BETWEEN '{{ var("start_dttm") }}'::date
AND '{{ var("end_dttm") }}'::date
{% endif %}
The target partition is fully replaced with data from the delta.
Generated SQL (first run + one partition period):
-- Create target (first run)
CREATE TABLE marts.orders (
"id" bigint,
"event_date" date,
"user_id" integer,
"amount" numeric(18,4),
"loaded_at" timestamp
)
WITH (
appendoptimized=true,
orientation=column,
compresstype=zstd,
compresslevel=2
)
DISTRIBUTED BY (id)
PARTITION BY RANGE (event_date) (
START (DATE '2026-01-01') INCLUSIVE
END (DATE '2026-01-02') EXCLUSIVE
EVERY (INTERVAL '1 day')
);
-- dbt creates a temp staging table automatically (heap, session temp schema):
-- CREATE TEMP TABLE orders__dbt_tmp AS ( <model SQL> ) DISTRIBUTED BY (id);
-- For each period in delta:
-- Column types are taken from model contract (schema.yml) — no DB round-trip needed
CREATE TABLE stage.__swap_orders_20260101
WITH (appendoptimized=true, orientation=column, compresstype=zstd, compresslevel=2)
AS
SELECT
"id"::bigint AS "id",
"event_date"::date AS "event_date",
"user_id"::integer AS "user_id",
"amount"::numeric(18,4) AS "amount",
"loaded_at"::timestamp AS "loaded_at"
FROM orders__dbt_tmp
WHERE date_trunc('day', event_date::timestamptz) = DATE '2026-01-01'
DISTRIBUTED BY (id);
ALTER TABLE marts.orders
EXCHANGE PARTITION FOR (DATE '2026-01-01')
WITH TABLE stage.__swap_orders_20260101;
DROP TABLE stage.__swap_orders_20260101;
-- dbt drops the temp staging table automatically at end of session
ANALYZE marts.orders;
1.2. Merge mode
{{
config(
schema='marts',
materialized='incremental',
incremental_strategy='exchange_partition',
partition_column='event_date',
raw_partition="""PARTITION BY RANGE (event_date) (
START (DATE '2026-01-01') INCLUSIVE
END (DATE '2026-01-02') EXCLUSIVE
EVERY (INTERVAL '1 day')
)""",
exchange_swap_schema='stage',
exchange_merge_partitions=true,
unique_key=['id'],
exchange_partition_granularity='day',
distributed_by='id',
appendoptimized=true,
orientation='column',
compresstype='zstd',
compresslevel=2
)
}}
-- Delta only — not a full source snapshot
SELECT
id,
event_date::date AS event_date,
user_id,
amount,
current_timestamp::timestamp AS loaded_at
FROM {{ source("ods", "orders") }}
{% if is_incremental() %}
WHERE updated_at BETWEEN '{{ var("start_dttm") }}'::timestamp
AND '{{ var("end_dttm") }}'::timestamp
{% endif %}
When id matches, delta rows win. Target rows not present in the delta are preserved.
Generated SQL (incremental run, one partition period):
-- dbt creates a temp staging table automatically (heap, session temp schema):
-- CREATE TEMP TABLE orders__dbt_tmp AS ( <model SQL> ) DISTRIBUTED BY (id);
-- For each period in delta:
CREATE TABLE stage.__swap_orders_20260101
WITH (appendoptimized=true, orientation=column, compresstype=zstd, compresslevel=2)
AS
-- delta rows (always included, win on key conflict); cast to exact types from model contract (schema.yml)
SELECT
"id"::bigint AS "id",
"event_date"::date AS "event_date",
"user_id"::integer AS "user_id",
"amount"::numeric(18,4) AS "amount",
"loaded_at"::timestamp AS "loaded_at"
FROM orders__dbt_tmp
WHERE date_trunc('day', event_date::timestamptz) = DATE '2026-01-01'
UNION ALL
-- target rows not present in delta by merge key (preserved)
SELECT "id", "event_date", "user_id", "amount", "loaded_at"
FROM marts.orders __trg
WHERE date_trunc('day', __trg.event_date::timestamptz) = DATE '2026-01-01'
AND NOT EXISTS (
SELECT 1
FROM orders__dbt_tmp __delta
WHERE date_trunc('day', __delta.event_date::timestamptz) = DATE '2026-01-01'
AND __trg."id" = __delta."id"
)
DISTRIBUTED BY (id);
ALTER TABLE marts.orders
EXCHANGE PARTITION FOR (DATE '2026-01-01')
WITH TABLE stage.__swap_orders_20260101;
DROP TABLE stage.__swap_orders_20260101;
-- dbt drops the temp staging table automatically at end of session
ANALYZE marts.orders;
2. Heap table
Distribution: if
distributed_byis not set, the table is created withDISTRIBUTED RANDOMLY.DISTRIBUTED REPLICATEDis not supported: partitioned tables in Greenplum 6 are incompatible with replicated distribution.
2.1. Overwrite mode
{{
config(
schema='marts',
materialized='incremental',
incremental_strategy='exchange_partition',
partition_column='event_date',
raw_partition="""PARTITION BY RANGE (event_date) (
START (DATE '2026-01-01') INCLUSIVE
END (DATE '2026-01-02') EXCLUSIVE
EVERY (INTERVAL '1 day')
)""",
exchange_swap_schema='stage',
exchange_merge_partitions=false,
exchange_partition_granularity='day',
distributed_by='id'
)
}}
SELECT
id,
event_date::date AS event_date,
user_id,
amount,
current_timestamp::timestamp AS loaded_at
FROM {{ source("ods", "orders") }}
{% if is_incremental() %}
WHERE event_date::date BETWEEN '{{ var("start_dttm") }}'::date
AND '{{ var("end_dttm") }}'::date
{% endif %}
Generated SQL (first run + one partition period):
-- Create target (first run)
CREATE TABLE marts.orders (
"id" bigint,
"event_date" date,
"user_id" integer,
"amount" numeric(18,4),
"loaded_at" timestamp
)
DISTRIBUTED BY (id)
PARTITION BY RANGE (event_date) (
START (DATE '2026-01-01') INCLUSIVE
END (DATE '2026-01-02') EXCLUSIVE
EVERY (INTERVAL '1 day')
);
-- dbt creates a temp staging table automatically (heap, session temp schema):
-- CREATE TEMP TABLE orders__dbt_tmp AS ( <model SQL> ) DISTRIBUTED BY (id);
CREATE TABLE stage.__swap_orders_20260101
AS
SELECT
"id"::bigint AS "id",
"event_date"::date AS "event_date",
"user_id"::integer AS "user_id",
"amount"::numeric(18,4) AS "amount",
"loaded_at"::timestamp AS "loaded_at"
FROM orders__dbt_tmp
WHERE date_trunc('day', event_date::timestamptz) = DATE '2026-01-01'
DISTRIBUTED BY (id);
ALTER TABLE marts.orders
EXCHANGE PARTITION FOR (DATE '2026-01-01')
WITH TABLE stage.__swap_orders_20260101;
DROP TABLE stage.__swap_orders_20260101;
-- dbt drops the temp staging table automatically at end of session
ANALYZE marts.orders;
2.2. Merge mode
{{
config(
schema='marts',
materialized='incremental',
incremental_strategy='exchange_partition',
partition_column='event_date',
raw_partition="""PARTITION BY RANGE (event_date) (
START (DATE '2026-01-01') INCLUSIVE
END (DATE '2026-01-02') EXCLUSIVE
EVERY (INTERVAL '1 day')
)""",
exchange_swap_schema='stage',
exchange_merge_partitions=true,
unique_key=['id'],
exchange_partition_granularity='day',
distributed_by='id'
)
}}
-- Delta only — not a full source snapshot
SELECT
id,
event_date::date AS event_date,
user_id,
amount,
current_timestamp::timestamp AS loaded_at
FROM {{ source("ods", "orders") }}
{% if is_incremental() %}
WHERE updated_at BETWEEN '{{ var("start_dttm") }}'::timestamp
AND '{{ var("end_dttm") }}'::timestamp
{% endif %}
Generated SQL (incremental run, one partition period):
-- dbt creates a temp staging table automatically (heap, session temp schema):
-- CREATE TEMP TABLE orders__dbt_tmp AS ( <model SQL> ) DISTRIBUTED BY (id);
CREATE TABLE stage.__swap_orders_20260101
AS
-- delta rows (always included, win on key conflict); cast to exact types from model contract (schema.yml)
SELECT
"id"::bigint AS "id",
"event_date"::date AS "event_date",
"user_id"::integer AS "user_id",
"amount"::numeric(18,4) AS "amount",
"loaded_at"::timestamp AS "loaded_at"
FROM orders__dbt_tmp
WHERE date_trunc('day', event_date::timestamptz) = DATE '2026-01-01'
UNION ALL
-- target rows not present in delta by merge key (preserved)
SELECT "id", "event_date", "user_id", "amount", "loaded_at"
FROM marts.orders __trg
WHERE date_trunc('day', __trg.event_date::timestamptz) = DATE '2026-01-01'
AND NOT EXISTS (
SELECT 1
FROM orders__dbt_tmp __delta
WHERE date_trunc('day', __delta.event_date::timestamptz) = DATE '2026-01-01'
AND __trg."id" = __delta."id"
)
DISTRIBUTED BY (id);
ALTER TABLE marts.orders
EXCHANGE PARTITION FOR (DATE '2026-01-01')
WITH TABLE stage.__swap_orders_20260101;
DROP TABLE stage.__swap_orders_20260101;
-- dbt drops the temp staging table automatically at end of session
ANALYZE marts.orders;
Getting started
- Install dbt
- Read the introduction and viewpoint
Join the dbt Community
- Be part of the conversation in the dbt Community Slack
- Read more on the dbt Community Discourse
Reporting bugs and contributing code
- Want to report a bug or request a feature? Let us know on Slack, or open an issue
- Want to help us build dbt? Check out the Contributing Guide
Code of Conduct
Everyone interacting in the dbt project's codebases, issue trackers, chat rooms, and mailing lists is expected to follow the dbt Code of Conduct.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dbt_gp_delta-1.2.0.tar.gz.
File metadata
- Download URL: dbt_gp_delta-1.2.0.tar.gz
- Upload date:
- Size: 29.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c1b3bdea065c40600081b95208dcadeab42bbd60770fba3dc9796518faa121de
|
|
| MD5 |
9711045416e948912703dd8da4b794a8
|
|
| BLAKE2b-256 |
a760a7645efcf6435180d39e039a9875e3f088d6d383b7858b06a6c437ac8fc7
|
File details
Details for the file dbt_gp_delta-1.2.0-py3-none-any.whl.
File metadata
- Download URL: dbt_gp_delta-1.2.0-py3-none-any.whl
- Upload date:
- Size: 26.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a84e538b059baa3a1a536cb6fdea0fc3a216c58d5d35483e4a5d29912252a5f6
|
|
| MD5 |
b53f52fb5a8d63354e95b8feff5256f7
|
|
| BLAKE2b-256 |
0e7dd971967409dca26345713a6b9b733ede664a6b76a04a99b96a84a4e9010e
|