Skip to main content

Python SDK for real-time processing

Project description

RisingWave Python SDK

Simple Python SDK for event-driven applications with RisingWave.

Quick start

1. Install risingwave-py (PyPI)

pip install risingwave-py

2. Run RisingWave

You can install RisingWave standlone on your laptop via:

# Download and install RisingWave standalone
curl https://risingwave.com/sh | sh

# start RisingWave on macOS
risingwave

# start RisingWave on linux
./risingwave

You can also provision a free-tier cluster in RisingWave Cloud

3. Interact with RisingWave in Python

Initialization

from risingwave import RisingWave, RisingWaveConnOptions, OutputFormat
import pandas as pd
import threading

# Init to connect to RisingWave instance on localhost
# You can also init with a connection string: RisingWave(RisingWaveConnOptions("postgresql://root:root@localhost:4566/dev"))
rw = RisingWave(
    RisingWaveConnOptions.from_connection_info(
        host="localhost", port=4566, user="root", password="root", database="dev"
    )
)

Insert and query data in DataFrame via SQL

# Insert a dataframe into a test_product table
test_df1 = pd.DataFrame(
    {
        "product": ["foo", "bar"],
        "price": [123.4, 456.7],
    }
)
rw.insert(table_name="test_product", data=test_df1)

# Fetch data from the test_product table via SQL
rw.fetch("SELECT * FROM test_product", format=OutputFormat.DATAFRAME)

Subscribe changes from a table

# Subscribe to changes in the test_product table in a separate thread.
# Print out the changes to console when they occur.
def subscribe_product_change():
    rw.on_change(
        subscribe_from="test_product",
        handler=lambda x: print(x),
        output_format=OutputFormat.DATAFRAME,
    )


threading.Thread(target=subscribe_product_change).start()


# Insert a new dataframe into the table test_product
test_df2 = pd.DataFrame(
    {
        "product": ["foo", "bar"],
        "price": [78.9, 10.11],
    }
)
rw.insert(table_name="test_product", data=test_df2)


### You should be able to see the changes for produce in console now!

Define your streaming job via materialized view in SQL

# Create a materialized view to calculate the average price of each product
mv = rw.mv(
    name="test_product_avg_price_mv",
    stmt="SELECT product, avg(price) as avg_price from test_product GROUP BY product",
)

# Fetch data from the materialized view via SQL
rw.fetch("SELECT * FROM test_product_avg_price_mv", format=OutputFormat.DATAFRAME)

Subscribe changes from your stremaing job

# Subscribe to changes in avg price for each produce.
# Print out the changes to console when they occur.
def subscribe_product_avg_price_change():
    mv.on_change(
        handler=lambda x: print(x),
        output_format=OutputFormat.DATAFRAME,
    )


threading.Thread(target=subscribe_product_avg_price_change).start()


# Insert a new dataframe into the test_product
test_df3 = pd.DataFrame(
    {
        "product": ["foo", "bar"],
        "price": [200, 0.11],
    }
)
rw.insert(table_name="test_product", data=test_df3)


### You should be able to see the changes in for product and product avg price console now!

Demo

You can also check the demo in our repo.

python3 -m venv
source ./venv/bin/activate
python3 demo.py simple

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

risingwave_py-0.0.1.tar.gz (13.5 kB view details)

Uploaded Source

Built Distribution

risingwave_py-0.0.1-py3-none-any.whl (12.3 kB view details)

Uploaded Python 3

File details

Details for the file risingwave_py-0.0.1.tar.gz.

File metadata

  • Download URL: risingwave_py-0.0.1.tar.gz
  • Upload date:
  • Size: 13.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.10.6

File hashes

Hashes for risingwave_py-0.0.1.tar.gz
Algorithm Hash digest
SHA256 6c01cf78bf7f0283c1b73176f1afe624e1131a23e90940dd5371378b137d0f96
MD5 1037e161191574bd8bf36a333fec3bb5
BLAKE2b-256 341df504f5801ce9377966d85ed83211b513eb06ce534de08b56cbdedfcc619f

See more details on using hashes here.

File details

Details for the file risingwave_py-0.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for risingwave_py-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 776852c8fc4a554fb581c1cc49fd5cd5fd7a89f4999c8072334b65caa18116d3
MD5 e5f212374f46b34160018de880295062
BLAKE2b-256 22467ae4894cddab522da4ba549ddd3897512386831e89983b923ae3b3f0553b

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page