Skip to main content

Package for Fabric Engineers

Project description

FabricEngineer Package

CI CD

Description

FabricEngineer is a comprehensive Python package designed specifically for Microsoft Fabric developers to streamline data transformation workflows and automate complex ETL processes. This package provides enterprise-grade solutions for building robust data pipelines with minimal boilerplate code.

Key Features

🚀 Silver Layer Data Ingestion Services

  • Insert-Only Pattern: Efficient data ingestion with support for schema evolution and historization
  • SCD Type 2 (Slowly Changing Dimensions): Complete implementation of Type 2 SCD with automatic history tracking
  • Delta Load Support: Optimized incremental data processing with broadcast join capabilities
  • Schema Evolution: Automatic handling of schema changes with backward compatibility

📊 Materialized Lake Views (MLV)

  • Automated MLV Generation: Create and manage materialized views with SQL generation
  • Schema-aware Operations: Intelligent handling of schema changes and column evolution
  • Lakehouse Integration: Seamless integration with Microsoft Fabric Lakehouse architecture

🔧 Advanced Data Engineering Features

  • Configurable Transformations: Flexible transformation pipelines with custom business logic
  • Data Quality Controls: Built-in validation and data quality checks
  • Performance Optimization: Broadcast joins, partition strategies, and optimized query patterns
  • Comprehensive Logging: Integrated logging and performance monitoring with TimeLogger

Installation

pip install fabricengineer-py

Quick Start Guide

Prerequisites

  • Microsoft Fabric workspace with Lakehouse
  • PySpark environment
  • Python 3.11+

Usage Examples

Silver Layer Data Ingestion

Insert-Only Pattern

The Insert-Only service is ideal for append-only scenarios where you need to track all changes while maintaining performance.

from pyspark.sql import DataFrame, functions as F
from fabricengineer.logging import TimeLogger
from fabricengineer.transform.lakehouse import LakehouseTable
from fabricengineer.transform import SilverIngestionInsertOnlyService


def transform_projects(df: DataFrame, etl) -> DataFrame:
    df = df.withColumn("dtime", F.to_timestamp("dtime"))
    return df


def transform_all(df: DataFrame, etl) -> DataFrame:
    df = df.withColumn("data", F.lit("values"))
    return df


# Initialize performance monitoring
timer = TimeLogger()

# Define table-specific transformations
transformations = {
    "*": transform_all,             # Applied to all tables
    "projects": transform_projects  # Applied only to projects table
}

# Configure source and destination tables
source_table = LakehouseTable(
    lakehouse="BronzeLakehouse",
    schema="schema",
    table="projects"
)
destination_table = LakehouseTable(
    lakehouse="SilverLakehouse",
    schema=source_table.schema,
    table=source_table.table
)

# Initialize and configure the ETL service
etl = SilverIngestionInsertOnlyService()
etl.init(
    spark_=spark,
    notebookutils_=notebookutils,
    source_table=source_table,
    destination_table=destination_table,
    nk_columns=NK_COLUMNS,
    constant_columns=CONSTANT_COLUMNS,
    is_delta_load=IS_DELTA_LOAD,
    delta_load_use_broadcast=DELTA_LOAD_USE_BROADCAST,
    transformations=transformations,
    exclude_comparing_columns=EXCLUDE_COLUMNS_FROM_COMPARING,
    include_comparing_columns=INCLUDE_COLUMNS_AT_COMPARING,
    historize=HISTORIZE,
    partition_by_columns=PARTITION_BY_COLUMNS,
    df_bronze=None,
    create_historized_mlv=True
)


timer.start().log()
etl.run()
timer.stop().log()

SCD Type 2 (Slowly Changing Dimensions)

The SCD2 service implements Type 2 Slowly Changing Dimensions with automatic history tracking and current record management.

from pyspark.sql import DataFrame, functions as F
from fabricengineer.logging import TimeLogger
from fabricengineer.transform.lakehouse import LakehouseTable
from fabricengineer.transform import SilverIngestionSCD2Service


def transform_projects(df: DataFrame, etl) -> DataFrame:
    df = df.withColumn("dtime", F.to_timestamp("dtime"))
    return df


def transform_all(df: DataFrame, etl) -> DataFrame:
    df = df.withColumn("data", F.lit("values"))
    return df


# Initialize performance monitoring
timer = TimeLogger()

# Define table-specific transformations
transformations = {
    "*": transform_all,             # Applied to all tables
    "projects": transform_projects  # Applied only to projects table
}

# Configure source and destination tables
source_table = LakehouseTable(
    lakehouse="BronzeLakehouse",
    schema="schema",
    table="projects"
)
destination_table = LakehouseTable(
    lakehouse="SilverLakehouse",
    schema=source_table.schema,
    table=source_table.table
)

# Initialize and configure the ETL service
etl = SilverIngestionSCD2Service()
etl.init(
    spark_=spark,
    notebookutils_=notebookutils,
    source_table=source_table,
    destination_table=destination_table,
    nk_columns=NK_COLUMNS,
    constant_columns=CONSTANT_COLUMNS,
    is_delta_load=IS_DELTA_LOAD,
    delta_load_use_broadcast=DELTA_LOAD_USE_BROADCAST,
    transformations=transformations,
    exclude_comparing_columns=EXCLUDE_COLUMNS_FROM_COMPARING,
    include_comparing_columns=INCLUDE_COLUMNS_AT_COMPARING,
    historize=HISTORIZE,
    partition_by_columns=PARTITION_BY_COLUMNS,
    df_bronze=None
)


timer.start().log()
etl.run()
timer.stop().log()

Materialized Lake Views Management

Prerequisites

Configure a Utils Lakehouse as your default Lakehouse. The generated view SQL code will be saved as .sql.txt files in the lakehouse under /Files/mlv/{lakehouse}/{schema}/{table}.sql.txt.

from fabricengineer.mlv import MaterializeLakeView

# Initialize the Materialized Lake View manager
mlv = MaterializedLakeView(
    lakehouse="SilverBusinessLakehouse",
    schema="schema",
    table="projects"
)
print(mlv.to_dict())

# Define your custom SQL query
sql = """
SELECT
    p.id
    ,p.projectname
    ,p.budget
    ,u.name AS projectlead
FROM dbo.projects p
LEFT JOIN users u
ON p.projectlead_id = u.id
"""

# Create or replace the materialized view
result = mlv.create_or_replace(sql)
display(result)

Remote Module Import for Fabric Notebooks

Import specific package modules directly into your Fabric notebooks from GitHub releases:

# Cell 1:
import requests

VERSION = "1.0.0"
url = f"https://raw.githubusercontent.com/enricogoerlitz/fabricengineer-py/refs/tags/{VERSION}/src/fabricengineer/import_module/import_module.py"
resp = requests.get(url)
code = resp.text

exec(code, globals())  # This provides the 'import_module' function
assert code.startswith("import requests")
assert "def import_module" in code

# Cell 2
mlv_module = import_module("transform.mlv", VERSION)
scd2_module = import_module("transform.silver.scd2", VERSION)
insertonly_module = import_module("transform.silver.insertonly", VERSION)

# Cell 3 - Use mlv module
exec(mlv_module, globals())  # Provides MaterializedLakeView class and mlv instance

mlv.init(
    lakehouse="SilverBusinessLakehouse",
    schema="schema",
    table="projects"
)
print(mlv.to_dict())

# Cell 4 - Use scd2 module
exec(scd2_module, globals())  # Provides an instantiated etl object

etl.init(...)
print(str(etl))

# Cell 5 - Use insertonly module
exec(insertonly_module, globals())  # Provides an instantiated etl object

etl.init(...)
print(str(etl))

Advanced Features

Performance Optimization

  • Broadcast Joins: Automatically optimize small table joins
  • Partition Strategies: Intelligent partitioning for better query performance
  • Schema Evolution: Handle schema changes without breaking existing pipelines
  • Delta Load Processing: Efficient incremental data processing

Data Quality & Validation

  • Automatic Validation: Built-in checks for data consistency and quality
  • Type Safety: Comprehensive type annotations for better development experience
  • Error Handling: Robust error handling and recovery mechanisms

Monitoring & Logging

from fabricengineer.logging import TimeLogger, logger

# Performance monitoring
timer = TimeLogger()
timer.start().log()

# Your ETL operations here
etl.run()

timer.stop().log()

# Custom fabricengineer logging
logger.info("Custom log message")
logger.error("Error occurred during processing")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fabricengineer_py-1.0.1.tar.gz (97.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fabricengineer_py-1.0.1-py3-none-any.whl (24.9 kB view details)

Uploaded Python 3

File details

Details for the file fabricengineer_py-1.0.1.tar.gz.

File metadata

  • Download URL: fabricengineer_py-1.0.1.tar.gz
  • Upload date:
  • Size: 97.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for fabricengineer_py-1.0.1.tar.gz
Algorithm Hash digest
SHA256 bff2253688e5fe7fdc10c993d31b91d77873588fef2f3ff0076aa0d5210330b8
MD5 a973b28a3fff7d5500bac0fbce755454
BLAKE2b-256 769077adb1ad570afb02d6724ef57fb3b9a6f73d05d1a6a5d92886ddf9b4afaf

See more details on using hashes here.

Provenance

The following attestation bundles were made for fabricengineer_py-1.0.1.tar.gz:

Publisher: release.yml on enricogoerlitz/fabricengineer-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file fabricengineer_py-1.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for fabricengineer_py-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 628071b18ca871ed0abece4b20d04117f052a4298f89c5377d40bb7f7d0cab6c
MD5 d5a82482b69182126b2bc2f9cc85113b
BLAKE2b-256 e3e13f8a1e9fff6b892a071bed9070f6945b0fb42b658ad304f0bcc515305967

See more details on using hashes here.

Provenance

The following attestation bundles were made for fabricengineer_py-1.0.1-py3-none-any.whl:

Publisher: release.yml on enricogoerlitz/fabricengineer-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page