A framework for building and managing enterprise Lakeflow Declaritive Pipelines
Project description
Lakehouse Plumber
Because every Lakehouse needs a good plumber to keep the flows running smoothly! ๐ฐ
-
Metadata-driven frameworks that does not want to become your next enterprise blackbox.
-
The YAML-driven Metadata framework for Databricks Lakeflow Declarative Pipelines (formerly Delta Live Tables).
-
The only Metadata framework that generates production ready Pyspark code for Lakeflow Declarative Pipelines
โก Why Lakehouse Plumber?
Core principles of a good Metadata framework:
โ It shouldโฆ
-
Eliminate repetitive boiler-plate so you spend time on business logic, not plumbing.
-
Standardise Lakehouse platform and quality โ For instance, all tables in Bronze layer should have the same Delta table properties.
-
Remain transparent โ the generated Python files are readable, version-controlled and exactly what runs in production.
-
Fit naturally into DataOps workflows (CI/CD, automated testing, environments).
-
Be easy to debug โ The generated Python code is readable and can be debugged in the Databricks IDE for faster troubleshooting and development cycles.
-
Provide a pathway to data democratisation โ Allow power users and non-technical teams to create artifacts while upholding platform standards.
โ And it should NOTโฆ
-
Introduce runtime overhead (no compiling configurations at runtime to generate pipelines).
-
Obscure or wrap Databricks features โ you still work with notebooks, Unity Catalog, DLT UI.
-
Make debugging harder โ any failures can be debugged using Databricks' IDE and AI (Databricks Assistant).
-
Lock you in โ the output is plain Python & SQL that you control.
Transform repetitive data pipeline development with action-based configurations. Generate production-ready Python code from simple YAML definitions while maintaining full transparency and Databricks native features.
Real-World Example
Instead of repeating load code like this 50 times for 50 tables:
# Generated by LakehousePlumber
from pyspark.sql import functions as F
import dlt
@dlt.view()
def v_customer_raw():
"""Load customer table from raw schema"""
df = spark.readStream \
.table("acmi_edw_dev.edw_raw.customer")
df = df.withColumn('_processing_timestamp', F.current_timestamp())
return df
@dlt.view(comment="SQL transform: customer_bronze_cleanse")
def v_customer_bronze_cleaned():
"""SQL transform: customer_bronze_cleanse"""
return spark.sql("""SELECT
c_custkey as customer_id,
c_name as name,
c_address as address,
-- ... 50+ more lines of transformations
FROM stream(v_customer_raw)""")
# ... 50+ more lines of data quality, table creation, etc.
We create one reusable template:
name: csv_ingestion_template
description: "Standard template for ingesting CSV files"
parameters:
- name: table_name
required: true
- name: landing_folder
required: true
actions:
- name: load_{{ table_name }}_csv
type: load
source:
type: cloudfiles
path: "{landing_volume}/{{ landing_folder }}/*.csv"
format: csv
options:
header: True
delimiter: "|"
cloudFiles.schemaEvolutionMode: "addNewColumns"
target: v_{{ table_name }}_cloudfiles
- name: write_{{ table_name }}_cloudfiles
type: write
source: v_{{ table_name }}_cloudfiles
write_target:
type: streaming_table
database: "{catalog}.{raw_schema}"
table: "{{ table_name }}"
And our pipeline code becomes this simple 5-line configuration per table:
pipeline: raw_ingestions
flowgroup: customer_ingestion
use_template: csv_ingestion_template
template_parameters:
table_name: customer
landing_folder: customer
Result: 4,300 lines of repetitive Python โ 250 lines total (1 template + 50 simple configs)
Core Workflow
The execution model is deliberately simple:
graph LR
A[Load] --> B{0..N Transform}
B --> C[Write]
- Load Ingest raw data from CloudFiles, Delta, JDBC, SQL, or custom Python.
- Transform Apply zero or many transforms (SQL, Python, schema, data-quality, temp-tablesโฆ).
- Write Persist results as Streaming Tables, Materialized Views, or Snapshots.
Features at a Glance
- Actions โ Load | Transform | Write with many sub-types.
- Presets & Templates โ reuse patterns without copy-paste.
- Substitutions โ environment-aware tokens & secret references.
- CDC & SCD โ change-data capture SCD type 1 and 2 and snapshot ingestion.
- Append Flows โ append data to existing streaming tables.(multi source write to single target table)
- Data-Quality โ declarative expectations integrated into transforms.
- Operational Metadata โ custom audit columns and metadata.
- Smart State Management โ regenerate only what changed; cleanup orphaned code. (Terraform-like state management)
- IntelliSense โ VS Code schema hints & YAML completion.
- Seeding โ seed data from existing tables using Lakeflow native features.
What is the output of Lakehouse Plumber?
LHP is designed as a transparent "white box" system, ensuring that all generated code and logic are fully accessible and easy to understand. This approach allows users to leverage the advantages of Databricks Lakeflow Declarative Pipelines (LDP) while upholding enterprise-level code quality avoid repetitive boilerplate code...
As such, the output of LHP is a set of Python files that can be used to create Databricks Lakeflow Declarative Pipelines.
# Generated by LakehousePlumber
# Pipeline: raw_ingestions
# FlowGroup: customer_ingestion
from pyspark.sql import functions as F
import dlt
# Pipeline Configuration
PIPELINE_ID = "raw_ingestions"
FLOWGROUP_ID = "customer_ingestion"
# ============================================================================
# SOURCE VIEWS
# ============================================================================
# Schema hints for customer_cloudfiles table
customer_cloudfiles_schema_hints = """
c_custkey BIGINT,
c_name STRING,
c_address STRING,
c_nationkey BIGINT,
c_phone STRING,
c_acctbal DECIMAL(18,2),
c_mktsegment STRING,
c_comment STRING
""".strip().replace("\n", " ")
@dlt.view()
def v_customer_cloudfiles():
"""Load customer CSV files from landing volume"""
df = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("header", True) \
.option("delimiter", "|") \
.option("cloudFiles.maxFilesPerTrigger", 11) \
.option("cloudFiles.inferColumnTypes", False) \
.option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
.option("cloudFiles.rescuedDataColumn", "_rescued_data") \
.option("cloudFiles.schemaHints", customer_cloudfiles_schema_hints) \
.load("/Volumes/acmi_edw_dev/edw_raw/landing_volume/customer/*.csv")
# Add operational metadata columns
df = df.withColumn('_record_hash', F.xxhash64(*[F.col(c) for c in df.columns]))
df = df.withColumn('_source_file_size', F.col('_metadata.file_size'))
df = df.withColumn('_source_file_modification_time', F.col('_metadata.file_modification_time'))
df = df.withColumn('_source_file_path', F.col('_metadata.file_path'))
return df
# ============================================================================
# TARGET TABLES
# ============================================================================
# Create the streaming table
dlt.create_streaming_table(
name="acmi_edw_dev.edw_raw.customer",
comment="Streaming table: customer",
table_properties={"delta.autoOptimize.optimizeWrite": "true", "delta.enableChangeDataFeed": "true"})
# Define append flow(s)
@dlt.append_flow(
target="acmi_edw_dev.edw_raw.customer",
name="f_customer_cloudfiles",
comment="Append flow to acmi_edw_dev.edw_raw.customer"
)
def f_customer_cloudfiles():
"""Append flow to acmi_edw_dev.edw_raw.customer"""
# Streaming flow
df = spark.readStream.table("v_customer_cloudfiles")
return df
Core Features
Action-Based Architecture
LHP has 100% coverage of Databricks Lakeflow Declarative Pipelines (ETL) through Load, Transform, and Write actions:
Load Actions
- CloudFiles: Auto Loader for streaming files (JSON, Parquet, CSV, Avro, ORC, XML)
- Delta: Read from existing Delta tables with CDC support
- JDBC: Connect to external databases
- SQL: Execute custom SQL queries
- Python: Custom Python data sources
Transform Actions
- SQL: Standard SQL transformations
- Python: Custom Python transformations
- Data Quality: Apply expectations and validation rules
- Schema: Column mapping and type casting
- Temp Table: Create reusable temporary streaming tables
Write Actions
- Streaming Table: Live tables with change data capture
- Materialized View: Batch-computed analytics views
- Append Flow: Append data to streaming tables from multiple sources
- CDC: Change Data Capture Type 1 and 2 and from snapshots
Template & Preset System
- Templates: Parameterized action patterns for common use cases
- Presets: Reusable configuration snippets for standardization
- Environment Management: Multi-environment support with variable substitution
Advanced Capabilities
- Append Flow API: Efficient multi-stream ingestion with automatic table creation management
- Smart Generation: Content-based file writing - only regenerate what actually changed
- Databricks Asset Bundles: Full integration for enterprise deployment workflows
- VS Code IntelliSense: Complete auto-completion and validation for YAML configurations
- Secret Management: Secure credential handling with scope-based organization and
dbutils.secrets - Operational Metadata: Flexible metadata column creation
Get started in 5 minutes
Installation & Setup
# Install Lakehouse Plumber
pip install lakehouse-plumber
# Initialize new project
lhp init my_lakehouse_project --bundle
cd my_lakehouse_project
Create Your First Pipeline
# pipelines/bronze_ingestion/customers.yaml
pipeline: tpch_sample_ingestion # Grouping of generated python files in the same folder
flowgroup: customer_ingestion # Logical grouping for generated Python file
actions:
- name: customer_sample_load # Unique action identifier
type: load # Action type: Load
readMode: stream # Read using streaming CDF
source:
type: delta # Source format: Delta Lake table
database: "samples.tpch" # Source database and schema in Unity Catalog
table: customer_sample # Source table name
target: v_customer_sample_raw # Target view name (temporary in-memory)
description: "Load customer sample table from Databricks samples catalog"
- name: transform_customer_sample # Unique action identifier
type: transform # Action type: Transform
transform_type: sql # Transform using SQL query
source: v_customer_sample_raw # Input view from previous action
target: v_customer_sample_cleaned # Output view name
sql: | # SQL transformation logic
SELECT
c_custkey as customer_id, # Rename key field for clarity
c_name as name, # Simplify column name
c_address as address, # Keep address as-is
c_nationkey as nation_id, # Rename for consistency
c_phone as phone, # Simplify column name
c_acctbal as account_balance, # More descriptive name
c_mktsegment as market_segment, # Readable column name
c_comment as comment # Keep comment as-is
FROM stream(v_customer_sample_raw) # Stream from source view
description: "Transform customer sample table"
- name: write_customer_sample_bronze # Unique action identifier
type: write # Action type: Write
source: v_customer_sample_cleaned # Input view from previous action
write_target:
type: streaming_table # Output as streaming table
database: "{catalog}.{bronze_schema}" # Target database.schema with substitutions
table: "tpch_sample_customer" # Final table name
description: "Write customer sample table to bronze schema"
Configure & Generate
# Configure environment variables
# Edit substitutions/dev.yaml with your settings
# Validate configuration
lhp validate --env dev
# Generate production-ready Python code
lhp generate --env dev --cleanup
# Deploy with Databricks Bundles (optional)
databricks bundle deploy --target dev
Enterprise Features
Databricks Asset Bundles Integration
Full integration with Databricks Asset Bundles for enterprise-grade deployment:
# databricks.yml - automatically detected
targets:
dev:
mode: development
workspace:
host: https://your-workspace.cloud.databricks.com
prod:
mode: production
workspace:
host: https://your-prod-workspace.cloud.databricks.com
resources:
pipelines:
bronze_ingestion:
name: "bronze_ingestion_${bundle.target}"
libraries:
- file:
path: ./generated/bronze_load
Multi-Environment DataOps Workflow
# Development
lhp generate --env dev
databricks bundle deploy --target dev
# Staging
lhp generate --env staging
databricks bundle deploy --target staging
# Production
lhp generate --env prod
databricks bundle deploy --target prod
Project Structure
LakehousePlumber organizes your data pipeline code for maximum reusability and maintainability:
my_lakehouse_project/
โโโ lhp.yaml # Project configuration
โโโ presets/ # Reusable configuration standards
โ โโโ bronze_layer.yaml # Bronze layer defaults
โ โโโ silver_layer.yaml # Silver layer standards
โ โโโ gold_layer.yaml # Analytics layer patterns
โโโ templates/ # Parameterized pipeline patterns
โ โโโ standard_ingestion.yaml # Common ingestion template
โ โโโ scd_type2.yaml # Slowly changing dimension template
โโโ pipelines/ # Your pipeline definitions
โ โโโ bronze_ingestion/ # Raw data ingestion
โ โ โโโ customers.yaml # Customer data flow
โ โ โโโ orders.yaml # Order data flow
โ โโโ silver_transforms/ # Business logic transformation
โ โ โโโ customer_dimension.yaml# Customer dimensional model
โ โโโ gold_analytics/ # Analytics and reporting
โ โโโ customer_metrics.yaml # Customer analytics
โโโ substitutions/ # Environment-specific values
โ โโโ dev.yaml # Development settings
โ โโโ staging.yaml # Staging settings
โ โโโ prod.yaml # Production settings
โโโ expectations/ # Data quality rules
โ โโโ customer_quality.yaml # Customer data validation
โโโ generated/ # Generated Python code (auto-managed)
โโโ bronze_load/ # Generated bronze pipelines
โโโ silver_load/ # Generated silver pipelines
โโโ gold_load/ # Generated analytics pipelines
VS Code IntelliSense Support
Get powerful auto-completion, validation, and documentation for all YAML files:
What You Get
- Smart Auto-completion - Context-aware suggestions for all fields
- Real-time Validation - Immediate error detection and feedback
- Inline Documentation - Hover descriptions for every configuration option
- Schema Validation - Ensures correct YAML structure
Supported Files
- Pipeline configurations (
pipelines/**/*.yaml) - Templates (
templates/**/*.yaml) - Presets (
presets/**/*.yaml) - Environment settings (
substitutions/**/*.yaml) - Project configuration (
lhp.yaml)
Generated Optimized Code
# Single table creation
dlt.create_streaming_table(name="orders", ...)
# Multiple append flows (high performance)
@dlt.append_flow(target="orders", name="f_orders_primary")
def f_orders_primary():
return spark.readStream.table("v_orders_primary")
@dlt.append_flow(target="orders", name="f_orders_secondary")
def f_orders_secondary():
return spark.readStream.table("v_orders_secondary")
Benefits
โ
Conflict Prevention - Automatic validation ensures exactly one table creator
โ
High Performance - Native Databricks Append Flow API
โ
Smart Generation - Only regenerate files that actually changed
โ
Clear Error Messages - Actionable validation feedback
Real-World Examples
Bronze: Raw Data Ingestion
pipeline: bronze_ingestion
flowgroup: orders
presets: [bronze_layer]
actions:
- name: load_orders_autoloader
type: load
source:
type: cloudfiles
path: "{{ landing_path }}/orders/*.parquet"
schema_evolution_mode: addNewColumns
target: v_orders_raw
- name: write_orders_bronze
type: write
source: v_orders_raw
write_target:
type: streaming_table
table: "orders"
cluster_columns: ["order_date"]
Silver: Business Logic & Transformations
pipeline: silver_transforms
flowgroup: customer_dimension
actions:
- name: cleanse_customers
type: transform
transform_type: sql
source: "{{ bronze_schema }}.customers"
sql: |
SELECT
customer_key,
TRIM(UPPER(customer_name)) as customer_name,
REGEXP_REPLACE(phone, '[^0-9]', '') as phone_clean,
market_segment,
account_balance
FROM STREAM(LIVE.customers)
WHERE customer_key IS NOT NULL
- name: write_customer_dimension
type: write
source: v_customers_cleansed
write_target:
type: streaming_table
table: "dim_customers"
table_properties:
quality: "silver"
Gold: Analytics & Reporting
pipeline: gold_analytics
flowgroup: customer_metrics
actions:
- name: customer_lifetime_value
type: transform
transform_type: sql
source:
- "{{ silver_schema }}.dim_customers"
- "{{ silver_schema }}.fact_orders"
sql: |
SELECT
c.customer_key,
c.customer_name,
COUNT(o.order_key) as total_orders,
SUM(o.total_price) as lifetime_value
FROM LIVE.dim_customers c
LEFT JOIN LIVE.fact_orders o USING (customer_key)
WHERE c.__is_current = true
GROUP BY c.customer_key, c.customer_name
- name: write_customer_metrics
type: write
source: v_customer_ltv
write_target:
type: materialized_view
table: "customer_metrics"
refresh_schedule: "0 2 * * *" # Daily at 2 AM
Community & Support
- ๐ Documentation - Complete guides and API reference
- ๐ Issues - Bug reports and feature requests
- ๐ญ Discussions - Community Q&A and best practices
Contributing
We welcome contributions from the community! See our development guide for:
- Setting up local development environment
- Running the comprehensive test suite
- Contributing documentation and examples
- Submitting features and improvements
License & Acknowledgments
Apache 2.0 License - See LICENSE for details
Built with โค๏ธ for Databricks Lakeflow Declarative Pipelines(LDP).
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file lakehouse_plumber-0.3.5.tar.gz.
File metadata
- Download URL: lakehouse_plumber-0.3.5.tar.gz
- Upload date:
- Size: 312.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8cb308b6dfd945134a04ce2a3e25ef642ea2ca9562e25510b7b27c22a5cb5d27
|
|
| MD5 |
dd37be39599ae640e4f46be4412a9dcb
|
|
| BLAKE2b-256 |
a7ba30679540d84f350cf0f2c33bf270530e8f6e4585380ae214b1a72a006be7
|
Provenance
The following attestation bundles were made for lakehouse_plumber-0.3.5.tar.gz:
Publisher:
publish.yml on Mmodarre/Lakehouse_Plumber
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
lakehouse_plumber-0.3.5.tar.gz -
Subject digest:
8cb308b6dfd945134a04ce2a3e25ef642ea2ca9562e25510b7b27c22a5cb5d27 - Sigstore transparency entry: 298545754
- Sigstore integration time:
-
Permalink:
Mmodarre/Lakehouse_Plumber@b393d118235947abfffc3ab629addcb00d153705 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/Mmodarre
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@b393d118235947abfffc3ab629addcb00d153705 -
Trigger Event:
workflow_run
-
Statement type:
File details
Details for the file lakehouse_plumber-0.3.5-py3-none-any.whl.
File metadata
- Download URL: lakehouse_plumber-0.3.5-py3-none-any.whl
- Upload date:
- Size: 199.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c17c22fbce508c19ad5c6898fd179011ce6ba9c13c1344999c475b1fea02dc8e
|
|
| MD5 |
3a68d676dc73bd6744fa8d6c3e43703b
|
|
| BLAKE2b-256 |
337eafde77497d7cd27f69373ceeaf291493901fdbe5a65bb1171180ca6af295
|
Provenance
The following attestation bundles were made for lakehouse_plumber-0.3.5-py3-none-any.whl:
Publisher:
publish.yml on Mmodarre/Lakehouse_Plumber
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
lakehouse_plumber-0.3.5-py3-none-any.whl -
Subject digest:
c17c22fbce508c19ad5c6898fd179011ce6ba9c13c1344999c475b1fea02dc8e - Sigstore transparency entry: 298545770
- Sigstore integration time:
-
Permalink:
Mmodarre/Lakehouse_Plumber@b393d118235947abfffc3ab629addcb00d153705 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/Mmodarre
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@b393d118235947abfffc3ab629addcb00d153705 -
Trigger Event:
workflow_run
-
Statement type: