Skip to main content

User-friendly PySpark helpers for Microsoft Fabric Lakehouses and Warehouses

Project description

fabrictools

User-friendly PySpark helpers for Microsoft Fabric — read, write, and merge Lakehouses and Warehouses with a single function call.


Features

  • Auto-resolved paths — pass a Lakehouse or Warehouse name, no ABFS URL configuration required
  • Auto-detected SparkSession — uses SparkSession.builder.getOrCreate(), works seamlessly inside Fabric notebooks
  • Auto-detected format on read — tries Delta → Parquet → CSV automatically
  • Auto-corrected Lakehouse read paths — supports bare or partial paths (e.g. customers, dbo/customers) with fallback to Tables/dbo/... then Files/...
  • Auto-corrected Lakehouse write paths — partial paths are normalized to Tables/dbo/... while explicit Files/... paths are preserved
  • Delta merge (upsert) — one-liner upsert into any Lakehouse Delta table
  • Generic data cleaning — standard cleaning with one helper function
  • Silver metadata enrichment — add ingestion/source metadata + year/month/day partitions
  • Data quality scan — detect nulls, blank strings, duplicates, and naming collisions
  • Bulk lakehouse cleaning — iterate all tables and copy cleaned outputs to another Lakehouse
  • Dimension generators — build dimension_date, dimension_country, and dimension_city with Lakehouse/Warehouse writes
  • Built-in logging — every operation logs its resolved path, detected format, and row/column count

Requirements

  • Microsoft Fabric Spark runtime (provides notebookutils, pyspark, and delta-spark)
  • Python >= 3.9

Local development: install the spark extras to get PySpark and delta-spark. notebookutils is only available inside Fabric — functions that resolve paths will raise a clear ValueError outside Fabric.


Installation

# Inside a Fabric notebook or pipeline
pip install fabrictools

# Local development (includes PySpark + delta-spark)
pip install "fabrictools[spark]"

Quick start

import fabrictools as ft

Read a Lakehouse dataset

# Auto-detects Delta → Parquet → CSV
df = ft.read_lakehouse("BronzeLakehouse", "sales/2024")

# Also accepts partial paths:
# - "customers"      -> tries Tables/dbo/customers then Files/customers
# - "dbo/customers"  -> tries Tables/dbo/customers

Write to a Lakehouse

ft.write_lakehouse(
    df,
    lakehouse_name="SilverLakehouse",
    relative_path="sales_clean",
    mode="overwrite",
    partition_by=["year", "month"],   # optional
)

# Write path normalization:
# - "sales_clean"      -> Tables/dbo/sales_clean
# - "dbo/sales_clean"  -> Tables/dbo/sales_clean
# - "Tables/sales_clean" -> Tables/dbo/sales_clean
# - "Files/archive/sales_clean" stays unchanged

Merge (upsert) into a Delta table

ft.merge_lakehouse(
    source_df=new_df,
    lakehouse_name="SilverLakehouse",
    relative_path="sales_clean",
    merge_condition="src.id = tgt.id",
    # update_set and insert_set are optional:
    # omit them to update/insert all columns automatically
)

Clean data (generic)

clean_df = ft.clean_data(df)

By default it:

  • normalizes columns to unique snake_case
  • trims string values
  • converts blank strings to null
  • removes exact duplicates
  • drops rows where all fields are null

Scan data quality issues

scan_output = ft.scan_data_errors(df, include_samples=True)

# The function displays the summary DataFrame and chart automatically by default.
# You can disable rendering with display_results=False.
scan_output = ft.scan_data_errors(df, include_samples=True, display_results=False)

# Spark DataFrame with all scan information (dataset metrics, per-column details, collisions)
scan_output["summary_df"].show(truncate=False)

# Plotly figure (auto bar/pie depending on issue categories)
scan_output["figure"].show()

# Optional helpers
print(scan_output["issue_totals"])
print(scan_output["collisions"])

scan_data_errors now returns a user-friendly bundle with a tabular summary DataFrame and a Plotly chart.

Add Silver metadata + date partitions

silver_df = ft.add_silver_metadata(
    df,
    source_lakehouse_name="RawLakehouse",
    source_relative_path="sales/raw",
    source_layer="bronze",  # optional
)

By default this adds:

  • ingestion_timestamp
  • source_layer
  • source_path (resolved candidate path actually used for source read)
  • year, month, day

Read -> clean -> write in one call

clean_df = ft.clean_and_write_data(
    source_lakehouse_name="RawLakehouse",
    source_relative_path="sales/raw",
    target_lakehouse_name="CuratedLakehouse",
    target_relative_path="sales/clean",
    mode="overwrite",
    partition_by=["year"],  # optional override
)

When partition_by is omitted, the helper writes with default partitions: ["year", "month", "day"].

Bulk clean all Lakehouse tables

bulk_result = ft.clean_and_write_all_tables(
    source_lakehouse_name="RawLakehouse",
    target_lakehouse_name="CuratedLakehouse",
    mode="overwrite",
    include_schemas=["dbo", "sales"],   # optional
    exclude_tables=["dbo.audit_log"],   # optional: accepts "table" or "schema.table"
    continue_on_error=True,             # optional
)

print(bulk_result["successful_tables"], bulk_result["failed_tables"])

The helper scans Tables/<schema>/<table> in the source Lakehouse and writes to the same relative path in the target Lakehouse.

You can also drive the orchestration with an explicit TABLES_CONFIG:

TABLES_CONFIG = [
    {
        "bronze_path": "Tables/dbo/fact_sale",
        "silver_table": "Tables/dbo/fact_sale",
        "partition_by": [],
        "mode": "overwrite",  # overwrite | append | merge
    },
    {
        "bronze_path": "Tables/dbo/dimension_customer",
        "silver_table": "Tables/dbo/dimension_customer",
        "partition_by": [],
        "mode": "append",
    },
    {
        "bronze_path": "Tables/dbo/fact_sale_updates",
        "silver_table": "Tables/dbo/fact_sale",
        "partition_by": [],
        "mode": "merge",
        "merge_condition": "src.sale_id = tgt.sale_id",  # required when mode='merge'
    },
]

bulk_result = ft.clean_and_write_all_tables(
    source_lakehouse_name="RawLakehouse",
    target_lakehouse_name="CuratedLakehouse",
    tables_config=TABLES_CONFIG,
    continue_on_error=True,
)

tables_config keys:

  • bronze_path (required): source relative path in Lakehouse.
  • silver_table (required): target relative path in Lakehouse.
  • partition_by (optional): partition columns used by overwrite/append writes.
  • mode (required): overwrite, append, or merge.
  • merge_condition (required when mode="merge"): join condition used by Delta merge.

With explicit column mappings:

ft.merge_lakehouse(
    source_df=new_df,
    lakehouse_name="SilverLakehouse",
    relative_path="sales_clean",
    merge_condition="src.id = tgt.id",
    update_set={"amount": "src.amount", "updated_at": "src.updated_at"},
    insert_set={"id": "src.id", "amount": "src.amount", "updated_at": "src.updated_at"},
)

Read from a Warehouse

df = ft.read_warehouse("MyWarehouse", "SELECT * FROM dbo.sales WHERE year = 2024")

Write to a Warehouse

ft.write_warehouse(
    df,
    warehouse_name="MyWarehouse",
    table="dbo.sales_clean",
    mode="overwrite",       # or "append"
    batch_size=10_000,      # optional, default 10 000
)

Generate dimension tables

dims = ft.generate_dimensions(
    lakehouse_name="CuratedLakehouse",
    warehouse_name="MyWarehouse",
    include_date=True,
    include_country=True,
    include_city=True,
    # Optional range for dimension_date; defaults to rolling -10y / +2y
    start_date="2015-01-01",
    end_date="2030-12-31",
    # Optional controls for countrystatecity-countries source
    countries_limit=None,
    include_states_metadata=True,
    fail_on_source_error=True,
)

# Access generated DataFrames
dims["dimension_date"].show(5)
dims["dimension_country"].show(5)
dims["dimension_city"].show(5)

API reference

Lakehouse

Function Description
read_lakehouse(lakehouse_name, relative_path, spark=None) Read a dataset — auto-detects Delta / Parquet / CSV
write_lakehouse(df, lakehouse_name, relative_path, mode, partition_by, format, spark=None) Write a DataFrame (default: Delta, overwrite)
merge_lakehouse(source_df, lakehouse_name, relative_path, merge_condition, update_set, insert_set, spark=None) Upsert via Delta merge
clean_data(df, drop_duplicates, drop_all_null_rows) Apply standard generic cleaning to a DataFrame
add_silver_metadata(df, source_lakehouse_name, source_relative_path, source_layer, ingestion_timestamp_col, source_layer_col, source_path_col, year_col, month_col, day_col, spark=None) Add Silver metadata and date partition columns, with resolved source path
scan_data_errors(df, include_samples) Report common data-quality issues
clean_and_write_data(source_lakehouse_name, source_relative_path, target_lakehouse_name, target_relative_path, mode, partition_by, spark=None) Read, clean, add Silver metadata, and write in one helper
clean_and_write_all_tables(source_lakehouse_name, target_lakehouse_name, mode, partition_by, tables_config, include_schemas, exclude_tables, continue_on_error, spark=None) Discover tables or use config entries, then clean/write or merge per table

Warehouse

Function Description
read_warehouse(warehouse_name, query, spark=None) Run a SQL query, return a DataFrame
write_warehouse(df, warehouse_name, table, mode, batch_size, spark=None) Write to a Warehouse table via JDBC

Dimensions

Function Description
build_dimension_date(start_date=None, end_date=None, spark=None) Build a date dimension DataFrame
build_dimension_country(countries_limit=None, fail_on_source_error=True, spark=None) Build country dimension from countrystatecity-countries
build_dimension_city(countries_limit=None, include_states_metadata=True, fail_on_source_error=True, spark=None) Build city dimension from countrystatecity-countries
generate_dimensions(lakehouse_name, warehouse_name, include_date, include_country, include_city, ...) Build and write selected dimensions to Lakehouse and Warehouse

How path resolution works

lakehouse_name="BronzeLakehouse"
       │
       ▼
notebookutils.lakehouse.get("BronzeLakehouse")
       │
       ▼
lh.properties.abfsPath
= "abfss://bronze@<account>.dfs.core.windows.net"
       │
       ▼
full_path = abfsPath + "/" + relative_path

Running the tests

pip install "fabrictools[dev]"
pytest

Publishing to PyPI

See docs/PYPI_PUBLISH.md for a step-by-step guide.


License

MIT

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fabrictools-0.3.1.tar.gz (31.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fabrictools-0.3.1-py3-none-any.whl (23.8 kB view details)

Uploaded Python 3

File details

Details for the file fabrictools-0.3.1.tar.gz.

File metadata

  • Download URL: fabrictools-0.3.1.tar.gz
  • Upload date:
  • Size: 31.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for fabrictools-0.3.1.tar.gz
Algorithm Hash digest
SHA256 b4d489894dede17d8804cc60888bab5cc5282ab52d8669a9feadd3f189e60db1
MD5 30b1b5b74d7825147423e42833c3d727
BLAKE2b-256 dc5c6150e15355a2384d817bcff83d415cfa5341df6ea733f9e9e870ea75d708

See more details on using hashes here.

Provenance

The following attestation bundles were made for fabrictools-0.3.1.tar.gz:

Publisher: publish.yml on willykinfoussia/FabricPackage

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file fabrictools-0.3.1-py3-none-any.whl.

File metadata

  • Download URL: fabrictools-0.3.1-py3-none-any.whl
  • Upload date:
  • Size: 23.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for fabrictools-0.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 15bc9c441a07faa1d46673e32e3173ec1c467f4140a8fa99afcc9cf4bde6bdbc
MD5 2d940cedb3b052b1014ca53482ca34c2
BLAKE2b-256 0e132e39770320c21e9fc956cb1af57733df5341523366460df0c622ef3dd8b6

See more details on using hashes here.

Provenance

The following attestation bundles were made for fabrictools-0.3.1-py3-none-any.whl:

Publisher: publish.yml on willykinfoussia/FabricPackage

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page