Skip to main content

User-friendly PySpark helpers for Microsoft Fabric Lakehouses and Warehouses

Project description

fabrictools

User-friendly PySpark helpers for Microsoft Fabric — read, write, and merge Lakehouses and Warehouses with a single function call.


Features

  • Auto-resolved paths — pass a Lakehouse or Warehouse name, no ABFS URL configuration required
  • Auto-detected SparkSession — uses SparkSession.builder.getOrCreate(), works seamlessly inside Fabric notebooks
  • Auto-detected format on read — tries Delta → Parquet → CSV automatically
  • Auto-corrected Lakehouse read paths — supports bare or partial paths (e.g. customers, dbo/customers) with fallback to Tables/dbo/... then Files/...
  • Auto-corrected Lakehouse write paths — partial paths are normalized to Tables/dbo/... while explicit Files/... paths are preserved
  • Delta merge (upsert) — one-liner upsert into any Lakehouse Delta table
  • Generic data cleaning — standard cleaning with one helper function
  • Silver metadata enrichment — add ingestion/source metadata + year/month/day partitions
  • Data quality scan — detect nulls, blank strings, duplicates, and naming collisions
  • Built-in logging — every operation logs its resolved path, detected format, and row/column count

Requirements

  • Microsoft Fabric Spark runtime (provides notebookutils, pyspark, and delta-spark)
  • Python >= 3.9

Local development: install the spark extras to get PySpark and delta-spark. notebookutils is only available inside Fabric — functions that resolve paths will raise a clear ValueError outside Fabric.


Installation

# Inside a Fabric notebook or pipeline
pip install fabrictools

# Local development (includes PySpark + delta-spark)
pip install "fabrictools[spark]"

Quick start

import fabrictools as ft

Read a Lakehouse dataset

# Auto-detects Delta → Parquet → CSV
df = ft.read_lakehouse("BronzeLakehouse", "sales/2024")

# Also accepts partial paths:
# - "customers"      -> tries Tables/dbo/customers then Files/customers
# - "dbo/customers"  -> tries Tables/dbo/customers

Write to a Lakehouse

ft.write_lakehouse(
    df,
    lakehouse_name="SilverLakehouse",
    relative_path="sales_clean",
    mode="overwrite",
    partition_by=["year", "month"],   # optional
)

# Write path normalization:
# - "sales_clean"      -> Tables/dbo/sales_clean
# - "dbo/sales_clean"  -> Tables/dbo/sales_clean
# - "Tables/sales_clean" -> Tables/dbo/sales_clean
# - "Files/archive/sales_clean" stays unchanged

Merge (upsert) into a Delta table

ft.merge_lakehouse(
    source_df=new_df,
    lakehouse_name="SilverLakehouse",
    relative_path="sales_clean",
    merge_condition="src.id = tgt.id",
    # update_set and insert_set are optional:
    # omit them to update/insert all columns automatically
)

Clean data (generic)

clean_df = ft.clean_data(df)

By default it:

  • normalizes columns to unique snake_case
  • trims string values
  • converts blank strings to null
  • removes exact duplicates
  • drops rows where all fields are null

Scan data quality issues

scan_output = ft.scan_data_errors(df, include_samples=True)

# The function displays the summary DataFrame and chart automatically by default.
# You can disable rendering with display_results=False.
scan_output = ft.scan_data_errors(df, include_samples=True, display_results=False)

# Spark DataFrame with all scan information (dataset metrics, per-column details, collisions)
scan_output["summary_df"].show(truncate=False)

# Plotly figure (auto bar/pie depending on issue categories)
scan_output["figure"].show()

# Optional helpers
print(scan_output["issue_totals"])
print(scan_output["collisions"])

scan_data_errors now returns a user-friendly bundle with a tabular summary DataFrame and a Plotly chart.

Add Silver metadata + date partitions

silver_df = ft.add_silver_metadata(
    df,
    source_lakehouse_name="RawLakehouse",
    source_relative_path="sales/raw",
    source_layer="bronze",  # optional
)

By default this adds:

  • ingestion_timestamp
  • source_layer
  • source_path (resolved candidate path actually used for source read)
  • year, month, day

Read -> clean -> write in one call

clean_df = ft.clean_and_write_data(
    source_lakehouse_name="RawLakehouse",
    source_relative_path="sales/raw",
    target_lakehouse_name="CuratedLakehouse",
    target_relative_path="sales/clean",
    mode="overwrite",
    partition_by=["year"],  # optional override
)

When partition_by is omitted, the helper writes with default partitions: ["year", "month", "day"].

With explicit column mappings:

ft.merge_lakehouse(
    source_df=new_df,
    lakehouse_name="SilverLakehouse",
    relative_path="sales_clean",
    merge_condition="src.id = tgt.id",
    update_set={"amount": "src.amount", "updated_at": "src.updated_at"},
    insert_set={"id": "src.id", "amount": "src.amount", "updated_at": "src.updated_at"},
)

Read from a Warehouse

df = ft.read_warehouse("MyWarehouse", "SELECT * FROM dbo.sales WHERE year = 2024")

Write to a Warehouse

ft.write_warehouse(
    df,
    warehouse_name="MyWarehouse",
    table="dbo.sales_clean",
    mode="overwrite",       # or "append"
    batch_size=10_000,      # optional, default 10 000
)

API reference

Lakehouse

Function Description
read_lakehouse(lakehouse_name, relative_path, spark=None) Read a dataset — auto-detects Delta / Parquet / CSV
write_lakehouse(df, lakehouse_name, relative_path, mode, partition_by, format, spark=None) Write a DataFrame (default: Delta, overwrite)
merge_lakehouse(source_df, lakehouse_name, relative_path, merge_condition, update_set, insert_set, spark=None) Upsert via Delta merge
clean_data(df, drop_duplicates, drop_all_null_rows) Apply standard generic cleaning to a DataFrame
add_silver_metadata(df, source_lakehouse_name, source_relative_path, source_layer, ingestion_timestamp_col, source_layer_col, source_path_col, year_col, month_col, day_col, spark=None) Add Silver metadata and date partition columns, with resolved source path
scan_data_errors(df, include_samples) Report common data-quality issues
clean_and_write_data(source_lakehouse_name, source_relative_path, target_lakehouse_name, target_relative_path, mode, partition_by, spark=None) Read, clean, add Silver metadata, and write in one helper

Warehouse

Function Description
read_warehouse(warehouse_name, query, spark=None) Run a SQL query, return a DataFrame
write_warehouse(df, warehouse_name, table, mode, batch_size, spark=None) Write to a Warehouse table via JDBC

How path resolution works

lakehouse_name="BronzeLakehouse"
       │
       ▼
notebookutils.lakehouse.get("BronzeLakehouse")
       │
       ▼
lh.properties.abfsPath
= "abfss://bronze@<account>.dfs.core.windows.net"
       │
       ▼
full_path = abfsPath + "/" + relative_path

Running the tests

pip install "fabrictools[dev]"
pytest

Publishing to PyPI

See docs/PYPI_PUBLISH.md for a step-by-step guide.


License

MIT

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fabrictools-0.2.5.tar.gz (21.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fabrictools-0.2.5-py3-none-any.whl (17.5 kB view details)

Uploaded Python 3

File details

Details for the file fabrictools-0.2.5.tar.gz.

File metadata

  • Download URL: fabrictools-0.2.5.tar.gz
  • Upload date:
  • Size: 21.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for fabrictools-0.2.5.tar.gz
Algorithm Hash digest
SHA256 5bc2c28222700bb2d53cbf437ccc750ffd1d950a04f200ada1ec0fc7e7c66273
MD5 3555e1fa17cb827706283bc05c7d0c56
BLAKE2b-256 d1210193d8dc913d637d3b8669d1c8b49cc95babc9224eec316edf889c5e1fa9

See more details on using hashes here.

Provenance

The following attestation bundles were made for fabrictools-0.2.5.tar.gz:

Publisher: publish.yml on willykinfoussia/FabricPackage

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file fabrictools-0.2.5-py3-none-any.whl.

File metadata

  • Download URL: fabrictools-0.2.5-py3-none-any.whl
  • Upload date:
  • Size: 17.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for fabrictools-0.2.5-py3-none-any.whl
Algorithm Hash digest
SHA256 9cf8dc58ae2262123e24124943c0f8a1276eadda8c707a33758f4559c968d5a5
MD5 cb11c14db7ea21eb9c212a69c356ecea
BLAKE2b-256 b1f5bb5e936bd5a4493821ae823794e11131ddc717d35d015cb72687972cc5db

See more details on using hashes here.

Provenance

The following attestation bundles were made for fabrictools-0.2.5-py3-none-any.whl:

Publisher: publish.yml on willykinfoussia/FabricPackage

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page