Function to help Data Scientist work more effectively with DWH
Project description
spark-sdk: Function to help Data Scientist work more effectively with DataLake
What is it?
spark-sdk Function to help Data Scientist work more effectively with DataLake. Include different function work with spark, pyarrow
Main Features
Here are just a few of the things that spark-sdk does well:
- Get your spark with newest version update, PySpark() function to get your spark requirement.
- Easy to read and write data to data lake.
- Support user using key to encrypt or decrypt data
- Support function to work with distributed system (datalake)
Install
Binary installers for the latest released version are available at the Python Package Index (PyPI).
# with PyPI
pip install spark-sdk
Dependencies
- pyspark - Apache Spark Python API
- pyarrow - Python library for Apache Arrow
- pandas - Powerful data structures for data analysis, time series, and statistics
Installation from sources
To install spark-sdk from source you need Cython in addition to the normal dependencies above. Cython can be installed from PyPI:
pip install cython
In the spark-sdk
directory (same one where you found this file after
cloning the git repo), execute:
python setup.py install
Get Spark
import spark_sdk as ss
spark = ss.PySpark(yarn=False, num_executors=4, driver_memory='8G').spark
# Yarn mode
spark = ss.PySpark(yarn=True, driver_memory='2G', num_executors=4, executor_memory='4G').spark
# Add more spark config
spark = ss.PySpark(yarn=False, driver_memory='8G', num_executors=4, executor_memory='4G',
add_on_config1=("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog"),
add_on_config2=('spark.databricks.delta.retentionDurationCheck.enabled', 'false')
).spark
Store data to dwh
# step 1 read data
ELT_DATE = '2021-12-01'
ELT_STR = ELT_DATE[:7]
import pandas as pd
df1 = pd.read_csv('./data.csv', sep='\t')
import spark_sdk as ss
# function to store to dwh
df1.to_dwh(
hdfs_path="path/to/data/test1.parquet/", # path hdfs
partition_by="m", # column time want to partition
partition_date=ELT_DATE, # partition date
database="database_name", # database name
table_name="test1", # table name
repartition=True
)
Function to read from dwh
# method 1: using pandas
import spark_sdk as ss
import pandas as pd
pandas_df = pd.read_dwh('database.table_name', filters="""m='2022-04-01'""")
# method 2: sql
sparkDF = ss.sql("""
SELECT *
FROM database_name.table1
WHERE m='2022-04-01'
""")
df = sparkDF.toPandas()
# method 3: read_table
sparkDF = ss.read_table('database_name.table')
sparkDF = sparkDF.filter("m == '2022-04-01'")
df = sparkDF.toPandas()
# IF got error timestamp out of range
sparkDF = ss.limit_timestamp(sparkDF).toPandas()
# IF YOU WANT TO DELETE DATA
# Link hdfs_file to table
# When to drop it also delete data
ss.drop_table_and_delete_data('database_name.test1')
# IF JUST WANT TO DELETE TABLE NOT DELETE DATA
ss.drop_table('database_name.test1')
Delta format
function to store to dwh
df1.to_dwh(
# just end path with delta then table will be store in delta format
hdfs_path="path/to/data/test1.delta/",
partition_by="m", # column time want to partition
partition_date=ELT_DATE, # partition date
database="database_name", # database name
table_name="test1",
repartition=True# table name
)
# read table not read file
sparkDF = ss.sql("""
SELECT *
FROM database_name.table1
WHERE m='2022-04-01'
""")
Decrypt Data
method 1: sql
Using Spark SQL
import spark_sdk as ps
key = '1234' # contact Data Owner to get key
df = ps.sql(f"""
select fdecrypt(column_name, "{key}") column_name
from database_name.table_name
limit 50000
""").toPandas()
method 2: using pandas
Using pandas
import spark_sdk as ss
df['new_column'] = df['column'].decrypt_column(key)
## function will return dataframe with column_decrypted
method 3
Using pandas apply function
from spark_sdk import decrypt
df['decrypted'] = df['encrypted'].apply(decrypt,args=("YOUR_KEY",))
Encrypt data
import spark_sdk as ss
# function to store to dwh
df.to_dwh(
hdfs_path="path/to/data/test1.parquet/", # path hdfs
partition_by="m", # column time want to partition
partition_date=ELT_DATE, # partition date
database="database_name", # database name
table_name="test1", # table name
encrypt_columns=['column1','column2'], # list column name need to encrypt
)
ss.PySpark().stop()
Create Yaml File Mapping data from CSV to DWH
from spark_sdk import CreateYamlDWH
create_yaml = CreateYamlDWH(
csv_file = 'data.csv',
hdfs_path = '/path/to/data/table_name.parquet',
sep = '\t',
database_name = 'database_name',
table_name = 'table_name',
yaml_path = '/path/to/yaml/file/'
)
create_yaml.generate_yaml_file()
Store spark.sql.DataFrame
import spark_sdk as ss
sparkDF = ss.sql("""select * from database.table_name limit 1000""")
ELT_DATE = '2022-06-10'
sparkDF.to_dwh(
hdfs_path="path/to/data/test6.parquet/", # path hdfs
partition_by="m", # column time want to partition
partition_date=ELT_DATE, # partition date
database="database_name", # database name
table_name="test6", # table name
repartition=True,
driver_memory='4G', executor_memory='4g', num_executors='1', port='4090', yarn=True
)
Read data
# sql
import spark_sdk as ss
sparkDF = ss.sql("""select * from database.table_name limit 1000""")
# function
sparkDF = ss.read_parquet("hdfs:/path/to/file.parquet")
sparkDF = ss.read_csv("hdfs:/path/to/file.csv")
sparkDF = ss.read_json("hdfs:/path/to/file.json")
Working with hdfs
mkdir, cat, exists, info, open
list file
ss.ls('/path/data')
create new path
ss.mkdir('/path/create/')
create new path
ss.exists('/check/path/exists') # return True if exists
print info
ss.info('/path/info/')
open file like local file
import json
with ss.open('/path/to/file.json', mode='rb') as file:
data = json.load(file)
import json
from spark_sdk import Utf8Encoder
data = {'user': 1, 'code': '456'}
with ss.open('/path/to/file.json', mode='wb') as file:
json.dump(data, Utf8Encoder(file), indent=4)
json__file = ss.read_json('/path/to/file.json')
ss.write_json(data, '/path/to_file.json')
sparkDF = ss.read_csv("file:///path/to/data.csv", sep='\t') # with local file
sparkDF = ss.read_csv("/path/to/data.csv", sep='\t') # with hdfs file
#### mode in ['rb', 'wb', 'ab']
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
File details
Details for the file spark_sdk-0.5.6rc8-py3-none-any.whl
.
File metadata
- Download URL: spark_sdk-0.5.6rc8-py3-none-any.whl
- Upload date:
- Size: 36.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.8.8
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 495dbf1f64712d6947b46dc307a5222a675735be48cb3e4f863f593efd9e8d7e |
|
MD5 | a6d218ca3926aad725c3c24f326e64b4 |
|
BLAKE2b-256 | 085885cb0a388149bcf2adbcab187753db4a54b08a4af5681cc238ec54d0ee14 |