Vector Gateway Interface - Connect DuckDB to external programs via Apache Arrow
Project description
VGI (Vector Gateway Interface)
Apache Arrow-based protocol for extending DuckDB using any language.
No C++/C/Zig/Rust or compilation/linking required (unless you want to).
Created by Query.Farm
See It in Action
# my_worker.py
from typing import Annotated
from vgi import ScalarFunction, Param, Returns, Worker
import pyarrow as pa
import pyarrow.compute as pc
class Greeting(ScalarFunction):
"""Generate a greeting for each name."""
@classmethod
def compute(
cls,
name: Annotated[pa.StringArray, Param(doc="Column containing names")],
) -> Annotated[pa.StringArray, Returns()]:
return pc.binary_join_element_wise("Hello, ", name, "!")
class MyWorker(Worker):
functions = [Greeting]
if __name__ == "__main__":
MyWorker().run()
-- First time only.
INSTALL vgi FROM COMMUNITY;
LOAD vgi;
ATTACH 'my_worker' (TYPE 'vgi', LOCATION './my_worker.py');
SELECT greeting(name) FROM users;
-- "Hello, Alice!"
-- "Hello, Bob!"
Or you can launch the DuckDB CLI with
duckdb vgi:my_worker.py to start a new session with the functions you just added.
That's it. No C++ compilation, no extension versioning, no complex build process. Just a Python script that DuckDB can call.
Installation
pip install vgi
Or with uv:
uv add vgi
Why VGI?
VGI lets you extend DuckDB with Python functions that run in separate processes, communicating via Apache Arrow IPC. This means:
| Traditional Extensions | VGI Workers |
|---|---|
| C/C++ compilation required | Any language but first Python and Typescript and Go |
| Tied to DuckDB version | Version independent |
| Complex build/release cycle | Ship a script or executable |
| Runs in-process | Process isolation |
| Single-threaded | Parallel workers |
Use cases:
- Call REST APIs or external services from SQL
- Run ML inference (PyTorch, scikit-learn, etc.)
- Process data with Python libraries (pandas, numpy)
- Build custom ETL transforms
- Create domain-specific functions for your team
- Expose external data sources as queryable tables and views
Quick Start
Step 1: Create a Worker
A worker is a Python script that defines one or more functions:
#!/usr/bin/env python
# my_worker.py
from typing import Annotated
import pyarrow as pa
import pyarrow.compute as pc
from vgi import ScalarFunction, Param, Returns, Worker
class UpperCase(ScalarFunction):
"""Convert string values to uppercase."""
@classmethod
def compute(
cls,
value: Annotated[pa.StringArray, Param(doc="String value to uppercase")],
) -> Annotated[pa.StringArray, Returns()]:
return pc.utf8_upper(value)
class MyWorker(Worker):
catalog_name = "my_funcs"
functions = [UpperCase]
if __name__ == "__main__":
MyWorker().run()
Step 2: Use from DuckDB
-- Attach the worker as a catalog
ATTACH 'my_funcs' (TYPE 'vgi', LOCATION './my_worker.py');
-- Call your function
SELECT upper_case(name) FROM users;
-- Use in complex queries
SELECT id, upper_case(status) as status
FROM orders
WHERE created_at > '2024-01-01';
Step 3: There is no step 3
Your function is now available in DuckDB. Ship the Python script to your team, and they can use it immediately.
Going Further: Type-Safe Arguments
For production use, you'll want type validation. Use Param with type_bound to ensure columns have the correct type:
from typing import Annotated
from vgi import ScalarFunction, Param, Returns, Worker
import pyarrow as pa
import pyarrow.compute as pc
class AddValues(ScalarFunction):
"""Add two integer values together."""
@classmethod
def compute(
cls,
left: Annotated[pa.Int64Array, Param(type_bound=pa.types.is_integer, doc="First integer value")],
right: Annotated[pa.Int64Array, Param(type_bound=pa.types.is_integer, doc="Second integer value")],
) -> Annotated[pa.Int64Array, Returns()]:
return pc.add(left, right)
SELECT add_values(price, tax) as total FROM orders;
-- This would fail at bind time with a clear error:
-- SELECT add_values(name, price) FROM orders;
-- Error: Column 'name' has type string, expected integer
Key features of the Param/Returns API:
- Types are inferred from PyArrow array annotations (
pa.Int64Array->pa.int64()) type_boundvalidates the column's Arrow type at bind timeConstParamreceives scalar values (not columns) from SQL argumentsReturnsdeclares the output type
Function Types
VGI supports three function types:
| Type | Base Class | SQL Pattern | Use Case |
|---|---|---|---|
| Scalar | ScalarFunction |
SELECT func(col) FROM t |
Per-row transforms (1:1) |
| Table | TableFunctionGenerator |
SELECT * FROM func(args) |
Generate data |
| Table-In-Out | TableInOutFunction |
SELECT * FROM func((SELECT ...)) |
Aggregation, filtering |
Scalar Functions
Transform each row independently. Output has the same number of rows as input.
class Double(ScalarFunction):
"""Double an integer value."""
@classmethod
def compute(
cls,
value: Annotated[pa.Int64Array, Param(doc="Value to double")],
) -> Annotated[pa.Int64Array, Returns()]:
return pc.multiply(value, 2)
Table Functions
Generate output data from arguments (no input table). Each call to process() emits
a batch via out.emit() or signals completion via out.finish().
from dataclasses import dataclass
from typing import Annotated, ClassVar
import pyarrow as pa
from vgi import TableFunctionGenerator, Arg
from vgi.table_function import ProcessParams, OutputCollector
@dataclass
class CounterState:
remaining: int
current: int = 0
class Counter(TableFunctionGenerator):
"""Generate a sequence of integers."""
count: Annotated[int, Arg(0, doc="Number of rows to generate")]
FIXED_SCHEMA: ClassVar[pa.Schema] = pa.schema([("n", pa.int64())])
@classmethod
def initial_state(cls, params: ProcessParams) -> CounterState:
return CounterState(remaining=params.args.count)
@classmethod
def process(cls, params: ProcessParams, state: CounterState, out: OutputCollector) -> None:
if state.remaining <= 0:
out.finish()
return
batch_size = min(state.remaining, 1000)
values = list(range(state.current, state.current + batch_size))
out.emit(pa.RecordBatch.from_pydict({"n": values}, schema=params.output_schema))
state.current += batch_size
state.remaining -= batch_size
Table-In-Out Functions
Transform or aggregate input data. Override transform() for per-batch processing
and finish() for final output after all input is consumed.
import pyarrow as pa
import pyarrow.compute as pc
from vgi import TableInOutFunction
class FilterPositive(TableInOutFunction):
"""Keep only rows where all numeric columns are positive."""
@property
def output_schema(self) -> pa.Schema:
return self.input_schema
def transform(self, batch: pa.RecordBatch) -> pa.RecordBatch:
mask = None
for i, field in enumerate(batch.schema):
if pa.types.is_integer(field.type) or pa.types.is_floating(field.type):
col_mask = pc.greater(batch.column(i), 0)
mask = col_mask if mask is None else pc.and_(mask, col_mask)
if mask is not None:
return pc.filter(batch, mask)
return batch
Beyond Functions: Full Catalog Support
VGI workers can expose more than just functions. A worker can provide a complete database catalog with:
- Schemas - Organize objects into namespaces
- Tables - Expose external data as queryable tables
- Views - Define SQL views over your data
- Functions - Scalar, table, and table-in-out functions
ATTACH 'external_db' (TYPE 'vgi', LOCATION './my_catalog_worker.py');
-- Query tables from the attached catalog
SELECT * FROM external_db.main.users;
-- Use views
SELECT * FROM external_db.analytics.daily_summary;
-- Call functions
SELECT external_db.main.transform(col) FROM my_table;
This enables VGI workers to act as bridges to external systems—databases, APIs, file systems—presenting them as native DuckDB catalogs.
See Catalog Interface for implementation details.
Parallel Execution
Functions can run across multiple worker processes. The client automatically distributes input batches round-robin across workers and collects results.
See Function API Reference for advanced patterns like distributed aggregation.
Error Handling
Errors in your functions propagate to DuckDB with clear messages:
@classmethod
def compute(cls, value: Annotated[pa.Int64Array, Param()]) -> Annotated[pa.Int64Array, Returns()]:
raise ValueError("Something went wrong")
SELECT my_func(col) FROM my_table;
-- Error: Something went wrong
Type bound violations are caught at bind time (before processing starts):
SELECT add_values(name, price) FROM orders;
-- Error: Argument 'left': Column 'name' has type string,
-- but type bound requires: is_integer
Debugging Worker Failures
When a worker fails, the Python traceback is written to stderr. By default, the client captures this stderr and includes it in the error message (last 50 lines), so you get the full context:
ClientError: Worker Exception: function 'my_func' raised ValueError
Worker stderr:
Traceback (most recent call last):
File "my_worker.py", line 42, in compute
...
ValueError: Something went wrong
For real-time debugging, set VGI_WORKER_DEBUG=1 to stream worker logs directly to your terminal and enable DEBUG-level logging:
VGI_WORKER_DEBUG=1 python my_script.py
This is especially useful when integrating from C++ or other clients where stderr might otherwise be lost.
Testing Your Functions
Use the VGI client for integration tests:
from vgi.client import Client
from vgi import Arguments
import pyarrow as pa
batch = pa.RecordBatch.from_pydict({"name": ["alice", "bob"]})
with Client("./my_worker.py") as client:
results = list(client.scalar_function(
function_name="upper_case",
input=iter([batch]),
arguments=Arguments(positional=[pa.scalar("name")]),
))
assert results[0]["result"].to_pylist() == ["ALICE", "BOB"]
Protocol Overview
VGI uses vgi_rpc, an Apache Arrow IPC-based RPC framework, for all
client-worker communication over stdin/stdout pipes:
Client Worker
│ │
│──── bind(request) ──────────────▶ │ Function name, args, input schema
│◀─── BindResponse ──────────────── │ Output schema, opaque data
│ │
│──── init(request) ──────────────▶ │ Start processing stream
│◀─── Stream header ─────────────── │ execution_id, max_workers
│ │
│──── exchange(batch1) ───────────▶ │
│◀─── output batch 1 ────────────── │ transform(batch)
│ ... │
│──── [stream close] ─────────────▶ │ Signal end of input
│ │
│──── init(phase=FINALIZE) ───────▶ │ Start finalize stream
│◀─── final output batches ──────── │ finish() results
└───────────────────────────────────┘
External Batch Offloading (Demo Storage)
When record batches are too large for HTTP request/response bodies, VGI supports externalizing them to blob storage. The server replaces oversized batches with pointer batches containing a URL, and the client transparently fetches the data.
The example HTTP server includes a built-in demo blob store for testing this without S3 or any cloud infrastructure:
# Start with demo storage (4 KiB threshold for testing)
vgi-fixture-http --demo-storage --externalize-threshold-bytes 4096
# With zstd compression
vgi-fixture-http --demo-storage --externalize-threshold-bytes 4096 --externalize-compression zstd
When --demo-storage is enabled:
- Batches exceeding
--externalize-threshold-bytesare stored in-memory and served from/__blobs__/{id}endpoints on the same server - Clients can request upload URLs for large inputs via the
__upload_url__endpoint - The server advertises
VGI-Max-Request-Bytesand rejects oversized requests with 413
For production use, implement the ExternalStorage protocol from vgi_rpc against
your cloud storage (S3, GCS, etc.). The example server also supports S3 via --s3-bucket.
Documentation
- Function Lifecycle - Bind, init, process, finalize
- Metadata API - Function introspection
- Function API Reference - Advanced function patterns
- Catalog Interface - DuckDB ATTACH integration
Logging
Workers support --debug, --log-level, --log-format, and --log-logger options:
# Enable debug logging
vgi-fixture-worker --debug
# JSON-formatted logs for structured pipelines
vgi-fixture-worker --log-format json
# Target a specific logger
vgi-fixture-worker --log-level DEBUG --log-logger vgi.worker
You can also use the VGI_WORKER_DEBUG=1 environment variable, which enables --debug on the worker and stderr passthrough on the client without changing any code or CLI flags:
VGI_WORKER_DEBUG=1 python my_script.py
See CLI Reference for the full list of loggers and options.
Development
git clone https://github.com/query-farm/vgi-python
cd vgi-python
uv sync --all-extras # Install dependencies
uv run pytest -n auto # Run tests
uv run ruff check --fix . # Lint
uv run ruff format . # Format
uv run mypy vgi/ # Type check
Requirements
- Python >= 3.12.4
- pyarrow
- DuckDB (for SQL integration)
License
Copyright (c) 2025, 2026 Query Farm LLC.
Licensed under the Query Farm Source-Available License, Version 1.0 — see LICENSE for the binding terms. In summary (the LICENSE text governs):
- ✅ Use, copy, modify, and redistribute the code freely, including in production and for commercial purposes — your own internal use, and building products and services on top of VGI.
- 🚫 Not permitted without a separate commercial license: offering a competing VGI-equivalent product or service to third parties (hosted, embedded, or as-a-service), or operating a commercial marketplace for such services.
- ⏳ Each released version converts to the Apache License, Version 2.0, ten years after its public release.
For a commercial license or any licensing questions, contact hello@query.farm.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file vgi_python-0.8.0.tar.gz.
File metadata
- Download URL: vgi_python-0.8.0.tar.gz
- Upload date:
- Size: 2.1 MB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
12bacb99f03a1a911b2981dbf4ec12cde9ca45ba57f7bae0b25fb07221e0b329
|
|
| MD5 |
0f2c9016a435a435315d7d89c1ef2f20
|
|
| BLAKE2b-256 |
1d7807ef995f1af0130de81005447fe27b8b85d21c1bbccab36734f4ee27e026
|
Provenance
The following attestation bundles were made for vgi_python-0.8.0.tar.gz:
Publisher:
release.yml on Query-farm/vgi-python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
vgi_python-0.8.0.tar.gz -
Subject digest:
12bacb99f03a1a911b2981dbf4ec12cde9ca45ba57f7bae0b25fb07221e0b329 - Sigstore transparency entry: 1826523602
- Sigstore integration time:
-
Permalink:
Query-farm/vgi-python@b3463968bf27dfcbc8e0d330266b48e83a88b100 -
Branch / Tag:
refs/tags/v0.8.0 - Owner: https://github.com/Query-farm
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@b3463968bf27dfcbc8e0d330266b48e83a88b100 -
Trigger Event:
release
-
Statement type:
File details
Details for the file vgi_python-0.8.0-py3-none-any.whl.
File metadata
- Download URL: vgi_python-0.8.0-py3-none-any.whl
- Upload date:
- Size: 566.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4547e192b94f09f1255adec6f911042afb04c95f44be52aeb6784399757bff4b
|
|
| MD5 |
8d75bd3f996d37acce389dd2afbd2100
|
|
| BLAKE2b-256 |
c9e2790f7ace137c7fb9df5ea6d44c4171a97746443450ff03d775916761a7d7
|
Provenance
The following attestation bundles were made for vgi_python-0.8.0-py3-none-any.whl:
Publisher:
release.yml on Query-farm/vgi-python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
vgi_python-0.8.0-py3-none-any.whl -
Subject digest:
4547e192b94f09f1255adec6f911042afb04c95f44be52aeb6784399757bff4b - Sigstore transparency entry: 1826523737
- Sigstore integration time:
-
Permalink:
Query-farm/vgi-python@b3463968bf27dfcbc8e0d330266b48e83a88b100 -
Branch / Tag:
refs/tags/v0.8.0 - Owner: https://github.com/Query-farm
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@b3463968bf27dfcbc8e0d330266b48e83a88b100 -
Trigger Event:
release
-
Statement type: