A library that helps you build dlt pipelines by side stepping the dlt library and making your code interactive
Project description
Installation
pip install in your Databricks Notebook
%pip install dlt_sidestep
Example Usage
Note: You must define a pipeline_id
variable as spark.conf.get("pipelines.id", None)
from pyspark.sql.functions import *
from pyspark.sql.types import *
from dlt_sidestep import SideStep
pipeline_id = spark.conf.get("pipelines.id", None)
if pipeline_id:
import dlt
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
step = """
@dlt.create_table(
comment="The raw wikipedia click stream dataset, ingested from /databricks-datasets.",
table_properties={
"quality": "bronze"
}
)
def clickstream_raw():
return (
spark.read.option("inferSchema", "true").json(json_path)
)
"""
SideStep(step, pipeline_id)
df = clickstream_raw()
df.display()
step = """
@dlt.create_table(
comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
table_properties={
"quality": "silver"
}
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
return (
dlt.read("clickstream_raw")
.withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
.withColumn("click_count", expr("CAST(n AS INT)"))
.withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
.withColumnRenamed("curr_title", "current_page_title")
.withColumnRenamed("prev_title", "previous_page_title")
.select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
)
"""
SideStep(step, pipeline_id)
df = clickstream_clean()
df.display()
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
dlt_sidestep-0.0.6.tar.gz
(4.2 kB
view hashes)
Built Distribution
Close
Hashes for dlt_sidestep-0.0.6-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0171bcd3a08fd0ed2925b1e0eb7f9c038c2cc5c7d8420614c1c0d809f461e204 |
|
MD5 | 99f6dfcc04a5a99be1ea089c1af66ca6 |
|
BLAKE2b-256 | 48a8b77edd80c4ab793de90fab8d034bd3f71c967deeefcc2f848ccda2763747 |