Skip to main content

Package for Fabric Engineers

Project description

FabricEngineer Package

CI CD

Description

...

Quickstart

Run Silver Ingestion

SilverIngesationInsertOnly

from pyspark.sql import DataFrame, functions as F

from fabricengineer.transform import (
    BaseSilverIngestion,
    SilverIngesationInsertOnly,
    BronzeTransformation
)
from fabricengineer.logging import TimeLogger


def transform_projects(
    df: DataFrame,
    etl: BaseSilverIngestion
) -> DataFrame:
    df = df.withColumn("dtime", F.to_timestamp("dtime"))
    return df


timer = TimeLogger()

transformations = [
    BronzeTransformation(table="projects", fn=transform_projects)
]

etl = SilverIngestionInsertOnly(
    spark=spark,
    df_bronze=None,
    src_lakehouse=SOURCE_LAKEHOUSE,
    src_schema=SOURCE_SCHEMA,
    src_tablename=SOURCE_TABLENAME,
    dist_lakehouse=DESTINATION_LAKEHOUSE,
    dist_schema=DESTINATION_SCHEMA,
    dist_tablename=DESTINATION_TABLENAME,
    nk_columns=NK_COLUMNS,
    constant_columns=CONSTANT_COLUMNS,
    is_delta_load=IS_DELTA_LOAD,
    delta_load_use_broadcast=DELTA_LOAD_USE_BROADCAST,
    transformations=TRANSFORMATIONS,
    exclude_comparing_columns=EXCLUDE_COLUMNS_FROM_COMPARING,
    include_comparing_columns=INCLUDE_COLUMNS_AT_COMPARING,
    historize=HISTORIZE,
    partition_by_columns=PARTITION_BY_COLUMNS,
    create_history_mlv=CREATE_HISTORY_MLV
)



timer.start().log()

etl.run()

timer.end().log()

SilverIngesationSCD2

from pyspark.sql import DataFrame, functions as F

from fabricengineer.transform import (
    BaseSilverIngestion,
    SilverIngesationSCD2,
    BronzeTransformation
)
from fabricengineer.logging import TimeLogger


def transform_projects(
    df: DataFrame,
    etl: BaseSilverIngestion
) -> DataFrame:
    df = df.withColumn("dtime", F.to_timestamp("dtime"))
    return df


timer = TimeLogger()

transformations = [
    BronzeTransformation(table="projects", fn=transform_projects)
]

etl = SilverIngesationSCD2(
    spark=spark,
    df_bronze=None,
    src_lakehouse=SOURCE_LAKEHOUSE,
    src_schema=SOURCE_SCHEMA,
    src_tablename=SOURCE_TABLENAME,
    dist_lakehouse=DESTINATION_LAKEHOUSE,
    dist_schema=DESTINATION_SCHEMA,
    dist_tablename=DESTINATION_TABLENAME,
    nk_columns=NK_COLUMNS,
    constant_columns=CONSTANT_COLUMNS,
    is_delta_load=IS_DELTA_LOAD,
    delta_load_use_broadcast=DELTA_LOAD_USE_BROADCAST,
    transformations=TRANSFORMATIONS,
    exclude_comparing_columns=EXCLUDE_COLUMNS_FROM_COMPARING,
    include_comparing_columns=INCLUDE_COLUMNS_AT_COMPARING,
    historize=HISTORIZE,
    partition_by_columns=PARTITION_BY_COLUMNS
)


timer.start().log()

etl.run()

timer.end().log()



Eigenes Package: fabric-utils-py (SilverIngestionInsertOnly, MaterializedLakeView(lakehouse, schema, table_name, mode=CREATE | DROP_CREATE).execute(); ...

Manage MaterializeLakeViews

Create once

from fabricengineer.mlv import MaterializeLakeView


sql = """
SELECT
    p.id
    ,p.projectname
    ,p.budget
    ,u.name AS projectlead
FROM dbo.projects p
LEFT JOIN users u
ON p.projectlead_id = u.id
"""

mlv = MaterializeLakeView(sql, spark=spark)
mlv.create(mode=MLVMode.CREATE)  # Creates the MLV once

Recreate MLV

mlv = MaterializeLakeView(sql, spark=spark)
mlv.recreate(mode=MLVMode.CREATE)  # Drops and Creates the MLV

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fabricengineer_py-0.0.10.tar.gz (54.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fabricengineer_py-0.0.10-py3-none-any.whl (17.3 kB view details)

Uploaded Python 3

File details

Details for the file fabricengineer_py-0.0.10.tar.gz.

File metadata

  • Download URL: fabricengineer_py-0.0.10.tar.gz
  • Upload date:
  • Size: 54.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for fabricengineer_py-0.0.10.tar.gz
Algorithm Hash digest
SHA256 aeafbac0c61ff24ae4a59dc00c2b7f5ebb64af1a2e78eaa95871e0888439a578
MD5 2acfde6126507f8d89f330c155a4eb91
BLAKE2b-256 e364efc3ce1f966dfb7dff36ece25f94b216c7ad539515960997f6f3a2b3be17

See more details on using hashes here.

Provenance

The following attestation bundles were made for fabricengineer_py-0.0.10.tar.gz:

Publisher: release.yml on enricogoerlitz/fabricengineer-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file fabricengineer_py-0.0.10-py3-none-any.whl.

File metadata

File hashes

Hashes for fabricengineer_py-0.0.10-py3-none-any.whl
Algorithm Hash digest
SHA256 1660f8a05d3abfd5c9c10f81d610bc4a6b544356e09ad0497c9fd493b1a6902a
MD5 13fcab67323c3048f90a6b12bd03365e
BLAKE2b-256 f3503eb0fe88bf219ac8496603f0ec1f8bf2df81c394690e41e843f5a069b87e

See more details on using hashes here.

Provenance

The following attestation bundles were made for fabricengineer_py-0.0.10-py3-none-any.whl:

Publisher: release.yml on enricogoerlitz/fabricengineer-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page