Dbt Core Interface
Project description
DBT-CORE-INTERFACE
Lightweight, thread-safe, multi-project Python interface to dbt-core
Built with the tools and technologies:
Overview
dbt-core-interface is a lightweight, high-performance Python interface for working directly with dbt-core (v1.8+). It allows developers to manage and run dbt projects entirely in memory using an intuitive Python API—enabling runtime SQL compilation, macro evaluation, SQLFluff linting/formatting, and more, all through FastAPI or local usage.
It supports dynamic multi-project environments, automatic re-parsing, file watchers, and asynchronous usage. It is the foundation for more complex interfaces such as dbt-fastapi and is designed to rapidly prototype ideas outside the constraints of the dbt-core repo itself.
Features
- 🧐 In-memory dbt-core 1.8+ interface with full
RuntimeConfighydration - ⚡ Fast, thread-safe SQL compilation and execution via FastAPI
- 🔬 Interactive linting and formatting with SQLFluff
- 🌐 Live REST API server via FastAPI
- 🌍 Supports multiple projects simultaneously using
DbtProjectContainer - 🚀 Dynamic macro parsing, Jinja rendering, manifest manipulation
- 🔄 Background file watching for auto-reparsing
- ⚖ Direct dbt command passthrough (e.g.
run,test,docs serve, etc.) - 🔍 Automated data quality monitoring and alerting
Requirements
- Python 3.9+
dbt-core >= 1.8.0
Install via PyPI:
pip install dbt-core-interface
Usage
Programmatic
from dbt_core_interface import DbtProject
# Load your project
project = DbtProject(project_dir="/path/to/dbt_project")
# Run a simple SQL query
res = project.execute_sql("SELECT current_date AS today")
print(res.table)
# Compile SQL (but don't run it)
compiled = project.compile_sql("SELECT * FROM {{ ref('my_model') }}")
print(compiled.compiled_code)
# Execute a ref() lookup
node = project.ref("my_model")
print(node.resource_type, node.name)
# Load a source node
source = project.source("my_source", "my_table")
print(source.description)
# Incrementally parse the project
project.parse_project(write_manifest=True)
# Re-parse a specific path
project.parse_paths("models/my_model.sql")
# Compile a node from path
node = project.get_node_by_path("models/my_model.sql")
compiled = project.compile_node(node)
print(compiled.compiled_code)
# Run a dbt command programmatically
project.run("-s +orders")
project.test()
# SQLFluff linting
lint_result = project.lint(sql="select 1 AS foo")
lint_result = project.lint(sql=Path("models/my_model.sql"))
print(lint_result)
# SQLFluff formatting
success, formatted_sql = project.format(sql="Select * FROM orders as o")
success, formatted_sql = project.format(sql=Path("models/my_model.sql"))
print(formatted_sql)
# Use the DbtProjectContainer to manage multiple projects
from dbt_core_interface import DbtProjectContainer
container = DbtProjectContainer()
container.create_project(project_dir="/path/to/dbt_project_1")
container.create_project(project_dir="/path/to/dbt_project_2")
print(container.registered_projects())
Server Mode (FastAPI)
Run:
python -m dbt_core_interface.server --host 0.0.0.0 --port 8581
Register a project:
curl -X POST 'http://localhost:8581/register?project_dir=/your/dbt_project'
Compile SQL:
curl -X POST 'http://localhost:8581/compile' \
-H 'X-dbt-Project: /your/dbt_project' \
-d 'select * from {{ ref("orders" }}'
Client Usage
Run the server and use the bundled client to interact with it:
from dbt_core_interface.client import DbtInterfaceClient, ServerError
client = DbtInterfaceClient(
project_dir="/path/to/project",
profiles_dir="/path/to/profiles.yml",
target="dev",
base_url="http://localhost:8581",
timeout=(5.0, 15.0)
)
# Health & heartbeat
print(client.health_check()) # {'status': 'ok', ...}
print(client.heartbeat()) # {'alive': True, 'uptime': ...}
# Run SQL with limit & path which allows resolving {{ this }}
result = client.run_sql("SELECT * FROM {{ this }} ORDER BY id", limit=500, path="models/my_model.sql")
print(result.table.rows)
# Compile without execution
comp = client.compile_sql("SELECT * FROM {{ ref('users') }}")
print(comp.compiled_code)
# Lint & format
lint = client.lint_sql(raw_sql="select * from {{ ref('users') }}")
print(lint.violations)
fmt = client.format_sql(raw_sql="select * from {{ ref('users') }}")
print(fmt.formatted_code)
# Arbitrary dbt command
docs = client.command("docs", "generate")
print(docs)
# On object deletion, project is unregistered automatically
del client
Data Quality Monitoring
dbt-core-interface includes automated data quality monitoring and alerting capabilities:
from dbt_core_interface import DbtProject, RowCountCheck, NullPercentageCheck
from dbt_core_interface import Severity, WebhookAlertChannel
# Load your project
project = DbtProject(project_dir="/path/to/dbt_project")
# Access the quality monitor
monitor = project.quality_monitor
# Add quality checks to your models
monitor.add_check(
"my_model",
RowCountCheck(
name="row_count_validation",
min_rows=1,
max_rows=1000000,
severity=Severity.ERROR,
)
)
monitor.add_check(
"my_model",
NullPercentageCheck(
name="null_id_check",
column_name="id",
max_null_percentage=0.0,
severity=Severity.CRITICAL,
)
)
# Add alert channels
monitor.add_alert_channel(
WebhookAlertChannel(url="https://hooks.slack.com/services/YOUR/WEBHOOK/URL")
)
# Run all quality checks
results = monitor.run_checks(model_name="my_model")
for result in results:
print(f"{result.check_name}: {result.status} - {result.message}")
# Run checks for all models
all_results = monitor.run_checks()
Available Check Types
- RowCountCheck: Validate row counts are within min/max bounds
- NullPercentageCheck: Ensure null percentage in a column is acceptable
- DuplicateCheck: Detect duplicate rows based on key columns
- ValueRangeCheck: Verify numeric values are within expected range
- CustomSqlCheck: Define custom SQL-based validation logic
Server API Endpoints
When running the FastAPI server, use these endpoints for quality monitoring:
# Add a quality check
curl -X POST 'http://localhost:8581/api/v1/quality/checks' \
-H 'X-dbt-Project: /your/dbt_project' \
-d '{
"name": "row_count_check",
"check_type": "row_count",
"model_name": "my_model",
"severity": "warning",
"config": {"min_rows": 1, "max_rows": 1000000}
}'
# List all quality checks
curl 'http://localhost:8581/api/v1/quality/checks?project_dir=/your/dbt_project'
# Run quality checks
curl -X POST 'http://localhost:8581/api/v1/quality/run?project_dir=/your/dbt_project&model_name=my_model'
# Add an alert channel
curl -X POST 'http://localhost:8581/api/v1/quality/alerts' \
-H 'X-dbt-Project: /your/dbt_project' \
-d '{
"channel_type": "webhook",
"config": {"url": "https://hooks.slack.com/..."}
}'
Generic Test Library
dbt-core-interface includes a comprehensive library of reusable generic dbt tests that can be easily configured via YAML schema files:
from dbt_core_interface import DbtProject, GenericTestLibrary
# Load your project
project = DbtProject(project_dir="/path/to/dbt_project")
# Initialize the test library
library = GenericTestLibrary(project)
# List all available tests
for test in library.list_tests():
print(f"{test.name}: {test.description}")
# Generate schema.yml with suggested tests
columns = {"id": {}, "email": {}, "status": {}}
schema_yml = library.generate_schema_yml("users", columns)
print(schema_yml)
Available Generic Tests
- unique: Ensures a column has unique values (no duplicates)
- not_null: Ensures a column has no null values
- relationships: Ensures referential integrity between tables
- accepted_values: Ensures column values are from a specified list
- recency: Checks data freshness within a time window
- cardinality_equals: Ensures distinct count matches another table
Example YAML Configuration
version: 2
models:
- name: users
columns:
- name: id
tests:
- unique
- not_null
- name: email
tests:
- unique
- not_null
- name: status
tests:
- accepted_values:
values: ['active', 'inactive', 'pending']
Auto-Suggest Tests
The library can automatically suggest appropriate tests based on column naming patterns:
# Get suggestions for specific columns
for col_name in ["id", "user_id", "email", "status"]:
suggestions = library.suggest_tests_for_column(col_name)
for suggestion in suggestions:
print(f"{col_name}: {suggestion.test_type.value}")
For detailed documentation, see src/dbt_core_interface/generic_tests/docs.md.
License
This project is licensed under the MIT License. See the LICENSE file for more info.
Acknowledgments
Thanks to the dbt-core maintainers and contributors whose work makes this project possible.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dbt_core_interface-1.1.7.tar.gz.
File metadata
- Download URL: dbt_core_interface-1.1.7.tar.gz
- Upload date:
- Size: 80.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.18
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f61417665c3db28be3dcc9ca01d5e7d15d0508b0da7ac8e1fb52145a91b31a14
|
|
| MD5 |
c7766a17b7661ddc38727d0dcddb8196
|
|
| BLAKE2b-256 |
3efe75fbee7be8d58e310505cfe9a963051f16ba57a75bf2b1d6fca9129323f7
|
File details
Details for the file dbt_core_interface-1.1.7-py3-none-any.whl.
File metadata
- Download URL: dbt_core_interface-1.1.7-py3-none-any.whl
- Upload date:
- Size: 93.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.18
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1b9631f6e9aed0fead98b724e261f608e035474f7a229cbc2206f16e1a05238c
|
|
| MD5 |
4cf269154ecc2e9b70a01e92d7ce41c5
|
|
| BLAKE2b-256 |
ef54987736dc6ac05bf4d66fc6b8bf21d4f5da815ada2fe78a273889c7cc8692
|