Function to help Data Scientist work more effectively with DWH
Project description
spark-sdk: Function to help Data Scientist work more effectively with DataLake
What is it?
spark-sdk Function to help Data Scientist work more effectively with DataLake. Include different function work with spark, pyarrow
Main Features
Here are just a few of the things that spark-sdk does well:
- Get your spark with newest version update, PySpark() function to get your spark requirement.
- Easy to read and write data to data lake.
- Support user using key to encrypt or decrypt data
- Support function to work with distributed system (datalake)
Install
Binary installers for the latest released version are available at the Python Package Index (PyPI).
# with PyPI
pip install spark-sdk
Dependencies
- pyspark - Apache Spark Python API
- pyarrow - Python library for Apache Arrow
- pandas - Powerful data structures for data analysis, time series, and statistics
Installation from sources
To install spark-sdk from source you need Cython in addition to the normal dependencies above. Cython can be installed from PyPI:
pip install cython
In the spark-sdk
directory (same one where you found this file after
cloning the git repo), execute:
python setup.py install
Get Spark
import spark_sdk as ss
spark = ss.PySpark(yarn=False, num_executors=4, driver_memory='8G').spark
# Yarn mode
spark = ss.PySpark(yarn=True, driver_memory='2G', num_executors=4, executor_memory='4G').spark
# Add more spark config
spark = ss.PySpark(yarn=False, driver_memory='8G', num_executors=4, executor_memory='4G',
add_on_config1=("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog"),
add_on_config2=('spark.databricks.delta.retentionDurationCheck.enabled', 'false')
).spark
Store data to dwh
# step 1 read data
ELT_DATE = '2021-12-01'
ELT_STR = ELT_DATE[:7]
import pandas as pd
df1 = pd.read_csv('./data.csv', sep='\t')
import spark_sdk as ss
# function to store to dwh
df1.to_dwh(
hdfs_path="path/to/data/test1.parquet/", # path hdfs
partition_by="m", # column time want to partition
partition_date=ELT_DATE, # partition date
database="database_name", # database name
table_name="test1", # table name
repartition=True
)
# function to read from dwh
# method 1: using pandas
import spark_sdk as ss
import pandas as pd
pandas_df = pd.read_dwh('database.table_name', filters="""m='2022-04-01'""")
# method 2: sql
sparkDF = ss.sql("""
SELECT *
FROM database_name.table1
WHERE m='2022-04-01'
""")
df = sparkDF.toPandas()
# method 3: read_table
sparkDF = ss.read_table('database_name.table')
sparkDF = sparkDF.filter("m == '2022-04-01'")
df = sparkDF.toPandas()
# IF got error timestamp out of range
sparkDF = ss.limit_timestamp(sparkDF).toPandas()
# IF YOU WANT TO DELETE DATA
# Link hdfs_file to table
# When to drop it also delete data
ss.drop_table_and_delete_data('database_name.test1')
# IF JUST WANT TO DELETE TABLE NOT DELETE DATA
ss.drop_table('database_name.test1')
Delta format
function to store to dwh
df1.to_dwh(
# just end path with delta then table will be store in delta format
hdfs_path="path/to/data/test1.delta/",
partition_by="m", # column time want to partition
partition_date=ELT_DATE, # partition date
database="database_name", # database name
table_name="test1",
repartition=True# table name
)
# read table not read file
sparkDF = ss.sql("""
SELECT *
FROM database_name.table1
WHERE m='2022-04-01'
""")
Decrypt Data
method 1: sql
Using Spark SQL
import spark_sdk as ps
key = '1234' # contact Data Owner to get key
df = ps.sql(f"""
select fdecrypt(column_name, "{key}") column_name
from database_name.table_name
limit 50000
""").toPandas()
method 2: using pandas
Using pandas
import spark_sdk as ss
df['new_column'] = df['column'].decrypt_column(key)
## function will return dataframe with column_decrypted
method 3
Using pandas apply function
from spark_sdk import decrypt
df['decrypted'] = df['encrypted'].apply(decrypt,args=("YOUR_KEY",))
Encrypt data
import spark_sdk as ss
# function to store to dwh
df.to_dwh(
hdfs_path="path/to/data/test1.parquet/", # path hdfs
partition_by="m", # column time want to partition
partition_date=ELT_DATE, # partition date
database="database_name", # database name
table_name="test1", # table name
encrypt_columns=['column1','column2'], # list column name need to encrypt
keys_path = '/path/to/store/keys.json' # if you add encrypt_columns, you need to have keys_path to store keys
)
ss.PySpark().stop()
Create Yaml File Mapping data from CSV to DWH
from spark_sdk import CreateYamlDWH
create_yaml = CreateYamlDWH(
csv_file = 'data.csv',
hdfs_path = '/path/to/data/table_name.parquet',
sep = '\t',
database_name = 'database_name',
table_name = 'table_name',
yaml_path = '/path/to/yaml/file/'
)
create_yaml.generate_yaml_file()
Store spark.sql.DataFrame
from spark_sdk.src import spark_sdk as ss
sparkDF = ss.sql("""select * from database.table_name limit 1000""")
ELT_DATE = '2022-06-10'
sparkDF.to_dwh(
hdfs_path="path/to/data/test6.parquet/", # path hdfs
partition_by="m", # column time want to partition
partition_date=ELT_DATE, # partition date
database="database_name", # database name
table_name="test6", # table name
repartition=True,
driver_memory='4G', executor_memory='4g', num_executors='1', port='4090', yarn=True
)
Read data
# sql
import spark_sdk as ss
sparkDF = ss.sql("""select * from database.table_name limit 1000""")
# function
sparkDF = ss.read_parquet("hdfs:/path/to/file.parquet")
sparkDF = ss.read_csv("hdfs:/path/to/file.csv")
sparkDF = ss.read_json("hdfs:/path/to/file.json")
Working with hdfs
mkdir, cat, exists, info, open
list file
ss.ls('/path/data')
create new path
ss.mkdir('/path/create/')
create new path
ss.exists('/check/path/exists') # return True if exists
print info
ss.info('/path/info/')
open file like local file
import json
with ss.open('/path/to/file.json', mode='rb') as file:
data = json.load(file)
import json
from spark_sdk import Utf8Encoder
data = {'user': 1, 'code': '456'}
with ss.open('/path/to/file.json', mode='wb') as file:
json.dump(data, Utf8Encoder(file), indent=4)
json__file = ss.read_json('/path/to/file.json')
ss.write_json(data, '/path/to_file.json')
sparkDF = ss.read_csv("file:///path/to/data.csv", sep='\t') # with local file
sparkDF = ss.read_csv("/path/to/data.csv", sep='\t') # with hdfs file
#### mode in ['rb', 'wb', 'ab']
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Hashes for spark_sdk-0.4.20-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | e14100634dbe8bc8db2762ea6f549474eca648fb115ad1840aa6b18c2b5d0344 |
|
MD5 | 2a1b799244a492e48938beeb0a17cdb5 |
|
BLAKE2b-256 | 81ae17eaf33c84e4b4b866ff3aadd5dc2ad1d4682e2fe16be8532c0aa9b4b700 |