Skip to main content

yet (another spark) etl framework

Project description

YETL

pip install yetl-framework

Website & Docs: Yet (another Apache Spark) ETL Framework

Example:

Define a dataflow

from yetl_flow import (
    yetl_flow, 
    IDataflow, 
    Context, 
    Timeslice, 
    TimesliceUtcNow, 
    OverwriteSave, 
    Save
)
from pyspark.sql.functions import *
from typing import Type

@yetl_flow(log_level="ERROR")
def customer_landing_to_rawdb_csv(
    context: Context, 
    dataflow: IDataflow, 
    timeslice: Timeslice = TimesliceUtcNow(), 
    save_type: Type[Save] = None
) -> dict:
    """Load the demo customer data as is into a raw delta hive registered table."""

    # the config for this dataflow has 2 landing sources that are joined
    # and written to delta table
    # delta tables are automatically created and if configured schema exceptions
    # are loaded syphened into a schema exception table
    df_cust = dataflow.source_df("landing.customer")
    df_prefs = dataflow.source_df("landing.customer_preferences")

    context.log.info("Joining customers with customer_preferences")
    df = df_cust.join(df_prefs, "id", "inner")
    df = df_cust

    dataflow.destination_df("raw.customer", df)

Run an incremental load:

timeslice = Timeslice(2022, 7, 12)
results = customer_landing_to_rawdb_csv(
    timeslice = Timeslice(2022, 7, 12)
)

Run a full load:

results = customer_landing_to_rawdb_csv(
    timeslice = Timeslice(2022, '*', '*'),
    save_type = OverwriteSave
)

Dependencies & Setup

This is a spark application with DeltaLake it requires following dependencies installed in order to run locally:

Ensure that the spark home path and is added to youy path is set Eg:

export SPARK_HOME="$HOME/opt/spark-3.2.2-bin-hadoop3.3"

Enable DeltaLake by:

cp $SPARK_HOME/conf/spark-defaults.conf.template  $SPARK_HOME/conf/spark-defaults.conf

Add the following to spark-defaults.conf:

spark.jars.packages               io.delta:delta-core_2.12:2.0.0
spark.sql.extensions              io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog   org.apache.spark.sql.delta.catalog.DeltaCatalog
spark.sql.catalogImplementation   hive

Python Project Setup

Create virual environment and install dependencies for local development:

python -m venv venv
source venv/bin/activate
pip install --upgrade pip
pip install -r requirements.txt
pip install --editable .

Build

Build python wheel:

python setup.py sdist bdist_wheel

There is a CI build configured for this repo that builds on main origin and publishes to PyPi.

Change Log

Version: 0.0.6

Version: 0.0.7

Version: 0.0.8

  • Including all packages in distribution.

Version: 0.0.9

  • Clean Up BadRecords JSON Files Automatically remove json schema exception files created by the BadRecordsPath exception handler after they are loaded into a delta table.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

yetl-framework-0.0.9.tar.gz (23.2 kB view hashes)

Uploaded Source

Built Distribution

yetl_framework-0.0.9-py3-none-any.whl (32.0 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page