Python SDK for real-time processing
Project description
RisingWave Python SDK
Simple Python SDK for event-driven applications with RisingWave.
Quick start
1. Install risingwave-py (PyPI)
pip install risingwave-py
2. Run RisingWave
You can install RisingWave standlone on your laptop via:
# Download and install RisingWave standalone
curl https://risingwave.com/sh | sh
# start RisingWave on macOS
risingwave
# start RisingWave on linux
./risingwave
You can also provision a free-tier cluster in RisingWave Cloud
3. Interact with RisingWave in Python
Initialization
from risingwave import RisingWave, RisingWaveConnOptions, OutputFormat
import pandas as pd
import threading
# Init to connect to RisingWave instance on localhost
# You can also init with a connection string: RisingWave(RisingWaveConnOptions("postgresql://root:root@localhost:4566/dev"))
rw = RisingWave(
RisingWaveConnOptions.from_connection_info(
host="localhost", port=4566, user="root", password="root", database="dev"
)
)
Insert and query data in DataFrame via SQL
# Insert a dataframe into a test_product table
test_df1 = pd.DataFrame(
{
"product": ["foo", "bar"],
"price": [123.4, 456.7],
}
)
rw.insert(table_name="test_product", data=test_df1)
# Fetch data from the test_product table via SQL
rw.fetch("SELECT * FROM test_product", format=OutputFormat.DATAFRAME)
Subscribe changes from a table
# Subscribe to changes in the test_product table in a separate thread.
# Print out the changes to console when they occur.
def subscribe_product_change():
rw.on_change(
subscribe_from="test_product",
handler=lambda x: print(x),
output_format=OutputFormat.DATAFRAME,
)
threading.Thread(target=subscribe_product_change).start()
# Insert a new dataframe into the table test_product
test_df2 = pd.DataFrame(
{
"product": ["foo", "bar"],
"price": [78.9, 10.11],
}
)
rw.insert(table_name="test_product", data=test_df2)
### You should be able to see the changes for produce in console now!
Define your streaming job via materialized view in SQL
# Create a materialized view to calculate the average price of each product
mv = rw.mv(
name="test_product_avg_price_mv",
stmt="SELECT product, avg(price) as avg_price from test_product GROUP BY product",
)
# Fetch data from the materialized view via SQL
rw.fetch("SELECT * FROM test_product_avg_price_mv", format=OutputFormat.DATAFRAME)
Subscribe changes from your stremaing job
# Subscribe to changes in avg price for each produce.
# Print out the changes to console when they occur.
def subscribe_product_avg_price_change():
mv.on_change(
handler=lambda x: print(x),
output_format=OutputFormat.DATAFRAME,
)
threading.Thread(target=subscribe_product_avg_price_change).start()
# Insert a new dataframe into the test_product
test_df3 = pd.DataFrame(
{
"product": ["foo", "bar"],
"price": [200, 0.11],
}
)
rw.insert(table_name="test_product", data=test_df3)
### You should be able to see the changes in for product and product avg price console now!
Demo
You can also check the demo in our repo.
python3 -m venv
source ./venv/bin/activate
python3 demo.py simple
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
risingwave_py-0.0.1.tar.gz
(13.5 kB
view details)
Built Distribution
File details
Details for the file risingwave_py-0.0.1.tar.gz
.
File metadata
- Download URL: risingwave_py-0.0.1.tar.gz
- Upload date:
- Size: 13.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.10.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 6c01cf78bf7f0283c1b73176f1afe624e1131a23e90940dd5371378b137d0f96 |
|
MD5 | 1037e161191574bd8bf36a333fec3bb5 |
|
BLAKE2b-256 | 341df504f5801ce9377966d85ed83211b513eb06ce534de08b56cbdedfcc619f |
File details
Details for the file risingwave_py-0.0.1-py3-none-any.whl
.
File metadata
- Download URL: risingwave_py-0.0.1-py3-none-any.whl
- Upload date:
- Size: 12.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.10.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 776852c8fc4a554fb581c1cc49fd5cd5fd7a89f4999c8072334b65caa18116d3 |
|
MD5 | e5f212374f46b34160018de880295062 |
|
BLAKE2b-256 | 22467ae4894cddab522da4ba549ddd3897512386831e89983b923ae3b3f0553b |