Skip to main content

Streaming insert/upsert helpers for Ibis backends

Project description

ibis-stream

ibis-stream provides streaming insert helpers for Ibis backends.

Right now, the project focuses on BigQuery: inside a context manager, it patches Ibis BigQuery insert/upsert so in-memory data is written through the BigQuery Storage Write API.

Why this exists

For in-memory payloads, Storage Write can be a better fit for low-latency incremental writes than the default insert path.

Current scope

  • BigQuery support via ibis_stream.bigquery.stream_mode
  • insert support, including overwrite=True
  • upsert support via a temporary staging table + merge
  • Automatic fallback to the original Ibis behavior when data is not an in-memory payload

Snowflake (coming soon)

Planned support will target Snowflake streaming ingestion via Snowpipe Streaming.

Expected implementation shape:

  • insert: coerce in-memory data to Arrow-compatible batches and stream them through Snowpipe Streaming channels.
  • upsert: stream into a staging table, then execute MERGE into the target table on the configured key.
  • overwrite=True: truncate target table before ingesting the replacement batch set.

Likely requirements:

  • Snowflake account and role permissions for streaming ingest and target-table writes.
  • A backend-specific configuration surface for connection/auth settings.

This is similar in goal to BigQuery Storage Write (low-latency ingest), but the SDK/API surface and operational model are Snowflake-specific.

Redshift (coming soon)

Planned support will use Redshift-native loading patterns for in-memory data.

Expected implementation shape:

  • insert: convert input to columnar micro-batches (for example Parquet), write to a temporary S3 prefix, then load with COPY.
  • upsert: COPY into a staging table, then MERGE into the target table on the configured key.
  • overwrite=True: truncate target table before running the replacement load.

Likely requirements:

  • Configured temporary S3 bucket/prefix for batch files.
  • IAM role/policy and Redshift permissions needed for COPY, MERGE, and staging-table lifecycle operations.

Redshift also has Streaming Ingestion for Kinesis/MSK event streams, but for application-driven DataFrame/Arrow writes, the COPY-based flow is typically the practical fit.

Installation

Install with pip:

python -m pip install "ibis-stream[bigquery]"

For development dependencies:

python -m pip install --group dev ".[bigquery]"

Quick start

import ibis
import pandas as pd
from google.cloud import bigquery

from ibis_stream.bigquery import stream_mode

client = bigquery.Client(project="my-project", location="US")
con = ibis.bigquery.connect(
    project_id="my-project",
    dataset_id="my_dataset",
    client=client,
    location="US",
)

rows = pd.DataFrame(
    {
        "id": [1, 2, 3],
        "payload": ["a", "b", "c"],
    }
)

with stream_mode():
    con.insert("target_table", rows)

Upsert example:

with stream_mode():
    con.upsert("target_table", rows, "id")

API

stream_mode(...)

Context manager that installs BigQuery insert/upsert hooks for the duration of the with block.

stream_mode(
    target_batch_bytes=8 * 1024 * 1024,
    max_batch_rows=50_000,
    append_timeout_seconds=120.0,
)
  • target_batch_bytes: target serialized Arrow batch size. Must be > 0 and <= 9 * 1024 * 1024.
  • max_batch_rows: maximum rows per emitted batch chunk.
  • append_timeout_seconds: timeout for each append request (None disables timeout).

StorageWriteInsertError

Raised for Storage Write failures surfaced by this patch layer (for example, row-level append errors).

Supported in-memory inputs

  • pyarrow.Table
  • pyarrow.RecordBatch
  • pyarrow.RecordBatchReader
  • pandas DataFrame
  • polars DataFrame
  • list[dict] and similar objects that Ibis can coerce to memtable
  • ibis.memtable(...) expressions

Non-in-memory table expressions fall back to the original backend insert/upsert implementation.

Testing

Run unit tests:

pytest -q tests/unit

Run integration tests:

python scripts/integration.py run --dialect bigquery

Enable live BigQuery integration tests:

export BQ_LIVE_TESTS=1
export BQ_PROJECT=<your-gcp-project>
export BQ_DATASET=<existing-dataset>
export BQ_LOCATION=US  # optional

pytest -q tests/integration/bigquery/test_live.py -m "integration and bigquery"

Development with Pixi

pixi run lint
pixi run format-check
pixi run test-unit
pixi run test-integration-bigquery

License

Apache-2.0. See LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ibis_stream-0.1.2.tar.gz (48.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ibis_stream-0.1.2-py3-none-any.whl (14.9 kB view details)

Uploaded Python 3

File details

Details for the file ibis_stream-0.1.2.tar.gz.

File metadata

  • Download URL: ibis_stream-0.1.2.tar.gz
  • Upload date:
  • Size: 48.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for ibis_stream-0.1.2.tar.gz
Algorithm Hash digest
SHA256 f6c5c851afbbb4d5db70b409f4129b5cedff48c743431ab1f99ade2b1db778fa
MD5 dc650bed8b7873c48dde2691290799e2
BLAKE2b-256 29ae35b2981e61876260d1c748058bcf9b3cf1ca2f105844949c38179dab80ab

See more details on using hashes here.

Provenance

The following attestation bundles were made for ibis_stream-0.1.2.tar.gz:

Publisher: release.yml on unlap-hq/ibis-stream

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file ibis_stream-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: ibis_stream-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 14.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for ibis_stream-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 588b256ecf96bad13a3674136aa9791cba3d3289525c11670cae265b3a7b14f1
MD5 b130d20d73efa2cd4cfacd0c843cf36c
BLAKE2b-256 f1c2dbc56a04ba8b74367d2bf737ce7c787b9023bb153ced10c4c2c9ec8ec394

See more details on using hashes here.

Provenance

The following attestation bundles were made for ibis_stream-0.1.2-py3-none-any.whl:

Publisher: release.yml on unlap-hq/ibis-stream

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page