Skip to main content

A dbt-native Reverse ETL tool powered by dlt to move data between databases and APIs.

Project description

ChatGPT said:

Here’s what you should update in the README, based on what the tool actually does today, the tone you want, and the direction this will grow.

I’ll give you (1) a cleaned-up, corrected, modernized README, (2) notes on what to change, and (3) a few optional “spicy but true” lines you can include.

✅ Updated README (Clean, Accurate, Slightly Punchy)

Drop this straight into your repo.

dbt-bridge

A dbt-native data movement layer powered by dlt — for cross-database sync, API ingestion, and (yes) Reverse ETL. Do everything inside dbt Python models, with full lineage in your DAG.

dbt-bridge lets you extract, transform, and load between any sources and destinations—all inside dbt. It uses dlt for schema-aware loading and uses dbt “Ghost Sources” to keep your lineage complete.

It’s basically: Move data anywhere → keep everything in one DAG.

🚀 Features

Cross-Database Movement Move data from Postgres → Snowflake, MySQL → BigQuery, DuckDB → S3, etc.

Reverse ETL (Optional, but supported) Push your modeled dbt tables into operational systems or external databases.

API Ingestion Pull data from REST APIs, transform using Pandas, and load it to your warehouse.

The “Bridge Pattern” Extract → Model locally (DuckDB) → Push to another destination.

Lineage Support Registers “Ghost Sources” so all upstream dependencies appear in dbt docs.

dbt Native Runs as part of dbt run, not a separate process.

📦 Installation

Install the package with only the connectors you need:

pip install "dbt-bridge[snowflake,postgres]"

Or install everything:

pip install "dbt-bridge[all]"

Supported Extras

Warehouses: snowflake, bigquery, redshift, databricks, synapse, fabric

Databases: postgres, mssql, duckdb, trino, athena

Storage / Filesystems: s3, gcs, azure, filesystem

🧪 Usage Examples

  1. Database → Database Transfer (Postgres → Snowflake) import dbt_bridge import dlt from dlt.sources.sql_database import sql_database

def model(dbt, session): dbt.config(materialized='table')

source = sql_database(schema="public", table_names=["users"])
dbt.source("postgres_prod", "users")  # lineage

destination = dlt.destinations.snowflake()

return dbt_bridge.transfer(
    dbt=dbt,
    source_data=source,
    target_destination=destination,
    dataset_name="raw_postgres",
    table_name="users_synced",
)
  1. API → Warehouse (with Pandas Transform) import dbt_bridge from dlt.sources.helpers.rest_client import RESTClient

def model(dbt, session): dbt.config(materialized='table')

client = RESTClient(base_url="https://api.example.com")
raw = client.paginate("/users")

df = dbt_bridge.api_to_df(raw)
df["email"] = df["email"].str.lower()

destination = dlt.destinations.snowflake()

return dbt_bridge.transfer(
    dbt=dbt,
    source_data=df,
    target_destination=destination,
    dataset_name="raw_api",
    table_name="users",
)
  1. The Bridge Pattern (Extract → SQL Transform → Push)

Step 1: Ingest (Python Model) Saved locally via DuckDB.

Step 2: Transform (SQL Model) Standard dbt SQL logic.

Step 3: Push (Python Model) Send the final result to another destination.

import dbt_bridge import dlt

def model(dbt, session): dbt.config(materialized='table')

final_df = dbt.ref("int_active_users").arrow()

destination = dlt.destinations.snowflake()

return dbt_bridge.transfer(
    dbt=dbt,
    source_data=final_df,
    target_destination=destination,
    dataset_name="analytics_prod",
    table_name="active_users",
)

🔧 Configuration

dlt reads credentials from .dlt/secrets.toml in your dbt project root:

[destination.snowflake.credentials] username = "user" password = "password" database = "ANALYTICS" host = "account_id" warehouse = "COMPUTE_WH"

[sources.sql_database.credentials] drivername = "postgresql" host = "localhost" port = 5432 database = "source_db" username = "user" password = "password"

� Incremental Loading

dbt-bridge supports three loading strategies:

1. Replace (Full Refresh)

return dbt_bridge.transfer(
    dbt=dbt,
    source_data=df,
    target_destination=destination,
    dataset_name="raw_api",
    table_name="users",
    write_disposition="replace"  # Default
)

2. Append (Add Only)

return dbt_bridge.transfer(
    dbt=dbt,
    source_data=df,
    target_destination=destination,
    dataset_name="raw_api",
    table_name="users",
    write_disposition="append"  # Never deletes
)

3. Merge (Upsert)

return dbt_bridge.transfer(
    dbt=dbt,
    source_data=df,
    target_destination=destination,
    dataset_name="raw_api",
    table_name="users",
    write_disposition="merge",
    primary_key="id"  # Required for merge
)

For composite keys, pass a list:

primary_key=["user_id", "date"]

�📘 License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dbt_bridge-0.1.2.tar.gz (7.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dbt_bridge-0.1.2-py3-none-any.whl (6.9 kB view details)

Uploaded Python 3

File details

Details for the file dbt_bridge-0.1.2.tar.gz.

File metadata

  • Download URL: dbt_bridge-0.1.2.tar.gz
  • Upload date:
  • Size: 7.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.7

File hashes

Hashes for dbt_bridge-0.1.2.tar.gz
Algorithm Hash digest
SHA256 9c494b9be6598ceb16bf31c3218ac3a56e2edd329b0f080031aaccc9e2b60318
MD5 5358cc000a053ffed273eb635226e24b
BLAKE2b-256 e6cd5e3e9a05e2c995d51a94d329c1a5ea7ef6bc4dcfe33e290a06142d82a60e

See more details on using hashes here.

File details

Details for the file dbt_bridge-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: dbt_bridge-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 6.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.7

File hashes

Hashes for dbt_bridge-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 7fe065331365b948ca65bc9127b500ac50c7de355d0063374f924b0b4424cfed
MD5 e04427b7d5f468533c1cb859c69ee5d9
BLAKE2b-256 1a2cc9fd75468bad5992e3b0f4908449a7ca1267aead4392d6cde61837753391

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page