Skip to main content

GDMO native classes for standardized interaction with data objects within Azure Databricks. Contains TimeSeriesForecasting, APIRequest, Landing, Delta, GoldConsolidation, and DBx functions.

Project description

gdmo

PyPI Tests Changelog License

GDMO native classes for standardized interaction with data objects within Azure Databricks

This custom library allows our engineering team to use standardized packages that strip away a load of administrative and repetitive tasks from their daily object interactions. The current classes supported (V0.1.0) are:

Installation

Install this library using pip:

pip install gdmo

Usage

Forecast - Forecast

Standardized way of forecasting a dataset. Input a dataframe with a Series, a Time, and a Value column, and see the function automatically select the right forecasting model and generate an output.

Example usage:

from gdmo import TimeSeriesForecast
forecaster = TimeSeriesForecast(spark, 'Invoiced Revenue')\
                    .set_columns('InvoiceDate', 'ProductCategory', 'RevenueUSD')\
                    .set_forecast_length(forecast_length)\
                    .set_last_data_point(lastdatamonth)\
                    .set_input(df)\
                    .set_growth_cap(0.02)\
                    .set_use_cap_growth(True)\
                    .set_modelselection_breakpoints(12, 24)\
                    .set_track_outcome(False)\
                    .build_forecast()

forecaster.inspect_forecast()

API - APIRequest

Class to perform a standard API Request using the request library, which allows a user to just add their endpoint / authentication / method data, and get the data returned without the need of writing error handling or need to understand how to properly build a request.

Example usage:

request = APIRequest(uri)\
            .set_content_type('application/json') \
            .set_header('bearer xxxxx') \
            .set_method('GET') \
            .set_parameters({"Month": "2024-01-01"})\
            .make_request()

response = request.get_json_response()
display(response)

Tables - Landing

A class for landing API ingests and other data into Azure Data Lake Storage (ADLS). Currently can ingest Sharepoint (excel) data and JSON (API-sourced) data.

Example usage to ingest files from Sharepoint folder:

environment     = 'xxxxx' #Databricks catalog

Sharepointsite  = 'xxxxx'
UserName        = 'xxxxx'
Password        = 'xxxxx'
Client_ID       = 'xxxxx'
adls_temp       = 'xxxxx'

sharepoint = Landing(spark, dbutils, database="xxx", bronze_table="xxx", catalog=environment, container='xxx')\
                  .set_tmp_file_location(adls_temp)\
                  .set_sharepoint_location(Sharepointsite)\
                  .set_sharepoint_auth(UserName, Password, Client_ID)\
                  .set_auto_archive(False)\
                  .get_all_sharepoint_files()

If you need to capture logging on top of these ingests, follow up the code with the get_log() function

try:
  log = sharepoint.get_log()
  # Construct the SQL query to insert logging information into the logtable
  sql_query = f"""
    INSERT INTO {logtable} 
    SELECT now() DbxCreated, '{log['database']}', '{log['bronze_table']}', '{log['catalog']}', '{log['file_path']}', {log['records_influenced']}, '{log['start_time']}', '{log['end_time']}'
    """

  # Execute the SQL query using Spark SQL
  spark.sql(sql_query)
except Exception as e:
  raise e

Example usage to ingest JSON content from an API:

#Sample API request using the APIRequest class
uri = 'xxxxx'
request  = APIRequest(uri).make_request()
response = request.get_json_response()

#Initiate the class, tell it where the bronze table is located, load configuration data for that table (required for delta merge), add the JSON to the landing area in ADLS, then put the landed data into a bronze delta table in the databricks catalog. 
landing = Landing(spark, dbutils, database="xxx", bronze_table="xxx", target_folder=location, filename=filename, catalog=environment, container='xxx')\    
                .set_bronze(bronze)\                                
                .set_config(config)\
                .put_json_content(response)\
                .put_bronze()

Tables - Delta

Class to enable you to create a delta table on azure databricks. Supports the ability to assign a primary key, foreign key constraints, table properties, partitioning, identity column, and more.

Example usage to create a delta table with a primary key and a foreign key:

from gdmo import Delta

table = Delta(db_name,table_name, spark, env, container)\
            .set_columns([
                {"name": "DbxCreated",   "data_type": "timestamp",  "comment": "Date when data was last loaded into source tables"},
                {"name": "DbxUpdated",   "data_type": "timestamp",  "comment": "Last modified timestamp"},
                {"name": "IsDeleted",    "data_type": "int",        "comment": "Deleted Flag"},
                {"name": "DataSource",   "data_type": "string",     "comment": "Identifies the source of the record."},
                {"name": "PK_ValueCol1", "data_type": "int",        "comment": "Primary Key column. Numeric value"},
                {"name": "ValueCol2",    "data_type": "string",     "comment": "Value column one"},
                {"name": "FK_ValueCol3", "data_type": "date",       "comment": "Value column two, which is a foreign key"},
              ])\
            .set_comment(table_comment)\
            .set_location(table_location)\
            .set_partitioning('DataSource')\
            .drop_existing()\
            .create_table()\
            .set_primary_key('NCC_PKValueCol1', 'PK_ValueCol1')

foreign_keys = [
    {'name': 'NCC_FKValueCol3', 'columns': 'FK_ValueCol3', 'table': 'schema.othertable'}
]

table.set_foreign_keys(foreign_keys)

Tables - Gold_Consolidation

A class to perform gold layer (see databricks medallion architecture) consolidation of records. This class is usable when you have a gold table that should consist of unique records, or a timeseries dataset, which need to be sourced from multiple sources, each offering roughly the same dataset. Example usage is if you want a single gold customer table, but there are two raw sources of customer data (multiple ERP systems for example), but both offer the same sort of data.

In order to use this class, one would need a control table to contain all unique (SQL) source codes to retrieve data in a standard format from the silver layer. By default, this class assumes there is a delta table located at "admin.bronze__gold_consolidation_sources". If that is not present, start by creating one like this:

from gdmo import Delta
import os
env = os.environ['Environment']

container = '{enter container name here}'
db_name = 'admin'
table_name = 'bronze__gold_consolidation_sources'

location = os.environ.get("ADLS").format(container=container, path=db_name)
table_location = f'{location}/{table_name.replace("__", "/").lower()}'

table = Delta(db_name,table_name, spark, env, container)\
            .set_columns([
                  {"name": "DbxCreated",      "data_type": "timestamp", "comment": "Creation date in databricks"},
                  {"name": "DbxUpdated",      "data_type": "timestamp", "comment": "Last updated date"},
                  {"name": "IsDeleted",       "data_type": "int",       "comment": "Deletion Flag"},
                  {"name": "TableName",       "data_type": "string",    "comment": "Complete gold table name. IE: sales__invoices"},
                  {"name": "SourceName",      "data_type": "string",    "comment": "silver table name consolidating into the gold layer"},
                  {"name": "SourceCode",      "data_type": "string",    "comment": "sql code required to pull data from silver"},
                  {"name": "InlineVar",       "data_type": "string",    "comment": "optional. the silver sql code can contain a var in the filtering clause when loading for example only partial data for performance. this contains the varname inline in the code as a python list of dicts. , so we can do a text.replace on it. "},
                  {"name": "SortOrder",       "data_type": "int",       "comment": "When consolidation uses unique records this order is used to order the sources by importance. Lower value = higher priority. 1 = highest priority"}
              ])\
            .set_comment('Holds the silver sql codes to retrieve data for gold layer consolidation tables. ')\
            .set_location(table_location)\
            .drop_existing()\
            .create_table()\
            .set_primary_key('TableName, SourceName')

When ready, then add sources to this dataset by using the below code. In this example, each sourcecode contains a catalog var inline in the sql, which needs to be replaceable by a python var during runtime (called InlineVar):

from gdmo import Gold_Consolidation

db_name    = 'gold'
table_name = 'tablename'
inlinevar  = '\'[catalog]\']' #stringified python list
SortOrder  = 1 #Numeric value used for deduplication of unique records when asked to

gold = Gold_Consolidation(spark, dbutils, database=db_name,gold_table=table_name)

SourceName =  'SilverDataSourceOne'
SourceCode = f'''
    SELECT  columns
    FROM [catalog].silvertable
    ...This should be your bespoke transformational layer to retrieve the records in the shape you need it in for gold. the column list should be identical to the gold table column list. 
    '''

gold.set_source(SourceName = SourceName, SourceCode = SourceCode, InlineVar = inlinevar, SortOrder=SortOrder)

Once sourcecodes are added to the control table, we can use a scheduled notebook (workflow) in databricks to pull data into the gold layer. The basic way of doing this is in below example:

from gdmo import DbxWidget, Gold_Consolidation, DbxClearCachedTables
import os

db_name    = 'gold'
table_name = 'tablename'

final_table = db_name+'.'+table_name
stage_table = final_table+'_stage'

catalog       = DbxWidget(dbutils, 'catalog', defaultValue = os.environ.get('Environment'))
verbose       = DbxWidget(dbutils, 'verbose', defaultValue = 'N')

#Set the metadata ready. these tables might be joined to within the source codes but are added as cached tables here to decrease runtime and increase performance
MetaTables = ['metatable']
DbxClearCachedTables(spark, MetaTables)

meta = spark.sql(f'''CACHE TABLE metadata AS SELECT * from somewheremetadata''')

#Start a gold instance for this table
gold = Gold_Consolidation(spark, dbutils, database=db_name,gold_table=table_name)
gold.set_catalog(catalog)

#Set basic configuration - how is data loaded in this gold table. 
config = {
    'partitioncolumns': ['DataSource'],   #Required when loadtype is merge and the class is running in parallel (which is default)
    'loadType':         'merge',          #[Options: merge | append | overwrite]
    'loadmechanism':    'uniquevalues',   #[Options: timeseries | uniquevalues]
    'joincolumns':      ['PKColOne']      #Required when loadtype is merge
}
gold.set_config(config)

#If you want to see more comments and runtime execution, set verbose to Y
if verbose == 'Y':
  gold.set_verbose()

#Get the sourcecodes from the control table and clear staging if needed
gold.get_sourcecodes()
gold.clean_stage()

#Replace inline vars with the correct value if needed
gold.set_inlinevar({'[catalog]': catalog})

#Run the data into staging
gold.set_stage()

#Merge data into the actual table
gold.set_final()

#Drop the staging table again
gold.clean_stage()

#Clear cached tables
DbxClearCachedTables(spark, MetaTables)

#Provide visual output of the data
if verbose == 'Y':
  df = spark.sql(f'''
                  SELECT DataSource, COUNT(*) Recs
                  FROM {final_table}
                  group by DataSource
                  ''')
  df.display()

Dbx - DbxWidget

A class for generating and reading a databricks notebook widget. The widget supports all four widget types (['text', 'dropdown', 'multiselect', 'combobox']) and allows for different response datatypes to be set ['text', 'int', 'double', 'float','date']

The default databricks method:

dbutils.widgets.dropdown("colour", "Red", "Enter Colour", ["Red", "Blue", "Yellow"])
colour = dbutils.widgets.read("colour")

Using this function all the user needs to write is:

colour = DbxWidget(dbutils, "colour", 'dropdown', "Red", choices=["Red", "Blue", "Yellow"])

A simple text value parameter:

reloadData = DbxWidget(dbutils, "fullReload", 'N')

A simple date value parameter:

reloadData = DbxWidget(dbutils, "startDate", 'N', returntype='date')

Dbx - DbxClearCachedTables

A class to simply input a list of cached table names to, which will then uncache them all.

MetaTables = ['CachedTableOne','CachedTableTwo','CachedTableThree']

DbxClearCachedTables(spark, MetaTables)

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gdmo-0.0.57.tar.gz (52.1 kB view details)

Uploaded Source

Built Distribution

gdmo-0.0.57-py3-none-any.whl (50.8 kB view details)

Uploaded Python 3

File details

Details for the file gdmo-0.0.57.tar.gz.

File metadata

  • Download URL: gdmo-0.0.57.tar.gz
  • Upload date:
  • Size: 52.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.11.9

File hashes

Hashes for gdmo-0.0.57.tar.gz
Algorithm Hash digest
SHA256 5e4384fa0c22a9f2298d6f27622c66c0049b23748c090367574a194be8b96cda
MD5 789718cf5830f9d616d9313f8535d32b
BLAKE2b-256 6b8100ac0fec125baedb48e2d655b09aefd0a0407c35d71f40d70dfe6c19208d

See more details on using hashes here.

File details

Details for the file gdmo-0.0.57-py3-none-any.whl.

File metadata

  • Download URL: gdmo-0.0.57-py3-none-any.whl
  • Upload date:
  • Size: 50.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.11.9

File hashes

Hashes for gdmo-0.0.57-py3-none-any.whl
Algorithm Hash digest
SHA256 b83d8ea462a8b8a2236a9e44c37ac75c7bbf5d5dd1193145b2b3939ac9216730
MD5 b5a355d2d6b16a1739cc08a9bf397c6f
BLAKE2b-256 ab8535688654aaacb8a73a2ce137eaf8458cc5d61e2f3602113ccdd885bffb47

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page