Fast multi-backend (DuckDB / DataFusion) dataset HTTP server.
Project description
datap-rs
██████╗ █████╗ ████████╗ █████╗ ██████╗ ██████╗ ███████╗
██╔══██╗██╔══██╗╚══██╔══╝██╔══██╗██╔══██╗ ██╔══██╗██╔════╝
██║ ██║███████║ ██║ ███████║██████╔╝█████╗██████╔╝███████╗
██║ ██║██╔══██║ ██║ ██╔══██║██╔═══╝ ╚════╝██╔══██╗╚════██║
██████╔╝██║ ██║ ██║ ██║ ██║██║ ██║ ██║███████║
╚═════╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝╚═╝ ╚═╝ ╚═╝╚══════╝
Documentation · Source · PyPI
A fast multi-backend dataset HTTP server, built in Rust and driven from Python.
datap-rs (datapress) exposes one or more Parquet or Delta datasets over a small
JSON HTTP API. It ships with two pluggable engines bundled into a single
wheel — pick one at runtime:
- DuckDB — battle-tested SQL, lazy parquet reads, low startup.
- DataFusion — pure-Rust, in-memory
RecordBatch+ equality index for low-latency point lookups.
Identical request/response shapes across both, so you can A/B them under your real workload.
Install
pip install datap-rs
# or
uv pip install datap-rs
Wheels are published for Linux (x86_64/aarch64), macOS (arm64), and Windows (x86_64) against CPython 3.9+ (abi3).
Quick start
For testing, we're using this kaggle US accidents 2016-2023 dataset.
import asyncio
from datap_rs.datapress import DataPress, DataPressConfig, DatasetConfig
async def main() -> None:
ds = DatasetConfig(
name="accidents",
source="data/accidents.parquet",
format="parquet", # or "delta"
mode="auto", # eq-index policy: "auto" | "none" | "list"
description="US accidents 2016-2023",
)
cfg = DataPressConfig(
backend="datafusion", # or "duckdb"
listen="0.0.0.0",
port=8000,
workers=8,
)
server = DataPress(cfg, datasets=[ds])
await server.run() # blocks until SIGINT
if __name__ == "__main__":
asyncio.run(main())
Hit it:
curl http://localhost:8000/api/v1/datasets
curl http://localhost:8000/api/v1/datasets/accidents/schema
curl -X POST http://localhost:8000/api/v1/datasets/accidents/query \
-H 'Content-Type: application/json' \
-d '{
"columns": ["ID","Severity","City","State"],
"predicates": [
{ "col": "State", "op": "eq", "val": "TX" },
{ "col": "Severity", "op": "gte", "val": 3 }
],
"page": 1, "page_size": 50
}'
API surface
Seven public classes, no module-level state:
| Class | Purpose |
|---|---|
DataPressConfig |
Server tuning: backend, listen, port, workers, prefix, compress, max_body_bytes, max_page_size, request_timeout_ms, shutdown_timeout_secs, metrics_enabled, metrics_path. |
DatasetConfig |
One dataset: name, source, format, mode, optional S3 + index. |
S3Config |
S3 / S3-compatible credentials and endpoint config. |
HMACKeyPair |
Access/secret key pair returned by an S3Config credentials provider. |
DataPress |
Built from a DataPressConfig + list of DatasetConfig + optional AuthConfig. await .run(). |
AuthConfig |
OIDC / OAuth2 bearer enforcement (requires the auth feature in the wheel). |
DataPressClient |
Sync HTTP client for talking to a running server (stdlib + lazy pyarrow). |
Hover any of them in your IDE for full kwarg docs.
S3 / S3-compatible sources
from datap_rs.datapress import DataPress, DataPressConfig, DatasetConfig, S3Config
s3 = S3Config(
region="us-east-1",
endpoint="http://localhost:9000", # MinIO / R2 / Wasabi / Backblaze
addressing_style="path", # or "virtual"
allow_http=True, # only for non-https endpoints
)
ds = DatasetConfig(
name="events",
source="s3://events/2025/",
format="parquet", # or "delta"
s3=s3,
)
Credentials fall back to the standard AWS env vars
(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN,
AWS_REGION) when not set inline.
Dynamic credentials provider
Pass a zero-argument callable returning an HMACKeyPair to resolve
credentials at startup (e.g. from a secrets manager). It is invoked once
when DataPress(...) is constructed, the result is cached indefinitely,
and it takes precedence over any inline access_key_id /
secret_access_key:
from datap_rs.datapress import S3Config, HMACKeyPair
def fetch_creds() -> HMACKeyPair:
secret = my_secrets_client.get("datapress/s3")
return HMACKeyPair(
access_key=secret["access_key_id"],
secret_key=secret["secret_access_key"],
)
s3 = S3Config(
region="us-east-1",
endpoint="http://localhost:9000",
allow_http=True,
credentials_provider=fetch_creds,
)
Behind a reverse proxy
Set prefix to mount every route under a URL path — handy when nginx /
Traefik / Caddy forwards the prefix verbatim:
DataPressConfig(backend="datafusion", port=8000, prefix="/datapress")
# → GET /datapress/api/v1/datasets, GET /datapress/health, ...
prefix must start with / and not end with /. Empty string (default)
mounts at the root.
Response compression
Compression is on by default and negotiated per request via the
Accept-Encoding header (gzip, brotli, zstd). Clients that want raw JSON
send Accept-Encoding: identity or omit the header. Turn it off at the
server when sitting behind a proxy that already compresses, or to save
CPU on a trusted LAN:
DataPressConfig(backend="datafusion", port=8000, compress=False)
Request limits & timeouts
Two server-side guardrails are on by default:
DataPressConfig(
backend="datafusion",
port=8000,
max_body_bytes=1_048_576, # 413 above this; default 1 MiB
max_page_size=100_000, # clamp query page_size above this
request_timeout_ms=30_000, # 504 above this; 0 disables; default 30s
shutdown_timeout_secs=30, # SIGTERM/SIGINT grace period, in seconds
)
Bodies larger than max_body_bytes are rejected with 413 Payload Too Large. Query page_size values larger than max_page_size are clamped
before the backend runs. Handlers that take longer than
request_timeout_ms are cancelled and the client sees 504 Gateway Timeout. Set the timeout to 0 to disable it entirely (useful behind a
proxy that already enforces one).
Graceful shutdown
On SIGTERM or SIGINT (Ctrl+C) the server stops accepting new
connections, then waits up to shutdown_timeout_secs seconds for
in-flight requests to finish before stopping workers. Set it lower for
faster restarts, higher for long-running query handlers.
Client
A small sync client is bundled for talking to a running server:
from datap_rs import DataPressClient
c = DataPressClient("http://127.0.0.1:8000")
c.healthz() # -> {"status": "ok"}
c.readyz() # -> {"status": "ready", "datasets": N}
c.datasets() # -> ["accidents", ...]
c.schema("accidents") # -> dict
c.count("accidents") # -> int
table = c.query("accidents", { # -> pyarrow.Table
"columns": ["State", "Severity"],
"page_size": 10_000,
})
query() requests Arrow IPC and returns a pyarrow.Table (pyarrow is
imported lazily). For the JSON envelope verbatim, use query_json().
On non-2xx responses a DataPressHTTPError is raised with .status,
.body and .payload.
Equality-index policy (DataFusion only)
DatasetConfig(
name="big",
source="data/big.parquet",
mode="list", # "auto" | "none" | "list"
index_columns=["State", "Severity"], # required for "list"
index_max_cardinality=100_000, # used by "auto"
)
auto— index every column whose distinct count stays belowindex_max_cardinality.none— skip the index; every query goes through DataFusion SQL.list— index onlyindex_columns. Best for very wide datasets.
DuckDB ignores this block.
HTTP API
Same five routes for both backends.
| Method | Path | Purpose |
|---|---|---|
| GET | /health |
Liveness probe. |
| GET | /api/v1/datasets |
List configured datasets. |
| GET | /api/v1/datasets/{name}/schema |
Inferred columns + sample row. |
| POST | /api/v1/datasets/{name}/query |
Filter + paginate. |
| POST | /api/v1/datasets/{name}/count |
Total or filtered row count. |
| POST | /api/v1/datasets/{name}/reload |
Atomic dataset reload (requires admin token). |
Query body
{
"columns": ["ID","City","State","Severity"],
"predicates": [
{ "col": "State", "op": "eq", "val": "TX" },
{ "col": "Severity", "op": "gte", "val": 3 }
],
"order_by": [ { "col": "Severity", "dir": "desc" } ],
"limit": 1000,
"page": 1,
"page_size": 50
}
| Field | Type | Default | Notes |
|---|---|---|---|
columns |
string[] |
[] |
Empty = all columns. |
predicates |
Predicate[] |
[] |
ANDed together. |
order_by |
OrderBy[] |
[] |
{ col, dir? }; dir is asc (default) or desc. |
group_by |
string[] |
[] |
Group-by columns; when set, columns is ignored. |
aggregations |
Aggregation[] |
[] |
{ col?, op, alias? }; ops: count|sum|avg|min|max. Requires group_by. |
distinct |
bool |
false |
Dedup the projected columns. Mutually exclusive with group_by / aggregations. |
limit |
int or null |
null |
Hard cap on total rows across pages. |
page |
int >= 1 |
1 |
1-based. |
page_size |
int >= 1 |
1000 |
Clamped to DataPressConfig.max_page_size (100_000 by default). |
Predicate operators
op |
val |
Meaning |
|---|---|---|
eq |
scalar | col = val |
neq |
scalar | col <> val |
gt / gte |
number / string | col > val / col >= val |
lt / lte |
number / string | col < val / col <= val |
like |
string with %/_ |
SQL LIKE |
ilike |
string with %/_ |
Case-insensitive LIKE |
in |
non-empty array | col IN (v1, v2, …) |
is_null |
omit | col IS NULL |
is_not_null |
omit | col IS NOT NULL |
Grouping / aggregation
curl -X POST http://localhost:8000/api/v1/datasets/accidents/query \
-H 'Content-Type: application/json' \
-d '{
"group_by": ["State"],
"aggregations": [
{ "op": "count" },
{ "col": "Severity", "op": "avg", "alias": "avg_sev" }
],
"order_by": [{ "col": "count", "dir": "desc" }],
"page_size": 10
}'
When group_by is non-empty the SELECT list is derived from the group
columns plus each aggregation's alias; the top-level columns field is
ignored. aggregations without group_by returns 400. order_by keys
must be a group column or aggregation alias.
Distinct
curl -X POST http://localhost:8000/api/v1/datasets/accidents/query \
-H 'Content-Type: application/json' \
-d '{ "columns": ["State"], "distinct": true, "order_by": [{"col":"State"}] }'
Mutually exclusive with group_by / aggregations.
Arrow IPC responses
Opt in per-request with the Accept header (or ?format=arrow) to skip
the JSON envelope and receive an Arrow IPC stream instead:
import requests, pyarrow.ipc as ipc, polars as pl
r = requests.post(
"http://localhost:8000/api/v1/datasets/accidents/query",
json={"columns": ["ID","State"], "page_size": 1000},
headers={"Accept": "application/vnd.apache.arrow.stream"},
)
table = ipc.open_stream(r.content).read_all() # pyarrow.Table
df = pl.from_arrow(table) # zero-copy → Polars
page, page_size = r.headers["X-Page"], r.headers["X-Page-Size"]
To read the complete result set into Polars, walk pages until the server returns fewer rows than requested:
import pyarrow as pa
import pyarrow.ipc as ipc
import polars as pl
import requests
ARROW = "application/vnd.apache.arrow.stream"
def query_all_polars(
base_url: str,
dataset: str,
body: dict,
page_size: int = 100_000,
) -> pl.DataFrame:
tables: list[pa.Table] = []
page = 1
with requests.Session() as session:
while True:
response = session.post(
f"{base_url.rstrip('/')}/api/v1/datasets/{dataset}/query",
json={**body, "page": page, "page_size": page_size},
headers={"Accept": ARROW},
)
response.raise_for_status()
table = ipc.open_stream(response.content).read_all()
tables.append(table)
if table.num_rows < page_size:
break
page += 1
table = tables[0] if len(tables) == 1 else pa.concat_tables(tables)
return pl.from_arrow(table)
Use a deterministic order_by for full exports from datasets that may be
reloaded while you page through results. Arrow IPC is supported by both
backends.
Count body
Same predicate shape, no projection or pagination:
{ "predicates": [ { "col": "State", "op": "eq", "val": "TX" } ] }
Response: { "count": <int> }. Empty body ({}) counts every row. On
materialised DataFusion datasets, the no-predicate case is O(1) and indexed
eq / in predicates short-circuit through the equality index.
curl -X POST http://localhost:8000/api/v1/datasets/accidents/count \
-H 'Content-Type: application/json' -d '{}'
# → { "count": 7728394 }
Admin reload
POST /api/v1/datasets/{name}/reload rebuilds a dataset from its source and
atomically swaps it in. Requires the X-Admin-Token header to match the
ADMIN_TOKEN env var. Endpoint is disabled when ADMIN_TOKEN is unset
(secure default).
import os
os.environ["ADMIN_TOKEN"] = "supersecret" # before constructing DataPress
curl -X POST -H "X-Admin-Token: supersecret" \
http://localhost:8000/api/v1/datasets/accidents/reload
# → { "dataset": "accidents", "rows": 7728394, "elapsed_ms": 1842 }
Reload publication is backend-specific. DataFusion uses a service-level
double buffer: it builds a new DatasetState off to the side, then
publishes it with an ArcSwap snapshot update. In-flight queries keep
using the old Arrow buffers; later queries see the new state. Peak RSS can
approach roughly twice the materialised dataset size during reload.
DuckDB delegates the heavy publication step to the engine with
CREATE OR REPLACE TABLE ... AS SELECT .... DuckDB handles that as an
ACID transaction over the table/catalog replacement: failures leave the
existing table live, and successful reloads become visible atomically to
later queries while in-flight queries continue against their starting
snapshot. DataPress then refreshes its small cached schema and row-count
metadata. Per-dataset reloads are serialised by an async mutex; reloads
of different datasets run in parallel.
Authentication (OIDC / OAuth2)
Optional bearer-token enforcement against any OpenID Connect issuer
(Keycloak, Auth0, Entra ID, Okta, Zitadel, …). Requires a wheel built
with the auth Cargo feature:
maturin build --release --features auth
Pre-built PyPI wheels include it by default.
from datap_rs.datapress import (
DataPress, DataPressConfig, DatasetConfig, AuthConfig,
)
auth = AuthConfig(
enabled=True,
issuer="https://issuer.example.com",
audience="datapress-api",
read_scopes=["datasets:read"],
reload_scopes=["datasets:reload"],
# anonymous_read=False,
# algorithms=["RS256"],
# leeway_secs=60,
# jwks_refresh_secs=3600,
# tenant_claim="/tenant_id",
# allowed_tenants=["acme"],
# admin_token_fallback=True, # honour legacy X-Admin-Token
# start_degraded=True, # boot even if JWKS fetch fails
)
server = DataPress(cfg, datasets=[ds], auth=auth)
await server.run()
When enabled=False (default) all other fields are ignored and the
server behaves exactly as before. Validation errors (missing issuer,
malformed tenant_claim, …) raise ValueError at construction time.
Call any endpoint with Authorization: Bearer <jwt>. Reload endpoints
require reload_scopes; read endpoints require read_scopes unless
anonymous_read=True.
Use your provider's issuer URL exactly as it appears in the discovery
document or JWT iss claim. /realms/<realm> is Keycloak-specific;
many providers use URLs such as https://tenant.us.auth0.com/,
https://login.microsoftonline.com/<tenant-id>/v2.0, or an Okta
authorization-server URL.
AuthConfig applies to one server instance. For strict per-dataset
scope boundaries from Python, run one DataPress instance per dataset
or access domain and use scopes such as datasets:accidents:read /
datasets:accidents:reload on that instance.
Try it locally
The repo ships a one-command Keycloak stack at
examples/keycloak/
with a pre-provisioned realm, service-account client, scopes and a test
user. docker compose up -d and point issuer at
http://localhost:8080/realms/datapress.
Choosing a backend
- DuckDB — the safe default. Handles arbitrary SQL well, manages its own buffer pool, starts up in milliseconds because it lazily reads parquet pages on demand.
- DataFusion — pick when the data fits in RAM and you repeatedly query
the same columns with equality /
INpredicates; the eq-index turns those into O(1) lookups. Also produces a leaner static binary (no vendored C++).
Both engines are compiled into the same wheel — switching is one keyword argument away.
Logging
datapress initialises env_logger on import. Control verbosity with the
standard RUST_LOG variable:
RUST_LOG=info python example.py
RUST_LOG=debug python example.py
License
MIT. See LICENSE in the source repo.
Source, issue tracker and Rust crates: https://github.com/jeroenflvr/fast-api
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distributions
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file datap_rs-0.3.3.tar.gz.
File metadata
- Download URL: datap_rs-0.3.3.tar.gz
- Upload date:
- Size: 192.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
08ddc6d73963dd40bc9ecb0473afb19f7fe5325c28e6f42645896137102b9b5c
|
|
| MD5 |
28ccfb953a64ddfc9063b682119021ec
|
|
| BLAKE2b-256 |
d2419259d481f364b60d99099a29fde789ac5f7c9225f1c3e98a9b034dcad398
|
File details
Details for the file datap_rs-0.3.3-cp39-abi3-win_amd64.whl.
File metadata
- Download URL: datap_rs-0.3.3-cp39-abi3-win_amd64.whl
- Upload date:
- Size: 69.0 MB
- Tags: CPython 3.9+, Windows x86-64
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9b6b92e265df30c26ee301e04357777d1d920ce889e0c2e318de62b95f8aa5c0
|
|
| MD5 |
b0c057b9532d5ca498773533d5739756
|
|
| BLAKE2b-256 |
0e48b15f3f5e291b8eef2a0700df2c9c3f642e3e075932c93b7671f73339efc5
|
File details
Details for the file datap_rs-0.3.3-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.
File metadata
- Download URL: datap_rs-0.3.3-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
- Upload date:
- Size: 71.9 MB
- Tags: CPython 3.9+, manylinux: glibc 2.17+ x86-64
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
52874dd1b3d16dd82fba349eb3211cecdc7e8c5bdc0d849d118b5ddd16bda7cb
|
|
| MD5 |
92599238a0a06d28e0eb357f1ea58267
|
|
| BLAKE2b-256 |
b5ca787414ae541a06d36ca5bfd24fcf2f3a4ca8c269224202a4aaac40fe01f6
|
File details
Details for the file datap_rs-0.3.3-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.
File metadata
- Download URL: datap_rs-0.3.3-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
- Upload date:
- Size: 67.9 MB
- Tags: CPython 3.9+, manylinux: glibc 2.17+ ARM64
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
056aca12154cbfc91bf0e59f3abb1da68bd975c3ab00f2a027929b5ca88b788d
|
|
| MD5 |
25e10766dba107cc8577781c2bf9beca
|
|
| BLAKE2b-256 |
f2f42f410902c00a75ce52cb7fc88d3ff818bb11a0d8bbf1d4c770862093b281
|
File details
Details for the file datap_rs-0.3.3-cp39-abi3-macosx_11_0_arm64.whl.
File metadata
- Download URL: datap_rs-0.3.3-cp39-abi3-macosx_11_0_arm64.whl
- Upload date:
- Size: 64.1 MB
- Tags: CPython 3.9+, macOS 11.0+ ARM64
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
61173a146baaae4aa2981423da0eb3a37e3a92569d1621dd48fc377f17feccb9
|
|
| MD5 |
30aa0ae8d3d29431a37a2f3c5a0d6a55
|
|
| BLAKE2b-256 |
3186e5671f8b37ba7772172b40fdd3de886da7d509332368602c3a9d1a1480bf
|