PIT join library for PySpark
Project description
Spark-PIT - Utility library for PIT-joins in PySpark
Description
This projects aims to expose different ways of executing PIT-joins, also called ASOF-joins, in PySpark. This project is created as a part of a research project to evaluate different ways of executing Spark PIT joins.
Prerequisite
In order to run this project in PySpark, you will need to have the JAR file of the Scala implementation be available inside you Spark Session.
Quickstart
1. Creating the context
The object PitContext
is the entrypoint for all of the functionality of the lirary. You can initialize this context with the following code:
from pyspark import SQLContext
from ackuq.pit import PitContext
sql_context = SQLContext(spark.sparkContext)
pit_context = PitContext(sql_context)
2. Performing a PIT join
There are currently 3 ways of executing a PIT join, using an early stop sort merge, union asof algorithm, or with exploding intermediate tables.
2.1. Early stop sort merge
pit_join = df1.join(df2, pit_context.pit_udf(df1.ts, df2.ts) & (df1.id == df2.id))
2.1.2. Adding tolerance
In this implementation, tolerance can be added to not allow matches whose timestamp differ by at most some value. To utilize this, set the third argument of the UDF to the desired integer value of the maximum different between two timestamps.
pit_join = df1.join(df2, pit_context.pit_udf(df1.ts, df2.ts, 100) & (df1.id == df2.id))
2.1.3. Left outer join
Left outer joins are supported in this implementation, the main difference between a regular inner join and a left outer join is that whether or not a left row gets matched with a right row, it will still be a part of the resulting table. In the resulting table, all the left rows that did not find a match have the values of the right columns set to null
.
pit_join = df1.join(
df2,
pit_context.pit_udf(df1.ts, df2.ts, 100) & (df1.id == df2.id),
"left"
)
2.2. Union merge
pit_join = pit_context.union(
left=df1,
right=df2,
left_prefix="df1_",
right_prefix="df2_",
left_ts_column = "ts",
right_ts_column = "ts",
partition_cols=["id"],
)
2.3. Exploding PIT join
pit_join = pit_context.exploding(
left=df1,
right=df2,
left_ts_column=df1["ts"],
right_ts_column=df2["ts"],
partition_cols = [df1["id"], df2["id"]],
)
Development
Testing
To run the tests for this package, you must first package the Scala package to a JAR file and export its path as an environment variable:
export SCALA_PIT_JAR=<PATH_TO_JAR_FILE>
To run all the tests, run the following command in the Python directory:
python -m unittest discover -s tests
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file spark-pit-0.5.0.tar.gz
.
File metadata
- Download URL: spark-pit-0.5.0.tar.gz
- Upload date:
- Size: 5.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.0 CPython/3.10.1
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 47f68ba1a7b70f8c66bbba4920284f4a5adf6805ec3f4a22e4d18d1a4a0c78ad |
|
MD5 | 9052587631ac21fb0ba84183d82326fb |
|
BLAKE2b-256 | be4a8511920ad344d0521a5ba9c6c4091cc343023d8d89433522f36d455ab892 |
File details
Details for the file spark_pit-0.5.0-py3-none-any.whl
.
File metadata
- Download URL: spark_pit-0.5.0-py3-none-any.whl
- Upload date:
- Size: 5.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.0 CPython/3.10.1
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9529241d53b0abdb57e4651251ac4ac6980ada2ba101bde48390df027f84c08c |
|
MD5 | 5a7b95d9ef7efd75f9e73fa753c57c8c |
|
BLAKE2b-256 | 81a9bf792df8d108a596a9d0284ec6fa0fa677864a96971d86a0935abec44ba4 |