RisingWave Python API
Project description
RisingWave Python UDF SDK
This library provides a Python SDK for creating user-defined functions (UDF) in RisingWave.
For a detailed guide on how to use Python UDF in RisingWave, please refer to this doc.
Introduction
RisingWave supports user-defined functions implemented as external functions. With the RisingWave Python UDF SDK, users can define custom UDFs using Python and start a Python process as a UDF server. RisingWave can then remotely access the UDF server to execute the defined functions.
Installation
pip install risingwave
Usage
Define functions in a Python file:
# udf.py
from risingwave.udf import udf, udtf, UdfServer
import struct
import socket
# Define a scalar function
@udf(input_types=['INT', 'INT'], result_type='INT')
def gcd(x, y):
while y != 0:
(x, y) = (y, x % y)
return x
# Define a scalar function that returns multiple values (within a struct)
@udf(input_types=['BYTEA'], result_type='STRUCT<VARCHAR, VARCHAR, SMALLINT, SMALLINT>')
def extract_tcp_info(tcp_packet: bytes):
src_addr, dst_addr = struct.unpack('!4s4s', tcp_packet[12:20])
src_port, dst_port = struct.unpack('!HH', tcp_packet[20:24])
src_addr = socket.inet_ntoa(src_addr)
dst_addr = socket.inet_ntoa(dst_addr)
return src_addr, dst_addr, src_port, dst_port
# Define a table function
@udtf(input_types='INT', result_types='INT')
def series(n):
for i in range(n):
yield i
# Start a UDF server
if __name__ == '__main__':
server = UdfServer(location="0.0.0.0:8815")
server.add_function(gcd)
server.add_function(series)
server.serve()
Start the UDF server:
python3 udf.py
To create functions in RisingWave, use the following syntax:
create function <name> ( <arg_type>[, ...] )
[ returns <ret_type> | returns table ( <column_name> <column_type> [, ...] ) ]
as <name_defined_in_server> using link '<udf_server_address>';
- The
as
parameter specifies the function name defined in the UDF server. - The
link
parameter specifies the address of the UDF server.
For example:
create function gcd(int, int) returns int
as gcd using link 'http://localhost:8815';
create function series(int) returns table (x int)
as series using link 'http://localhost:8815';
select gcd(25, 15);
select * from series(10);
Data Types
The RisingWave Python UDF SDK supports the following data types:
SQL Type | Python Type | Notes |
---|---|---|
BOOLEAN | bool | |
SMALLINT | int | |
INT | int | |
BIGINT | int | |
REAL | float | |
DOUBLE PRECISION | float | |
DECIMAL | decimal.Decimal | |
DATE | datetime.date | |
TIME | datetime.time | |
TIMESTAMP | datetime.datetime | |
INTERVAL | MonthDayNano / (int, int, int) | Fields can be obtained by months() , days() and nanoseconds() from MonthDayNano |
VARCHAR | str | |
BYTEA | bytes | |
JSONB | any | |
T[] | list[T] | |
STRUCT<> | tuple | |
...others | Not supported yet. |
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for risingwave-0.0.12-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | adde9a03e4b00a544be34a5e359b9384b8f359f4a07e05e7141d3163c0a8fe03 |
|
MD5 | 0df8c917ddbba623a4461149e9fc6c01 |
|
BLAKE2b-256 | 8b0f6e26adc57d9fb887198a85d68d1afd66173f951e25f2f4956c98c937d120 |