A dbt-native Reverse ETL tool powered by dlt to move data between databases and APIs.
Project description
dbt-bridge
A dbt-native data movement layer powered by dlt — for cross-database sync, API ingestion, and (yes) Reverse ETL. Do everything inside dbt Python models, with full lineage in your DAG.
dbt-bridge lets you extract, transform, and load between any sources and destinations—all inside dbt. It uses dlt for schema-aware loading and uses dbt “Ghost Sources” to keep your lineage complete.
It’s basically: Move data anywhere → keep everything in one DAG.
🚀 Features
Cross-Database Movement Move data from Postgres → Snowflake, MySQL → BigQuery, DuckDB → S3, etc.
Reverse ETL (Optional, but supported) Push your modeled dbt tables into operational systems or external databases.
API Ingestion Pull data from REST APIs, transform using Pandas, and load it to your warehouse.
The “Bridge Pattern” Extract → Model locally (DuckDB) → Push to another destination.
Lineage Support Registers “Ghost Sources” so all upstream dependencies appear in dbt docs.
dbt Native Runs as part of dbt run, not a separate process.
📦 Installation
Install the package with only the connectors you need:
pip install "dbt-bridge[snowflake,postgres]"
Or install everything:
pip install "dbt-bridge[all]"
Supported Extras
Warehouses: snowflake, bigquery, redshift, databricks, synapse, fabric
Databases: postgres, mssql, duckdb, trino, athena
Storage / Filesystems: s3, gcs, azure, filesystem
🧪 Usage Examples
- Database → Database Transfer (Postgres → Snowflake) import dbt_bridge import dlt from dlt.sources.sql_database import sql_database
def model(dbt, session): dbt.config(materialized='table')
source = sql_database(schema="public", table_names=["users"])
dbt.source("postgres_prod", "users") # lineage
destination = dlt.destinations.snowflake()
return dbt_bridge.transfer(
dbt=dbt,
source_data=source,
target_destination=destination,
dataset_name="raw_postgres",
table_name="users_synced",
)
- API → Warehouse (with Pandas Transform) import dbt_bridge from dlt.sources.helpers.rest_client import RESTClient
def model(dbt, session): dbt.config(materialized='table')
client = RESTClient(base_url="https://api.example.com")
raw = client.paginate("/users")
df = dbt_bridge.api_to_df(raw)
df["email"] = df["email"].str.lower()
destination = dlt.destinations.snowflake()
return dbt_bridge.transfer(
dbt=dbt,
source_data=df,
target_destination=destination,
dataset_name="raw_api",
table_name="users",
)
- The Bridge Pattern (Extract → SQL Transform → Push)
Step 1: Ingest (Python Model) Saved locally via DuckDB.
Step 2: Transform (SQL Model) Standard dbt SQL logic.
Step 3: Push (Python Model) Send the final result to another destination.
import dbt_bridge import dlt
def model(dbt, session): dbt.config(materialized='table')
final_df = dbt.ref("int_active_users").arrow()
destination = dlt.destinations.snowflake()
return dbt_bridge.transfer(
dbt=dbt,
source_data=final_df,
target_destination=destination,
dataset_name="analytics_prod",
table_name="active_users",
)
🔧 Configuration
dlt reads credentials from .dlt/secrets.toml in your dbt project root:
[destination.snowflake.credentials] username = "user" password = "password" database = "ANALYTICS" host = "account_id" warehouse = "COMPUTE_WH"
[sources.sql_database.credentials] drivername = "postgresql" host = "localhost" port = 5432 database = "source_db" username = "user" password = "password"
� Incremental Loading
dbt-bridge supports three loading strategies:
1. Replace (Full Refresh)
return dbt_bridge.transfer(
dbt=dbt,
source_data=df,
target_destination=destination,
dataset_name="raw_api",
table_name="users",
write_disposition="replace" # Default
)
2. Append (Add Only)
return dbt_bridge.transfer(
dbt=dbt,
source_data=df,
target_destination=destination,
dataset_name="raw_api",
table_name="users",
write_disposition="append" # Never deletes
)
3. Merge (Upsert)
return dbt_bridge.transfer(
dbt=dbt,
source_data=df,
target_destination=destination,
dataset_name="raw_api",
table_name="users",
write_disposition="merge",
primary_key="id" # Required for merge
)
For composite keys, pass a list:
primary_key=["user_id", "date"]
�📘 License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dbt_bridge-0.1.3.tar.gz.
File metadata
- Download URL: dbt_bridge-0.1.3.tar.gz
- Upload date:
- Size: 7.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e6d91864819581a85f3a2b6020cff9fdbd56073d2ba770ee5f95e23e0763c978
|
|
| MD5 |
5d8caf90c33cbc3f8716acb1c4caea9b
|
|
| BLAKE2b-256 |
bbaa88acde4d843042808d6d8d0876e6ae4e8b1942cf391137e04b2517832421
|
File details
Details for the file dbt_bridge-0.1.3-py3-none-any.whl.
File metadata
- Download URL: dbt_bridge-0.1.3-py3-none-any.whl
- Upload date:
- Size: 6.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8635d6d516a9fdbb507e647c59d94cdb833879bb6b63a133d1e92a59c1306568
|
|
| MD5 |
555a9fac48d0e41dda8b84eaa8034c02
|
|
| BLAKE2b-256 |
ddf257396d60108842107ef8f9faddaae58de56291839dcddcb6a456c7291d23
|