Python SDK for real-time processing
Project description
RisingWave Python SDK
Simple Python SDK for event-driven applications with RisingWave.
Quick start
1. Install risingwave-py (PyPI)
pip install risingwave-py psycopg2-binary # or psycopg2
2. Run RisingWave
You can install RisingWave standlone on your laptop via:
# Download and install RisingWave standalone
curl https://risingwave.com/sh | sh
# start RisingWave on macOS
risingwave
# start RisingWave on linux
./risingwave
You can also provision a free-tier cluster in RisingWave Cloud
3. Interact with RisingWave in Python
Initialization
from risingwave import RisingWave, RisingWaveConnOptions, OutputFormat
import pandas as pd
import threading
# Init to connect to RisingWave instance on localhost
# You can also init with a connection string: RisingWave(RisingWaveConnOptions("postgresql://root:root@localhost:4566/dev"))
rw = RisingWave(
RisingWaveConnOptions.from_connection_info(
host="localhost", port=4566, user="root", password="root", database="dev"
)
)
Insert and query data in DataFrame via SQL
# Insert a dataframe into a test_product table
test_df1 = pd.DataFrame(
{
"product": ["foo", "bar"],
"price": [123.4, 456.7],
}
)
rw.insert(table_name="test_product", data=test_df1)
# Fetch data from the test_product table via SQL
rw.fetch("SELECT * FROM test_product", format=OutputFormat.DATAFRAME)
Subscribe changes from a table
# Subscribe to changes in the test_product table in a separate thread.
# Print out the changes to console when they occur.
def subscribe_product_change():
rw.on_change(
subscribe_from="test_product",
handler=lambda x: print(x),
output_format=OutputFormat.DATAFRAME,
)
threading.Thread(target=subscribe_product_change).start()
# Insert a new dataframe into the table test_product
test_df2 = pd.DataFrame(
{
"product": ["foo", "bar"],
"price": [78.9, 10.11],
}
)
rw.insert(table_name="test_product", data=test_df2)
### You should be able to see the changes for produce in console now!
Define your streaming job via materialized view in SQL
# Create a materialized view to calculate the average price of each product
mv = rw.mv(
name="test_product_avg_price_mv",
stmt="SELECT product, avg(price) as avg_price from test_product GROUP BY product",
)
# Fetch data from the materialized view via SQL
rw.fetch("SELECT * FROM test_product_avg_price_mv", format=OutputFormat.DATAFRAME)
Subscribe changes from your streaming job
# Subscribe to changes in avg price for each produce.
# Print out the changes to console when they occur.
def subscribe_product_avg_price_change():
mv.on_change(
handler=lambda x: print(x),
output_format=OutputFormat.DATAFRAME,
)
threading.Thread(target=subscribe_product_avg_price_change).start()
# Insert a new dataframe into the test_product
test_df3 = pd.DataFrame(
{
"product": ["foo", "bar"],
"price": [200, 0.11],
}
)
rw.insert(table_name="test_product", data=test_df3)
### You should be able to see the changes in for product and product avg price console now!
Demo
You can also check the demo in our repo.
# Run the simple demo
uv run examples/demo.py simple
# Run the Binance demo
uv run examples/demo.py boll
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
risingwave_py-0.0.2.tar.gz
(46.4 kB
view details)
Built Distribution
File details
Details for the file risingwave_py-0.0.2.tar.gz
.
File metadata
- Download URL: risingwave_py-0.0.2.tar.gz
- Upload date:
- Size: 46.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 |
4a305cc78b84833a97cce2ef9a77f17eb9ed69fcadbf855ba6cc3c405a644746
|
|
MD5 |
891dbd135f20c9d69be3032dc1d79880
|
|
BLAKE2b-256 |
dbe5508c5daa9eb6c7ee2dd04093a7aaa76b3c28b21a77fadac307cc6bb97e0c
|
File details
Details for the file risingwave_py-0.0.2-py3-none-any.whl
.
File metadata
- Download URL: risingwave_py-0.0.2-py3-none-any.whl
- Upload date:
- Size: 12.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 |
7cb39e68ec3d9fd88ed5b328cf5529e6aa41d7d2170c112dac76377eca2adf54
|
|
MD5 |
3a243bfd8f2e623ce37f01755686ea48
|
|
BLAKE2b-256 |
9393ab3f168e40aac7d68260bdb4acee8b93882fb7b3054181c53562bfb0287a
|