Streaming insert/upsert helpers for Ibis backends
Project description
ibis-stream
ibis-stream provides streaming insert helpers for Ibis backends.
Right now, the project focuses on BigQuery: inside a context manager, it patches Ibis BigQuery insert/upsert so in-memory data is written through the BigQuery Storage Write API.
Why this exists
For in-memory payloads, Storage Write can be a better fit for low-latency incremental writes than the default insert path.
Current scope
- BigQuery support via
ibis_stream.bigquery.stream_mode insertsupport, includingoverwrite=Trueupsertsupport via a temporary staging table + merge- Automatic fallback to the original Ibis behavior when data is not an in-memory payload
Snowflake (coming soon)
Planned support will target Snowflake streaming ingestion via Snowpipe Streaming.
Expected implementation shape:
insert: coerce in-memory data to Arrow-compatible batches and stream them through Snowpipe Streaming channels.upsert: stream into a staging table, then executeMERGEinto the target table on the configured key.overwrite=True: truncate target table before ingesting the replacement batch set.
Likely requirements:
- Snowflake account and role permissions for streaming ingest and target-table writes.
- A backend-specific configuration surface for connection/auth settings.
This is similar in goal to BigQuery Storage Write (low-latency ingest), but the SDK/API surface and operational model are Snowflake-specific.
Redshift (coming soon)
Planned support will use Redshift-native loading patterns for in-memory data.
Expected implementation shape:
insert: convert input to columnar micro-batches (for example Parquet), write to a temporary S3 prefix, then load withCOPY.upsert:COPYinto a staging table, thenMERGEinto the target table on the configured key.overwrite=True: truncate target table before running the replacement load.
Likely requirements:
- Configured temporary S3 bucket/prefix for batch files.
- IAM role/policy and Redshift permissions needed for
COPY,MERGE, and staging-table lifecycle operations.
Redshift also has Streaming Ingestion for Kinesis/MSK event streams, but for application-driven DataFrame/Arrow writes, the COPY-based flow is typically the practical fit.
Installation
Install with pip:
python -m pip install "ibis-stream[bigquery]"
For development dependencies:
python -m pip install --group dev ".[bigquery]"
Quick start
import ibis
import pandas as pd
from google.cloud import bigquery
from ibis_stream.bigquery import stream_mode
client = bigquery.Client(project="my-project", location="US")
con = ibis.bigquery.connect(
project_id="my-project",
dataset_id="my_dataset",
client=client,
location="US",
)
rows = pd.DataFrame(
{
"id": [1, 2, 3],
"payload": ["a", "b", "c"],
}
)
with stream_mode():
con.insert("target_table", rows)
Upsert example:
with stream_mode():
con.upsert("target_table", rows, "id")
API
stream_mode(...)
Context manager that installs BigQuery insert/upsert hooks for the duration of the with block.
stream_mode(
target_batch_bytes=8 * 1024 * 1024,
max_batch_rows=50_000,
append_timeout_seconds=120.0,
)
target_batch_bytes: target serialized Arrow batch size. Must be> 0and<= 9 * 1024 * 1024.max_batch_rows: maximum rows per emitted batch chunk.append_timeout_seconds: timeout for each append request (Nonedisables timeout).
StorageWriteInsertError
Raised for Storage Write failures surfaced by this patch layer (for example, row-level append errors).
Supported in-memory inputs
pyarrow.Tablepyarrow.RecordBatchpyarrow.RecordBatchReader- pandas DataFrame
- polars DataFrame
list[dict]and similar objects that Ibis can coerce tomemtableibis.memtable(...)expressions
Non-in-memory table expressions fall back to the original backend insert/upsert implementation.
Testing
Run unit tests:
pytest -q tests/unit
Run integration tests:
python scripts/integration.py run --dialect bigquery
Enable live BigQuery integration tests:
export BQ_LIVE_TESTS=1
export BQ_PROJECT=<your-gcp-project>
export BQ_DATASET=<existing-dataset>
export BQ_LOCATION=US # optional
pytest -q tests/integration/bigquery/test_live.py -m "integration and bigquery"
Development with Pixi
pixi run lint
pixi run format-check
pixi run test-unit
pixi run test-integration-bigquery
License
Apache-2.0. See LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ibis_stream-0.1.1.tar.gz.
File metadata
- Download URL: ibis_stream-0.1.1.tar.gz
- Upload date:
- Size: 48.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f61ca381788166f4996a5f2d300bd76d7edaaada90ddb29574af45acbf165193
|
|
| MD5 |
a3d23bc3943e82859673cb8bffb7c953
|
|
| BLAKE2b-256 |
101741e85a796419fa31da853f3a6bddd4dd1d036f367bf5716fda9e5a7c9933
|
Provenance
The following attestation bundles were made for ibis_stream-0.1.1.tar.gz:
Publisher:
publish.yml on unlap-hq/ibis-stream
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
ibis_stream-0.1.1.tar.gz -
Subject digest:
f61ca381788166f4996a5f2d300bd76d7edaaada90ddb29574af45acbf165193 - Sigstore transparency entry: 995303830
- Sigstore integration time:
-
Permalink:
unlap-hq/ibis-stream@a700000cdd9b8b206cab8ce197b18c46452e7b34 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/unlap-hq
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@a700000cdd9b8b206cab8ce197b18c46452e7b34 -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file ibis_stream-0.1.1-py3-none-any.whl.
File metadata
- Download URL: ibis_stream-0.1.1-py3-none-any.whl
- Upload date:
- Size: 14.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
94e0e6ebfc75595e2bc8e3517858e56335a6b1088e5bc78b22c45bcfb81c757d
|
|
| MD5 |
e36db259a02cb0d79cdd6d4b00c641a8
|
|
| BLAKE2b-256 |
4ad976240e4c23600884ad838a4b7cddc9204dce105a5a2df03c46fd9d6267ca
|
Provenance
The following attestation bundles were made for ibis_stream-0.1.1-py3-none-any.whl:
Publisher:
publish.yml on unlap-hq/ibis-stream
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
ibis_stream-0.1.1-py3-none-any.whl -
Subject digest:
94e0e6ebfc75595e2bc8e3517858e56335a6b1088e5bc78b22c45bcfb81c757d - Sigstore transparency entry: 995303831
- Sigstore integration time:
-
Permalink:
unlap-hq/ibis-stream@a700000cdd9b8b206cab8ce197b18c46452e7b34 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/unlap-hq
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@a700000cdd9b8b206cab8ce197b18c46452e7b34 -
Trigger Event:
workflow_dispatch
-
Statement type: