Optimus is the missing framework for cleaning and pre-processing data in a distributed fashion with pyspark.
Project description
To launch a live notebook server to test optimus using binder or Colab, click on one of the following badges:
Optimus is the missing framework to profile, clean, process and do ML in a distributed fashion using Apache Spark(PySpark).
Installation (pip):
In your terminal just type pip install optimuspyspark
Requirements
- Apache Spark>= 2.4.0
- Python>=3.6
Examples
You can go to the 10 minutes to Optimus notebook where you can find the basic to start working.
Also you can go to the examples folder to found specific notebooks about data cleaning, data munging, profiling, data enrichment and how to create ML and DL models.
Besides check the Cheat Sheet
Documentation
Feedback
Feedback is what drive Optimus future, so please take a couple of minutes to help shape the Optimus' Roadmap: https://optimusdata.typeform.com/to/aEnYRY
Also if you want to a suggestion or feature request use https://github.com/ironmussa/optimus/issues
Start Optimus
</code></pre>
<pre lang="python"><code>
from optimus import Optimus
op= Optimus()
You also can use an already created Spark session:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('optimus').getOrCreate()
op= Optimus(spark)
Loading data
Now Optimus can load data in csv, json, parquet, avro, excel from a local file or URL.
#csv
df = op.load.csv("examples/data/foo.csv")
#json
# Use a local file
df = op.load.json("examples/data/foo.json")
# Use a url
df = op.load.json("https://raw.githubusercontent.com/ironmussa/Optimus/master/examples/data/foo.json")
# parquet
df = op.load.parquet("examples/data/foo.parquet")
# avro
# df = op.load.avro("examples/data/foo.avro").table(5)
# excel
df = op.load.excel("examples/data/titanic3.xls")
Also you can create a dataframe from scratch
from pyspark.sql.types import *
from datetime import date, datetime
df = op.create.df(
[
("names", "str", True),
("height(ft)","int", True),
("function", "str", True),
("rank", "int", True),
("age","int",True),
("weight(t)","float",True),
("japanese name", ArrayType(StringType()), True),
("last position seen", "str", True),
("date arrival", "str", True),
("last date seen", "str", True),
("attributes", ArrayType(FloatType()), True),
("DateType"),
("Tiemstamp"),
("Cybertronian", "bool", True),
("NullType", "null", True),
],
[
("Optim'us", 28, "Leader", 10, 5000000, 4.3, ["Inochi", "Convoy"], "19.442735,-99.201111", "1980/04/10",
"2016/09/10", [8.5344, 4300.0], date(2016, 9, 10), datetime(2014, 6, 24), True,
None),
("bumbl#ebéé ", 17, "Espionage", 7, 5000000, 2.0, ["Bumble", "Goldback"], "10.642707,-71.612534", "1980/04/10",
"2015/08/10", [5.334, 2000.0], date(2015, 8, 10), datetime(2014, 6, 24), True,
None),
("ironhide&", 26, "Security", 7, 5000000, 4.0, ["Roadbuster"], "37.789563,-122.400356", "1980/04/10",
"2014/07/10", [7.9248, 4000.0], date(2014, 6, 24), datetime(2014, 6, 24), True,
None),
("Jazz", 13, "First Lieutenant", 8, 5000000, 1.80, ["Meister"], "33.670666,-117.841553", "1980/04/10",
"2013/06/10", [3.9624, 1800.0], date(2013, 6, 24), datetime(2014, 6, 24), True, None),
("Megatron", None, "None", 10, 5000000, 5.70, ["Megatron"], None, "1980/04/10", "2012/05/10", [None, 5700.0],
date(2012, 5, 10), datetime(2014, 6, 24), True, None),
("Metroplex_)^$", 300, "Battle Station", 8, 5000000, None, ["Metroflex"], None, "1980/04/10", "2011/04/10",
[91.44, None], date(2011, 4, 10), datetime(2014, 6, 24), True, None),
], infer_schema = True).h_repartition(1)
With .table() you hace a beautifull way to show your data. You have extra informacion like column number, column data type and marked white spaces
df.table()
Cleaning and Processing
Optimus V2 was created to make data cleaning a breeze. The API was designed to be super easy to newcomers and very familiar for people that comes from Pandas. Optimus expand the Spark DataFrame functionality adding .rows and .cols attributes.
For example you can load data from a url, transform and apply some predefined cleaning functions:
# This is a custom function
def func(value, arg):
return "this was a number"
new_df = df\
.rows.sort("rank","desc")\
.withColumn('new_age', df.age)\
.cols.lower(["names","function"])\
.cols.date_transform("date arrival", "yyyy/MM/dd", "dd-MM-YYYY")\
.cols.years_between("date arrival", "dd-MM-YYYY", output_cols = "from arrival")\
.cols.remove_accents("names")\
.cols.remove_special_chars("names")\
.rows.drop(df["rank"]>8)\
.cols.rename(str.lower)\
.cols.trim("*")\
.cols.unnest("japanese name", output_cols="other names")\
.cols.unnest("last position seen",separator=",", output_cols="pos")\
.cols.drop(["last position seen", "japanese name","date arrival", "cybertronian", "nulltype"])
# .cols.apply_by_dtypes("product",func=func, func_return_type="string", data_type="integer")\
# .cols.replace("product","taaaccoo","taco")\
# .cols.replace("product",["piza","pizzza"],"pizza")\
You transform this
df.table()
Into this
new_df.table()
Note that you can use Optimus functions and Spark functions(.WithColumn()
) and all the df function availables in a Spark Dataframe at the same time. To know about all the Optimus functionality please go to this notebooks
Custom functions
Spark have multiple ways to transform your data like rdd, Column Expression ,udf and pandas udf. In Optimus we create the apply()
and apply_expr
which handle all the implementation complexity.
Here you apply a function to the "billingid" column. Sum 1 and 2 to the current column value. All powered by Pandas UDF
def func(value, args):
return value + args[0] + args[1]
df.cols.apply("height(ft)",func,"int", [1,2]).table()
If you want to apply a Column Expression use apply_expr()
like this. In this case we pasa an argument 10 to divide the actual column value
from pyspark.sql import functions as F
def func(col_name, args):
return F.col(col_name)/20
df.cols.apply("height(ft)", func=func, args=20).table()
You can change the table output back to ascii if you which
op.output("ascii")
To return to HTML just:
op.output("html")
Data profiling
Optimus comes with a powerful and unique data profiler. Besides basic and advance stats like min, max, kurtosis, mad etc,
it also let you know what type of data has every column. For example if a string column have string, integer, float, bool, date Optimus can give you an unique overview about your data.
Just run df.profile("*")
to profile all the columns. For more info about the profiler please go to this notebook.
Let's load a "big" dataset
df = op.load.csv("https://raw.githubusercontent.com/ironmussa/Optimus/master/examples/data/Meteorite_Landings.csv").h_repartition()
op.profiler.run(df, "name", infer=False)
</code></pre>
<p><img src="images/profiler.png" alt="" /></p>
<p>For dates data types Optimus can give you extra data</p>
<pre lang="python"><code>op.profiler.run(df, "year", infer=True)
</code></pre>
<p><img src="images/profiler1.png" alt="" /></p>
<h2>Plots</h2>
<p>Besides histograms, frequency plots you also have scatter plots and box plots. All powered by Apache by pyspark</p>
<pre lang="python"><code>df = op.load.excel("examples/data/titanic3.xls")
df = df.rows.drop_na(["age","fare"])
df.plot.scatter(["fare", "age"], buckets=30)
df.plot.box("age")
df.plot.correlation(["age","fare","survived"])
Outliers
Get the ouliers using iqr
df.outliers.iqr("age").select().table()
Remove the outliers using iqr
df.outliers.iqr("age").drop().table()
You can also use z_score, modified_z_score or mad
df.outliers.z_score("age", threshold=2).drop()
df.outliers.modified_z_score("age", threshold = 2 ).drop()
df.outliers.mad("age", threshold = 2).drop()
Database connection
Optimus have handy tools to connect to databases and extract informacion. Optimus can handle Redshift, postgres and mysql
#Put your db credentials here
db = op.connect(
db_type="redshift",
url="iron.******.us-east-1.redshift.amazonaws.com",
database= "******",
user= "******",
password = "******",
port="5439")
#Show all tables names
db.tables()
#Show a summary of every table
db.table.show("*",20)
#Get a table as dataframe
db.table_to_df("tablename")
Data enrichment
You can connect to any external API to enrich your data using Optimus. Optimus use MongoDB to download the data and then merge it with the Spark Dataframe. You need to install MongoDB
Let's load a tiny dataset we can enrich
df = op.load.json("https://raw.githubusercontent.com/ironmussa/Optimus/master/examples/data/foo.json")
import requests
def func_request(params):
# You can use here whatever header or auth info you need to send.
# For more information see the requests library
url= "https://jsonplaceholder.typicode.com/todos/" + str(params["id"])
return requests.get(url)
def func_response(response):
# Here you can parse de response
return response["title"]
e = op.enrich()
df_result = e.run(df, func_request, func_response, calls= 60, period = 60, max_tries = 8)
df_result.table()
Machine Learning
Machine Learning is one of the last steps, and the goal for most Data Science WorkFlows.
Apache Spark created a library called MLlib where they coded great algorithms for Machine Learning. Now with the ML library we can take advantage of the Dataframe API and its optimization to create easily Machine Learning Pipelines.
Even though this task is not extremely hard, is not easy. The way most Machine Learning models work on Spark are not straightforward, and they need lots feature engineering to work. That's why we created the feature engineering section inside Optimus.
One of the best "tree" models for machine learning is Random Forest. What about creating a RF model with just one line? With Optimus is really easy.
df_cancer =op.load.csv("https://raw.githubusercontent.com/ironmussa/Optimus/master/tests/data_cancer.csv")
columns = ['diagnosis', 'radius_mean', 'texture_mean', 'perimeter_mean', 'area_mean', 'smoothness_mean',
'compactness_mean', 'concavity_mean', 'concave points_mean', 'symmetry_mean',
'fractal_dimension_mean']
df_predict, rf_model = op.ml.random_forest(df_cancer, columns, "diagnosis")
This will create a DataFrame with the predictions of the Random Forest model.
So lets see the prediction compared with the actual label:
df_predict.cols.select(["label","prediction"]).table()
The rf_model variable contains the Random Forest model for analysis.
Contributing to Optimus
Contributions go far beyond pull requests and commits. We are very happy to receive any kind of contributions
including:
- Documentation updates, enhancements, designs, or bugfixes.
- Spelling or grammar fixes.
- README.md corrections or redesigns.
- Adding unit, or functional tests
- Triaging GitHub issues -- especially determining whether an issue still persists or is reproducible.
- Searching #optimusdata on twitter and helping someone else who needs help.
- Blogging, speaking about, or creating tutorials about Optimus and its many features.
- Helping others in our optimus gitter channel.
Backers
[Become a backer] and get your image on our README on Github with a link to your site.
Sponsors
[Become a sponsor] and get your image on our README on Github with a link to your site.
Optimus for Spark 1.6.x
Optimus main stable branch will work now for Spark 2.3.1 The 1.6.x version is now under maintenance, the last tag release for this Spark version is the 0.4.0. We strongly suggest that you use the >2.x version of the framework because the new improvements and features will be added now on this version.
Core Team
Argenis Leon and Favio Vazquez
Contributors:
Here is the amazing people that make Optimus possible:
License:
Apache 2.0 © Iron
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for optimuspyspark-2.2.4-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | e4e493ac3fd1349b5dc8cc8bd9bbc9ed572c2ed0076102b10cba941a583b23ca |
|
MD5 | 087c21f7659de849220066cae62a39a5 |
|
BLAKE2b-256 | 0ab3a3a6c930725dbd3b9a91d7da0f4f03f7d4c106a5404ae3d3d130de30e034 |