Python edition of RumbleDB, a JSONiq engine
Project description
RumbleDB for Python
by Abishek Ramdas and Ghislain Fourny
This is the Python edition of RumbleDB, which brings JSONiq to the world of Spark and DataFrames. JSONiq is a language considerably more powerful than SQL as it can process messy, heterogeneous datasets, from kilobytes to Petabytes, with very little coding effort.
The Python edition of RumbleDB is currently only a prototype (alpha) and probably unstable. We welcome bug reports.
About RumbleDB
RumbleDB is a JSONiq engine that works both with very small amounts of data and very large amounts of data. It works with JSON, CSV, text, Parquet, etc (and soon XML). It works on your laptop as well as on any Spark cluster (AWS, company clusters, etc).
It automatically detects and switches between execution modes in a way transparent to the user, bringing the convenience of data independence to the world of messy data.
It is an academic project, natively in Java, carried out at ETH Zurich by many students over more than 8 years: Stefan Irimescu, Renato Marroquin, Rodrigo Bruno, Falko Noé, Ioana Stefan, Andrea Rinaldi, Stevan Mihajlovic, Mario Arduini, Can Berker Çıkış, Elwin Stephan, David Dao, Zirun Wang, Ingo Müller, Dan-Ovidiu Graur, Thomas Zhou, Olivier Goerens, Alexandru Meterez, Pierre Motard, Remo Röthlisberger, Dominik Bruggisser, David Loughlin, David Buzatu, Marco Schöb, Maciej Byczko, Matteo Agnoletto, Dwij Dixit.
It is free and open source, under an Apache 2.0 license, which can also be used commercially (but on an as-is basis with no guarantee).
High-level information on the library
A RumbleSession is a wrapper around a SparkSession that additionally makes sure the RumbleDB environment is in scope.
JSONiq queries are invoked with rumble.jsoniq() in a way similar to the way Spark SQL queries are invoked with spark.sql().
Any number of Python DataFrames can be attached to external JSONiq variables used in the query. It will later also be possible to read tables registered in the Hive metastore, similar to spark.sql(). Alternatively, the JSONiq query can also read many files of many different formats from many places (local drive, HTTP, S3, HDFS, ...) directly with simple builtin function calls such as json-lines(), text-file(), parquet-file(), csv-file(), etc. See RumbleDB's documentation.
The resulting sequence of items can be retrieved as DataFrame, as an RDD, as a Python list, or with a streaming iteration over the items.
The individual items can be processed using the RumbleDB Item API.
Alternatively, it is possible to directly get a Python list of JSON values, or a streaming iteration of JSON values. This is a convenience that makes it unnecessary to use the Item API, especially for a first-time user.
It is also possible to write the sequence of items to the local disk, to HDFS, to S3, etc in a way similar to how DataFrames are written back by Spark.
The design goal is that it should be possible to chain DataFrames between JSONiq and Spark SQL queries seamlessly. For example, JSONiq can be used to clean up very messy data and turn it into a clean DataFrame, which can then be processed with Spark SQL, spark.ml, etc.
Any feedback or error reports are very welcome.
Installation
Install with
pip install jsoniq
Important note: since the jsoniq package depends on pyspark 4, Java 17 or Java 21 is a requirement. If another version of Java is installed, the execution of a Python program attempting to create a RumbleSession will lead to an error message on stderr that contains explanations.
Sample code
We will make more documentation available as we go. In the meantime, you will find a sample code below that should just run after installing the library.
You can directly copy paste the code below to a Python file and execute it with Python.
from jsoniq import RumbleSession
# The syntax to start a session is similar to that of Spark.
# A RumbleSession is a SparkSession that additionally knows about RumbleDB.
# All attributes and methods of SparkSession are also available on RumbleSession.
rumble = RumbleSession.builder.appName("PyRumbleExample").getOrCreate();
# Create a data frame also similar to Spark (but using the rumble object).
data = [("Alice", 30), ("Bob", 25), ("Charlie", 35)];
columns = ["Name", "Age"];
df = rumble.createDataFrame(data, columns);
# This is how to bind a JSONiq variable to a dataframe. You can bind as many variables as you want.
rumble.bindDataFrameAsVariable('$a', df);
# This is how to run a query. This is similar to spark.sql().
# Since variable $a was bound to a DataFrame, it is automatically declared as an external variable
# and can be used in the query. In JSONiq, it is logically a sequence of objects.
res = rumble.jsoniq('$a.Name');
# There are several ways to collect the outputs, depending on the user needs but also
# on the query supplied.
# This returns a list containing one or several of "DataFrame", "RDD", "PUL", "Local"
# If DataFrame is in the list, df() can be invoked.
# If RDD is in the list, rdd() can be invoked.
# If Local is the list, items() or json() can be invokved, as well as the local iterator API.
modes = res.availableOutputs();
for mode in modes:
print(mode)
#########################################################
###### Manipulating DataFrames with SQL and JSONiq ######
#########################################################
# If the output of the JSONiq query is structured (i.e., RumbleDB was able to detect a schema),
# then we can extract a regular data frame that can be further processed with spark.sql() or rumble.jsoniq().
df = res.df();
df.show();
# We are continuously working on the detection of schemas and RumbleDB will get better at it with them.
# JSONiq is a very powerful language and can also produce heterogeneous output "by design". Then you need
# to use rdd() instead of df(), or to collect the list of JSON values (see further down). Remember
# that availableOutputs() tells you what is at your disposal.
# A DataFrame output by JSONiq can be reused as input to a Spark SQL query.
# (Remember that rumble is a wrapper around a SparkSession object, so you can use rumble.sql() just like spark.sql())
df.createTempView("input")
df2 = rumble.sql("SELECT * FROM input").toDF("name");
df2.show();
# A DataFrame output by Spark SQL can be reused as input to a JSONiq query.
rumble.bindDataFrameAsVariable('$b', df2);
seq2 = rumble.jsoniq("for $i in 1 to 5 return $b");
df3 = seq2.df();
df3.show();
# And a DataFrame output by JSONiq can be reused as input to another JSONiq query.
rumble.bindDataFrameAsVariable('$b', df3);
seq3 = rumble.jsoniq("$b[position() lt 3]");
df4 = seq3.df();
df4.show();
#########################
##### Local access ######
#########################
# This materializes the rows as items.
# The items are accessed with the RumbleDB Item API.
list = res.items();
for result in list:
print(result.getStringValue())
# This streams through the items one by one
res.open();
while (res.hasNext()):
print(res.next().getStringValue());
res.close();
################################################################################################################
###### Native Python/JSON Access for bypassing the Item API (but losing on the richer JSONiq type system) ######
################################################################################################################
# This method directly gets the result as JSON (dict, list, strings, ints, etc).
jlist = res.json();
for str in jlist:
print(str);
# This streams through the JSON values one by one.
res.open();
while(res.hasNext()):
print(res.nextJSON());
res.close();
# This gets an RDD of JSON values that can be processed by Python
rdd = res.rdd();
print(rdd.count());
for str in rdd.take(10):
print(str);
###################################################
###### Write back to the disk (or data lake) ######
###################################################
# It is also possible to write the output to a file locally or on a cluster. The API is similar to that of Spark dataframes.
# Note that it creates a directory and stores the (potentially very large) output in a sharded directory.
# RumbleDB was already tested with up to 64 AWS machines and 100s of TBs of data.
# Of course the examples below are so small that it makes more sense to process the results locally with Python,
# but this shows how GBs or TBs of data obtained from JSONiq can be written back to disk.
seq = rumble.jsoniq("$a.Name");
seq.write().mode("overwrite").json("outputjson");
seq.write().mode("overwrite").parquet("outputparquet");
seq = rumble.jsoniq("1+1");
seq.write().mode("overwrite").text("outputtext");
############################################
##### More complex, standalone queries #####
############################################
seq = rumble.jsoniq("""
let $stores :=
[
{ "store number" : 1, "state" : "MA" },
{ "store number" : 2, "state" : "MA" },
{ "store number" : 3, "state" : "CA" },
{ "store number" : 4, "state" : "CA" }
]
let $sales := [
{ "product" : "broiler", "store number" : 1, "quantity" : 20 },
{ "product" : "toaster", "store number" : 2, "quantity" : 100 },
{ "product" : "toaster", "store number" : 2, "quantity" : 50 },
{ "product" : "toaster", "store number" : 3, "quantity" : 50 },
{ "product" : "blender", "store number" : 3, "quantity" : 100 },
{ "product" : "blender", "store number" : 3, "quantity" : 150 },
{ "product" : "socks", "store number" : 1, "quantity" : 500 },
{ "product" : "socks", "store number" : 2, "quantity" : 10 },
{ "product" : "shirt", "store number" : 3, "quantity" : 10 }
]
let $join :=
for $store in $stores[], $sale in $sales[]
where $store."store number" = $sale."store number"
return {
"nb" : $store."store number",
"state" : $store.state,
"sold" : $sale.product
}
return [$join]
""");
print(seq.json());
seq = rumble.jsoniq("""
for $product in json-lines("http://rumbledb.org/samples/products-small.json", 10)
group by $store-number := $product.store-number
order by $store-number ascending
return {
"store" : $store-number,
"products" : [ distinct-values($product.product) ]
}
""");
print(seq.json());
How to learn JSONiq, and more query examples
Even more queries can be found here and you can look at the JSONiq documentation and tutorials.
Last updates
Version 0.1.0 alpha 11
- Fix an issue when feeding a DataFrame output by rumble.jsoniq() back to a new JSONiq query (as a variable).
Version 0.1.0 alpha 10
- Add an explicit explanation on stderr if the Java version is not properly set, together with hints.
Version 0.1.0 alpha 9
- Upgrade to Spark 4, which aligns the internal scala versions to 2.13 and should remove some errors. Requires Java 17 or 21.
Version 0.1.0 alpha 8
- Ability to write back a sequence of items to local disk, HDFS, S3... in various formats (JSON, CSV, Parquet...).
- Automatically declare external variables bound as DataFrames to improve userfriendliness.
- Simplified the function names to make them more intuitive (json(), items(), df(), rdd(), etc).
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file jsoniq-0.1.0a11.tar.gz.
File metadata
- Download URL: jsoniq-0.1.0a11.tar.gz
- Upload date:
- Size: 26.3 MB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ac42b1f743b6fa990e859fd278528e56894c50632fea602c5d6c61a12042328d
|
|
| MD5 |
1be6da4d485e337af545728c0a8d4b7d
|
|
| BLAKE2b-256 |
936cf92fac74cac320ea768e6c333759b7f10d88d07f5c82c492742461904f55
|
Provenance
The following attestation bundles were made for jsoniq-0.1.0a11.tar.gz:
Publisher:
deploy.yml on RumbleDB/python-jsoniq
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
jsoniq-0.1.0a11.tar.gz -
Subject digest:
ac42b1f743b6fa990e859fd278528e56894c50632fea602c5d6c61a12042328d - Sigstore transparency entry: 281732252
- Sigstore integration time:
-
Permalink:
RumbleDB/python-jsoniq@289751ecd0d7af9daa741e596d0070af48d7fdf7 -
Branch / Tag:
refs/tags/0.1.0a11 - Owner: https://github.com/RumbleDB
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
deploy.yml@289751ecd0d7af9daa741e596d0070af48d7fdf7 -
Trigger Event:
push
-
Statement type:
File details
Details for the file jsoniq-0.1.0a11-py3-none-any.whl.
File metadata
- Download URL: jsoniq-0.1.0a11-py3-none-any.whl
- Upload date:
- Size: 26.2 MB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
04d8fe3267c403e937e00ba70d009c1da46f631344d5bee50dae6dec9d1bcb3d
|
|
| MD5 |
58b18c162b2a2c2649468a51f06d5846
|
|
| BLAKE2b-256 |
ddb9321316d41e7c8035823a2977a0324d2477c5cc38da3f4e85facc4e2c91aa
|
Provenance
The following attestation bundles were made for jsoniq-0.1.0a11-py3-none-any.whl:
Publisher:
deploy.yml on RumbleDB/python-jsoniq
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
jsoniq-0.1.0a11-py3-none-any.whl -
Subject digest:
04d8fe3267c403e937e00ba70d009c1da46f631344d5bee50dae6dec9d1bcb3d - Sigstore transparency entry: 281732259
- Sigstore integration time:
-
Permalink:
RumbleDB/python-jsoniq@289751ecd0d7af9daa741e596d0070af48d7fdf7 -
Branch / Tag:
refs/tags/0.1.0a11 - Owner: https://github.com/RumbleDB
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
deploy.yml@289751ecd0d7af9daa741e596d0070af48d7fdf7 -
Trigger Event:
push
-
Statement type: