Databricks Configuration Framework
Project description
dbxconfig
Configuration framework for databricks pipelines. Define configuration and table dependencies in yaml config then get the table mappings config model:
Define your tables.
landing:
landing_dbx_patterns:
customer_details_1: null
customer_details_2: null
raw:
raw_dbx_patterns:
customers:
ids: id
depends_on:
- landing.landing_dbx_patterns.customer_details_1
- landing.landing_dbx_patterns.customer_details_2
warning_thresholds:
invalid_ratio: 0.1
invalid_rows: 0
max_rows: 100
min_rows: 5
exception_thresholds:
invalid_ratio: 0.2
invalid_rows: 2
max_rows: 1000
min_rows: 0
base:
base_dbx_patterns:
customer_details_1:
ids: id
depends_on:
- raw.raw_dbx_patterns.customers
customer_details_2:
ids: id
depends_on:
- raw.raw_dbx_patterns.customers
Define you load configuration:
tables: ./tables.yaml
landing:
trigger: customerdetailscomplete-{{filename_date_format}}*.flg
trigger_type: file
database: landing_dbx_patterns
table: "{{table}}"
container: datalake
root: "/mnt/{{container}}/data/landing/dbx_patterns/{{table}}/{{path_date_format}}"
filename: "{{table}}-{{filename_date_format}}*.csv"
filename_date_format: "%Y%m%d"
path_date_format: "%Y%m%d"
format: cloudFiles
spark_schema: ../Schema/{{table.lower()}}.yaml
options:
# autoloader
cloudFiles.format: csv
cloudFiles.schemaLocation: "/mnt/{{container}}/checkpoint/{{checkpoint}}"
cloudFiles.useIncrementalListing: auto
# schema
inferSchema: false
enforceSchema: true
columnNameOfCorruptRecord: _corrupt_record
# csv
header: false
mode: PERMISSIVE
encoding: windows-1252
delimiter: ","
escape: '"'
nullValue: ""
quote: '"'
emptyValue: ""
raw:
database: raw_dbx_patterns
table: "{{table}}"
container: datalake
root: /mnt/{{container}}/data/raw
path: "{{database}}/{{table}}"
checkpoint_location: /mnt/{{container}}/checkpoint/{{checkpoint}}
options:
mergeSchema: true
Import the config objects into you pipeline:
from dbxconfig import Config, Timeslice, StageType
# build path to configuration file
pattern = "auto_load_schema"
config_path = f"../Config"
# create a timeslice object for slice loading. Use * for all time (supports hrs, mins, seconds and sub-second).
timeslice = Timeslice(day="*", month="*", year="*")
# parse and create a config objects
config = Config(config_path=config_path, pattern=pattern)
# get the configuration for a table mapping to load.
table_mapping = config.get_table_mapping(
timeslice=timeslice,
stage=StageType.raw,
table="customers"
)
print(table_mapping)
Development Setup
pip install -r requirements.txt
Unit Tests
To run the unit tests with a coverage report.
pip install -e .
pytest test/unit --junitxml=junit/test-results.xml --cov=dbxconfig --cov-report=xml --cov-report=html
Build
python setup.py sdist bdist_wheel
Publish
twine upload dist/*
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
dbxconfig-2.3.0.tar.gz
(12.0 kB
view hashes)
Built Distribution
dbxconfig-2.3.0-py3-none-any.whl
(15.4 kB
view hashes)
Close
Hashes for dbxconfig-2.3.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 99e284427728c39023e8f46767fa9b586abec1cd1a659e2a1075150db4669e0e |
|
MD5 | c222d5f32c746e3adf4a2efbf522a7fd |
|
BLAKE2b-256 | 87986ddd105fee11a31fe7533247a01e9c49c860e1e25d07a3ad4af20c9f1bd9 |