Skip to main content

Python SDK for real-time processing

Project description

RisingWave Python SDK

Simple Python SDK for event-driven applications with RisingWave.

Quick start

1. Install risingwave-py (PyPI)

pip install risingwave-py psycopg2-binary # or psycopg2

2. Run RisingWave

You can install RisingWave standlone on your laptop via:

# Download and install RisingWave standalone
curl https://risingwave.com/sh | sh

# start RisingWave on macOS
risingwave

# start RisingWave on linux
./risingwave

You can also provision a free-tier cluster in RisingWave Cloud

3. Interact with RisingWave in Python

Initialization

from risingwave import RisingWave, RisingWaveConnOptions, OutputFormat
import pandas as pd
import threading

# Init to connect to RisingWave instance on localhost
# You can also init with a connection string: RisingWave(RisingWaveConnOptions("postgresql://root:root@localhost:4566/dev"))
rw = RisingWave(
    RisingWaveConnOptions.from_connection_info(
        host="localhost", port=4566, user="root", password="root", database="dev"
    )
)

Insert and query data in DataFrame via SQL

# Insert a dataframe into a test_product table
test_df1 = pd.DataFrame(
    {
        "product": ["foo", "bar"],
        "price": [123.4, 456.7],
    }
)
rw.insert(table_name="test_product", data=test_df1)

# Fetch data from the test_product table via SQL
rw.fetch("SELECT * FROM test_product", format=OutputFormat.DATAFRAME)

Subscribe changes from a table

# Subscribe to changes in the test_product table in a separate thread.
# Print out the changes to console when they occur.
def subscribe_product_change():
    rw.on_change(
        subscribe_from="test_product",
        handler=lambda x: print(x),
        output_format=OutputFormat.DATAFRAME,
    )


threading.Thread(target=subscribe_product_change).start()


# Insert a new dataframe into the table test_product
test_df2 = pd.DataFrame(
    {
        "product": ["foo", "bar"],
        "price": [78.9, 10.11],
    }
)
rw.insert(table_name="test_product", data=test_df2)


### You should be able to see the changes for produce in console now!

Define your streaming job via materialized view in SQL

# Create a materialized view to calculate the average price of each product
mv = rw.mv(
    name="test_product_avg_price_mv",
    stmt="SELECT product, avg(price) as avg_price from test_product GROUP BY product",
)

# Fetch data from the materialized view via SQL
rw.fetch("SELECT * FROM test_product_avg_price_mv", format=OutputFormat.DATAFRAME)

Subscribe changes from your streaming job

# Subscribe to changes in avg price for each produce.
# Print out the changes to console when they occur.
def subscribe_product_avg_price_change():
    mv.on_change(
        handler=lambda x: print(x),
        output_format=OutputFormat.DATAFRAME,
    )


threading.Thread(target=subscribe_product_avg_price_change).start()


# Insert a new dataframe into the test_product
test_df3 = pd.DataFrame(
    {
        "product": ["foo", "bar"],
        "price": [200, 0.11],
    }
)
rw.insert(table_name="test_product", data=test_df3)


### You should be able to see the changes in for product and product avg price console now!

Demo

You can also check the demo in our repo.

# Run the simple demo
uv run examples/demo.py simple

# Run the Binance demo
uv run examples/demo.py boll

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

risingwave_py-0.0.2.tar.gz (46.4 kB view details)

Uploaded Source

Built Distribution

risingwave_py-0.0.2-py3-none-any.whl (12.8 kB view details)

Uploaded Python 3

File details

Details for the file risingwave_py-0.0.2.tar.gz.

File metadata

  • Download URL: risingwave_py-0.0.2.tar.gz
  • Upload date:
  • Size: 46.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.7.9

File hashes

Hashes for risingwave_py-0.0.2.tar.gz
Algorithm Hash digest
SHA256 4a305cc78b84833a97cce2ef9a77f17eb9ed69fcadbf855ba6cc3c405a644746
MD5 891dbd135f20c9d69be3032dc1d79880
BLAKE2b-256 dbe5508c5daa9eb6c7ee2dd04093a7aaa76b3c28b21a77fadac307cc6bb97e0c

See more details on using hashes here.

File details

Details for the file risingwave_py-0.0.2-py3-none-any.whl.

File metadata

File hashes

Hashes for risingwave_py-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 7cb39e68ec3d9fd88ed5b328cf5529e6aa41d7d2170c112dac76377eca2adf54
MD5 3a243bfd8f2e623ce37f01755686ea48
BLAKE2b-256 9393ab3f168e40aac7d68260bdb4acee8b93882fb7b3054181c53562bfb0287a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page