Build and run queries against data
Project description
Datafusion with Python
This is a Python library that binds to Apache's Arrow in-memory rust-based query engine datafusion. It allows you to build a Logical Plan through a DataFrame API against parquet or CSV files, and obtain the result back.
Being written in rust, this code has strong assumptions about thread safety and lack of memory leaks.
We lock the GIL to convert the results back to pyarrow arrays and to run UFDs.
How to use it
Simple usage:
import datafusion
import pyarrow
# an alias
f = datafusion.functions
# create a context
ctx = datafusion.ExecutionContext()
# create a RecordBatch and a new DataFrame from it
batch = pyarrow.RecordBatch.from_arrays(
[pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
names=["a", "b"],
)
df = ctx.create_dataframe([[batch]])
# create a new statement
df = df.select(
f.col("a") + f.col("b"),
f.col("a") - f.col("b"),
)
# execute and collect the first (and only) batch
result = df.collect()[0]
assert result.column(0) == pyarrow.array([5, 7, 9])
assert result.column(1) == pyarrow.array([-3, -3, -3])
UDFs
def is_null(array: pyarrow.Array) -> pyarrow.Array:
return array.is_null()
udf = f.udf(is_null, [pyarrow.int64()], pyarrow.bool_())
df = df.select(udf(f.col("a")))
UDAFs
import pyarrow
import pyarrow.compute
class Accumulator:
"""
Interface of a user-defined accumulation.
"""
def __init__(self):
self._sum = pyarrow.scalar(0.0)
def to_scalars(self) -> [pyarrow.Scalar]:
return [self._sum]
def update(self, values: pyarrow.Array) -> None:
# not nice since pyarrow scalars can't be summed yet. This breaks on `None`
self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(values).as_py())
def merge(self, states: pyarrow.Array) -> None:
# not nice since pyarrow scalars can't be summed yet. This breaks on `None`
self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(states).as_py())
def evaluate(self) -> pyarrow.Scalar:
return self._sum
df = ...
udaf = f.udaf(Accumulator, pyarrow.float64(), pyarrow.float64(), [pyarrow.float64()])
df = df.aggregate(
[],
[udaf(f.col("a"))]
)
How to install
pip install --index-url https://test.pypi.org/simple/ --extra-index-url https://pypi.org/simple datafusion
We haven't configured CI/CD to publish wheels in pip yet and thus you can only install it in development. It requires cargo and rust. See below.
How to develop
This assumes that you have rust and cargo installed. We use the workflow recommended by pyo3 and maturin.
Bootstrap:
# fetch this repo
git clone git@github.com:jorgecarleitao/datafusion-python.git
cd datafusion-python
# prepare development environment (used to build wheel / install in development)
python -m venv venv
venv/bin/pip install maturin==0.8.2 toml==0.10.1
# used for testing
venv/bin/pip install pyarrow==1.0.0
Whenever rust code changes (your changes or via git pull):
venv/bin/maturin develop
venv/bin/python -m unittest discover tests
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distributions
Hashes for datafusion-0.1.2-cp38-cp38-macosx_10_7_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | d38cc28167fc540bec5e8e7ef679e544820eaf42b20db422f0cd60343199d2be |
|
MD5 | 33085a49e46736ada8d9c955c1200951 |
|
BLAKE2b-256 | 90e2971fe2b6f79189beee1253d16d370c89a21c461b2d4552e268b2f399ea10 |
Hashes for datafusion-0.1.2-cp37-cp37m-macosx_10_7_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | c25886aade9f6973e6f9bb8dcdcb233e56b48f0007ff0b64c448d9fb8fb7f47c |
|
MD5 | e62706fcdffd2b582bf33ba8378901fa |
|
BLAKE2b-256 | 16129e42c54c5f9071414efd3f86d35880d58abc3507be6c6ac2daedd51b1ad6 |