Skip to main content

A dbt-native Reverse ETL tool powered by dlt to move data between databases and APIs.

Project description

dbt-bridge

A dbt-native data movement layer powered by dlt — for cross-database sync, API ingestion, and (yes) Reverse ETL.

Do everything inside dbt Python models, with full lineage in your DAG.

dbt-bridge lets you extract, transform, and load between any sources and destinations—all inside dbt. It uses dlt for schema-aware loading and uses dbt “Ghost Sources” to keep your lineage complete.

It’s basically: Move data anywhere → keep everything in one DAG.

🚀 Features

  • Cross-Database Movement: - Move data from Postgres → Snowflake, MySQL → BigQuery, DuckDB → S3, etc.

  • Reverse ETL (Optional, but supported): - Push your modeled dbt tables into operational systems or external databases.

  • API Ingestion: - Pull data from REST APIs, transform using Pandas, and load it to your warehouse.

  • The “Bridge Pattern”: Extract → Model locally (DuckDB) → Push to another destination.

  • Lineage Support: Registers “Ghost Sources” so all upstream dependencies appear in dbt docs.

  • dbt Native: Runs as part of dbt run, not a separate process.

📦 Installation

Install the package with only the connectors you need:

pip install "dbt-bridge[snowflake,postgres]"

Or install everything:

pip install "dbt-bridge[all]"

Supported Extras

  • Warehouses: snowflake, bigquery, redshift, databricks, synapse, fabric

  • Databases: postgres, mssql, duckdb, trino, athena

  • Storage / Filesystems: s3, gcs, azure, filesystem

🧪 Usage Examples

1. Database → Database Transfer (Postgres → Snowflake)

Move a table from a source database (e.g., Postgres) to your destination (e.g., Snowflake).

import dbt_bridge
import dlt
from dlt.sources.sql_database import sql_database

def model(dbt, session):
    dbt.config(materialized='table')

    source = sql_database(schema="public", table_names=["users"])
    dbt.source("postgres_prod", "users")  # lineage

    destination = dlt.destinations.snowflake()

    return dbt_bridge.transfer(
        dbt=dbt,
        source_data=source,
        target_destination=destination,
        dataset_name="raw_postgres",
        table_name="users_synced",
    )

2. API → Warehouse (with Pandas Transform)

import dbt_bridge
from dlt.sources.helpers.rest_client import RESTClient

def model(dbt, session):
    dbt.config(materialized='table')

    client = RESTClient(base_url="https://api.example.com")
    raw = client.paginate("/users")

    df = dbt_bridge.api_to_df(raw)
    df["email"] = df["email"].str.lower()

    destination = dlt.destinations.snowflake()

    return dbt_bridge.transfer(
        dbt=dbt,
        source_data=df,
        target_destination=destination,
        dataset_name="raw_api",
        table_name="users",
    )

Incremental Loading

dbt-bridge supports incremental extract → load workflows via dlt’s write_disposition modes.

Supported Write Dispositions

  • replace – full refresh (default)

  • append – insert new rows

  • merge – upsert based on a primary key

Example: Incremental Append

return dbt_bridge.transfer(
    dbt=dbt,
    source_data=df,
    target_destination=destination,
    dataset_name="raw_api",
    table_name="users",
    write_disposition="append",
)

Example: Incremental Merge (Upsert)

return dbt_bridge.transfer(
    dbt=dbt,
    source_data=df,
    target_destination=destination,
    dataset_name="raw_api",
    table_name="users",
    write_disposition="merge",
    primary_key="user_id",
)

How It Works

  • dbt computes upstream changes.
  • dbt-bridge converts the model to an Arrow/Pandas-compatible structure.
  • dlt performs incremental loads using the configured disposition and primary key.
  • Lineage remains fully visible in the dbt DAG.

3. The Bridge Pattern (Extract → SQL Transform → Push)

  1. Ingest (Python Model) – Fetch and stage data locally (DuckDB).
  2. Transform (SQL Model) – Standard dbt SQL transformations.
  3. Push (Python Model) – Load the final result to another destination.
import dbt_bridge
import dlt

def model(dbt, session):
    dbt.config(materialized='table')

    final_df = dbt.ref("int_active_users").arrow()

    destination = dlt.destinations.snowflake()

    return dbt_bridge.transfer(
        dbt=dbt,
        source_data=final_df,
        target_destination=destination,
        dataset_name="analytics_prod",
        table_name="active_users",
    )

🔧 Configuration

dlt reads credentials from .dlt/secrets.toml in your dbt project root:

[destination.snowflake.credentials]
username = "user"
password = "password"
database = "ANALYTICS"
host = "account_id"
warehouse = "COMPUTE_WH"

[sources.sql_database.credentials]
drivername = "postgresql"
host = "localhost"
port = 5432
database = "source_db"
username = "user"
password = "password" 

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dbt_bridge-0.1.5.tar.gz (7.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dbt_bridge-0.1.5-py3-none-any.whl (7.0 kB view details)

Uploaded Python 3

File details

Details for the file dbt_bridge-0.1.5.tar.gz.

File metadata

  • Download URL: dbt_bridge-0.1.5.tar.gz
  • Upload date:
  • Size: 7.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.7

File hashes

Hashes for dbt_bridge-0.1.5.tar.gz
Algorithm Hash digest
SHA256 0416f7053583349a980aa3a7dc00957acd879a3c62d4588f4a954c279b8ecce8
MD5 bc7cf6ad0d1352a3eb115548dd3c4518
BLAKE2b-256 977f80104ee3bb23483331430383e1b3ea9742c4554bdaa1f08373025deded6e

See more details on using hashes here.

File details

Details for the file dbt_bridge-0.1.5-py3-none-any.whl.

File metadata

  • Download URL: dbt_bridge-0.1.5-py3-none-any.whl
  • Upload date:
  • Size: 7.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.7

File hashes

Hashes for dbt_bridge-0.1.5-py3-none-any.whl
Algorithm Hash digest
SHA256 670eb5fda13d0562b1144450d033719da41cec84d68d3a6b3fe954e3684b85fc
MD5 ecdf55eb45934ef83455d45ab98cb6fe
BLAKE2b-256 d5bc5d8e807eec3dfc46f833bf0a8a28f5fa9366ed76f56da13d092523383e21

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page