A framework for building and managing enterprise Lakeflow Declaritive Pipelines
Project description
Lakehouse Plumber
Turn 4,300 lines of repetitive Python/SQL code into 50 lines of reusable YAML ๐
Because every data lake needs a good plumber to keep the flows running smoothly! ๐ฐ
The YAML-driven Metadata framework for Databricks Lakeflow Declarative Pipelines (formerly Delta Live Tables)
The only Metadata framework that generates production ready Pyspark code for Lakeflow Declarative Pipelinee
Transform repetitive data pipeline development with action-based configurations. Generate production-ready Python code from simple YAML definitions while maintaining full transparency and Databricks native features.
โก Why Lakehouse Plumber?
Before: Repetitive Boilerplate Hell
# customer_bronze.py (86 lines ร 50 tables = 4,300 lines!)
from pyspark.sql import functions as F
import dlt
@dlt.view()
def v_customer_raw():
"""Load customer table from raw schema"""
df = spark.readStream.table("acmi_edw_dev.edw_raw.customer")
df = df.withColumn('_processing_timestamp', F.current_timestamp())
return df
@dlt.view(comment="SQL transform: customer_bronze_cleanse")
def v_customer_bronze_cleaned():
"""SQL transform: customer_bronze_cleanse"""
return spark.sql("""SELECT
c_custkey as customer_id,
c_name as name,
c_address as address,
-- ... 50+ more lines of transformations
FROM stream(v_customer_raw)""")
# ... 50+ more lines of data quality, table creation, etc.
After: One Reusable Template
# templates/csv_ingestion_template.yaml (50 lines for ALL tables!)
name: csv_ingestion_template
description: "Standard template for ingesting CSV files"
parameters:
- name: table_name
required: true
- name: landing_folder
required: true
actions:
- name: load_{{ table_name }}_csv
type: load
source:
type: cloudfiles
path: "{landing_volume}/{{ landing_folder }}/*.csv"
format: csv
schema_evolution_mode: addNewColumns
target: v_{{ table_name }}_raw
- name: write_{{ table_name }}_bronze
type: write
source: v_{{ table_name }}_raw
write_target:
type: streaming_table
database: "{{ catalog }}.{{ bronze_schema }}"
table: "{{ table_name }}"
Plus 5 lines for each table
pipeline: raw_ingestions
flowgroup: customer_ingestion
use_template: csv_ingestion_template
template_parameters:
table_name: customer
landing_folder: customer
table_properties:
PII: "true"
๐ฏ Key Benefits
โ
Eliminate 95% of Repetitive Code - One template replaces hundreds of nearly-identical files
โ
Standardize Data Platform Quality - Enforce consistent table properties, schemas, and patterns
โ
Maintain Full Transparency - Generated Python is readable, version-controlled, and debuggable
โ
DataOps-Ready - Built for CI/CD pipelines, automated testing, and multi-environment deployment
โ
Zero Runtime Overhead - Pure Python generation, no compilation at runtime
โ
Databricks Native - Works with Unity Catalog, Lakeflow UI, and Databricks Assistant
โ
No Vendor Lock-in - Output is standard Python & SQL you control
๐๏ธ Core Features
๐ Action-Based Architecture
Build pipelines using composable Load, Transform, and Write actions:
Load Actions
- CloudFiles: Auto Loader for streaming files (JSON, Parquet, CSV, Avro, ORC, XML)
- Delta: Read from existing Delta tables with CDC support
- JDBC: Connect to external databases
- SQL: Execute custom SQL queries
- Python: Custom Python data sources
Transform Actions
- SQL: Standard SQL transformations
- Python: Custom Python transformations
- Data Quality: Apply expectations and validation rules
- Schema: Column mapping and type casting
- Temp Table: Create reusable temporary streaming tables
Write Actions
- Streaming Table: Live tables with change data capture
- Materialized View: Batch-computed analytics views
๐จ Template & Preset System
- Templates: Parameterized action patterns for common use cases
- Presets: Reusable configuration snippets for standardization
- Environment Management: Multi-environment support with variable substitution
๐ Advanced Capabilities
- Append Flow API: Efficient multi-stream ingestion with automatic table creation management
- Smart Generation: Content-based file writing - only regenerate what actually changed
- Databricks Asset Bundles: Full integration for enterprise deployment workflows
- VS Code IntelliSense: Complete auto-completion and validation for YAML configurations
- Secret Management: Secure credential handling with scope-based organization and
dbutils.secrets - Operational Metadata: Flexible metadata column creation
๐ Quick Start
Installation & Setup
# Install Lakehouse Plumber
pip install lakehouse-plumber
# Initialize new project
lhp init my_lakehouse_project --bundle
cd my_lakehouse_project
# Set up VS Code IntelliSense (optional)
lhp setup-intellisense
Create Your First Pipeline
# pipelines/bronze_ingestion/customers.yaml
pipeline: tpch_sample_ingestion # Grouping of generated python files in the same folder
flowgroup: customer_ingestion # Logical grouping for generated Python file
actions:
- name: customer_sample_load # Unique action identifier
type: load # Action type: Load
readMode: stream # Read using streaming CDF
source:
type: delta # Source format: Delta Lake table
database: "samples.tpch" # Source database and schema in Unity Catalog
table: customer_sample # Source table name
target: v_customer_sample_raw # Target view name (temporary in-memory)
description: "Load customer sample table from Databricks samples catalog"
- name: transform_customer_sample # Unique action identifier
type: transform # Action type: Transform
transform_type: sql # Transform using SQL query
source: v_customer_sample_raw # Input view from previous action
target: v_customer_sample_cleaned # Output view name
sql: | # SQL transformation logic
SELECT
c_custkey as customer_id, # Rename key field for clarity
c_name as name, # Simplify column name
c_address as address, # Keep address as-is
c_nationkey as nation_id, # Rename for consistency
c_phone as phone, # Simplify column name
c_acctbal as account_balance, # More descriptive name
c_mktsegment as market_segment, # Readable column name
c_comment as comment # Keep comment as-is
FROM stream(v_customer_sample_raw) # Stream from source view
description: "Transform customer sample table"
- name: write_customer_sample_bronze # Unique action identifier
type: write # Action type: Write
source: v_customer_sample_cleaned # Input view from previous action
write_target:
type: streaming_table # Output as streaming table
database: "{catalog}.{bronze_schema}" # Target database.schema with substitutions
table: "tpch_sample_customer" # Final table name
description: "Write customer sample table to bronze schema"
Configure & Generate
# Configure environment variables
# Edit substitutions/dev.yaml with your settings
# Validate configuration
lhp validate --env dev
# Generate production-ready Python code
lhp generate --env dev --cleanup
# Deploy with Databricks Bundles (optional)
databricks bundle deploy --target dev
๐ข Enterprise Features
Databricks Asset Bundles Integration
Full integration with Databricks Asset Bundles for enterprise-grade deployment:
# databricks.yml - automatically detected
targets:
dev:
mode: development
workspace:
host: https://your-workspace.cloud.databricks.com
prod:
mode: production
workspace:
host: https://your-prod-workspace.cloud.databricks.com
resources:
pipelines:
bronze_ingestion:
name: "bronze_ingestion_${bundle.target}"
libraries:
- file:
path: ./generated/bronze_load
Multi-Environment DataOps Workflow
# Development
lhp generate --env dev
databricks bundle deploy --target dev
# Staging
lhp generate --env staging
databricks bundle deploy --target staging
# Production
lhp generate --env prod
databricks bundle deploy --target prod
๐ Complete CLI Reference
๐ Project Lifecycle
lhp init <project_name> # Initialize new project with best practices
lhp validate --env <env> # Validate all configurations before generation
lhp generate --env <env> # Generate production-ready Python code
lhp info # Show project statistics and health metrics
๐ Discovery & Debugging
lhp list-presets # Browse available configuration presets
lhp list-templates # Browse available pipeline templates
lhp show <flowgroup> --env <env> # Debug resolved configuration with substitutions
lhp stats # Analyze project complexity and code metrics
๐ ๏ธ State & Maintenance
lhp generate --cleanup # Clean up orphaned generated files
lhp state --env <env> # View generation state and dependencies
lhp state --cleanup --env <env> # Remove stale state and orphaned files
๐ง VS Code Integration
lhp setup-intellisense # Enable full YAML auto-completion
lhp setup-intellisense --check # Verify system prerequisites
lhp setup-intellisense --status # Show current IntelliSense status
lhp setup-intellisense --verify # Test IntelliSense functionality
lhp setup-intellisense --cleanup # Remove IntelliSense configuration
๐ Project Structure
LakehousePlumber organizes your data pipeline code for maximum reusability and maintainability:
my_lakehouse_project/
โโโ ๐ lhp.yaml # Project configuration
โโโ ๐จ presets/ # Reusable configuration standards
โ โโโ bronze_layer.yaml # Bronze layer defaults
โ โโโ silver_layer.yaml # Silver layer standards
โ โโโ gold_layer.yaml # Analytics layer patterns
โโโ ๐ templates/ # Parameterized pipeline patterns
โ โโโ standard_ingestion.yaml # Common ingestion template
โ โโโ scd_type2.yaml # Slowly changing dimension template
โโโ ๐ pipelines/ # Your pipeline definitions
โ โโโ bronze_ingestion/ # Raw data ingestion
โ โ โโโ customers.yaml # Customer data flow
โ โ โโโ orders.yaml # Order data flow
โ โโโ silver_transforms/ # Business logic transformation
โ โ โโโ customer_dimension.yaml# Customer dimensional model
โ โโโ gold_analytics/ # Analytics and reporting
โ โโโ customer_metrics.yaml # Customer analytics
โโโ ๐ substitutions/ # Environment-specific values
โ โโโ dev.yaml # Development settings
โ โโโ staging.yaml # Staging settings
โ โโโ prod.yaml # Production settings
โโโ โ
expectations/ # Data quality rules
โ โโโ customer_quality.yaml # Customer data validation
โโโ ๐ generated/ # Generated Python code (auto-managed)
โโโ bronze_load/ # Generated bronze pipelines
โโโ silver_load/ # Generated silver pipelines
โโโ gold_load/ # Generated analytics pipelines
๐ง VS Code IntelliSense Support
Get powerful auto-completion, validation, and documentation for all YAML files:
Quick Setup
lhp setup-intellisense # One-time setup
# Restart VS Code
What You Get
- โก Smart Auto-completion - Context-aware suggestions for all fields
- ๐ Real-time Validation - Immediate error detection and feedback
- ๐ Inline Documentation - Hover descriptions for every configuration option
- ๐ฏ Schema Validation - Ensures correct YAML structure
Supported Files
- Pipeline configurations (
pipelines/**/*.yaml) - Templates (
templates/**/*.yaml) - Presets (
presets/**/*.yaml) - Environment settings (
substitutions/**/*.yaml) - Project configuration (
lhp.yaml)
๐ฅ Who Should Use Lakehouse Plumber?
๐ข Data Platform Teams
- Standardize data engineering practices across the organization
- Enforce consistent quality, security, and operational patterns
- Reduce onboarding time for new data engineers
๐จโ๐ป Data Engineers
- Eliminate repetitive boilerplate code and focus on business logic
- Accelerate development with templates and presets
- Maintain code quality with automated generation and validation
๐ DevOps/Platform Engineers
- Implement Infrastructure as Code for data pipelines
- Enable GitOps workflows with transparent Python generation
- Integrate with existing CI/CD and Databricks Asset Bundles
๐ Analytics Engineers
- Build complex medallion architecture pipelines with simple YAML
- Implement advanced patterns like SCD Type 2 without coding complexity
- Focus on data modeling instead of infrastructure concerns
๐จ Advanced Patterns Made Simple
Multi-Stream Table Creation
# Handle multiple data sources writing to the same table
- name: write_orders_primary
write_target:
table: orders
create_table: true # Primary stream creates table
- name: write_orders_secondary
write_target:
table: orders
create_table: false # Secondary streams append
Data Quality Integration
# Built-in data quality with expectations
- name: validate_customers
type: transform
transform_type: data_quality
source: v_customers_raw
expectations_file: "customer_quality.yaml"
๐ง Smart Multi-Stream Table Management
LakehousePlumber uses Databricks Append Flow API for efficient multi-stream ingestion:
Simple Multi-Stream Setup
# Multiple streams โ Single table
- name: write_orders_primary
write_target:
table: orders
create_table: true # โ
One stream creates table
- name: write_orders_secondary
write_target:
table: orders
create_table: false # โ
Other streams append
Generated Optimized Code
# Single table creation
dlt.create_streaming_table(name="orders", ...)
# Multiple append flows (high performance)
@dlt.append_flow(target="orders", name="f_orders_primary")
def f_orders_primary():
return spark.readStream.table("v_orders_primary")
@dlt.append_flow(target="orders", name="f_orders_secondary")
def f_orders_secondary():
return spark.readStream.table("v_orders_secondary")
Benefits
โ
Conflict Prevention - Automatic validation ensures exactly one table creator
โ
High Performance - Native Databricks Append Flow API
โ
Smart Generation - Only regenerate files that actually changed
โ
Clear Error Messages - Actionable validation feedback
๐ Real-World Examples
๐ฅ Bronze: Raw Data Ingestion
pipeline: bronze_ingestion
flowgroup: orders
presets: [bronze_layer]
actions:
- name: load_orders_autoloader
type: load
source:
type: cloudfiles
path: "{{ landing_path }}/orders/*.parquet"
schema_evolution_mode: addNewColumns
target: v_orders_raw
- name: write_orders_bronze
type: write
source: v_orders_raw
write_target:
type: streaming_table
table: "orders"
cluster_columns: ["order_date"]
๐ฅ Silver: Business Logic & Transformations
pipeline: silver_transforms
flowgroup: customer_dimension
actions:
- name: cleanse_customers
type: transform
transform_type: sql
source: "{{ bronze_schema }}.customers"
sql: |
SELECT
customer_key,
TRIM(UPPER(customer_name)) as customer_name,
REGEXP_REPLACE(phone, '[^0-9]', '') as phone_clean,
market_segment,
account_balance
FROM STREAM(LIVE.customers)
WHERE customer_key IS NOT NULL
- name: write_customer_dimension
type: write
source: v_customers_cleansed
write_target:
type: streaming_table
table: "dim_customers"
table_properties:
quality: "silver"
๐ฅ Gold: Analytics & Reporting
pipeline: gold_analytics
flowgroup: customer_metrics
actions:
- name: customer_lifetime_value
type: transform
transform_type: sql
source:
- "{{ silver_schema }}.dim_customers"
- "{{ silver_schema }}.fact_orders"
sql: |
SELECT
c.customer_key,
c.customer_name,
COUNT(o.order_key) as total_orders,
SUM(o.total_price) as lifetime_value
FROM LIVE.dim_customers c
LEFT JOIN LIVE.fact_orders o USING (customer_key)
WHERE c.__is_current = true
GROUP BY c.customer_key, c.customer_name
- name: write_customer_metrics
type: write
source: v_customer_ltv
write_target:
type: materialized_view
table: "customer_metrics"
refresh_schedule: "0 2 * * *" # Daily at 2 AM
๐ Get Started Today
Ready to eliminate repetitive data pipeline code? Choose your path:
๐ New to Lakehouse Plumber
pip install lakehouse-plumber
lhp init my-first-project --bundle
cd my-first-project
lhp generate --env dev --cleanup
๐ข Enterprise Evaluation
- ๐ Read the complete documentation
- ๐ฌ Explore the ACME demo project
- ๐ฌ Join our community discussions
๐ฅ Production Deployment
๐ค Community & Support
๐ฌ Get Help
- ๐ Documentation - Complete guides and API reference
- ๐ Issues - Bug reports and feature requests
- ๐ญ Discussions - Community Q&A and best practices
๐ง Contributing
We welcome contributions from the community! See our development guide for:
- ๐ ๏ธ Setting up local development environment
- โ Running the comprehensive test suite
- ๐ Contributing documentation and examples
- ๐ Submitting features and improvements
๐ License & Acknowledgments
Apache 2.0 License - See LICENSE for details
Built with โค๏ธ for the Databricks ecosystem and modern data engineering practices. Special thanks to the Databricks team for Lakeflow Declarative Pipelines and the open-source community for continuous inspiration.
Transform your data pipelines today โ Get Started ๐
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file lakehouse_plumber-0.3.4.tar.gz.
File metadata
- Download URL: lakehouse_plumber-0.3.4.tar.gz
- Upload date:
- Size: 312.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
563b3f27af32957fc9186b2b190847d161b94f5d8701a063befbd223d9ae23a9
|
|
| MD5 |
101c8ed4f3e613cb28332f4a76fe85df
|
|
| BLAKE2b-256 |
bb78cf6400d3cc9791482cea7d41a7098011cd63fe554423a169a748ad71e347
|
Provenance
The following attestation bundles were made for lakehouse_plumber-0.3.4.tar.gz:
Publisher:
publish.yml on Mmodarre/Lakehouse_Plumber
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
lakehouse_plumber-0.3.4.tar.gz -
Subject digest:
563b3f27af32957fc9186b2b190847d161b94f5d8701a063befbd223d9ae23a9 - Sigstore transparency entry: 298473773
- Sigstore integration time:
-
Permalink:
Mmodarre/Lakehouse_Plumber@020a9c08d140b26147910b4283dc8b6cd9aa985c -
Branch / Tag:
refs/heads/main - Owner: https://github.com/Mmodarre
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@020a9c08d140b26147910b4283dc8b6cd9aa985c -
Trigger Event:
workflow_run
-
Statement type:
File details
Details for the file lakehouse_plumber-0.3.4-py3-none-any.whl.
File metadata
- Download URL: lakehouse_plumber-0.3.4-py3-none-any.whl
- Upload date:
- Size: 198.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fc74cc4f71473c960a7568602d9b35434061ed6c79fb39d750632d1c84fd68c3
|
|
| MD5 |
c6da150cc2be35682b55a87272e3ec66
|
|
| BLAKE2b-256 |
ac894f1744efd528c5fff9dc631a2d490574ceabfa8566e69a9f7edcc13ef4a9
|
Provenance
The following attestation bundles were made for lakehouse_plumber-0.3.4-py3-none-any.whl:
Publisher:
publish.yml on Mmodarre/Lakehouse_Plumber
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
lakehouse_plumber-0.3.4-py3-none-any.whl -
Subject digest:
fc74cc4f71473c960a7568602d9b35434061ed6c79fb39d750632d1c84fd68c3 - Sigstore transparency entry: 298473778
- Sigstore integration time:
-
Permalink:
Mmodarre/Lakehouse_Plumber@020a9c08d140b26147910b4283dc8b6cd9aa985c -
Branch / Tag:
refs/heads/main - Owner: https://github.com/Mmodarre
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@020a9c08d140b26147910b4283dc8b6cd9aa985c -
Trigger Event:
workflow_run
-
Statement type: