A small example package
Project description
Data transformations with Python
This is a collection of Python jobs that are supposed to transform data.
These jobs are using PySpark to process larger volumes of data and are supposed to run on a Spark cluster (via spark-submit
).
Pre-requisites
Please make sure you have the following installed and can run them
- Python (3.6 or later)
- Pipenv
- Java (1.8 or later)
Install all dependencies
pipenv install
Run tests
Run unit tests
pipenv run unit-test
Run integration tests
pipenv run integration-test
Create .egg package
pipenv run packager
Use linter
pipenv run linter
Jobs
Sample
A Hello World Program
There is a dump of the datalake for this under resources/word_count/words.txt
with a text file.
Input
Simple *.txt
file containing text.
Output
A single *.csv
file containing data similar to:
"word","count"
"a","3"
"an","5"
...
Run the job
Please make sure to package the code before submitting the spark job (pipenv run packager
)
pipenv run spark-submit \
--master local \
--py-files dist/data_transformations-0.1.0-py3.6.egg \
jobs/word_count.py \
<INPUT_FILE_PATH> \
<OUTPUT_PATH>
Citibike
For analytics purposes the BI department of a bike share company would like to present dashboards, displaying the
distance each bike was driven. There is a *.csv
file that contains historical data of previous bike rides. This input
file needs to be processed in multiple steps. There is a pipeline running these jobs.
There is a dump of the datalake for this under resources/citibike/citibike.csv
with historical data.
Ingest
Reads a *.csv
file and transforms it to parquet format. The column names will be sanitized (whitespaces replaced).
Input
Historical bike ride *.csv
file:
"tripduration","starttime","stoptime","start station id","start station name","start station latitude",...
364,"2017-07-01 00:00:00","2017-07-01 00:06:05",539,"Metropolitan Ave & Bedford Ave",40.71534825,...
...
Output
*.parquet
files containing the same content
"tripduration","starttime","stoptime","start_station_id","start_station_name","start_station_latitude",...
364,"2017-07-01 00:00:00","2017-07-01 00:06:05",539,"Metropolitan Ave & Bedford Ave",40.71534825,...
...
Run the job
Please make sure to package the code before submitting the spark job (pipenv run packager
)
pipenv run spark-submit \
--master local \
--py-files dist/data_transformations-0.1.0-py3.6.egg \
jobs/citibike_ingest.py \
<INPUT_FILE_PATH> \
<OUTPUT_PATH>
Distance calculation
This job takes bike trip information and calculates the "as the crow flies" distance traveled for each trip. It reads the previously ingested data parquet files.
Hint:
- For distance calculation, consider using Harvesine formula as an option.
Input
Historical bike ride *.parquet
files
"tripduration",...
364,...
...
Outputs
*.parquet
files containing historical data with distance column containing the calculated distance.
"tripduration",...,"distance"
364,...,1.34
...
Run the job
Please make sure to package the code before submitting the spark job (pipenv run packager
)
pipenv run spark-submit \
--master local \
--py-files dist/data_transformations-0.1.0-py3.6.egg \
jobs/citibike_distance_calculation.py \
<INPUT_PATH> \
<OUTPUT_PATH>
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for Demo_example_pkg-0.0.1-py2-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 63858ed769669f642decb1370f072bb6bb5feeeec9ba84ca2c1dce28d3b55d88 |
|
MD5 | e4a41c1a96f7df70e3b53a63bd98380f |
|
BLAKE2b-256 | 86d062b3546ddafd52d535cacbe3a2f43051fb40ef108aef798ac124e7a02245 |