User-friendly PySpark helpers for Microsoft Fabric Lakehouses and Warehouses
Project description
fabrictools
User-friendly PySpark helpers for Microsoft Fabric — read, write, and merge Lakehouses and Warehouses with a single function call.
Features
- Auto-resolved paths — pass a Lakehouse or Warehouse name, no ABFS URL configuration required
- Auto-detected SparkSession — uses
SparkSession.builder.getOrCreate(), works seamlessly inside Fabric notebooks - Auto-detected format on read — tries Delta → Parquet → CSV automatically
- Auto-corrected Lakehouse read paths — supports bare or partial paths (e.g.
customers,dbo/customers) with fallback toTables/dbo/...thenFiles/... - Auto-corrected Lakehouse write paths — partial paths are normalized to
Tables/dbo/...while explicitFiles/...paths are preserved - Delta merge (upsert) — one-liner upsert into any Lakehouse Delta table
- Generic data cleaning — standard cleaning with one helper function
- Silver metadata enrichment — add ingestion/source metadata +
year/month/daypartitions - Data quality scan — detect nulls, blank strings, duplicates, and naming collisions
- Bulk lakehouse cleaning — iterate all tables and copy cleaned outputs to another Lakehouse
- Source → Prepared pipeline — profile schema, resolve semantics, transform, optimize, and publish BI assets
- Dimension generators — build
dimension_date,dimension_country, anddimension_citywith Lakehouse/Warehouse writes - Built-in logging — every operation logs its resolved path, detected format, and row/column count
Requirements
- Microsoft Fabric Spark runtime (provides
notebookutils,pyspark, anddelta-spark) - Python >= 3.9
Local development: install the
sparkextras to get PySpark and delta-spark.notebookutilsis only available inside Fabric — functions that resolve paths will raise a clearValueErroroutside Fabric.
Installation
# Inside a Fabric notebook or pipeline
pip install fabrictools
# Local development (includes PySpark + delta-spark)
pip install "fabrictools[spark]"
Quick start
import fabrictools as ft
Read a Lakehouse dataset
# Auto-detects Delta → Parquet → CSV
df = ft.read_lakehouse("BronzeLakehouse", "sales/2024")
# Also accepts partial paths:
# - "customers" -> tries Tables/dbo/customers then Files/customers
# - "dbo/customers" -> tries Tables/dbo/customers
Write to a Lakehouse
ft.write_lakehouse(
df,
lakehouse_name="SilverLakehouse",
relative_path="sales_clean",
mode="overwrite",
partition_by=["year", "month"], # optional
)
# Write path normalization:
# - "sales_clean" -> Tables/dbo/sales_clean
# - "dbo/sales_clean" -> Tables/dbo/sales_clean
# - "Tables/sales_clean" -> Tables/dbo/sales_clean
# - "Files/archive/sales_clean" stays unchanged
Merge (upsert) into a Delta table
ft.merge_lakehouse(
source_df=new_df,
lakehouse_name="SilverLakehouse",
relative_path="sales_clean",
merge_condition="src.id = tgt.id",
# update_set and insert_set are optional:
# omit them to update/insert all columns automatically
)
Clean data (generic)
clean_df = ft.clean_data(df)
By default it:
- normalizes columns to unique
snake_case - trims string values
- converts blank strings to
null - removes exact duplicates
- drops rows where all fields are
null
Scan data quality issues
scan_output = ft.scan_data_errors(df, include_samples=True)
# The function displays the summary DataFrame and chart automatically by default.
# You can disable rendering with display_results=False.
scan_output = ft.scan_data_errors(df, include_samples=True, display_results=False)
# Spark DataFrame with all scan information (dataset metrics, per-column details, collisions)
scan_output["summary_df"].show(truncate=False)
# Plotly figure (auto bar/pie depending on issue categories)
scan_output["figure"].show()
# Optional helpers
print(scan_output["issue_totals"])
print(scan_output["collisions"])
scan_data_errors now returns a user-friendly bundle with a tabular summary DataFrame and a Plotly chart.
Add Silver metadata + date partitions
silver_df = ft.add_silver_metadata(
df,
source_lakehouse_name="RawLakehouse",
source_relative_path="sales/raw",
source_layer="bronze", # optional
)
By default this adds:
ingestion_timestampsource_layersource_path(resolved candidate path actually used for source read)year,month,day
Read -> clean -> write in one call
clean_df = ft.clean_and_write_data(
source_lakehouse_name="RawLakehouse",
source_relative_path="sales/raw",
target_lakehouse_name="CuratedLakehouse",
target_relative_path="sales/clean",
mode="overwrite",
partition_by=["year"], # optional override
)
When partition_by is omitted, the helper writes with default partitions:
["year", "month", "day"].
Bulk clean all Lakehouse tables
bulk_result = ft.clean_and_write_all_tables(
source_lakehouse_name="RawLakehouse",
target_lakehouse_name="CuratedLakehouse",
mode="overwrite",
include_schemas=["dbo", "sales"], # optional
exclude_tables=["dbo.audit_log"], # optional: accepts "table" or "schema.table"
continue_on_error=True, # optional
)
print(bulk_result["successful_tables"], bulk_result["failed_tables"])
The helper scans Tables/<schema>/<table> in the source Lakehouse and writes to
the same relative path in the target Lakehouse.
Source -> Prepared pipeline (independent steps)
Each phase can be called independently, or orchestrated by helpers.
# 1) Discovery
schema_hash = ft.snapshot_source_schema(
source_lakehouse_name="CleanLakehouse",
source_relative_path="Tables/dbo/orders_clean",
)
# 2) Resolver (3 layers + unresolved audit)
source_df = ft.read_lakehouse("CleanLakehouse", "Tables/dbo/orders_clean")
mappings = ft.resolve_columns(
df=source_df,
source_lakehouse_name="CleanLakehouse",
schema_hash=schema_hash,
sample_size=500,
profiling_confidence_threshold=0.80,
)
# 3) Transformation
prepared_df = ft.transform_to_prepared(
df=source_df,
resolved_mappings=mappings,
source_lakehouse_name="CleanLakehouse",
)
# 4) Write + optimization
ft.write_prepared_table(
df=prepared_df,
resolved_mappings=mappings,
target_lakehouse_name="BusinessReadyLakehouse",
target_relative_path="Tables/dbo/orders_prepared",
mode="overwrite",
)
# 5) Optional BI outputs
agg_tables = ft.generate_prepared_aggregations(
source_lakehouse_name="CleanLakehouse",
target_lakehouse_name="BusinessReadyLakehouse",
target_relative_path="Tables/dbo/orders_prepared",
resolved_mappings=mappings,
)
Config tables for this pipeline are stored in the source lakehouse under Tables/config/:
source_schema_snapshotprefix_rulesprofiling_cachemapping_rulescode_labelspipeline_audit_log
Orchestration helpers:
# Single table
prepared_df = ft.prepare_and_write_data(
source_lakehouse_name="CleanLakehouse",
source_relative_path="Tables/dbo/orders_clean",
target_lakehouse_name="BusinessReadyLakehouse",
target_relative_path="Tables/dbo/orders_prepared",
mode="overwrite",
)
# Bulk mode (same pattern as clean_and_write_all_tables)
result = ft.prepare_and_write_all_tables(
source_lakehouse_name="CleanLakehouse",
target_lakehouse_name="BusinessReadyLakehouse",
mode="overwrite",
include_schemas=["dbo"],
continue_on_error=True,
)
print(result["successful_tables"], result["failed_tables"])
You can also drive the orchestration with an explicit TABLES_CONFIG:
TABLES_CONFIG = [
{
"bronze_path": "Tables/dbo/fact_sale",
"silver_table": "Tables/dbo/fact_sale",
"partition_by": [],
"mode": "overwrite", # overwrite | append | merge
},
{
"bronze_path": "Tables/dbo/dimension_customer",
"silver_table": "Tables/dbo/dimension_customer",
"partition_by": [],
"mode": "append",
},
{
"bronze_path": "Tables/dbo/fact_sale_updates",
"silver_table": "Tables/dbo/fact_sale",
"partition_by": [],
"mode": "merge",
"merge_condition": "src.sale_id = tgt.sale_id", # required when mode='merge'
},
]
bulk_result = ft.clean_and_write_all_tables(
source_lakehouse_name="RawLakehouse",
target_lakehouse_name="CuratedLakehouse",
tables_config=TABLES_CONFIG,
continue_on_error=True,
)
tables_config keys:
bronze_path(required): source relative path in Lakehouse.silver_table(required): target relative path in Lakehouse.partition_by(optional): partition columns used by overwrite/append writes.mode(required):overwrite,append, ormerge.merge_condition(required whenmode="merge"): join condition used by Delta merge.
With explicit column mappings:
ft.merge_lakehouse(
source_df=new_df,
lakehouse_name="SilverLakehouse",
relative_path="sales_clean",
merge_condition="src.id = tgt.id",
update_set={"amount": "src.amount", "updated_at": "src.updated_at"},
insert_set={"id": "src.id", "amount": "src.amount", "updated_at": "src.updated_at"},
)
Read from a Warehouse
df = ft.read_warehouse("MyWarehouse", "SELECT * FROM dbo.sales WHERE year = 2024")
Write to a Warehouse
ft.write_warehouse(
df,
warehouse_name="MyWarehouse",
table="dbo.sales_clean",
mode="overwrite", # or "append"
batch_size=10_000, # optional, default 10 000
)
Generate dimension tables
dims = ft.generate_dimensions(
lakehouse_name="CuratedLakehouse",
warehouse_name="MyWarehouse",
include_date=True,
include_country=True,
include_city=True,
# Optional range for dimension_date; defaults to rolling -10y / +2y
start_date="2015-01-01",
end_date="2030-12-31",
# Optional fiscal year start month for dimension_date (1=Jan ... 12=Dec)
fiscal_year_start_month=1,
# Optional controls for countrystatecity-countries source
countries_limit=None,
include_states_metadata=True,
fail_on_source_error=True,
# Optional city filters (narrow down which cities are generated)
city_regions=["Europe"], # filter by region
city_subregions=["Western Europe"], # filter by subregion
city_countries=["FR", "DEU", "Belgium"], # filter by code2/code3/name
)
# Access generated DataFrames
dims["dimension_date"].show(5)
dims["dimension_country"].show(5)
dims["dimension_city"].show(5)
dimension_city now includes country attributes: country_code_3, country_key, region, subregion, and population.
population is enriched from GeoDB Cities API using:
GEODB_API_KEY(preferred), orFABRICTOOLS_GEODB_API_KEY(fallback)
When multiple cities share the same name in a country, matching uses country + city, then narrows with state/province metadata when available. If GeoDB is unavailable or no match is found, population remains NULL and generation continues.
You can also filter cities directly via build_dimension_city:
city_df = ft.build_dimension_city(
regions=["Americas"],
countries=["US", "CAN"],
)
Filters accept case-insensitive values and are combined with AND logic.
The countries parameter accepts any mix of country_code_2, country_code_3, or country_name.
dimension_date also includes calendar/fiscal attributes and labels:
short_month,calendar_year,calendar_monthfiscal_year,fiscal_month(fromfiscal_year_start_month, default1)calendar_year_label,calendar_month_label(format:CYyyyy-MMM)fiscal_year_label,fiscal_month_label(format:FYyyyy-MMM)iso_week_number
API reference
Lakehouse
| Function | Description |
|---|---|
read_lakehouse(lakehouse_name, relative_path, spark=None) |
Read a dataset — auto-detects Delta / Parquet / CSV |
write_lakehouse(df, lakehouse_name, relative_path, mode, partition_by, format, spark=None) |
Write a DataFrame (default: Delta, overwrite) |
merge_lakehouse(source_df, lakehouse_name, relative_path, merge_condition, update_set, insert_set, spark=None) |
Upsert via Delta merge |
clean_data(df, drop_duplicates, drop_all_null_rows) |
Apply standard generic cleaning to a DataFrame |
add_silver_metadata(df, source_lakehouse_name, source_relative_path, source_layer, ingestion_timestamp_col, source_layer_col, source_path_col, year_col, month_col, day_col, spark=None) |
Add Silver metadata and date partition columns, with resolved source path |
scan_data_errors(df, include_samples) |
Report common data-quality issues |
clean_and_write_data(source_lakehouse_name, source_relative_path, target_lakehouse_name, target_relative_path, mode, partition_by, spark=None) |
Read, clean, add Silver metadata, and write in one helper |
clean_and_write_all_tables(source_lakehouse_name, target_lakehouse_name, mode, partition_by, tables_config, include_schemas, exclude_tables, continue_on_error, spark=None) |
Discover tables or use config entries, then clean/write or merge per table |
Warehouse
| Function | Description |
|---|---|
read_warehouse(warehouse_name, query, spark=None) |
Run a SQL query, return a DataFrame |
write_warehouse(df, warehouse_name, table, mode, batch_size, spark=None) |
Write to a Warehouse table via JDBC |
Dimensions
| Function | Description |
|---|---|
build_dimension_date(start_date=None, end_date=None, fiscal_year_start_month=1, lakehouse_name=None, lakehouse_relative_path=None, mode="overwrite", spark=None) |
Build a date dimension DataFrame (with calendar/fiscal columns, labels, ISO week number) and optionally write it to Lakehouse |
build_dimension_country(countries_limit=None, fail_on_source_error=True, lakehouse_name=None, lakehouse_relative_path=None, mode="overwrite", spark=None) |
Build country dimension and optionally write it to Lakehouse |
build_dimension_city(countries_limit=None, include_states_metadata=True, fail_on_source_error=True, regions=None, subregions=None, countries=None, lakehouse_name=None, lakehouse_relative_path=None, mode="overwrite", spark=None) |
Build city dimension (with country attributes and population from GeoDB, plus optional region/subregion/country filters) and optionally write it to Lakehouse |
generate_dimensions(lakehouse_name, warehouse_name, ..., city_regions=None, city_subregions=None, city_countries=None, ...) |
Build and write selected dimensions to Lakehouse and Warehouse |
Source to Prepared
| Function | Description |
|---|---|
snapshot_source_schema(source_lakehouse_name, source_relative_path, spark=None) |
Passive profiling of source table + schema hash generation |
resolve_columns(df, source_lakehouse_name, schema_hash=None, sample_size=500, profiling_confidence_threshold=0.80, unresolved_webhook_url=None, spark=None) |
3-layer semantic resolver with confidence score and unresolved fallback audit |
transform_to_prepared(df, resolved_mappings, source_lakehouse_name, spark=None) |
Apply semantic casts, code labels, and date-derived columns in one select pass |
write_prepared_table(df, resolved_mappings, target_lakehouse_name, target_relative_path, mode="overwrite", max_partitions_guard=500, vacuum_retention_hours=168, spark=None) |
Write prepared table with partition strategy and conditional maintenance |
generate_prepared_aggregations(source_lakehouse_name, target_lakehouse_name, target_relative_path, resolved_mappings, spark=None) |
Build default prepared aggregations (prepared_agg_jour, prepared_agg_semaine, prepared_agg_region) |
publish_semantic_model(target_lakehouse_name, agg_tables, resolved_mappings, power_bi_workspace_id, power_bi_token, spark=None) |
Publish/update semantic model via Power BI/Fabric REST API |
prepare_and_write_data(source_lakehouse_name, source_relative_path, target_lakehouse_name, target_relative_path, ...) |
End-to-end orchestration for one table |
prepare_and_write_all_tables(source_lakehouse_name, target_lakehouse_name, ...) |
Bulk orchestration with discovery or explicit table config |
How path resolution works
lakehouse_name="BronzeLakehouse"
│
▼
notebookutils.lakehouse.get("BronzeLakehouse")
│
▼
lh.properties.abfsPath
= "abfss://bronze@<account>.dfs.core.windows.net"
│
▼
full_path = abfsPath + "/" + relative_path
Running the tests
pip install "fabrictools[dev]"
pytest
Publishing to PyPI
See docs/PYPI_PUBLISH.md for a step-by-step guide.
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fabrictools-0.4.2.tar.gz.
File metadata
- Download URL: fabrictools-0.4.2.tar.gz
- Upload date:
- Size: 49.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9ed957fdba279b3552e0ef89cc5d62670a6de9c8ae8c8a29ea4a6eaed7deaa7f
|
|
| MD5 |
7054687bb9fd088720c4926700a19b30
|
|
| BLAKE2b-256 |
3911fb9324490675543ec430f11d52d5db292f8ad0bdd6101d294a21889d0b86
|
Provenance
The following attestation bundles were made for fabrictools-0.4.2.tar.gz:
Publisher:
publish.yml on willykinfoussia/FabricPackage
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
fabrictools-0.4.2.tar.gz -
Subject digest:
9ed957fdba279b3552e0ef89cc5d62670a6de9c8ae8c8a29ea4a6eaed7deaa7f - Sigstore transparency entry: 1135222262
- Sigstore integration time:
-
Permalink:
willykinfoussia/FabricPackage@478032e3c6e3ee3c1c4f1c4bc8098f89f842038a -
Branch / Tag:
refs/tags/v0.4.2 - Owner: https://github.com/willykinfoussia
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@478032e3c6e3ee3c1c4f1c4bc8098f89f842038a -
Trigger Event:
push
-
Statement type:
File details
Details for the file fabrictools-0.4.2-py3-none-any.whl.
File metadata
- Download URL: fabrictools-0.4.2-py3-none-any.whl
- Upload date:
- Size: 36.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ae63d061ff9d1edce65bbd8fb0d212597a2f9f84317589cd4c03194a0638814f
|
|
| MD5 |
3777a7d224f6bc07669c747f462218ed
|
|
| BLAKE2b-256 |
cffccdfcc3e6590d4800ba9aeaa86399a60d4196fae1c922cc7701fffd5c1a5a
|
Provenance
The following attestation bundles were made for fabrictools-0.4.2-py3-none-any.whl:
Publisher:
publish.yml on willykinfoussia/FabricPackage
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
fabrictools-0.4.2-py3-none-any.whl -
Subject digest:
ae63d061ff9d1edce65bbd8fb0d212597a2f9f84317589cd4c03194a0638814f - Sigstore transparency entry: 1135222336
- Sigstore integration time:
-
Permalink:
willykinfoussia/FabricPackage@478032e3c6e3ee3c1c4f1c4bc8098f89f842038a -
Branch / Tag:
refs/tags/v0.4.2 - Owner: https://github.com/willykinfoussia
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@478032e3c6e3ee3c1c4f1c4bc8098f89f842038a -
Trigger Event:
push
-
Statement type: