Skip to main content

Function to help Data Scientist work more effectively with DWH

Project description

Install package

pip install spark-sdk --upgrade

Store data to dwh

# step 1 read data
ELT_DATE = '2021-12-01'
ELT_STR = ELT_DATE[:7]
import pandas as pd
df1 = pd.read_csv('./data.csv', sep='\t')


import spark_sdk as ss

# function to store to dwh
df1.to_dwh(    
    hdfs_path="path/to/data/test1.parquet/", # path hdfs
    partition_by="m", # column time want to partition
    partition_date=ELT_DATE, # partition date
    database="database_name", # database name
    table_name="test1" # table name
)


# function to read from dwh
# method 1: using pandas
import spark_sdk as ss
import pandas as pd

pandas_df = pd.read_dwh('database.table_name', filters="""m='2022-04-01'""")

# method 2: sql
sparkDF = ss.sql("""
SELECT *
FROM database_name.table1
WHERE m='2022-04-01'
""")

df = sparkDF.toPandas()


# method 3: read_table
sparkDF = ss.read_table('database_name.table')
sparkDF = sparkDF.filter("m == '2022-04-01'")

df = sparkDF.toPandas()


# IF YOU WANT TO DELETE DATA
# Link hdfs_file to table
# When to drop it also delete data
ss.drop_table_and_delete_data('database_name.test1')

# IF JUST WANT TO DELETE TABLE NOT DELETE DATA
ss.drop_table('database_name.test1')

Decrypt Data

method 1: sql

Using Spark SQL

import spark_sdk as ps

key = '1234' # contact Data Owner to get key

df = ps.sql(f"""
select fdecrypt(column_name, "{key}") column_name
from database_name.table_name
limit 50000
""").toPandas()

method 2: using pandas

Using pandas

import spark_sdk as ss
df['new_column'] = df['column'].decrypt_column(key)

# function will return dataframe with column_decrypted

method 3

Using pandas apply function

from spark_sdk import decrypt
df['decrypted'] = df['encrypted'].apply(decrypt,args=("YOUR_KEY",))

Encrypt data

import spark_sdk as ss


# function to store to dwh
df.to_dwh(    
    hdfs_path="path/to/data/test1.parquet/", # path hdfs
    partition_by="m", # column time want to partition
    partition_date=ELT_DATE, # partition date
    database="database_name", # database name
    table_name="test1", # table name
    encrypt_columns=['column1','column2'], # list column name need to encrypt
    keys_path = '/path/to/store/keys.json' # if you add encrypt_columns, you need to have keys_path to store keys
)

ss.PySpark().stop()

Create Yaml File Mapping data from CSV to DWH

from spark_sdk import CreateYamlDWH
create_yaml = CreateYamlDWH(
csv_file = 'data.csv',
hdfs_path = '/path/to/data/table_name.parquet',
sep = '\t',
database_name = 'database_name',
table_name = 'table_name',
yaml_path = '/path/to/yaml/file/'
)

create_yaml.generate_yaml_file()

Store spark.sql.DataFrame

from spark_sdk.src import spark_sdk as ss
sparkDF = ss.sql("""select * from database.table_name limit 1000""")

ELT_DATE = '2022-06-10'
sparkDF.to_dwh(
    hdfs_path="path/to/data/test6.parquet/", # path hdfs
    partition_by="m", # column time want to partition
    partition_date=ELT_DATE, # partition date
    database="database_name", # database name
    table_name="test6", # table name
    driver_memory='4G', executor_memory='4g', num_executors='1', port='4090', yarn=True
)

Working with hdfs

mkdir, cat, exists, info, open

# list file
ss.ls('/path/data')

# create new path 
ss.mkdir('/path/create/')

# create new path 
ss.exists('/check/path/exists') # return True if exists

# print info
ss.info('/path/info/')

# open file like normal file
import json
with ss.open('/path/to/file.json', mode='rb') as file:
    data = json.load(file)

import json
from spark_sdk import Utf8Encoder
data = {'user': 1, 'code': '456'}
with ss.open('/path/to/file.json', mode='wb') as file:
    json.dump(data, Utf8Encoder(file), indent=4)


ss.read_json('/path/to/file.json')
ss.write_json(data, '/path/to_file.json')

sparkDF = ss.read_csv("file:///path/to/data.csv", sep='\t') # with local file
sparkDF = ss.read_csv("/path/to/data.csv", sep='\t') # with hdfs file

# mode in ['rb', 'wb', 'ab']

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

spark_sdk-0.3.28-py3-none-any.whl (19.5 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page