Delta table utilities.
Project description
delta_table_utils
Delta table utilities.
The basic use case for this library is if you are working in Databricks and want to do upserts using AutoLoader.
Basic usage:
from delta_table.delta_table_utils import DeltaTableColumn, DeltaTable
schema_name = 'my_schema'
table_name = 'my_table'
# Define the delta table schema
column_list = [
DeltaTableColumn('id', data_type='STRING', nulls_allowed=False, is_unique_id=True),
DeltaTableColumn('col1', data_type='STRING', nulls_allowed=False),
DeltaTableColumn('col2', data_type='DOUBLE'),
DeltaTableColumn('col3', data_type='DOUBLE'),
DeltaTableColumn('col4', data_type='DOUBLE'),
DeltaTableColumn('created_at', data_type='TIMESTAMP'),
DeltaTableColumn('updated_at', data_type='TIMESTAMP')
]
# Create the DeltaTable object
delta_table = DeltaTable(schema_name=schema_name, table_name=table_name, upload_path="<location_of_data_in_s3>", column_list=column_list)
# Create the table and start the stream
delta_table.create_if_not_exists(sqlContext)
delta_table.stream(spark, cloudFiles_format='csv')
Additional notes
By default, when you use the stream
method in this library, it stops as soon as no new data is detected. This is useful if you don't want a cluster running all the time and rather you just want to update your delta tables on some sort of a schedule.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Close
Hashes for delta_table_utils-0.0.14-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4463d900e0590b8703a3f23b29ecf1e99486735dd08b416bb50cbf30429f1388 |
|
MD5 | 9da245a46e461566ca8cf9218749f207 |
|
BLAKE2b-256 | 219720f6c6f7a0582e6ad9c55deecf84b3fb02c0391dc8deee82a1f8e2a2b748 |