Skip to main content

Function to help interactive with CADSHOUSE

Project description


spark-sdk: Function to help Data Scientist work more effectively with DataLake

PyPI Latest Release Package Status Downloads Powered by NumFOCUS

What is it?

spark-sdk Function to help Data Scientist work more effectively with DataLake. Include different function work with spark, pyarrow

Main Features

Here are just a few of the things that spark-sdk does well:

  • Get your spark with newest version update, PySpark() function to get your spark requirement.
  • Easy to read and write data to data lake.
  • Support user using key to encrypt or decrypt data
  • Support function to work with distributed system (datalake)

Install

Binary installers for the latest released version are available at the Python Package Index (PyPI).

# with PyPI
pip install spark-sdk

Dependencies

Installation from sources

To install spark-sdk from source you need Cython in addition to the normal dependencies above. Cython can be installed from PyPI:

pip install cython

In the spark-sdk directory (same one where you found this file after cloning the git repo), execute:

python setup.py install

Get Spark

import spark_sdk as ss
spark = ss.PySpark(yarn=False, num_executors=4, driver_memory='8G').spark

# Yarn mode
spark = ss.PySpark(yarn=True, driver_memory='2G', num_executors=4, executor_memory='4G').spark

# Add more spark config
spark = ss.PySpark(yarn=False, driver_memory='8G', num_executors=4, executor_memory='4G',
                  add_on_config1=("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog"),
                  add_on_config2=('spark.databricks.delta.retentionDurationCheck.enabled', 'false')
                  ).spark

Store data to dwh

# step 1 read data
ELT_DATE = '2021-12-01'
ELT_STR = ELT_DATE[:7]
import pandas as pd
df1 = pd.read_csv('./data.csv', sep='\t')


import spark_sdk as ss

# function to store to dwh
df1.to_dwh(    
    hdfs_path="path/to/data/test1.parquet/", # path hdfs
    partition_by="m", # column time want to partition
    partition_date=ELT_DATE, # partition date
    database="database_name", # database name
    table_name="test1", # table name
    repartition=True
)

Function to read from dwh

# method 1: using pandas
import spark_sdk as ss
import pandas as pd

pandas_df = pd.read_dwh('database.table_name', filters="""m='2022-04-01'""")

# method 2: sql
sparkDF = ss.sql("""
SELECT *
FROM database_name.table1
WHERE m='2022-04-01'
""")

df = sparkDF.toPandas()


# method 3: read_table
sparkDF = ss.read_table('database_name.table')
sparkDF = sparkDF.filter("m == '2022-04-01'")

df = sparkDF.toPandas()


# IF got error timestamp out of range
sparkDF = ss.limit_timestamp(sparkDF).toPandas()

# IF YOU WANT TO DELETE DATA
# Link hdfs_file to table
# When to drop it also delete data
ss.drop_table_and_delete_data('database_name.test1')

# IF JUST WANT TO DELETE TABLE NOT DELETE DATA
ss.drop_table('database_name.test1')

Delta format

function to store to dwh

df1.to_dwh(
    # just end path with delta then table will be store in delta format
    hdfs_path="path/to/data/test1.delta/", 
    partition_by="m", # column time want to partition
    partition_date=ELT_DATE, # partition date
    database="database_name", # database name
    table_name="test1",
    repartition=True# table name
)

# read table not read file
sparkDF = ss.sql("""
SELECT *
FROM database_name.table1
WHERE m='2022-04-01'
""")

Decrypt Data

method 1: sql

Using Spark SQL

import spark_sdk as ps

key = '1234' # contact Data Owner to get key

df = ps.sql(f"""
select fdecrypt(column_name, "{key}") column_name
from database_name.table_name
limit 50000
""").toPandas()

method 2: using pandas

Using pandas

import spark_sdk as ss
df['new_column'] = df['column'].decrypt_column(key)

## function will return dataframe with column_decrypted

method 3

Using pandas apply function

from spark_sdk import decrypt
df['decrypted'] = df['encrypted'].apply(decrypt,args=("YOUR_KEY",))

Encrypt data

import spark_sdk as ss


# function to store to dwh
df.to_dwh(    
    hdfs_path="path/to/data/test1.parquet/", # path hdfs
    partition_by="m", # column time want to partition
    partition_date=ELT_DATE, # partition date
    database="database_name", # database name
    table_name="test1", # table name
    encrypt_columns=['column1','column2'], # list column name need to encrypt
)

ss.PySpark().stop()

Create Yaml File Mapping data from CSV to DWH

from spark_sdk import CreateYamlDWH
create_yaml = CreateYamlDWH(
csv_file = 'data.csv',
hdfs_path = '/path/to/data/table_name.parquet',
sep = '\t',
database_name = 'database_name',
table_name = 'table_name',
yaml_path = '/path/to/yaml/file/'
)

create_yaml.generate_yaml_file()

Store spark.sql.DataFrame

import spark_sdk as ss
sparkDF = ss.sql("""select * from database.table_name limit 1000""")

ELT_DATE = '2022-06-10'
sparkDF.to_dwh(
    hdfs_path="path/to/data/test6.parquet/", # path hdfs
    partition_by="m", # column time want to partition
    partition_date=ELT_DATE, # partition date
    database="database_name", # database name
    table_name="test6", # table name
    repartition=True,
    driver_memory='4G', executor_memory='4g', num_executors='1', port='4090', yarn=True
)

Read data

# sql
import spark_sdk as ss
sparkDF = ss.sql("""select * from database.table_name limit 1000""")

# function
sparkDF = ss.read_parquet("hdfs:/path/to/file.parquet")
sparkDF = ss.read_csv("hdfs:/path/to/file.csv")
sparkDF = ss.read_json("hdfs:/path/to/file.json")

Working with hdfs

mkdir, cat, exists, info, open

list file

ss.ls('/path/data')

create new path

ss.mkdir('/path/create/')

create new path

ss.exists('/check/path/exists') # return True if exists

print info

ss.info('/path/info/')

open file like local file

import json
with ss.open('/path/to/file.json', mode='rb') as file:
    data = json.load(file)
import json
from spark_sdk import Utf8Encoder
data = {'user': 1, 'code': '456'}
with ss.open('/path/to/file.json', mode='wb') as file:
    json.dump(data, Utf8Encoder(file), indent=4)
    
    
json__file = ss.read_json('/path/to/file.json')
ss.write_json(data, '/path/to_file.json')

sparkDF = ss.read_csv("file:///path/to/data.csv", sep='\t') # with local file
sparkDF = ss.read_csv("/path/to/data.csv", sep='\t') # with hdfs file

#### mode in ['rb', 'wb', 'ab']

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cphoenix-1.0.2-py3-none-any.whl (83.2 MB view details)

Uploaded Python 3

File details

Details for the file cphoenix-1.0.2-py3-none-any.whl.

File metadata

  • Download URL: cphoenix-1.0.2-py3-none-any.whl
  • Upload date:
  • Size: 83.2 MB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/2.0.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/40.6.2 requests-toolbelt/0.9.1 tqdm/4.36.1 CPython/3.6.8

File hashes

Hashes for cphoenix-1.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 733400e61f294be103cce8ab22ff8f25ba764cf1336c37f8bcf9ea30f7cbca87
MD5 0bcb5ba655da49b01cb4b9bcfa6d56bf
BLAKE2b-256 6706afb8deea40af7f5b66748f72c684c64383dab19144419d38cffee2007cb0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page