Skip to main content

A Python package for Azure Datalake Storage and Microsoft Fabric Lakehouse, enables format detection and schema retrieval for Iceberg, Delta, and Parquet formats.It also helps identify paritioned columns for parquet datasets.

Project description

DataLakeSurfer

A unified toolkit for detecting, exploring, and validating data lake formats across Azure Data Lake Storage Gen2 and Microsoft Fabric Lakehouse.

DataLakeSurfer lets you:

  • Detect whether a directory is Iceberg, Delta, or Parquet.
  • Retrieve normalized schema for supported formats.
  • Detect partition columns for Parquet datasets.
  • Validate configuration inputs with Pydantic models.
  • Work seamlessly with both ADLS Gen2 and Fabric Lakehouse.

📑 Table of Contents

  1. Installation
  2. Quick Start
  3. Core Concepts
  4. Usage Examples
  5. Supported Formats
  6. Development & Testing
  7. License

🚀 Installation

pip install datalakesurfer

Requirements:

  • Python 3.9+
  • pyarrowfs-adlgen2==0.2.5
  • adlfs==2024.12.0
  • azure-identity==1.17.1
  • azure-storage-file-datalake==12.21.0
  • numpy==1.26.4
  • pandas==1.5.3
  • cryptography==43.0.1
  • duckdb==1.3.2
  • deltalake==1.1.4
  • pydantic==2.11.7
  • gcsfs==2025.7.0
  • s3fs==2025.7.0

âš¡ Quick Start

Generate Token

from azure.identity import DefaultAzureCredential, ClientSecretCredential
from azure.core.credentials import AccessToken

class CustomTokenCredential():
    def __init__(self):
        self.credential = DefaultAzureCredential()

    def get_token(self):
        token_response = self.credential.get_token("https://storage.azure.com/.default")

        return (token_response.token, token_response.expires_on)

class CustomTokenCredentialSeconday(object):
    def __init__(self,token,expires_on):
        self.token = token
        self.expires_on = expires_on

    def get_token(self, *scopes, **kwargs):
        return AccessToken(self.token, self.expires_on)

token,expires_on = CustomTokenCredential().get_token()
print(token)
print(expires_on)
from datalakesurfer.format_detector import FormatDetector
f = FormatDetector(account_name="adlssynapseeus01",
    file_system_name="datacontainer",
    directory_path="salesorder",
    token=token,
    expires_on=expires_on).detect_format()
print(f)

# → {'status': 'success', 'format': 'delta'}

🧠 Core Concepts

  • Format Detectors: Identify dataset format (iceberg, delta, parquet).
  • Schema Retrievers: Extract a normalized schema regardless of source format.
  • Partition Detectors: Identify partitioning and list partition columns.
  • Models: All incoming parameters validated by Pydantic for correctness.

📚 Usage Examples

1. Detect Format (ADLS)

from datalakesurfer.format_detector import FormatDetector
f = FormatDetector(account_name="adlssynapseeus01",
    file_system_name="iceberg-container",
    directory_path="customerdw/mydb/product",
    token=token,
    expires_on=expires_on).detect_format()
print(f)
# {'status': 'success', 'format': 'iceberg'}

from datalakesurfer.format_detector import FormatDetector
f = FormatDetector(account_name="adlssynapseeus01",
    file_system_name="datacontainer",
    directory_path="SaleParquetData",
    token=token,
    expires_on=expires_on).detect_format()
print(f)
# {'status': 'success', 'format': 'parquet'}

2. Detect Format (Fabric)

from datalakesurfer.fabric_format_detector import FabricFormatDetector
f = FabricFormatDetector(account_name="onelake",
    file_system_name="devworkspace",
    directory_path="devlakehouse.Lakehouse/Files/salesorder",
    token=token,
    expires_on=expires_on).detect_format()
print(f)

#abfss://devworkspace@onelake.dfs.fabric.microsoft.com/devlakehouse.Lakehouse/Files/salesorder
# {'status': 'success', 'format': 'delta'}

3. Retrieve Schema

For Parquet/Delta/Iceberg in ADLS:

from datalakesurfer.schemas.delta_schema import DeltaSchemaRetriever
s = DeltaSchemaRetriever(account_name="adlssynapseeus01",
    file_system_name="datacontainer",
    directory_path="salesorder",
    token=token,
    expires_on=expires_on).get_schema()
print(s)
#{'status': 'success', 'schema': [{'column_name': 'SalesId', 'dtype': 'string'}, {'column_name': 'ProductName', 'dtype': 'string'}, {'column_name': 'SalesDateTime', 'dtype': 'timestamp'}, {'column_name': 'SalesAmount', 'dtype': 'int'}, {'column_name': 'EventProcessingTime', 'dtype': 'timestamp'}]}

from datalakesurfer.schemas.iceberg_schema import IcebergSchemaRetriever
s = IcebergSchemaRetriever(account_name="adlssynapseeus01",
    file_system_name="iceberg-container",
    directory_path="customerdw/mydb/product",
    token=token,
    expires_on=expires_on).get_schema()
print(s)
#{'status': 'success', 'schema': [{'column_name': 'productId', 'dtype': 'string'}, {'column_name': 'productName', 'dtype': 'string'}, {'column_name': 'productDescription', 'dtype': 'string'}, {'column_name': 'productPrice', 'dtype': 'double'}, {'column_name': 'dataModified', 'dtype': 'timestamp'}]}

from datalakesurfer.schemas.parquet_schema import ParquetSchemaRetriever
s = ParquetSchemaRetriever(account_name="adlssynapseeus01",
    file_system_name="datacontainer",
    directory_path="SaleParquetData",
    token=token,
    expires_on=expires_on).get_schema()
print(s)

#{'status': 'success', 'schema': [{'column_name': 'SalesId', 'dtype': 'string'}, {'column_name': 'ProductName', 'dtype': 'string'}, {'column_name': 'SalesDateTime', 'dtype': 'timestamp'}, {'column_name': 'SalesAmount', 'dtype': 'int'}, {'column_name': 'EventProcessingTime', 'dtype': 'timestamp'}]}

4. Detect Partitions

ADLS:

from datalakesurfer.schemas.parquet_schema import ParquetSchemaRetriever
f = ParquetSchemaRetriever(account_name="adlssynapseeus01",
    file_system_name="datacontainer",
    directory_path="LargeSaleParquetData2Billion",
    token=token,
    expires_on=expires_on).detect_partitions()
print(f)
#{'status': 'success', 'isPartitioned': True, 'partition_columns': [{'column_name': 'ProductName', 'dtype': 'string'}, {'column_name': 'SalesAmount', 'dtype': 'int32'}]}

from datalakesurfer.schemas.parquet_schema import ParquetSchemaRetriever
f = ParquetSchemaRetriever(account_name="adlssynapseeus01",
    file_system_name="datacontainer",
    directory_path="SaleParquetData",
    token=token,
    expires_on=expires_on).detect_partitions()
print(f)
#{'status': 'success', 'isPartitioned': False, 'partition_columns': []}

Fabric:

from datalakesurfer.fabric_format_detector import FabricFormatDetector
f = FabricFormatDetector(account_name="onelake",
    file_system_name="devworkspace",
    directory_path="devlakehouse.Lakehouse/Files/Product",
    token=token,
    expires_on=expires_on).detect_format()
print(f)
#{'status': 'success', 'format': 'parquet'}

from datalakesurfer.fabric_format_detector import FabricFormatDetector
f = FabricFormatDetector(account_name="onelake",
    file_system_name="devworkspace",
    directory_path="devlakehouse.Lakehouse/Files/FlightTripData",
    token=token,
    expires_on=expires_on).detect_format()
print(f)
#{'status': 'success', 'format': 'iceberg'}

from datalakesurfer.schemas.fabric_delta_schema import FabricDeltaSchemaRetriever
s = FabricDeltaSchemaRetriever(account_name="onelake",
    file_system_name="devworkspace",
    directory_path="devlakehouse.Lakehouse/Files/salesorder",
    token=token,
    expires_on=expires_on).get_schema()
print(s)
#{'status': 'success', 'schema': [{'column_name': 'SalesId', 'dtype': 'string'}, {'column_name': 'ProductName', 'dtype': 'string'}, {'column_name': 'SalesDateTime', 'dtype': 'timestamp'}, {'column_name': 'SalesAmount', 'dtype': 'int'}, {'column_name': 'EventProcessingTime', 'dtype': 'timestamp'}]}

from datalakesurfer.schemas.fabric_iceberg_schema import FabricIcebergSchemaRetriever
s = FabricIcebergSchemaRetriever(account_name="onelake",
    file_system_name="devworkspace",
    directory_path="devlakehouse.Lakehouse/Files/FlightTripData",
    token=token,
    expires_on=expires_on).get_schema()
print(s)
#{'status': 'success', 'schema': [{'column_name': 'flight_id', 'dtype': 'int'}, {'column_name': 'airline_code', 'dtype': 'int'}, {'column_name': 'passenger_count', 'dtype': 'int'}, {'column_name': 'flight_duration_minutes', 'dtype': 'bigint'}, {'column_name': 'baggage_weight', 'dtype': 'float'}, {'column_name': 'ticket_price', 'dtype': 'double'}, {'column_name': 'tax_amount', 'dtype': 'decimal(38,18)', 'typeproperties': {'precision': 10, 'scale': 2}}, {'column_name': 'destination', 'dtype': 'string'}, {'column_name': 'flight_data', 'dtype': 'string'}, {'column_name': 'is_international', 'dtype': 'boolean'}, {'column_name': 'departure_timestamp', 'dtype': 'timestamp'}, {'column_name': 'arrival_timestamp_ntz', 'dtype': 'timestamp'}, {'column_name': 'flight_date', 'dtype': 'date'}]}

from datalakesurfer.schemas.fabric_parquet_schema import FabricParquetSchemaRetriever
s = FabricParquetSchemaRetriever(account_name="onelake",
    file_system_name="devworkspace",
    directory_path="devlakehouse.Lakehouse/Files/Product",
    token=token,
    expires_on=expires_on).get_schema()
print(s)

# {'status': 'success', 'schema': [{'column_name': 'name', 'dtype': 'string'}, {'column_name': 'age', 'dtype': 'long'}]}

from datalakesurfer.schemas.fabric_parquet_schema import FabricParquetSchemaRetriever
s = FabricParquetSchemaRetriever(account_name="onelake",
    file_system_name="devworkspace",
    directory_path="devlakehouse.Lakehouse/Files/Product",
    token=token,
    expires_on=expires_on).detect_partitions()
print(s)

#{'status': 'success', 'isPartitioned': False, 'partition_columns': []}

from datalakesurfer.schemas.fabric_parquet_schema import FabricParquetSchemaRetriever
f = FabricParquetSchemaRetriever(account_name="onelake",
    file_system_name="devworkspace",
    directory_path="devlakehouse.Lakehouse/Files/Customer",
    token=token,
    expires_on=expires_on).detect_partitions()
print(f)

#{'status': 'success', 'isPartitioned': True, 'partition_columns': [{'column_name': 'city', 'dtype': 'string'}]}

📦 Supported Formats

Format ADLS Gen2 (Azure) Fabric OneLake (Azure) GCS (GCP) S3 (AWS)
Parquet . . . .
Delta . . . .
Iceberg . . . .

🛠 Development & Testing

Run tests:

pytest

📄 License

MIT License – See LICENSE for details.


Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

datalakesurfer-0.1.1-py3-none-any.whl (44.5 kB view details)

Uploaded Python 3

File details

Details for the file datalakesurfer-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: datalakesurfer-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 44.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.5

File hashes

Hashes for datalakesurfer-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 96929f6e4dd74b3281fa036db15fab24dec2e3af6ba682f4ca25408561bdd821
MD5 b67481533385f170e22dd383125911fa
BLAKE2b-256 89a023e3d0486dd94fab220540b66d35900a6cd33266f950d51a5b566f84296e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page