Utility for running workflows leveraging delta live tables from interactive notebooks
Project description
Installation
pip install in your Databricks Notebook
%pip install dlt_with_debug
Example Usage
Note: You must define a pipeline_id
variable as spark.conf.get("pipelines.id", None)
Note: You must define a g
variable as globals()
`
from pyspark.sql.functions import *
from pyspark.sql.types import *
from dlt_with_debug import *
pipeline_id = spark.conf.get("pipelines.id", None)
g = globals()
if pipeline_id:
import dlt
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
step = """
@dlt.create_table(
comment="The raw wikipedia click stream dataset, ingested from /databricks-datasets.",
table_properties={
"quality": "bronze"
}
)
def clickstream_raw():
return (
spark.read.option("inferSchema", "true").json(json_path)
)
"""
dltwithdebug(step, pipeline_id, g)
df = clickstream_raw()
df.display()
step = """
@dlt.create_table(
comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
table_properties={
"quality": "silver"
}
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
return (
dlt.read("clickstream_raw")
.withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
.withColumn("click_count", expr("CAST(n AS INT)"))
.withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
.withColumnRenamed("curr_title", "current_page_title")
.withColumnRenamed("prev_title", "previous_page_title")
.select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
)
"""
dltwithdebug(step, pipeline_id, g)
df = clickstream_clean()
df.display()
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
dlt_with_debug-1.0.tar.gz
(4.0 kB
view hashes)
Built Distribution
Close
Hashes for dlt_with_debug-1.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 88aa81a5274c197c186f55c3b98d169ef7e4404908be7b029299c5a55ae92387 |
|
MD5 | 74678e5836864afcb442144ad282f7ac |
|
BLAKE2b-256 | 7f78e3fa75f98b3b0e2f0b291d5ee24a2ba92d6e60c19ed0edf829025a9c6cdc |