Skip to main content

yet (another spark) etl framework

Project description

YETL

pip install yetl-framework

Website & Docs: Yet (another Apache Spark) ETL Framework

Example:

Define a dataflow

from yetl.flow import (
    yetl_flow, 
    IDataflow, 
    IContext, 
    Timeslice, 
    TimesliceUtcNow, 
    OverwriteSave, 
    Save
)
from pyspark.sql.functions import *
from typing import Type

@yetl_flow(project="demo")
def landing_to_raw(
    context: IContext,
    dataflow: IDataflow,
    timeslice: Timeslice = TimesliceUtcNow(),
    save: Type[Save] = None,
) -> dict:
    """Load the demo customer data as is into a raw delta hive registered table.

        the config for this dataflow has 2 landing sources that are joined
        and written to delta table
        delta tables are automatically created and if configured schema exceptions
        are loaded syphened into a schema exception table
    """

    df_cust = dataflow.source_df(f"{context.project}_landing.customer")
    df_prefs = dataflow.source_df(f"{context.project}_landing.customer_preferences")

    df = df_cust.join(df_prefs, "id", "inner")
    df = df.withColumn(
        "_partition_key", date_format("_timeslice", "yyyyMMdd").cast("integer")
    )

    dataflow.destination_df(f"{context.project}_raw.customer", df, save=save)

Run an incremental load:

timeslice = Timeslice(year=2022, month=7, day=12)
results = landing_to_raw(
    timeslice = Timeslice(year=2022, month=7, day=12)
)

Run a full load for Year 2022:

results = landing_to_raw(
    timeslice = Timeslice(year=2022, month='*', day='*'),
    save = OverwriteSave
)

Dependencies & Setup

This is a spark application with DeltaLake it requires following dependencies installed in order to run locally:

Ensure that the spark home path and is added to youy path is set Eg:

export SPARK_HOME="$HOME/opt/spark-3.2.2-bin-hadoop3.3"

Enable DeltaLake by:

cp $SPARK_HOME/conf/spark-defaults.conf.template  $SPARK_HOME/conf/spark-defaults.conf

Add the following to spark-defaults.conf:

spark.jars.packages               io.delta:delta-core_2.12:2.1.1
spark.sql.extensions              io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog   org.apache.spark.sql.delta.catalog.DeltaCatalog
spark.sql.catalogImplementation   hive

Python Project Setup

Create virual environment and install dependencies for local development:

python -m venv venv
source venv/bin/activate
pip install --upgrade pip
pip install -r requirements.txt
pip install --editable .

Build

Build python wheel:

python setup.py sdist bdist_wheel

There is a CI build configured for this repo that builds on main origin and publishes to PyPi.

Releases

Version: 0.0.27

  • bumping spark version 3.3.2
  • bumping delta lake version 2.1.1

Version: 0.0.26

  • Bug fix of loads failing when schema exceptions not configured
  • Refactored table node from configuration of datasets in order to simply
  • Standardized yetl properties to capialised naming convention
  • Removed reader bug that adds the contecxt_id by default
  • Removed lineage columns on reader from schema creation
  • Extended parallel process to take the save type injection

Version: 0.0.25

  • Added typer dependency

Version: 0.0.24

  • Added cli init comman to initialise a yetl project directory
  • Added maxparallel parameter to prototype for multithreaded loading
  • Fixed partition bug on initial load causing failure when schema exist but no data
  • Refactored SQLReader SQL files into the project pipeline dir.

Version: 0.0.23

  • Added metadata lineage configuration into sources and destinations for context, dataflow and dataset id's
  • Removed spark logging argument from decorator to the config since allows context to be more abstract and is less confusing.
  • Added workflow module and prototype for multithreaded loading

Version: 0.0.22

  • Introduced YETL optimize table property since there are still reasons to optimise on databricks
  • Regression tested/fixes SQL Reader
  • Fixed bug that was missing lineage columns off automatic schema table creation.
  • Adjusted table creation on delta writer so that when schema is inferred the table is created afterwards to avoid schema partition synchronisation errors
  • Added in configuration for putting file origin into the source dataframes
  • Added configuration to add _corrupt_record on schema creation
  • Auto generating sql schema's on schema creation
  • Adding options for dynamic template loading from a single function so that it can be re-used across tables
  • deprecated custom parsing timeslice features in favour of jinja templating
  • Added in a new cli lib for build out templates and maintenance tasks using typer

Version: 0.0.21

  • Added in Jinja for variables replacements as more robust solution than simple string replacements.

Version: 0.0.20

  • Fixed missing packages in build

Version: 0.0.19

  • Fixed missing packages in build

Version: 0.0.18

  • Major cleanup and refactor of datasets for future road map
  • Sources and destinations have the same auto_io lifecycle in the dataflow, auto is called on retrieval from the dataflow collections
  • Added SQLReader dataset type so we can define SQL Sources from any hive table in data flows that write to destinations (e.g. delta lake tables)
  • Fixed audit error trapping

Version: 0.0.17

  • Integration testing with databricks.
  • Refactored configuration so that there is more re-use across environments
  • Dataset types are now specifically declared in the configuration to reduce complexity when adding more types of datasets.

Version: 0.0.16

  • Refactored context into inteface to allow the future expansion into engines other than spark.

Version: 0.0.15

  • raise errors and warnings from thresholds configurations
  • Refactored audit loging and added comprehensive data flow auditing.

Version: 0.0.14

  • Started building in integration tests
  • Refactored Destination save using class composition
  • Recfactored save dependency injection down to the dataset level
  • Added support for Merge save using deltalake

Version: 0.0.13

  • Added support default schema creation etl.schema.createIfNotExists.
  • refactoed and cleaned up the basic reader
  • added consistent validation and consistent property settings to basic reader
  • added reader skipping features based on configuration settings

Version: 0.0.12

  • Added support multicolumn zording.

Version: 0.0.11

  • Upgrade to spark 3.3. Upgraded development for spark 3.3 and delta lake 2.1.
  • Added _timeslice metadata column parsing into the destination dataset so that it can be used for partitioning, works even if the read path is wildcarded '*'
  • Added support for partition based optimization on writes
  • Added support for multi column partitioning

Version: 0.0.10

  • Fix YAML Schema Format Error when Dataflow Retries are Set to 0. Fixed dictionary extraction bug for setting retries and retry_wait to zero.
  • Added overwrite schema save
  • Added partition sql support
  • Fixed constraints synchronisation to drop and create more efficiently
  • Refined, refactored and fixed lineage
  • Added file lineage logging
  • Add file lineage logging
  • Detect spark and databricks versions, determine whether to auto optimise and compact

Version: 0.0.9

  • Clean Up BadRecords JSON Files Automatically remove json schema exception files created by the BadRecordsPath exception handler after they are loaded into a delta table.

Version: 0.0.8

  • Including all packages in distribution.

Version: 0.0.7

Version: 0.0.6

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

yetl-framework-0.0.27.dev1.tar.gz (49.5 kB view hashes)

Uploaded Source

Built Distribution

yetl_framework-0.0.27.dev1-py3-none-any.whl (127.0 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page