Databend UDF Server
Project description
Databend Python UDF
Write user-defined functions in Python for Databend.
Quick Start
from databend_udf import udf, UDFServer
@udf(input_types=["INT", "INT"], result_type="INT")
def add(a: int, b: int) -> int:
return a + b
if __name__ == '__main__':
server = UDFServer("0.0.0.0:8815")
server.add_function(add)
server.serve()
python3 my_udf.py
CREATE FUNCTION add (INT, INT) RETURNS INT
LANGUAGE python HANDLER = 'add' ADDRESS = 'http://0.0.0.0:8815';
SELECT add(1, 2); -- 3
Data Types
| SQL Type | Python Type | Example |
|---|---|---|
| BOOLEAN | bool | True |
| TINYINT | int | 127 |
| SMALLINT | int | 32767 |
| INT | int | 42 |
| BIGINT | int | 1000000 |
| UINT8 | int | 255 |
| UINT16 | int | 65535 |
| UINT32 | int | 1000000 |
| UINT64 | int | 1000000 |
| FLOAT | float | 3.14 |
| DOUBLE | float | 3.14159 |
| DECIMAL(P,S) | decimal.Decimal | Decimal("99.99") |
| DATE | datetime.date | date(2024, 1, 15) |
| TIMESTAMP | datetime.datetime | datetime(2024, 1, 15, 10, 30) |
| VARCHAR | str | "hello" |
| BINARY | bytes | b"data" |
| VARIANT | any | {"key": "value"} |
| ARRAY(T) | list[T] | [1, 2, 3] |
| MAP(K,V) | dict[K,V] | {"a": 1, "b": 2} |
| TUPLE(T...) | tuple(T...) | (1, "hello", True) |
| VECTOR(N) | list[float] | [0.1, 0.2, 0.3] |
Note: SQL NULL maps to Python None.
Examples
Basic Types
from databend_udf import udf
import datetime
from decimal import Decimal
@udf(input_types=["INT"], result_type="INT")
def double(x: int) -> int:
return x * 2
@udf(input_types=["VARCHAR"], result_type="VARCHAR")
def upper(s: str) -> str:
return s.upper()
@udf(input_types=["DATE", "INT"], result_type="DATE")
def add_days(d: datetime.date, days: int) -> datetime.date:
return d + datetime.timedelta(days=days)
@udf(input_types=["DECIMAL(10,2)"], result_type="DECIMAL(10,2)")
def add_tax(price: Decimal) -> Decimal:
return price * Decimal("1.1")
Complex Types
from typing import List, Dict, Tuple
@udf(input_types=["ARRAY(INT)"], result_type="INT")
def array_sum(arr: List[int]) -> int:
return sum(arr)
@udf(input_types=["MAP(VARCHAR, INT)"], result_type="INT")
def map_sum(m: Dict[str, int]) -> int:
return sum(m.values())
@udf(input_types=["TUPLE(INT, VARCHAR)"], result_type="VARCHAR")
def tuple_format(t: Tuple[int, str]) -> str:
return f"{t[0]}: {t[1]}"
@udf(input_types=["VECTOR(128)"], result_type="VECTOR(128)")
def normalize_vector(v: List[float]) -> List[float]:
norm = sum(x * x for x in v) ** 0.5
return [x / norm for x in v] if norm > 0 else v
NULL Handling
from typing import Optional
# Option 1: skip_null=True - NULL input returns NULL
@udf(input_types=["INT"], result_type="INT", skip_null=True)
def double(x: int) -> int:
return x * 2
# Option 2: skip_null=False - Handle NULL manually
@udf(input_types=["INT"], result_type="INT", skip_null=False)
def double_or_zero(x: Optional[int]) -> int:
return x * 2 if x is not None else 0
# Nullable array elements
@udf(input_types=["ARRAY(INT NULL)"], result_type="INT", skip_null=False)
def sum_non_null(arr: List[Optional[int]]) -> int:
return sum(x for x in arr if x is not None)
Table Functions
@udf(
input_types=["INT", "INT"],
result_type=[("left", "INT"), ("right", "INT"), ("sum", "INT")],
batch_mode=True,
)
def expand_pairs(left: List[int], right: List[int]):
return [
{"left": l, "right": r, "sum": l + r}
for l, r in zip(left, right)
]
SELECT * FROM expand_pairs([1, 2, 3], [10, 20, 30]);
I/O Bound Functions
For I/O-bound functions (e.g., network requests, file operations), use io_threads to enable parallel processing of rows within a batch:
import time
@udf(input_types=["INT"], result_type="INT", io_threads=32)
def fetch_data(id: int) -> int:
time.sleep(0.1) # Simulates I/O operation
return id * 2
Note: io_threads controls parallelism within a single batch, not across requests.
Concurrency Limiting
To protect your UDF server from being overwhelmed, use max_concurrency to limit the number of concurrent requests per function:
from databend_udf import udf, UDFServer
@udf(input_types=["INT"], result_type="INT", max_concurrency=10)
def expensive_operation(x: int) -> int:
# Only 10 concurrent requests allowed
# Additional requests will wait for a slot
return x * 2
By default, when the limit is reached, new requests wait until a slot becomes available. You can set a timeout to reject requests that wait too long:
@udf(
input_types=["INT"],
result_type="INT",
max_concurrency=10,
concurrency_timeout=30, # Wait up to 30 seconds, then reject
)
def expensive_operation(x: int) -> int:
return x * 2
When the timeout expires, a ConcurrencyLimitExceeded error is raised.
You can combine io_threads and max_concurrency:
@udf(
input_types=["INT"],
result_type="INT",
io_threads=32, # 32 threads for I/O within each request
max_concurrency=5, # But only 5 concurrent requests allowed
concurrency_timeout=60, # Wait up to 60s for a slot
)
def api_call(id: int) -> int:
# Protected from too many concurrent requests
return fetch_from_api(id)
Configuration
Databend Config
Edit databend-query.toml:
[query]
enable_udf_server = true
udf_server_allow_list = ["http://0.0.0.0:8815"]
UDF Decorator Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
input_types |
List[str] |
- | Input SQL types |
result_type |
str | List[Tuple] |
- | Return SQL type or table schema |
name |
str |
None | Custom function name |
skip_null |
bool |
False | Auto-return NULL for NULL inputs |
io_threads |
int |
32 | Thread pool size for parallel row processing |
batch_mode |
bool |
False | Enable for table functions |
max_concurrency |
int |
None | Max concurrent requests (None = unlimited) |
concurrency_timeout |
float |
None | Seconds to wait for a slot (None = wait forever) |
Additional Resources
- Comprehensive Examples - All data types
- Test Cases - Usage examples
Development
Format code with Ruff:
pip install ruff
ruff format python/databend_udf python/tests
Inspired by RisingWave Python API.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file databend_udf-0.2.20.tar.gz.
File metadata
- Download URL: databend_udf-0.2.20.tar.gz
- Upload date:
- Size: 30.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d3c4f6f840a5ebb8d818c3d3ae3683263f9f41f87f30c4679a1e3da6e2c63036
|
|
| MD5 |
800c8d55cb7ee24e2b7dabf300e9bf71
|
|
| BLAKE2b-256 |
3c111b63068017c913c208a70050ff34f76ba54250622afe55dfdd206d5f4b81
|
Provenance
The following attestation bundles were made for databend_udf-0.2.20.tar.gz:
Publisher:
python.yml on databendlabs/databend-udf
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
databend_udf-0.2.20.tar.gz -
Subject digest:
d3c4f6f840a5ebb8d818c3d3ae3683263f9f41f87f30c4679a1e3da6e2c63036 - Sigstore transparency entry: 742333663
- Sigstore integration time:
-
Permalink:
databendlabs/databend-udf@ead597127cca65d78bccccf734e0f5e152ae2584 -
Branch / Tag:
refs/tags/v0.2.20 - Owner: https://github.com/databendlabs
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python.yml@ead597127cca65d78bccccf734e0f5e152ae2584 -
Trigger Event:
push
-
Statement type:
File details
Details for the file databend_udf-0.2.20-py3-none-any.whl.
File metadata
- Download URL: databend_udf-0.2.20-py3-none-any.whl
- Upload date:
- Size: 20.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5849d3885736f4d26febcd9ea2b778f9982e4e96fd0c60789aa331a97dd39286
|
|
| MD5 |
d1c1ad8b4ce4095e1dae78a9ea230af3
|
|
| BLAKE2b-256 |
e65fd0734ca7931e96975f0a48e80a17ed8cc1511a77612751a74ebece1ab973
|
Provenance
The following attestation bundles were made for databend_udf-0.2.20-py3-none-any.whl:
Publisher:
python.yml on databendlabs/databend-udf
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
databend_udf-0.2.20-py3-none-any.whl -
Subject digest:
5849d3885736f4d26febcd9ea2b778f9982e4e96fd0c60789aa331a97dd39286 - Sigstore transparency entry: 742333665
- Sigstore integration time:
-
Permalink:
databendlabs/databend-udf@ead597127cca65d78bccccf734e0f5e152ae2584 -
Branch / Tag:
refs/tags/v0.2.20 - Owner: https://github.com/databendlabs
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python.yml@ead597127cca65d78bccccf734e0f5e152ae2584 -
Trigger Event:
push
-
Statement type: