yet (another spark) etl framework
Project description
yetl
Website: https://www.yetl.io/
Introduction
Install
pip install yetl-framework
Configuration framework for databricks pipelines. Define configuration and table dependencies in yaml config then get the table mappings config model:
Define your tables.
version: 1.3.0
audit_control:
delta_lake:
raw_dbx_patterns_control:
header_footer:
sql: ../sql/{{database}}/{{table}}.sql
depends_on:
- raw.raw_dbx_patterns.*
raw_audit:
sql: ../sql/{{database}}/{{table}}.sql
depends_on:
- raw.raw_dbx_patterns.*
- audit_control.raw_dbx_patterns_control.header_footer
landing:
read:
landing_dbx_patterns:
customer_details_1: null
customer_details_2: null
raw:
delta_lake:
raw_dbx_patterns:
customers:
ids: id
depends_on:
- landing.landing_dbx_patterns.customer_details_1
- landing.landing_dbx_patterns.customer_details_2
warning_thresholds:
invalid_ratio: 0.1
invalid_rows: 0
max_rows: 100
min_rows: 5
exception_thresholds:
invalid_ratio: 0.2
invalid_rows: 2
max_rows: 1000
min_rows: 0
custom_properties:
process_group: 1
Define you load configuration:
version: 1.3.0
tables: ./tables.yaml
audit_control:
delta_lake:
# delta table properties can be set at stage level or table level
delta_properties:
delta.appendOnly: true
delta.autoOptimize.autoCompact: true
delta.autoOptimize.optimizeWrite: true
managed: false
create_table: true
container: datalake
location: /mnt/{{container}}/data/raw
checkpoint_location: "/mnt/{{container}}/checkpoint/{{checkpoint}}"
path: "{{database}}/{{table}}"
options:
checkpointLocation: default
landing:
read:
trigger: customerdetailscomplete-{{filename_date_format}}*.flg
trigger_type: file
container: datalake
location: "/mnt/{{container}}/data/landing/dbx_patterns/{{table}}/{{path_date_format}}"
filename: "{{table}}-{{filename_date_format}}*.csv"
filename_date_format: "%Y%m%d"
path_date_format: "%Y%m%d"
format: cloudFiles
spark_schema: ../schema/{{table.lower()}}.yaml
options:
# autoloader
cloudFiles.format: csv
cloudFiles.schemaLocation: /mnt/{{container}}/checkpoint/{{checkpoint}}
cloudFiles.useIncrementalListing: auto
# schema
inferSchema: false
enforceSchema: true
columnNameOfCorruptRecord: _corrupt_record
# csv
header: false
mode: PERMISSIVE
encoding: windows-1252
delimiter: ","
escape: '"'
nullValue: ""
quote: '"'
emptyValue: ""
raw:
delta_lake:
# delta table properties can be set at stage level or table level
delta_properties:
delta.appendOnly: true
delta.autoOptimize.autoCompact: true
delta.autoOptimize.optimizeWrite: true
delta.enableChangeDataFeed: false
managed: false
create_table: true
container: datalake
location: /mnt/{{container}}/data/raw
path: "{{database}}/{{table}}"
checkpoint_location: "/mnt/{{container}}/checkpoint/{{checkpoint}}"
options:
mergeSchema: true
base:
delta_lake:
container: datalake
location: /mnt/{{container}}/data/base
path: "{{database}}/{{table}}"
options: null
Import the config objects into you pipeline:
from yetl import Config, StageType
pipeline = "auto_load_schema"
project = "test_project"
config = Config(
project=project, pipeline=pipeline
)
table_mapping = config.get_table_mapping(
stage=StageType.raw, table="customers"
)
print(table_mapping)
Use even less code and use the decorator pattern:
@yetl_flow(
project="test_project",
stage=StageType.raw
)
def auto_load_schema(table_mapping:TableMapping):
# << ADD YOUR PIPELINE LOGIC HERE - USING TABLE MAPPING CONFIG >>
return table_mapping # return whatever you want here.
result = auto_load_schema(table="customers")
Development Setup
pip install -r requirements.txt
Unit Tests
To run the unit tests with a coverage report.
pip install -e .
pytest test/unit --junitxml=junit/test-results.xml --cov=yetl --cov-report=xml --cov-report=html
Integration Tests
To run the integration tests with a coverage report.
pip install -e .
pytest test/integration --junitxml=junit/test-results.xml --cov=yetl --cov-report=xml --cov-report=html
Build
python setup.py sdist bdist_wheel
Publish
twine upload dist/*
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
yetl-framework-1.3.2.dev1.tar.gz
(20.4 kB
view hashes)
Built Distribution
Close
Hashes for yetl-framework-1.3.2.dev1.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5756a0b59b3e093978148bb833429e15b0d903f42f05936f665466950524d197 |
|
MD5 | 6f742e3474b4cf3010170b4736572c07 |
|
BLAKE2b-256 | 9fe722a0929d06c62d5218588859ba65ba44e93a42202c55b34656b985a5a375 |
Close
Hashes for yetl_framework-1.3.2.dev1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 767a863547c3d070d9ead956900bb1d1ff1fa9f2535ba39baca3ae0eec9be85e |
|
MD5 | 9b49d272b52fcfcc9171a368efc972f1 |
|
BLAKE2b-256 | aa174158d8b13679c0f29852d97210c19343c47db6a5915cd722d3f90ec9e8ff |