Skip to main content

A framework for building and managing enterprise Lakeflow Declaritive Pipelines

Project description

Lakehouse Plumber

Plumbing the future of data engineering, one pipeline at a time ๐Ÿš€

Because every data lake needs a good plumber to keep the flows running smoothly! ๐Ÿšฐ

LakehousePlumber Logo

PyPI version Tests Python Support License: Apache 2.0 Lines of Code Code style: black codecov Documentation Databricks

Action-based Lakeflow Declaritive Pipelines (formerly DLT) code generator for Databricks

LakehousePlumber is a powerful CLI tool that generates Lakeflow Declaritive Pipelines from YAML configurations, enabling data engineers to build robust, scalable data pipelines using a declarative approach.

๐ŸŽฏ Key Features

  • Action-Based Architecture: Define pipelines using composable load, transform, and write actions
  • Append Flow API: Efficient multi-stream ingestion with automatic table creation management
  • Template System: Reusable pipeline templates with parameterization
  • Environment Management: Multi-environment support with token substitution
  • Data Quality Integration: Built-in expectations and validation
  • Smart Generation: Only regenerate changed files with state management and content-based file writing
  • Pipeline Validation: Comprehensive validation rules prevent configuration conflicts
  • Code Formatting: Automatic Python code formatting with Black
  • Secret Management: Secure handling of credentials and API keys
  • Operational Metadata: Automatic lineage tracking and data provenance

๐Ÿ—๏ธ Architecture

Action Types

LakehousePlumber supports three main action types:

๐Ÿ”„ Load Actions

  • CloudFiles: Structured streaming from cloud storage (JSON, Parquet, CSV)
  • Delta: Read from existing Delta tables
  • SQL: Execute SQL queries as data sources
  • JDBC: Connect to external databases
  • Python: Custom Python-based data loading

โšก Transform Actions

  • SQL: Standard SQL transformations
  • Python: Custom Python transformations
  • Data Quality: Apply expectations
  • Schema: Column mapping and type casting
  • Temp Table: Create temporary views

๐Ÿ’พ Write Actions

  • Streaming Table: Live tables with change data capture
  • Materialized View: Batch-computed views for analytics

Project Structure

my_lakehouse_project/
โ”œโ”€โ”€ lhp.yaml                   # Project configuration
โ”œโ”€โ”€ presets/                   # Reusable configurations
โ”‚   โ”œโ”€โ”€ bronze_layer.yaml      # Bronze layer defaults
โ”‚   โ”œโ”€โ”€ silver_layer.yaml      # Silver layer defaults
โ”‚   โ””โ”€โ”€ gold_layer.yaml        # Gold layer defaults
โ”œโ”€โ”€ templates/                 # Pipeline templates
โ”‚   โ”œโ”€โ”€ standard_ingestion.yaml
โ”‚   โ””โ”€โ”€ scd_type2.yaml
โ”œโ”€โ”€ pipelines/                 # Pipeline definitions
โ”‚   โ”œโ”€โ”€ bronze_ingestion/
โ”‚   โ”‚   โ”œโ”€โ”€ customers.yaml
โ”‚   โ”‚   โ””โ”€โ”€ orders.yaml
โ”‚   โ”œโ”€โ”€ silver_transforms/
โ”‚   โ”‚   โ””โ”€โ”€ customer_dimension.yaml
โ”‚   โ””โ”€โ”€ gold_analytics/
โ”‚       โ””โ”€โ”€ customer_metrics.yaml
โ”œโ”€โ”€ substitutions/             # Environment-specific values
โ”‚   โ”œโ”€โ”€ dev.yaml
โ”‚   โ”œโ”€โ”€ staging.yaml
โ”‚   โ””โ”€โ”€ prod.yaml
โ”œโ”€โ”€ expectations/              # Data quality rules
โ””โ”€โ”€ generated/                 # Generated code

๐Ÿš€ Quick Start

Installation

pip install lakehouse-plumber

Initialize a Project

lhp init my_lakehouse_project
cd my_lakehouse_project

Create Your First Pipeline

Create a simple ingestion pipeline:

# pipelines/bronze_ingestion/customers.yaml
pipeline: bronze_ingestion
flowgroup: customers
presets:
  - bronze_layer

actions:
  - name: load_customers_raw
    type: load
    source:
      type: cloudfiles
      path: "{{ landing_path }}/customers"
      format: json
      schema_evolution_mode: addNewColumns
    target: v_customers_raw
    description: "Load raw customer data from landing zone"

  - name: write_customers_bronze
    type: write
    source: v_customers_raw
    write_target:
      type: streaming_table
      database: "{{ catalog }}.{{ bronze_schema }}"
      table: "customers"
      table_properties:
        delta.enableChangeDataFeed: "true"
        quality: "bronze"
    description: "Write customers to bronze layer"

Configure Environment

# substitutions/dev.yaml
catalog: dev_catalog
bronze_schema: bronze
silver_schema: silver
gold_schema: gold
landing_path: /mnt/dev/landing
checkpoint_path: /mnt/dev/checkpoints

secrets:
  default_scope: dev-secrets
  scopes:
    database: dev-db-secrets
    storage: dev-storage-secrets

Validate and Generate

# Validate configuration
lhp validate --env dev

# Generate pipeline code
lhp generate --env dev

# View generated code
ls generated/

๐Ÿ“‹ CLI Commands

Project Management

  • lhp init <project_name> - Initialize new project
  • lhp validate --env <env> - Validate pipeline configurations
  • lhp generate --env <env> - Generate pipeline code
  • lhp info - Show project information and statistics

Discovery and Inspection

  • lhp list-presets - List available presets
  • lhp list-templates - List available templates
  • lhp show <flowgroup> --env <env> - Show resolved configuration
  • lhp stats - Show project statistics

State Management

  • lhp generate --cleanup - Clean up orphaned generated files
  • lhp state --env <env> - Show generation state
  • lhp state --cleanup --env <env> - Clean up orphaned files

IntelliSense Setup

  • lhp setup-intellisense - Set up VS Code IntelliSense support
  • lhp setup-intellisense --check - Check prerequisites
  • lhp setup-intellisense --status - Show current setup status
  • lhp setup-intellisense --verify - Verify setup is working
  • lhp setup-intellisense --conflicts - Show extension conflict analysis
  • lhp setup-intellisense --cleanup - Remove IntelliSense setup

๐Ÿง  VS Code IntelliSense Support

LakehousePlumber provides comprehensive VS Code IntelliSense support with auto-completion, validation, and documentation for all YAML configuration files.

โœจ Features

  • Smart Auto-completion: Context-aware suggestions for all configuration options
  • Real-time Validation: Immediate feedback on configuration errors
  • Inline Documentation: Hover hints and descriptions for all fields
  • Schema Validation: Ensures your YAML files follow the correct structure
  • Error Detection: Highlights syntax and semantic errors as you type

๐Ÿ”ง Setup

Prerequisites

  • VS Code installed and accessible via command line
  • LakehousePlumber installed (pip install lakehouse-plumber)

Quick Setup

# Check if your system is ready
lhp setup-intellisense --check

# Set up IntelliSense (one-time setup)
lhp setup-intellisense

# Restart VS Code to activate schema associations

Verify Setup

# Check if setup is working
lhp setup-intellisense --verify

# View current status
lhp setup-intellisense --status

๐ŸŽฏ What Gets IntelliSense Support

  • Pipeline Configurations (pipelines/**/*.yaml) - Full pipeline schema with actions, sources, and targets
  • Templates (templates/**/*.yaml) - Template definitions with parameter validation
  • Presets (presets/**/*.yaml) - Preset configuration schema
  • Substitutions (substitutions/**/*.yaml) - Environment-specific value validation
  • Project Configuration (lhp.yaml) - Main project settings

๐Ÿ“ Usage

Once set up, open any Lakehouse Plumber YAML file in VS Code and enjoy:

  1. Auto-completion: Press Ctrl+Space to see available options
  2. Documentation: Hover over any field to see descriptions
  3. Validation: Red underlines indicate errors with helpful messages
  4. Structure: IntelliSense guides you through the correct YAML structure

Example of IntelliSense in action:

# Type "actions:" and get auto-completion for action types
actions:
  - name: load_data
    type: # โ† IntelliSense suggests: load, transform, write
    source:
      type: # โ† IntelliSense suggests: cloudfiles, delta, sql, jdbc, python
      path: # โ† Documentation shows path requirements

๐Ÿ› ๏ธ Troubleshooting

Extension Conflicts

Some YAML extensions may conflict with LakehousePlumber schemas:

# Check for conflicts
lhp setup-intellisense --conflicts

# View detailed conflict analysis
lhp setup-intellisense --conflicts

Setup Issues

# Force setup even if prerequisites aren't met
lhp setup-intellisense --force

# Clean up and start fresh
lhp setup-intellisense --cleanup
lhp setup-intellisense

Common Issues

IntelliSense not working after setup:

  1. Restart VS Code completely
  2. Verify setup: lhp setup-intellisense --verify
  3. Check for extension conflicts: lhp setup-intellisense --conflicts

Schema associations missing:

  1. Check status: lhp setup-intellisense --status
  2. Re-run setup: lhp setup-intellisense --force

Red Hat YAML extension conflicts:

  • The Red Hat YAML extension is detected but usually works well alongside LakehousePlumber schemas
  • If issues persist, you can temporarily disable it or adjust its settings

๐ŸŽจ Advanced Features

Presets

Create reusable configurations:

# presets/bronze_layer.yaml
name: bronze_layer
version: "1.0"
description: "Standard bronze layer configuration"

defaults:
  operational_metadata: true
  load_actions:
    cloudfiles:
      schema_evolution_mode: addNewColumns
      rescue_data_column: "_rescued_data"
  write_actions:
    streaming_table:
      table_properties:
        delta.enableChangeDataFeed: "true"
        delta.autoOptimize.optimizeWrite: "true"
        quality: "bronze"

Templates

Create parameterized pipeline templates:

# templates/standard_ingestion.yaml
name: standard_ingestion
version: "1.0"
description: "Standard data ingestion template"

parameters:
  - name: source_path
    type: string
    required: true
  - name: table_name
    type: string
    required: true
  - name: file_format
    type: string
    default: "json"

actions:
  - name: "load_{{ table_name }}_raw"
    type: load
    source:
      type: cloudfiles
      path: "{{ source_path }}"
      format: "{{ file_format }}"
    target: "v_{{ table_name }}_raw"
    
  - name: "write_{{ table_name }}_bronze"
    type: write
    source: "v_{{ table_name }}_raw"
    write_target:
      type: streaming_table
      database: "{{ catalog }}.{{ bronze_schema }}"
      table: "{{ table_name }}"

Data Quality

Integrate expectations:

# expectations/customer_quality.yaml
expectations:
  - name: valid_customer_key
    constraint: "customer_key IS NOT NULL"
    on_violation: "fail"
  - name: valid_email
    constraint: "email RLIKE '^[A-Za-z0-9+_.-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$'"
    on_violation: "drop"
# Pipeline with data quality
- name: validate_customers
  type: transform
  transform_type: data_quality
  source: v_customers_raw
  target: v_customers_validated
  expectations_file: "expectations/customer_quality.yaml"

SCD Type 2

Implement Slowly Changing Dimensions:

- name: customer_dimension_scd2
  type: transform
  transform_type: python
  source: v_customers_validated
  target: v_customers_scd2
  python_source: |
    def scd2_merge(df):
        return df.withColumn("__start_date", current_date()) \
                 .withColumn("__end_date", lit(None)) \
                 .withColumn("__is_current", lit(True))

๐Ÿ—๏ธ Table Creation and Append Flow API

LakehousePlumber uses the Databricks Append Flow API to efficiently handle multiple data streams writing to the same streaming table. This approach prevents table recreation conflicts and enables high-performance, concurrent data ingestion.

Core Concepts

Table Creation Control

Every write action must specify whether it creates the table or appends to an existing one:

- name: write_orders_primary
  type: write
  source: v_orders_cleaned_primary
  write_target:
    type: streaming_table
    database: "{catalog}.{bronze_schema}"
    table: orders
    create_table: true  # โ† This action creates the table
    table_properties:
      delta.enableChangeDataFeed: "true"
      quality: "bronze"

- name: write_orders_secondary  
  type: write
  source: v_orders_cleaned_secondary
  write_target:
    type: streaming_table
    database: "{catalog}.{bronze_schema}"
    table: orders
    create_table: false  # โ† This action appends to existing table

Generated DLT Code

The above configuration generates optimized DLT code:

# Table is created once
dlt.create_streaming_table(
    name="catalog.bronze.orders",
    comment="Streaming table: orders",
    table_properties={
        "delta.enableChangeDataFeed": "true",
        "quality": "bronze"
    }
)

# Multiple append flows target the same table
@dlt.append_flow(
    target="catalog.bronze.orders",
    name="f_orders_primary",
    comment="Append flow to catalog.bronze.orders from v_orders_cleaned_primary"
)
def f_orders_primary():
    return spark.readStream.table("v_orders_cleaned_primary")

@dlt.append_flow(
    target="catalog.bronze.orders", 
    name="f_orders_secondary",
    comment="Append flow to catalog.bronze.orders from v_orders_cleaned_secondary"
)
def f_orders_secondary():
    return spark.readStream.table("v_orders_cleaned_secondary")

Validation Rules

LakehousePlumber enforces strict validation rules to prevent conflicts:

Rule 1: Exactly One Creator Per Table

Each streaming table must have exactly one action with create_table: true across the entire pipeline.

# โœ… VALID: One creator, multiple appenders
- name: write_lineitem_au
  write_target:
    table: lineitem
    create_table: true   # โ† Creates table

- name: write_lineitem_nz  
  write_target:
    table: lineitem
    create_table: false  # โ† Appends to table

- name: write_lineitem_us
  write_target:
    table: lineitem  
    create_table: false  # โ† Appends to table
# โŒ INVALID: Multiple creators
- name: action1
  write_target:
    table: lineitem
    create_table: true   # โ† Error: Multiple creators

- name: action2
  write_target:
    table: lineitem
    create_table: true   # โ† Error: Multiple creators
# โŒ INVALID: No creator
- name: action1
  write_target:
    table: lineitem
    create_table: false  # โ† Error: No creator for table

- name: action2
  write_target:
    table: lineitem
    create_table: false  # โ† Error: No creator for table

Rule 2: Explicit Configuration Required for Multiple Writes to Table

The create_table field defaults to true, NOT requiring explicit specification unless you want to append to an existing table:

# โŒ Implicit (defaults to true - may cause validation errors if you want to append to an existing table)
write_target:
  table: my_table
  # create_table not specified (defaults to true)

# โœ… Explicit (recommended for single write to table, required for multiple writes to table)  
write_target_1_n_more:
  table: my_existing_table
  create_table: false

Error Handling

LakehousePlumber provides clear, actionable error messages:

# No table creator
Table creation validation failed:
  - Table 'catalog.bronze.orders' has no creator. 
    One action must have 'create_table: true'. 
    Used by: orders_ingestion.write_orders_bronze

# Multiple table creators  
Table creation validation failed:
  - Table 'catalog.bronze.orders' has multiple creators: 
    orders_ingestion.write_orders_primary, orders_ingestion.write_orders_secondary. 
    Only one action can have 'create_table: true'.

Advanced Use Cases

Multi-Region Data Ingestion

# Pipeline ingesting from multiple regions
actions:
  - name: write_events_us_east
    type: write
    source: v_events_us_east_cleaned
    write_target:
      type: streaming_table
      database: "{catalog}.{bronze_schema}"
      table: events
      create_table: true  # Primary region creates table
      partition_columns: ["event_date", "region"]
      
  - name: write_events_us_west
    type: write  
    source: v_events_us_west_cleaned
    write_target:
      type: streaming_table
      database: "{catalog}.{bronze_schema}"
      table: events
      create_table: false  # Secondary regions append
      
  - name: write_events_eu
    type: write
    source: v_events_eu_cleaned  
    write_target:
      type: streaming_table
      database: "{catalog}.{bronze_schema}"
      table: events
      create_table: false  # Secondary regions append

Cross-Flowgroup Table Sharing

Tables can be shared across multiple flowgroups within the same pipeline:

# flowgroup1.yaml
pipeline: bronze_facts
flowgroup: orders_processing
actions:
  - name: write_orders_online
    write_target:
      table: all_orders
      create_table: true  # This flowgroup creates the table

# flowgroup2.yaml  
pipeline: bronze_facts
flowgroup: legacy_orders
actions:
  - name: write_orders_legacy
    write_target:
      table: all_orders
      create_table: false  # This flowgroup appends to existing table

Smart File Generation

LakehousePlumber includes intelligent file writing that reduces unnecessary file churn:

Content-Based File Writing

  • Only writes files when content actually changes
  • Normalizes whitespace and formatting for accurate comparison
  • Reduces Git noise and CI/CD overhead
# Generation output shows statistics
โœ… Generation complete: 2 files written, 8 files skipped (no changes)

Benefits

  • Faster CI/CD: Fewer file changes mean faster builds
  • Cleaner Git History: No unnecessary commits for unchanged files
  • Reduced Resource Usage: Less file I/O and processing
  • Better Developer Experience: Clear indication of actual changes

Migration Guide

From Legacy DLT Code

If you have existing DLT code with multiple dlt.create_streaming_table() calls:

# โŒ Legacy: Multiple table creations
dlt.create_streaming_table(name="catalog.bronze.orders", ...)
dlt.create_streaming_table(name="catalog.bronze.orders", ...)  # Conflict!

@dlt.table(name="catalog.bronze.orders")
def orders_flow1():
    return spark.readStream.table("source1")
    
@dlt.table(name="catalog.bronze.orders")  
def orders_flow2():
    return spark.readStream.table("source2")

Update your YAML configuration:

# โœ… New: Explicit table creation control
- name: write_orders_primary
  source: source1
  write_target:
    table: orders
    create_table: true   # Only this action creates

- name: write_orders_secondary
  source: source2  
  write_target:
    table: orders
    create_table: false  # This action appends

Backward Compatibility

Existing configurations without create_table flags will work but may trigger validation warnings. Update configurations gradually by adding explicit create_table flags.

๐Ÿ”ง Development

Prerequisites

  • Python 3.8+
  • Databricks workspace with enabled
  • Access to cloud storage (S3, ADLS, GCS)

Local Development

# Clone the repository
git clone https://github.com/yourusername/lakehouse-plumber.git
cd lakehouse-plumber

# Install in development mode
pip install -e .

# Run tests
pytest tests/

# Run CLI
lhp --help

Testing

LakehousePlumber includes comprehensive test coverage:

# Run all tests
pytest

# Run specific test categories
pytest tests/test_integration.py      # Integration tests
pytest tests/test_cli.py             # CLI tests
pytest tests/test_advanced_features.py  # Advanced features
pytest tests/test_performance.py     # Performance tests

๐Ÿ“š Examples

Bronze Layer Ingestion

pipeline: bronze_ingestion
flowgroup: orders
presets:
  - bronze_layer

actions:
  - name: load_orders_cloudfiles
    type: load
    source:
      type: cloudfiles
      path: "{{ landing_path }}/orders"
      format: parquet
      schema_evolution_mode: addNewColumns
    target: v_orders_raw
    operational_metadata: true
    
  - name: write_orders_bronze
    type: write
    source: v_orders_raw
    write_target:
      type: streaming_table
      database: "{{ catalog }}.{{ bronze_schema }}"
      table: "orders"
      partition_columns: ["order_date"]

Silver Layer Transformation

pipeline: silver_transforms
flowgroup: customer_dimension

actions:
  - name: cleanse_customers
    type: transform
    transform_type: sql
    source: "{{ catalog }}.{{ bronze_schema }}.customers"
    target: v_customers_cleansed
    sql: |
      SELECT 
        customer_key,
        TRIM(UPPER(customer_name)) as customer_name,
        REGEXP_REPLACE(phone, '[^0-9]', '') as phone_clean,
        address,
        nation_key,
        market_segment,
        account_balance
      FROM STREAM(LIVE.customers)
      WHERE customer_key IS NOT NULL
      
  - name: apply_scd2
    type: transform
    transform_type: python
    source: v_customers_cleansed
    target: v_customers_scd2
    python_source: |
      @dlt.view
      def scd2_logic():
          return spark.readStream.table("LIVE.v_customers_cleansed")
          
  - name: write_customer_dimension
    type: write
    source: v_customers_scd2
    write_target:
      type: streaming_table
      database: "{{ catalog }}.{{ silver_schema }}"
      table: "dim_customers"
      table_properties:
        delta.enableChangeDataFeed: "true"
        quality: "silver"

Gold Layer Analytics

pipeline: gold_analytics
flowgroup: customer_metrics

actions:
  - name: customer_lifetime_value
    type: transform
    transform_type: sql
    source: 
      - "{{ catalog }}.{{ silver_schema }}.dim_customers"
      - "{{ catalog }}.{{ silver_schema }}.fact_orders"
    target: v_customer_ltv
    sql: |
      SELECT 
        c.customer_key,
        c.customer_name,
        c.market_segment,
        COUNT(o.order_key) as total_orders,
        SUM(o.total_price) as lifetime_value,
        AVG(o.total_price) as avg_order_value,
        MAX(o.order_date) as last_order_date
      FROM LIVE.dim_customers c
      LEFT JOIN LIVE.fact_orders o ON c.customer_key = o.customer_key
      WHERE c.__is_current = true
      GROUP BY c.customer_key, c.customer_name, c.market_segment
      
  - name: write_customer_metrics
    type: write
    source: v_customer_ltv
    write_target:
      type: materialized_view
      database: "{{ catalog }}.{{ gold_schema }}"
      table: "customer_metrics"
      refresh_schedule: "0 2 * * *"  # Daily at 2 AM

๐Ÿค Contributing

We welcome contributions! Please see our Contributing Guide for details.

Development Setup

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Add tests for new functionality
  5. Ensure all tests pass
  6. Submit a pull request

๐Ÿ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.

๐Ÿ†˜ Support

๐Ÿ™ Acknowledgments

  • Built for the Databricks ecosystem
  • Inspired by modern data engineering practices
  • Designed for the medallion architecture pattern

Made with โค๏ธ for Databricks and Lakeflow Declarative Pipelines

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

lakehouse_plumber-0.2.14.tar.gz (274.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

lakehouse_plumber-0.2.14-py3-none-any.whl (180.7 kB view details)

Uploaded Python 3

File details

Details for the file lakehouse_plumber-0.2.14.tar.gz.

File metadata

  • Download URL: lakehouse_plumber-0.2.14.tar.gz
  • Upload date:
  • Size: 274.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for lakehouse_plumber-0.2.14.tar.gz
Algorithm Hash digest
SHA256 80caf737822681461b8dd2b33192e8d2807154a6e91a5db2e0bb6aa3899acb81
MD5 b1d05969c93e1359f3d3d7fca7312bfd
BLAKE2b-256 c4debf59988cdb841a1b659b4a8efd7e98013f6ee96e81e3c9aef4d224227dd7

See more details on using hashes here.

Provenance

The following attestation bundles were made for lakehouse_plumber-0.2.14.tar.gz:

Publisher: publish.yml on Mmodarre/Lakehouse_Plumber

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file lakehouse_plumber-0.2.14-py3-none-any.whl.

File metadata

File hashes

Hashes for lakehouse_plumber-0.2.14-py3-none-any.whl
Algorithm Hash digest
SHA256 c80b597b6c34a17bfa158154ca1cd45e0877b4fc33dd3e7f30accdb60e03e33c
MD5 76444135d16452c2435da409cb87cef8
BLAKE2b-256 e33c88d5d6732519b836f5e00ea69d6ba7f6d6fc131611da59153c1b14b6dec4

See more details on using hashes here.

Provenance

The following attestation bundles were made for lakehouse_plumber-0.2.14-py3-none-any.whl:

Publisher: publish.yml on Mmodarre/Lakehouse_Plumber

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page