Skip to main content

connectby hierarchy query in spark

Project description

pyspark-connectby

Spark currently does not support hierarchy query connectBy as of version 3.5.0. And there is a PR opened to support recursive CTE query. But that is still not available yet.

This is an attempt to add connectBy method to DataFrame

Concept

Hierarchy query is one of the important feature that many relational databases, such as Oracle, DB2, My SQL, Snowflake, Redshift, etc., would support directly or alternatively by using recursive CTE.

Example in Redshift:

select id, name, manager_id, level
from employee
start with emp_id = 1
connect by prior emp_id = manager_id;

With this library, you can use connectBy() on Dateframe:

from pyspark_connectby import connectBy
from pyspark.sql import SparkSession

schema = 'emp_id string, manager_id string, name string'
data = [[1, None, 'Carlos'],
        [11, 1, 'John'],
        [111, 11, 'Jorge'],
        [112, 11, 'Kwaku'],
        [113, 11, 'Liu'], 
        [2, None, 'Mat']
        ]
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(data, schema)
df2 = df.connectBy(prior='emp_id', to='manager_id', start_with='1')
df2.show()

With result:

+------+----------+-----+-----------------+----------+------+
|emp_id|START_WITH|LEVEL|CONNECT_BY_ISLEAF|manager_id|  name|
+------+----------+-----+-----------------+----------+------+
|     1|         1|    1|            false|      null|Carlos|
|    11|         1|    2|            false|         1|  John|
|   111|         1|    3|             true|        11| Jorge|
|   112|         1|    3|             true|        11| Kwaku|
|   113|         1|    3|             true|        11|   Liu|
+------+----------+-----+-----------------+----------+------+

Note the pseudo columns in the query result:

  • START_WITH
  • LEVEL
  • CONNECT_BY_ISLEAF

Installation

Python Version >= 3.7

$ pip install --upgrade pyspark-connectby

Usage

from pyspark_connectby import connectBy

df = ...

df.connectBy(prior='emp_id', to='manager_id', start_with='1')  # start_with `emp_id` as 1

df.transform(connectBy, prior='emp_id', to='manager_id', start_with='1')  # or by using df.transform() method

df.connectBy(prior='emp_id', to='manager_id')  # without start_with, it will go through each node

df.connectBy(prior='emp_id', to='manager_id', start_with=['1', '2'])  # start_with a list of top nodes ids. 

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyspark_connectby-1.1.3.tar.gz (3.2 kB view hashes)

Uploaded Source

Built Distribution

pyspark_connectby-1.1.3-py3-none-any.whl (4.0 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page