Skip to main content

Package for Fabric Engineers

Project description

FabricEngineer Package

CI CD

Description

...

Quickstart

Run Silver Ingestion

SilverIngesationInsertOnly

from pyspark.sql import DataFrame, functions as F

from fabricengineer.transform import (
    BaseSilverIngestion,
    SilverIngesationInsertOnly,
    BronzeTransformation
)
from fabricengineer.logging import TimeLogger


def transform_projects(
    df: DataFrame,
    etl: BaseSilverIngestion
) -> DataFrame:
    df = df.withColumn("dtime", F.to_timestamp("dtime"))
    return df


timer = TimeLogger()

transformations = [
    BronzeTransformation(table="projects", fn=transform_projects)
]

etl = SilverIngestionInsertOnly(
    spark=spark,
    df_bronze=None,
    src_lakehouse=SOURCE_LAKEHOUSE,
    src_schema=SOURCE_SCHEMA,
    src_tablename=SOURCE_TABLENAME,
    dist_lakehouse=DESTINATION_LAKEHOUSE,
    dist_schema=DESTINATION_SCHEMA,
    dist_tablename=DESTINATION_TABLENAME,
    nk_columns=NK_COLUMNS,
    constant_columns=CONSTANT_COLUMNS,
    is_delta_load=IS_DELTA_LOAD,
    delta_load_use_broadcast=DELTA_LOAD_USE_BROADCAST,
    transformations=TRANSFORMATIONS,
    exclude_comparing_columns=EXCLUDE_COLUMNS_FROM_COMPARING,
    include_comparing_columns=INCLUDE_COLUMNS_AT_COMPARING,
    historize=HISTORIZE,
    partition_by_columns=PARTITION_BY_COLUMNS,
    create_history_mlv=CREATE_HISTORY_MLV
)



timer.start().log()

etl.run()

timer.end().log()

SilverIngesationSCD2

from pyspark.sql import DataFrame, functions as F

from fabricengineer.transform import (
    BaseSilverIngestion,
    SilverIngesationSCD2,
    BronzeTransformation
)
from fabricengineer.logging import TimeLogger


def transform_projects(
    df: DataFrame,
    etl: BaseSilverIngestion
) -> DataFrame:
    df = df.withColumn("dtime", F.to_timestamp("dtime"))
    return df


timer = TimeLogger()

transformations = [
    BronzeTransformation(table="projects", fn=transform_projects)
]

etl = SilverIngesationSCD2(
    spark=spark,
    df_bronze=None,
    src_lakehouse=SOURCE_LAKEHOUSE,
    src_schema=SOURCE_SCHEMA,
    src_tablename=SOURCE_TABLENAME,
    dist_lakehouse=DESTINATION_LAKEHOUSE,
    dist_schema=DESTINATION_SCHEMA,
    dist_tablename=DESTINATION_TABLENAME,
    nk_columns=NK_COLUMNS,
    constant_columns=CONSTANT_COLUMNS,
    is_delta_load=IS_DELTA_LOAD,
    delta_load_use_broadcast=DELTA_LOAD_USE_BROADCAST,
    transformations=TRANSFORMATIONS,
    exclude_comparing_columns=EXCLUDE_COLUMNS_FROM_COMPARING,
    include_comparing_columns=INCLUDE_COLUMNS_AT_COMPARING,
    historize=HISTORIZE,
    partition_by_columns=PARTITION_BY_COLUMNS
)


timer.start().print()

etl.run()

timer.end().print()



Eigenes Package: fabric-utils-py (SilverIngestionInsertOnly, MaterializedLakeView(lakehouse, schema, table_name, mode=CREATE | DROP_CREATE).execute(); ...

Manage MaterializeLakeViews

Create once

from fabricengineer.mlv import MaterializeLakeView


sql = """
SELECT
    p.id
    ,p.projectname
    ,p.budget
    ,u.name AS projectlead
FROM dbo.projects p
LEFT JOIN users u
ON p.projectlead_id = u.id
"""

mlv = MaterializeLakeView(sql, spark=spark)
mlv.create(mode=MLVMode.CREATE)  # Creates the MLV once

Recreate MLV

mlv = MaterializeLakeView(sql, spark=spark)
mlv.recreate(mode=MLVMode.CREATE)  # Drops and Creates the MLV

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fabricengineer_py-0.0.4.tar.gz (29.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fabricengineer_py-0.0.4-py3-none-any.whl (13.6 kB view details)

Uploaded Python 3

File details

Details for the file fabricengineer_py-0.0.4.tar.gz.

File metadata

  • Download URL: fabricengineer_py-0.0.4.tar.gz
  • Upload date:
  • Size: 29.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for fabricengineer_py-0.0.4.tar.gz
Algorithm Hash digest
SHA256 2483a14e8d168e49f6d3b78155c5e3947d28bdf3148ab0357ff302a1a2718146
MD5 3cd6f308307ff46e21b1a5e1616825c9
BLAKE2b-256 a19ea8daa83516116bfaed478fae53e0f177755d28e7ac4e102b8127a220338b

See more details on using hashes here.

Provenance

The following attestation bundles were made for fabricengineer_py-0.0.4.tar.gz:

Publisher: release.yml on enricogoerlitz/fabricengineer-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file fabricengineer_py-0.0.4-py3-none-any.whl.

File metadata

File hashes

Hashes for fabricengineer_py-0.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 c5aada353a9f1484e27b48bed6a20ec8b96126825d02995310d57e7bb6629634
MD5 5b31bd01bb2c64863ba34ab9f2b3ccf2
BLAKE2b-256 d87969d4f4a88194b3f07b2846385946db9646164ecd838514f1dbdfbc98ec2f

See more details on using hashes here.

Provenance

The following attestation bundles were made for fabricengineer_py-0.0.4-py3-none-any.whl:

Publisher: release.yml on enricogoerlitz/fabricengineer-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page