Skip to main content

Build and run queries against data

Project description

DataFusion in Python

This is a Python library that binds to Apache Arrow in-memory query engine DataFusion.

Like pyspark, it allows you to build a plan through SQL or a DataFrame API against in-memory data, parquet or CSV files, run it in a multi-threaded environment, and obtain the result back in Python.

It also allows you to use UDFs and UDAFs for complex operations.

The major advantage of this library over other execution engines is that this library achieves zero-copy between Python and its execution engine: there is no cost in using UDFs, UDAFs, and collecting the results to Python apart from having to lock the GIL when running those operations.

Its query engine, DataFusion, is written in Rust, which makes strong assumptions about thread safety and lack of memory leaks.

Technically, zero-copy is achieved via the c data interface.

How to use it

Simple usage:

import datafusion
import pyarrow

# an alias
f = datafusion.functions

# create a context
ctx = datafusion.ExecutionContext()

# create a RecordBatch and a new DataFrame from it
batch = pyarrow.RecordBatch.from_arrays(
    [pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
    names=["a", "b"],
)
df = ctx.create_dataframe([[batch]])

# create a new statement
df = df.select(
    f.col("a") + f.col("b"),
    f.col("a") - f.col("b"),
)

# execute and collect the first (and only) batch
result = df.collect()[0]

assert result.column(0) == pyarrow.array([5, 7, 9])
assert result.column(1) == pyarrow.array([-3, -3, -3])

UDFs

def is_null(array: pyarrow.Array) -> pyarrow.Array:
    return array.is_null()

udf = f.udf(is_null, [pyarrow.int64()], pyarrow.bool_())

df = df.select(udf(f.col("a")))

UDAF

import pyarrow
import pyarrow.compute


class Accumulator:
    """
    Interface of a user-defined accumulation.
    """
    def __init__(self):
        self._sum = pyarrow.scalar(0.0)

    def to_scalars(self) -> [pyarrow.Scalar]:
        return [self._sum]

    def update(self, values: pyarrow.Array) -> None:
        # not nice since pyarrow scalars can't be summed yet. This breaks on `None`
        self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(values).as_py())

    def merge(self, states: pyarrow.Array) -> None:
        # not nice since pyarrow scalars can't be summed yet. This breaks on `None`
        self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(states).as_py())

    def evaluate(self) -> pyarrow.Scalar:
        return self._sum


df = ...

udaf = f.udaf(Accumulator, pyarrow.float64(), pyarrow.float64(), [pyarrow.float64()])

df = df.aggregate(
    [],
    [udaf(f.col("a"))]
)

How to install

pip install datafusion

How to develop

This assumes that you have rust and cargo installed. We use the workflow recommended by pyo3 and maturin.

Bootstrap:

# fetch this repo
git clone git@github.com:jorgecarleitao/datafusion-python.git

cd datafusion-python

# prepare development environment (used to build wheel / install in development)
python -m venv venv
venv/bin/pip install maturin==0.8.2 toml==0.10.1

Whenever rust code changes (your changes or via git pull):

venv/bin/maturin develop
venv/bin/python -m unittest discover tests

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for datafusion, version 0.2.0
Filename, size File type Python version Upload date Hashes
Filename, size datafusion-0.2.0-cp36-cp36m-macosx_10_7_x86_64.whl (3.6 MB) File type Wheel Python version cp36 Upload date Hashes View
Filename, size datafusion-0.2.0-cp36-cp36m-manylinux2010_x86_64.whl (5.1 MB) File type Wheel Python version cp36 Upload date Hashes View
Filename, size datafusion-0.2.0-cp36-none-win_amd64.whl (3.5 MB) File type Wheel Python version cp36 Upload date Hashes View
Filename, size datafusion-0.2.0-cp37-cp37m-macosx_10_7_x86_64.whl (3.6 MB) File type Wheel Python version cp37 Upload date Hashes View
Filename, size datafusion-0.2.0-cp37-cp37m-manylinux2010_x86_64.whl (5.1 MB) File type Wheel Python version cp37 Upload date Hashes View
Filename, size datafusion-0.2.0-cp37-none-win_amd64.whl (3.5 MB) File type Wheel Python version cp37 Upload date Hashes View
Filename, size datafusion-0.2.0-cp38-cp38-macosx_10_7_x86_64.whl (3.6 MB) File type Wheel Python version cp38 Upload date Hashes View
Filename, size datafusion-0.2.0-cp38-cp38-manylinux2010_x86_64.whl (5.1 MB) File type Wheel Python version cp38 Upload date Hashes View
Filename, size datafusion-0.2.0-cp38-none-win_amd64.whl (3.5 MB) File type Wheel Python version cp38 Upload date Hashes View
Filename, size datafusion-0.2.0-cp39-cp39-manylinux2010_x86_64.whl (5.1 MB) File type Wheel Python version cp39 Upload date Hashes View
Filename, size datafusion-0.2.0.tar.gz (14.7 kB) File type Source Python version None Upload date Hashes View

Supported by

Pingdom Pingdom Monitoring Google Google Object Storage and Download Analytics Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN DigiCert DigiCert EV certificate StatusPage StatusPage Status page