Skip to main content

Databend UDF Server

Project description

Databend Python UDF

Write user-defined functions in Python for Databend.

Quick Start

from databend_udf import udf, UDFServer

@udf(input_types=["INT", "INT"], result_type="INT")
def add(a: int, b: int) -> int:
    return a + b

if __name__ == '__main__':
    server = UDFServer("0.0.0.0:8815")
    server.add_function(add)
    server.serve()
python3 my_udf.py
CREATE FUNCTION add (INT, INT) RETURNS INT
LANGUAGE python HANDLER = 'add' ADDRESS = 'http://0.0.0.0:8815';

SELECT add(1, 2);  -- 3

Data Types

SQL Type Python Type Example
BOOLEAN bool True
TINYINT int 127
SMALLINT int 32767
INT int 42
BIGINT int 1000000
UINT8 int 255
UINT16 int 65535
UINT32 int 1000000
UINT64 int 1000000
FLOAT float 3.14
DOUBLE float 3.14159
DECIMAL(P,S) decimal.Decimal Decimal("99.99")
DATE datetime.date date(2024, 1, 15)
TIMESTAMP datetime.datetime datetime(2024, 1, 15, 10, 30)
VARCHAR str "hello"
BINARY bytes b"data"
VARIANT any {"key": "value"}
ARRAY(T) list[T] [1, 2, 3]
MAP(K,V) dict[K,V] {"a": 1, "b": 2}
TUPLE(T...) tuple(T...) (1, "hello", True)
VECTOR(N) list[float] [0.1, 0.2, 0.3]

Note: SQL NULL maps to Python None.

Examples

Basic Types

from databend_udf import udf
import datetime
from decimal import Decimal

@udf(input_types=["INT"], result_type="INT")
def double(x: int) -> int:
    return x * 2

@udf(input_types=["VARCHAR"], result_type="VARCHAR")
def upper(s: str) -> str:
    return s.upper()

@udf(input_types=["DATE", "INT"], result_type="DATE")
def add_days(d: datetime.date, days: int) -> datetime.date:
    return d + datetime.timedelta(days=days)

@udf(input_types=["DECIMAL(10,2)"], result_type="DECIMAL(10,2)")
def add_tax(price: Decimal) -> Decimal:
    return price * Decimal("1.1")

Complex Types

from typing import List, Dict, Tuple

@udf(input_types=["ARRAY(INT)"], result_type="INT")
def array_sum(arr: List[int]) -> int:
    return sum(arr)

@udf(input_types=["MAP(VARCHAR, INT)"], result_type="INT")
def map_sum(m: Dict[str, int]) -> int:
    return sum(m.values())

@udf(input_types=["TUPLE(INT, VARCHAR)"], result_type="VARCHAR")
def tuple_format(t: Tuple[int, str]) -> str:
    return f"{t[0]}: {t[1]}"

@udf(input_types=["VECTOR(128)"], result_type="VECTOR(128)")
def normalize_vector(v: List[float]) -> List[float]:
    norm = sum(x * x for x in v) ** 0.5
    return [x / norm for x in v] if norm > 0 else v

NULL Handling

from typing import Optional

# Option 1: skip_null=True - NULL input returns NULL
@udf(input_types=["INT"], result_type="INT", skip_null=True)
def double(x: int) -> int:
    return x * 2

# Option 2: skip_null=False - Handle NULL manually
@udf(input_types=["INT"], result_type="INT", skip_null=False)
def double_or_zero(x: Optional[int]) -> int:
    return x * 2 if x is not None else 0

# Nullable array elements
@udf(input_types=["ARRAY(INT NULL)"], result_type="INT", skip_null=False)
def sum_non_null(arr: List[Optional[int]]) -> int:
    return sum(x for x in arr if x is not None)

Table Functions

@udf(
    input_types=["INT", "INT"],
    result_type=[("left", "INT"), ("right", "INT"), ("sum", "INT")],
    batch_mode=True,
)
def expand_pairs(left: List[int], right: List[int]):
    return [
        {"left": l, "right": r, "sum": l + r}
        for l, r in zip(left, right)
    ]
SELECT * FROM expand_pairs([1, 2, 3], [10, 20, 30]);

I/O Bound Functions

For I/O-bound functions (e.g., network requests, file operations), use io_threads to enable parallel processing of rows within a batch:

import time

@udf(input_types=["INT"], result_type="INT", io_threads=32)
def fetch_data(id: int) -> int:
    time.sleep(0.1)  # Simulates I/O operation
    return id * 2

Note: io_threads controls parallelism within a single batch, not across requests.

Concurrency Limiting

To protect your UDF server from being overwhelmed, use max_concurrency to limit the number of concurrent requests per function:

from databend_udf import udf, UDFServer

@udf(input_types=["INT"], result_type="INT", max_concurrency=10)
def expensive_operation(x: int) -> int:
    # Only 10 concurrent requests allowed
    # Additional requests will wait for a slot
    return x * 2

By default, when the limit is reached, new requests wait until a slot becomes available. You can set a timeout to reject requests that wait too long:

@udf(
    input_types=["INT"],
    result_type="INT",
    max_concurrency=10,
    concurrency_timeout=30,  # Wait up to 30 seconds, then reject
)
def expensive_operation(x: int) -> int:
    return x * 2

When the timeout expires, a ConcurrencyLimitExceeded error is raised.

You can combine io_threads and max_concurrency:

@udf(
    input_types=["INT"],
    result_type="INT",
    io_threads=32,           # 32 threads for I/O within each request
    max_concurrency=5,       # But only 5 concurrent requests allowed
    concurrency_timeout=60,  # Wait up to 60s for a slot
)
def api_call(id: int) -> int:
    # Protected from too many concurrent requests
    return fetch_from_api(id)

Configuration

Databend Config

Edit databend-query.toml:

[query]
enable_udf_server = true
udf_server_allow_list = ["http://0.0.0.0:8815"]

UDF Decorator Parameters

Parameter Type Default Description
input_types List[str] - Input SQL types
result_type str | List[Tuple] - Return SQL type or table schema
name str None Custom function name
skip_null bool False Auto-return NULL for NULL inputs
io_threads int 32 Thread pool size for parallel row processing
batch_mode bool False Enable for table functions
max_concurrency int None Max concurrent requests (None = unlimited)
concurrency_timeout float None Seconds to wait for a slot (None = wait forever)

Additional Resources

Development

Format code with Ruff:

pip install ruff
ruff format python/databend_udf python/tests

Inspired by RisingWave Python API.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

databend_udf-0.2.19.tar.gz (28.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

databend_udf-0.2.19-py3-none-any.whl (20.4 kB view details)

Uploaded Python 3

File details

Details for the file databend_udf-0.2.19.tar.gz.

File metadata

  • Download URL: databend_udf-0.2.19.tar.gz
  • Upload date:
  • Size: 28.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for databend_udf-0.2.19.tar.gz
Algorithm Hash digest
SHA256 6eb0d4b46a657257ea7102c6a8e9a1a57196a7012616980d92d8d86cbf0600b1
MD5 8fb5ca678daec6066c7d6522183280e8
BLAKE2b-256 42592518efd870594cdb8be30aabf70ce028f38e2db3e7ef8a244ea42e601682

See more details on using hashes here.

Provenance

The following attestation bundles were made for databend_udf-0.2.19.tar.gz:

Publisher: python.yml on databendlabs/databend-udf

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file databend_udf-0.2.19-py3-none-any.whl.

File metadata

  • Download URL: databend_udf-0.2.19-py3-none-any.whl
  • Upload date:
  • Size: 20.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for databend_udf-0.2.19-py3-none-any.whl
Algorithm Hash digest
SHA256 4e8520034f47176a5d366348d4cb6325b9ee66a13cf47ec8a39811e9ade7149e
MD5 30a0fe4a02e0c3f4a04e7f6be4c39d85
BLAKE2b-256 3ffacfea4c935313fa0474f3c39155a200a863294233ffa2ccc1c5e758e53b8a

See more details on using hashes here.

Provenance

The following attestation bundles were made for databend_udf-0.2.19-py3-none-any.whl:

Publisher: python.yml on databendlabs/databend-udf

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page