Skip to main content

Package for Fabric Engineers

Project description

FabricEngineer Package

CI CD

Description

...

Quickstart

Run Silver Ingestion

SilverIngesationInsertOnly

from pyspark.sql import DataFrame, functions as F

from fabricengineer.transform import (
    BaseSilverIngestion,
    SilverIngesationInsertOnly,
    BronzeTransformation
)
from fabricengineer.logging import TimeLogger


def transform_projects(
    df: DataFrame,
    etl: BaseSilverIngestion
) -> DataFrame:
    df = df.withColumn("dtime", F.to_timestamp("dtime"))
    return df


timer = TimeLogger()

transformations = [
    BronzeTransformation(table="projects", fn=transform_projects)
]

etl = SilverIngestionInsertOnly(
    spark=spark,
    df_bronze=None,
    src_lakehouse=SOURCE_LAKEHOUSE,
    src_schema=SOURCE_SCHEMA,
    src_tablename=SOURCE_TABLENAME,
    dist_lakehouse=DESTINATION_LAKEHOUSE,
    dist_schema=DESTINATION_SCHEMA,
    dist_tablename=DESTINATION_TABLENAME,
    nk_columns=NK_COLUMNS,
    constant_columns=CONSTANT_COLUMNS,
    is_delta_load=IS_DELTA_LOAD,
    delta_load_use_broadcast=DELTA_LOAD_USE_BROADCAST,
    transformations=TRANSFORMATIONS,
    exclude_comparing_columns=EXCLUDE_COLUMNS_FROM_COMPARING,
    include_comparing_columns=INCLUDE_COLUMNS_AT_COMPARING,
    historize=HISTORIZE,
    partition_by_columns=PARTITION_BY_COLUMNS,
    create_history_mlv=CREATE_HISTORY_MLV
)



timer.start().log()

etl.run()

timer.end().log()

SilverIngesationSCD2

from pyspark.sql import DataFrame, functions as F

from fabricengineer.transform import (
    BaseSilverIngestion,
    SilverIngesationSCD2,
    BronzeTransformation
)
from fabricengineer.logging import TimeLogger


def transform_projects(
    df: DataFrame,
    etl: BaseSilverIngestion
) -> DataFrame:
    df = df.withColumn("dtime", F.to_timestamp("dtime"))
    return df


timer = TimeLogger()

transformations = [
    BronzeTransformation(table="projects", fn=transform_projects)
]

etl = SilverIngesationSCD2(
    spark=spark,
    df_bronze=None,
    src_lakehouse=SOURCE_LAKEHOUSE,
    src_schema=SOURCE_SCHEMA,
    src_tablename=SOURCE_TABLENAME,
    dist_lakehouse=DESTINATION_LAKEHOUSE,
    dist_schema=DESTINATION_SCHEMA,
    dist_tablename=DESTINATION_TABLENAME,
    nk_columns=NK_COLUMNS,
    constant_columns=CONSTANT_COLUMNS,
    is_delta_load=IS_DELTA_LOAD,
    delta_load_use_broadcast=DELTA_LOAD_USE_BROADCAST,
    transformations=TRANSFORMATIONS,
    exclude_comparing_columns=EXCLUDE_COLUMNS_FROM_COMPARING,
    include_comparing_columns=INCLUDE_COLUMNS_AT_COMPARING,
    historize=HISTORIZE,
    partition_by_columns=PARTITION_BY_COLUMNS
)


timer.start().print()

etl.run()

timer.end().print()



Eigenes Package: fabric-utils-py (SilverIngestionInsertOnly, MaterializedLakeView(lakehouse, schema, table_name, mode=CREATE | DROP_CREATE).execute(); ...

Manage MaterializeLakeViews

Create once

from fabricengineer.mlv import MaterializeLakeView


sql = """
SELECT
    p.id
    ,p.projectname
    ,p.budget
    ,u.name AS projectlead
FROM dbo.projects p
LEFT JOIN users u
ON p.projectlead_id = u.id
"""

mlv = MaterializeLakeView(sql, spark=spark)
mlv.create(mode=MLVMode.CREATE)  # Creates the MLV once

Recreate MLV

mlv = MaterializeLakeView(sql, spark=spark)
mlv.recreate(mode=MLVMode.CREATE)  # Drops and Creates the MLV

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fabricengineer_py-0.0.7.tar.gz (37.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fabricengineer_py-0.0.7-py3-none-any.whl (14.7 kB view details)

Uploaded Python 3

File details

Details for the file fabricengineer_py-0.0.7.tar.gz.

File metadata

  • Download URL: fabricengineer_py-0.0.7.tar.gz
  • Upload date:
  • Size: 37.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for fabricengineer_py-0.0.7.tar.gz
Algorithm Hash digest
SHA256 3b027a042df3ee337f49badf53d7dc063b718bc6bffeb743b62218bc97f827a5
MD5 2855cce467b72bcf5f94c322d7fdc35f
BLAKE2b-256 154efa506ca5700fc485e36d6a5f13164f953e710fbfd79a334c640da6c52ef8

See more details on using hashes here.

Provenance

The following attestation bundles were made for fabricengineer_py-0.0.7.tar.gz:

Publisher: release.yml on enricogoerlitz/fabricengineer-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file fabricengineer_py-0.0.7-py3-none-any.whl.

File metadata

File hashes

Hashes for fabricengineer_py-0.0.7-py3-none-any.whl
Algorithm Hash digest
SHA256 13de2fe94c34cdf17bf1ccde286200fa2f3024ad5f6fd36515aef6fb7474bb02
MD5 a76a2c66d3b58c9ab9d94b0c3b62d776
BLAKE2b-256 d1ad607344891cd4619755639dbb73528b28a790de38a531147eafe3abc56b97

See more details on using hashes here.

Provenance

The following attestation bundles were made for fabricengineer_py-0.0.7-py3-none-any.whl:

Publisher: release.yml on enricogoerlitz/fabricengineer-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page