A library that helps you build dlt pipelines by side stepping the dlt library and making your code interactive
Project description
Installation
pip install in your Databricks Notebook
%pip install dlt_sidestep
Example Usage
Note: You must define a pipeline_id variable as spark.conf.get("pipelines.id", None)
from pyspark.sql.functions import *
from pyspark.sql.types import *
from dlt_sidestep import SideStep
pipeline_id = spark.conf.get("pipelines.id", None):
if pipeline_id:
import dlt
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
step = """
@dlt.create_table(
comment="The raw wikipedia click stream dataset, ingested from /databricks-datasets.",
table_properties={
"quality": "bronze"
}
)
def clickstream_raw():
return (
spark.read.option("inferSchema", "true").json(json_path)
)
"""
SideStep(step, pipeline_id)
df = clickstream_raw()
df.display()
step = """
@dlt.create_table(
comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
table_properties={
"quality": "silver"
}
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
return (
dlt.read("clickstream_raw")
.withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
.withColumn("click_count", expr("CAST(n AS INT)"))
.withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
.withColumnRenamed("curr_title", "current_page_title")
.withColumnRenamed("prev_title", "previous_page_title")
.select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
)
"""
SideStep(step, pipeline_id)
df = clickstream_clean()
df.display()
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
dlt_sidestep-0.0.4.tar.gz
(4.2 kB
view hashes)
Built Distribution
Close
Hashes for dlt_sidestep-0.0.4-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | de0c220fedab39fae303c6e74c9a0e7b19c9db26cbb6ab04b30e800009258b51 |
|
MD5 | 8e248a121668b0b76a5fe1699c5258cc |
|
BLAKE2b-256 | 397c4619e43c0cca287a6ad51153a5616ab73823889fff9acf2b6c5d98718209 |