Skip to main content

PIT join library for PySpark

Project description

Spark-PIT - Utility library for PIT-joins in PySpark

Description

This projects aims to expose different ways of executing PIT-joins, also called ASOF-joins, in PySpark. This project is created as a part of a research project to evaluate different ways of executing Spark PIT joins.

Prerequisite

In order to run this project in PySpark, you will need to have the JAR file of the Scala implementation be available inside you Spark Session.

Quickstart

1. Creating the context

The object PitContext is the entrypoint for all of the functionality of the lirary. You can initialize this context with the following code:

from pyspark import SQLContext
from ackuq.pit import PitContext

sql_context = SQLContext(spark.sparkContext)
pit_context = PitContext(sql_context)

2. Performing a PIT join

There are currently 3 ways of executing a PIT join, using an early stop sort merge, union asof algorithm, or with exploding intermediate tables.

2.1. Early stop sort merge

pit_join = df1.join(df2,  pit_context.pit_udf(df1.ts, df2.ts) & (df1.id == df2.id))
2.1.2. Adding tolerance

In this implementation, tolerance can be added to not allow matches whose timestamp differ by at most some value. To utilize this, set the third argument of the UDF to the desired integer value of the maximum different between two timestamps.

pit_join = df1.join(df2,  pit_context.pit_udf(df1.ts, df2.ts, 100) & (df1.id == df2.id))
2.1.3. Left outer join

Left outer joins are supported in this implementation, the main difference between a regular inner join and a left outer join is that whether or not a left row gets matched with a right row, it will still be a part of the resulting table. In the resulting table, all the left rows that did not find a match have the values of the right columns set to null.

pit_join = df1.join(
    df2,
    pit_context.pit_udf(df1.ts, df2.ts, 100) & (df1.id == df2.id),
    "left"
)

2.2. Union merge

pit_join = pit_context.union(
        left=df1,
        right=df2,
        left_prefix="df1_",
        right_prefix="df2_",
        left_ts_column = "ts",
        right_ts_column = "ts",
        partition_cols=["id"],
)

2.3. Exploding PIT join

pit_join = pit_context.exploding(
    left=df1,
    right=df2,
    left_ts_column=df1["ts"],
    right_ts_column=df2["ts"],
    partition_cols = [df1["id"], df2["id"]],
)

Development

Testing

To run the tests for this package, you must first package the Scala package to a JAR file and export its path as an environment variable:

export SCALA_PIT_JAR=<PATH_TO_JAR_FILE>

To run all the tests, run the following command in the Python directory:

python -m unittest discover -s tests

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

spark-pit-0.5.0.tar.gz (5.1 kB view details)

Uploaded Source

Built Distribution

spark_pit-0.5.0-py3-none-any.whl (5.6 kB view details)

Uploaded Python 3

File details

Details for the file spark-pit-0.5.0.tar.gz.

File metadata

  • Download URL: spark-pit-0.5.0.tar.gz
  • Upload date:
  • Size: 5.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.0 CPython/3.10.1

File hashes

Hashes for spark-pit-0.5.0.tar.gz
Algorithm Hash digest
SHA256 47f68ba1a7b70f8c66bbba4920284f4a5adf6805ec3f4a22e4d18d1a4a0c78ad
MD5 9052587631ac21fb0ba84183d82326fb
BLAKE2b-256 be4a8511920ad344d0521a5ba9c6c4091cc343023d8d89433522f36d455ab892

See more details on using hashes here.

File details

Details for the file spark_pit-0.5.0-py3-none-any.whl.

File metadata

  • Download URL: spark_pit-0.5.0-py3-none-any.whl
  • Upload date:
  • Size: 5.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.0 CPython/3.10.1

File hashes

Hashes for spark_pit-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 9529241d53b0abdb57e4651251ac4ac6980ada2ba101bde48390df027f84c08c
MD5 5a7b95d9ef7efd75f9e73fa753c57c8c
BLAKE2b-256 81a9bf792df8d108a596a9d0284ec6fa0fa677864a96971d86a0935abec44ba4

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page