RisingWave Python API
Project description
RisingWave Python UDF SDK
This library provides a Python SDK for creating user-defined functions (UDF) in RisingWave.
For a detailed guide on how to use Python UDF in RisingWave, please refer to this doc.
Introduction
RisingWave supports user-defined functions implemented as external functions. With the RisingWave Python UDF SDK, users can define custom UDFs using Python and start a Python process as a UDF server. RisingWave can then remotely access the UDF server to execute the defined functions.
Installation
pip install risingwave
Usage
Define functions in a Python file:
# udf.py
from risingwave.udf import udf, udtf, UdfServer
import struct
import socket
# Define a scalar function
@udf(input_types=['INT', 'INT'], result_type='INT')
def gcd(x, y):
while y != 0:
(x, y) = (y, x % y)
return x
# Define a scalar function that returns multiple values (within a struct)
@udf(input_types=['BYTEA'], result_type='STRUCT<VARCHAR, VARCHAR, SMALLINT, SMALLINT>')
def extract_tcp_info(tcp_packet: bytes):
src_addr, dst_addr = struct.unpack('!4s4s', tcp_packet[12:20])
src_port, dst_port = struct.unpack('!HH', tcp_packet[20:24])
src_addr = socket.inet_ntoa(src_addr)
dst_addr = socket.inet_ntoa(dst_addr)
return src_addr, dst_addr, src_port, dst_port
# Define a table function
@udtf(input_types='INT', result_types='INT')
def series(n):
for i in range(n):
yield i
# Start a UDF server
if __name__ == '__main__':
server = UdfServer(location="0.0.0.0:8815")
server.add_function(gcd)
server.add_function(series)
server.serve()
Start the UDF server:
python3 udf.py
To create functions in RisingWave, use the following syntax:
create function <name> ( <arg_type>[, ...] )
[ returns <ret_type> | returns table ( <column_name> <column_type> [, ...] ) ]
as <name_defined_in_server> using link '<udf_server_address>';
- The
as
parameter specifies the function name defined in the UDF server. - The
link
parameter specifies the address of the UDF server.
For example:
create function gcd(int, int) returns int
as gcd using link 'http://localhost:8815';
create function series(int) returns table (x int)
as series using link 'http://localhost:8815';
select gcd(25, 15);
select * from series(10);
Data Types
The RisingWave Python UDF SDK supports the following data types:
SQL Type | Python Type | Notes |
---|---|---|
BOOLEAN | bool | |
SMALLINT | int | |
INT | int | |
BIGINT | int | |
REAL | float | |
DOUBLE PRECISION | float | |
DECIMAL | decimal.Decimal | |
DATE | datetime.date | |
TIME | datetime.time | |
TIMESTAMP | datetime.datetime | |
INTERVAL | MonthDayNano / (int, int, int) | Fields can be obtained by months() , days() and nanoseconds() from MonthDayNano |
VARCHAR | str | |
BYTEA | bytes | |
JSONB | any | |
T[] | list[T] | |
STRUCT<> | tuple | |
...others | Not supported yet. |
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file risingwave-0.1.1.tar.gz
.
File metadata
- Download URL: risingwave-0.1.1.tar.gz
- Upload date:
- Size: 10.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | ddfb031c3582852f077c36f99dcd540f5fa4b73e44f950c0d926bdb59795095a |
|
MD5 | 7a15693c25bb281087cb630899ff403f |
|
BLAKE2b-256 | e0aa25dce50fde98c1973380bf58bff18d206fa3ece5a0e6dcdfdceedc1df267 |
File details
Details for the file risingwave-0.1.1-py3-none-any.whl
.
File metadata
- Download URL: risingwave-0.1.1-py3-none-any.whl
- Upload date:
- Size: 11.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 236b58f2cc8cb5525baec6e6710d1ce9aedad0212b00d1d4dce275dae2ddd379 |
|
MD5 | a28f28b48f55c79da29ff5af848e6fc3 |
|
BLAKE2b-256 | 587a31a1f80960031357f0f0a90f234c547e77e0076493eeb294dffa5b7dda2b |