Skip to main content

A framework for building and managing enterprise Lakeflow Declaritive Pipelines

Project description

Lakehouse Plumber

Turn 4,300 lines of repetitive Python/SQL code into 50 lines of reusable YAML ๐Ÿš€

Because every data lake needs a good plumber to keep the flows running smoothly! ๐Ÿšฐ

LakehousePlumber Logo

PyPI version Tests License: Apache 2.0 Lines of Code codecov Documentation Databricks PyPI - Downloads

The YAML-driven Metadata framework for Databricks Lakeflow Declarative Pipelines (formerly Delta Live Tables)

The only Metadata framework that generates production ready Pyspark code for Lakeflow Declarative Pipelinee

Transform repetitive data pipeline development with action-based configurations. Generate production-ready Python code from simple YAML definitions while maintaining full transparency and Databricks native features.

โšก Why Lakehouse Plumber?

Before: Repetitive Boilerplate Hell

# customer_bronze.py (86 lines ร— 50 tables = 4,300 lines!)
from pyspark.sql import functions as F
import dlt

@dlt.view()
def v_customer_raw():
    """Load customer table from raw schema"""
    df = spark.readStream.table("acmi_edw_dev.edw_raw.customer")
    df = df.withColumn('_processing_timestamp', F.current_timestamp())
    return df

@dlt.view(comment="SQL transform: customer_bronze_cleanse")
def v_customer_bronze_cleaned():
    """SQL transform: customer_bronze_cleanse"""
    return spark.sql("""SELECT
      c_custkey as customer_id,
      c_name as name,
      c_address as address,
      -- ... 50+ more lines of transformations
    FROM stream(v_customer_raw)""")

# ... 50+ more lines of data quality, table creation, etc.

After: One Reusable Template

# templates/csv_ingestion_template.yaml (50 lines for ALL tables!)
name: csv_ingestion_template
description: "Standard template for ingesting CSV files"

parameters:
  - name: table_name
    required: true
  - name: landing_folder
    required: true

actions:
  - name: load_{{ table_name }}_csv
    type: load
    source:
      type: cloudfiles
      path: "{landing_volume}/{{ landing_folder }}/*.csv"
      format: csv
      schema_evolution_mode: addNewColumns
    target: v_{{ table_name }}_raw
    
  - name: write_{{ table_name }}_bronze
    type: write
    source: v_{{ table_name }}_raw
    write_target:
      type: streaming_table
      database: "{{ catalog }}.{{ bronze_schema }}"
      table: "{{ table_name }}"

Plus 5 lines for each table

pipeline: raw_ingestions
flowgroup: customer_ingestion

use_template: csv_ingestion_template
template_parameters:
  table_name: customer
  landing_folder: customer
  table_properties:
    PII: "true"

๐ŸŽฏ Key Benefits

โœ… Eliminate 95% of Repetitive Code - One template replaces hundreds of nearly-identical files
โœ… Standardize Data Platform Quality - Enforce consistent table properties, schemas, and patterns
โœ… Maintain Full Transparency - Generated Python is readable, version-controlled, and debuggable
โœ… DataOps-Ready - Built for CI/CD pipelines, automated testing, and multi-environment deployment
โœ… Zero Runtime Overhead - Pure Python generation, no compilation at runtime
โœ… Databricks Native - Works with Unity Catalog, Lakeflow UI, and Databricks Assistant
โœ… No Vendor Lock-in - Output is standard Python & SQL you control

๐Ÿ—๏ธ Core Features

๐Ÿ”„ Action-Based Architecture

Build pipelines using composable Load, Transform, and Write actions:

Load Actions

  • CloudFiles: Auto Loader for streaming files (JSON, Parquet, CSV, Avro, ORC, XML)
  • Delta: Read from existing Delta tables with CDC support
  • JDBC: Connect to external databases
  • SQL: Execute custom SQL queries
  • Python: Custom Python data sources

Transform Actions

  • SQL: Standard SQL transformations
  • Python: Custom Python transformations
  • Data Quality: Apply expectations and validation rules
  • Schema: Column mapping and type casting
  • Temp Table: Create reusable temporary streaming tables

Write Actions

  • Streaming Table: Live tables with change data capture
  • Materialized View: Batch-computed analytics views

๐ŸŽจ Template & Preset System

  • Templates: Parameterized action patterns for common use cases
  • Presets: Reusable configuration snippets for standardization
  • Environment Management: Multi-environment support with variable substitution

๐Ÿš€ Advanced Capabilities

  • Append Flow API: Efficient multi-stream ingestion with automatic table creation management
  • Smart Generation: Content-based file writing - only regenerate what actually changed
  • Databricks Asset Bundles: Full integration for enterprise deployment workflows
  • VS Code IntelliSense: Complete auto-completion and validation for YAML configurations
  • Secret Management: Secure credential handling with scope-based organization and dbutils.secrets
  • Operational Metadata: Flexible metadata column creation

๐Ÿš€ Quick Start

Installation & Setup

# Install Lakehouse Plumber
pip install lakehouse-plumber

# Initialize new project
lhp init my_lakehouse_project --bundle
cd my_lakehouse_project

# Set up VS Code IntelliSense (optional)
lhp setup-intellisense

Create Your First Pipeline

# pipelines/bronze_ingestion/customers.yaml
  pipeline: tpch_sample_ingestion  # Grouping of generated python files in the same folder
   flowgroup: customer_ingestion   # Logical grouping for generated Python file

   actions:
      - name: customer_sample_load     # Unique action identifier
        type: load                     # Action type: Load
        readMode: stream              # Read using streaming CDF
        source:
           type: delta                # Source format: Delta Lake table
           database: "samples.tpch"   # Source database and schema in Unity Catalog
           table: customer_sample     # Source table name
        target: v_customer_sample_raw # Target view name (temporary in-memory)
        description: "Load customer sample table from Databricks samples catalog"

      - name: transform_customer_sample  # Unique action identifier
        type: transform                  # Action type: Transform
        transform_type: sql             # Transform using SQL query
        source: v_customer_sample_raw   # Input view from previous action
        target: v_customer_sample_cleaned  # Output view name
        sql: |                          # SQL transformation logic
           SELECT
           c_custkey as customer_id,    # Rename key field for clarity
           c_name as name,              # Simplify column name
           c_address as address,        # Keep address as-is
           c_nationkey as nation_id,    # Rename for consistency
           c_phone as phone,            # Simplify column name
           c_acctbal as account_balance, # More descriptive name
           c_mktsegment as market_segment, # Readable column name
           c_comment as comment         # Keep comment as-is
           FROM stream(v_customer_sample_raw)  # Stream from source view
        description: "Transform customer sample table"

      - name: write_customer_sample_bronze  # Unique action identifier
        type: write                         # Action type: Write
        source: v_customer_sample_cleaned   # Input view from previous action
        write_target:
           type: streaming_table            # Output as streaming table
           database: "{catalog}.{bronze_schema}"  # Target database.schema with substitutions
           table: "tpch_sample_customer"    # Final table name
        description: "Write customer sample table to bronze schema"

Configure & Generate

# Configure environment variables
# Edit substitutions/dev.yaml with your settings

# Validate configuration
lhp validate --env dev

# Generate production-ready Python code
lhp generate --env dev --cleanup

# Deploy with Databricks Bundles (optional)
databricks bundle deploy --target dev

๐Ÿข Enterprise Features

Databricks Asset Bundles Integration

Full integration with Databricks Asset Bundles for enterprise-grade deployment:

# databricks.yml - automatically detected
targets:
  dev:
    mode: development
    workspace:
      host: https://your-workspace.cloud.databricks.com
  prod:
    mode: production
    workspace:
      host: https://your-prod-workspace.cloud.databricks.com

resources:
  pipelines:
    bronze_ingestion:
      name: "bronze_ingestion_${bundle.target}"
      libraries:
        - file:
            path: ./generated/bronze_load

Multi-Environment DataOps Workflow

# Development
lhp generate --env dev
databricks bundle deploy --target dev

# Staging  
lhp generate --env staging
databricks bundle deploy --target staging

# Production
lhp generate --env prod
databricks bundle deploy --target prod

๐Ÿ“‹ Complete CLI Reference

๐Ÿš€ Project Lifecycle

lhp init <project_name>           # Initialize new project with best practices
lhp validate --env <env>          # Validate all configurations before generation  
lhp generate --env <env>          # Generate production-ready Python code
lhp info                          # Show project statistics and health metrics

๐Ÿ” Discovery & Debugging

lhp list-presets                  # Browse available configuration presets
lhp list-templates                # Browse available pipeline templates
lhp show <flowgroup> --env <env>  # Debug resolved configuration with substitutions
lhp stats                         # Analyze project complexity and code metrics

๐Ÿ› ๏ธ State & Maintenance

lhp generate --cleanup            # Clean up orphaned generated files
lhp state --env <env>             # View generation state and dependencies  
lhp state --cleanup --env <env>   # Remove stale state and orphaned files

๐Ÿง  VS Code Integration

lhp setup-intellisense           # Enable full YAML auto-completion
lhp setup-intellisense --check   # Verify system prerequisites
lhp setup-intellisense --status  # Show current IntelliSense status
lhp setup-intellisense --verify  # Test IntelliSense functionality
lhp setup-intellisense --cleanup # Remove IntelliSense configuration

๐Ÿ“ Project Structure

LakehousePlumber organizes your data pipeline code for maximum reusability and maintainability:

my_lakehouse_project/
โ”œโ”€โ”€ ๐Ÿ“‹ lhp.yaml                    # Project configuration
โ”œโ”€โ”€ ๐ŸŽจ presets/                    # Reusable configuration standards
โ”‚   โ”œโ”€โ”€ bronze_layer.yaml          #   Bronze layer defaults  
โ”‚   โ”œโ”€โ”€ silver_layer.yaml          #   Silver layer standards
โ”‚   โ””โ”€โ”€ gold_layer.yaml            #   Analytics layer patterns
โ”œโ”€โ”€ ๐Ÿ“ templates/                  # Parameterized pipeline patterns
โ”‚   โ”œโ”€โ”€ standard_ingestion.yaml    #   Common ingestion template
โ”‚   โ””โ”€โ”€ scd_type2.yaml             #   Slowly changing dimension template
โ”œโ”€โ”€ ๐Ÿ”„ pipelines/                  # Your pipeline definitions  
โ”‚   โ”œโ”€โ”€ bronze_ingestion/          #   Raw data ingestion
โ”‚   โ”‚   โ”œโ”€โ”€ customers.yaml         #     Customer data flow
โ”‚   โ”‚   โ””โ”€โ”€ orders.yaml            #     Order data flow
โ”‚   โ”œโ”€โ”€ silver_transforms/         #   Business logic transformation
โ”‚   โ”‚   โ””โ”€โ”€ customer_dimension.yaml#     Customer dimensional model
โ”‚   โ””โ”€โ”€ gold_analytics/            #   Analytics and reporting
โ”‚       โ””โ”€โ”€ customer_metrics.yaml  #     Customer analytics
โ”œโ”€โ”€ ๐ŸŒ substitutions/              # Environment-specific values
โ”‚   โ”œโ”€โ”€ dev.yaml                   #   Development settings
โ”‚   โ”œโ”€โ”€ staging.yaml               #   Staging settings  
โ”‚   โ””โ”€โ”€ prod.yaml                  #   Production settings
โ”œโ”€โ”€ โœ… expectations/               # Data quality rules
โ”‚   โ””โ”€โ”€ customer_quality.yaml      #   Customer data validation
โ””โ”€โ”€ ๐Ÿ generated/                  # Generated Python code (auto-managed)
    โ”œโ”€โ”€ bronze_load/               #   Generated bronze pipelines
    โ”œโ”€โ”€ silver_load/               #   Generated silver pipelines  
    โ””โ”€โ”€ gold_load/                 #   Generated analytics pipelines

๐Ÿง  VS Code IntelliSense Support

Get powerful auto-completion, validation, and documentation for all YAML files:

Quick Setup

lhp setup-intellisense    # One-time setup
# Restart VS Code

What You Get

  • โšก Smart Auto-completion - Context-aware suggestions for all fields
  • ๐Ÿ” Real-time Validation - Immediate error detection and feedback
  • ๐Ÿ“– Inline Documentation - Hover descriptions for every configuration option
  • ๐ŸŽฏ Schema Validation - Ensures correct YAML structure

Supported Files

  • Pipeline configurations (pipelines/**/*.yaml)
  • Templates (templates/**/*.yaml)
  • Presets (presets/**/*.yaml)
  • Environment settings (substitutions/**/*.yaml)
  • Project configuration (lhp.yaml)

๐Ÿ‘ฅ Who Should Use Lakehouse Plumber?

๐Ÿข Data Platform Teams

  • Standardize data engineering practices across the organization
  • Enforce consistent quality, security, and operational patterns
  • Reduce onboarding time for new data engineers

๐Ÿ‘จโ€๐Ÿ’ป Data Engineers

  • Eliminate repetitive boilerplate code and focus on business logic
  • Accelerate development with templates and presets
  • Maintain code quality with automated generation and validation

๐Ÿš€ DevOps/Platform Engineers

  • Implement Infrastructure as Code for data pipelines
  • Enable GitOps workflows with transparent Python generation
  • Integrate with existing CI/CD and Databricks Asset Bundles

๐Ÿ“Š Analytics Engineers

  • Build complex medallion architecture pipelines with simple YAML
  • Implement advanced patterns like SCD Type 2 without coding complexity
  • Focus on data modeling instead of infrastructure concerns

๐ŸŽจ Advanced Patterns Made Simple

Multi-Stream Table Creation

# Handle multiple data sources writing to the same table
- name: write_orders_primary
  write_target:
    table: orders
    create_table: true    # Primary stream creates table
    
- name: write_orders_secondary  
  write_target:
    table: orders
    create_table: false   # Secondary streams append

Data Quality Integration

# Built-in data quality with expectations
- name: validate_customers
  type: transform
  transform_type: data_quality
  source: v_customers_raw
  expectations_file: "customer_quality.yaml"

๐Ÿ”ง Smart Multi-Stream Table Management

LakehousePlumber uses Databricks Append Flow API for efficient multi-stream ingestion:

Simple Multi-Stream Setup

# Multiple streams โ†’ Single table  
- name: write_orders_primary
  write_target:
    table: orders
    create_table: true    # โœ… One stream creates table

- name: write_orders_secondary  
  write_target:
    table: orders
    create_table: false   # โœ… Other streams append

Generated Optimized Code

# Single table creation
dlt.create_streaming_table(name="orders", ...)

# Multiple append flows (high performance)
@dlt.append_flow(target="orders", name="f_orders_primary")
def f_orders_primary():
    return spark.readStream.table("v_orders_primary")

@dlt.append_flow(target="orders", name="f_orders_secondary") 
def f_orders_secondary():
    return spark.readStream.table("v_orders_secondary")

Benefits

โœ… Conflict Prevention - Automatic validation ensures exactly one table creator
โœ… High Performance - Native Databricks Append Flow API
โœ… Smart Generation - Only regenerate files that actually changed
โœ… Clear Error Messages - Actionable validation feedback

๐Ÿ“š Real-World Examples

๐Ÿฅ‰ Bronze: Raw Data Ingestion

pipeline: bronze_ingestion
flowgroup: orders
presets: [bronze_layer]

actions:
  - name: load_orders_autoloader
    type: load
    source:
      type: cloudfiles
      path: "{{ landing_path }}/orders/*.parquet"
      schema_evolution_mode: addNewColumns
    target: v_orders_raw
    
  - name: write_orders_bronze
    type: write
    source: v_orders_raw
    write_target:
      type: streaming_table
      table: "orders"
      cluster_columns: ["order_date"]

๐Ÿฅˆ Silver: Business Logic & Transformations

pipeline: silver_transforms
flowgroup: customer_dimension

actions:
  - name: cleanse_customers
    type: transform
    transform_type: sql
    source: "{{ bronze_schema }}.customers"
    sql: |
      SELECT 
        customer_key,
        TRIM(UPPER(customer_name)) as customer_name,
        REGEXP_REPLACE(phone, '[^0-9]', '') as phone_clean,
        market_segment,
        account_balance
      FROM STREAM(LIVE.customers)
      WHERE customer_key IS NOT NULL
          
  - name: write_customer_dimension
    type: write
    source: v_customers_cleansed
    write_target:
      type: streaming_table
      table: "dim_customers"
      table_properties:
        quality: "silver"

๐Ÿฅ‡ Gold: Analytics & Reporting

pipeline: gold_analytics  
flowgroup: customer_metrics

actions:
  - name: customer_lifetime_value
    type: transform
    transform_type: sql
    source: 
      - "{{ silver_schema }}.dim_customers"
      - "{{ silver_schema }}.fact_orders"
    sql: |
      SELECT 
        c.customer_key,
        c.customer_name,
        COUNT(o.order_key) as total_orders,
        SUM(o.total_price) as lifetime_value
      FROM LIVE.dim_customers c
      LEFT JOIN LIVE.fact_orders o USING (customer_key)
      WHERE c.__is_current = true
      GROUP BY c.customer_key, c.customer_name
      
  - name: write_customer_metrics
    type: write
    source: v_customer_ltv
    write_target:
      type: materialized_view
      table: "customer_metrics"
      refresh_schedule: "0 2 * * *"  # Daily at 2 AM

๐Ÿš€ Get Started Today

Ready to eliminate repetitive data pipeline code? Choose your path:

๐Ÿ†• New to Lakehouse Plumber

pip install lakehouse-plumber
lhp init my-first-project --bundle
cd my-first-project
lhp generate --env dev --cleanup

๐Ÿข Enterprise Evaluation

๐Ÿ‘ฅ Production Deployment

๐Ÿค Community & Support

๐Ÿ’ฌ Get Help

๐Ÿ”ง Contributing

We welcome contributions from the community! See our development guide for:

  • ๐Ÿ› ๏ธ Setting up local development environment
  • โœ… Running the comprehensive test suite
  • ๐Ÿ“ Contributing documentation and examples
  • ๐Ÿš€ Submitting features and improvements

๐Ÿ“„ License & Acknowledgments

Apache 2.0 License - See LICENSE for details

Built with โค๏ธ for the Databricks ecosystem and modern data engineering practices. Special thanks to the Databricks team for Lakeflow Declarative Pipelines and the open-source community for continuous inspiration.


Transform your data pipelines today โ€“ Get Started ๐Ÿš€

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

lakehouse_plumber-0.3.4.tar.gz (312.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

lakehouse_plumber-0.3.4-py3-none-any.whl (198.9 kB view details)

Uploaded Python 3

File details

Details for the file lakehouse_plumber-0.3.4.tar.gz.

File metadata

  • Download URL: lakehouse_plumber-0.3.4.tar.gz
  • Upload date:
  • Size: 312.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for lakehouse_plumber-0.3.4.tar.gz
Algorithm Hash digest
SHA256 563b3f27af32957fc9186b2b190847d161b94f5d8701a063befbd223d9ae23a9
MD5 101c8ed4f3e613cb28332f4a76fe85df
BLAKE2b-256 bb78cf6400d3cc9791482cea7d41a7098011cd63fe554423a169a748ad71e347

See more details on using hashes here.

Provenance

The following attestation bundles were made for lakehouse_plumber-0.3.4.tar.gz:

Publisher: publish.yml on Mmodarre/Lakehouse_Plumber

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file lakehouse_plumber-0.3.4-py3-none-any.whl.

File metadata

File hashes

Hashes for lakehouse_plumber-0.3.4-py3-none-any.whl
Algorithm Hash digest
SHA256 fc74cc4f71473c960a7568602d9b35434061ed6c79fb39d750632d1c84fd68c3
MD5 c6da150cc2be35682b55a87272e3ec66
BLAKE2b-256 ac894f1744efd528c5fff9dc631a2d490574ceabfa8566e69a9f7edcc13ef4a9

See more details on using hashes here.

Provenance

The following attestation bundles were made for lakehouse_plumber-0.3.4-py3-none-any.whl:

Publisher: publish.yml on Mmodarre/Lakehouse_Plumber

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page