Skip to main content

Combination of tools that allow more convenient use of PySpark within Azure DataBricks environment.

Project description

# Spark Safe Delta
Combination of tools that allow more convenient use of PySpark within Azure DataBricks environment.

## I. Package contents:
### 1.delta_write_safe
Tool that allows to automatically update schema of DataBricks Delta in case of Changes in data structure

### 2.write_data_mysql
Method writes data into MySQL and takes care of repartitioning in case if it's necessary.

Dependencies:

1. MySQL connector Java 8_0_13
dbfs:/FileStore/jars/7b863f06_67cf_4a51_8f3b_67d414d808b3-Barnymysql_connector_java_8_0_13_4ac45-2f7c7.jar

http://dev.mysql.com/doc/connector-j/en/
https://mvnrepository.com/artifact/mysql/mysql-connector-java

By default, it relies on constant variables outside of method that define MySQL credentials, that can be also specified as a parameters:

* MYSQL_URL
* MYSQL_DRIVER
* MYSQL_USER
* MYSQL_PASSWORD
* MYSQL_SSL_CA_PATH
* MYSQL_QUERY_TIMEOUT

Method Parameters:

* p_spark_dataframe - dataframe to write
* p_mysql_db_name - name of database to write to
* p_mysql_table_name - name of table to write to
* p_num_partitions - amount of partitions, if -1, runs with default amount of partitions defined in spark environment or specific delta

Method default parameters:

p_num_partitions=-1
url=MYSQL_URL,
driver=MYSQL_DRIVER,
user=MYSQL_USER,
password=MYSQL_PASSWORD,
ssl_ca=MYSQL_SSL_CA_PATH,
queryTimeout=MYSQL_QUERY_TIMEOUT

Usage example:

#MySQL settings defined outside of a method below:
MYSQL_DRIVER = "com.mysql.jdbc.Driver"
MYSQL_URL = "jdbc:mysql://hostname:port/database?useUnicode=true&characterEncoding=utf-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false"
MYSQL_QUERY_TIMEOUT = 0

MYSQL_USER = "user@namespace"
MYSQL_PASSWORD = "example_password"
MYSQL_SSL_CA_PATH = "/mnt/alex-experiments-blob/certs/cert.txt"

#Method execution itself
write_data_mysql(p_spark_dataframe=target_data, p_mysql_dbtable=destination_db_name_column_name_construct)

### 3.remove_columns

remove_columns() method removes columns from a specified dataframe.
It will silently return a result even if user specifies column that doesn't exist.
Usage example: destination_df = remove_columns(source_df, "SequenceNumber;Body;Non-existng-column")

### 4.read_mysql

Method allows fetch the table, or a query as a Spark DataFrame.
Returnws Spark DataFrame as a result.

# Example usage:
read_mysql(table_name=customers)
read_mysql(table_name=h2.customers)
read_mysql(table_name=h2.customers, url=MYSQL_URL, driver=MYSQL_DRIVER, user=MYSQL_USER, password=MYSQL_PASSWORD, ssl_ca=MYSQL_SSL_CA_PATH, queryTimeout=MYSQL_QUERY_TIMEOUT)

### 4.list_available_mysql_tables

Method allows to list all the tables that available to a particular user.
Returnws Spark DataFrame as a result


## Package sample usage:

#!/usr/bin/env python

from sparksafedelta import sparksafedelta
sparksafedelta.delta_write_safe(sp_df_to_write, SP_CONTEXT, DATABRICKS_TABLE_NAME)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

SparkSafeDelta-0.3.4.tar.gz (3.2 kB view details)

Uploaded Source

File details

Details for the file SparkSafeDelta-0.3.4.tar.gz.

File metadata

  • Download URL: SparkSafeDelta-0.3.4.tar.gz
  • Upload date:
  • Size: 3.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.4.2 requests/2.21.0 setuptools/40.6.3 requests-toolbelt/0.9.1 tqdm/4.28.1 CPython/3.7.1

File hashes

Hashes for SparkSafeDelta-0.3.4.tar.gz
Algorithm Hash digest
SHA256 1218e523df5a0c72113020b9aafcdbfcc4a56e0924b0826226e10d89b4af2300
MD5 da36bec05f58a6407e6fa67b88198b1f
BLAKE2b-256 c51f09f67396bfd25ec9a663033df4dde2b9a16371f2c5a62bd79a3b92b84d12

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page