Stream Processing Library using Polars
Project description
polars-streaming
This library helps to process streaming data using Polars.
Installation
pip install polars-streaming
Install from sources
Alternatively, you can also clone the latest version from the repository and install it directly from the source code:
pip install -e .
Quick tour
Socket
>>> from polars_streaming import StreamProcessor
>>> s = StreamProcessor()
>>> s.readStream.format('socket').options({'host':'localhost','port':12345}).load()
>>> def transformation(df):
>>> # Add your transformation code here
>>> df = df.sum() # For example purpose, I am calculating the sum.
>>> return df # Return the transformed dataframe
>>> s.add_transform(transformation)
>>> s.writeStream.format('console').trigger('3 seconds')
>>> s.start()
Kafka
>>> from polars_streaming import StreamProcessor
>>> s = StreamProcessor()
>>> s.readStream.format('kafka').options({'kafka.bootstrap.servers':'localhost','subscribe': 'topic_name',
'startingOffsets': 'earliest',
'kafka.group.id': 'g1'}).load()
>>> def transformation(df):
>>> # Add your transformation code here
>>> df = df.sum() # For example purpose, I am calculating the sum.
>>> return df # Return the transformed dataframe
>>> s.add_transform(transformation)
>>> s.writeStream.format('console').trigger('10 seconds')
>>> s.start()
File Sources
>>> from polars_streaming import StreamProcessor
>>> s = StreamProcessor()
>>> s.readStream.format('csv').load('read_path_of_file_source') # Reads the csv file from the path once it is created
>>> def transformation(df):
>>> # Add your transformation code here
>>> df = df.sum() # For example purpose, I am calculating the sum.
>>> return df # Return the transformed dataframe
>>> s.add_transform(transformation)
>>> s.writeStream.option('path','write_path').format('avro') # Write the processed data to the write path in avro format
>>> s.start()
Sources
- Socket
- File Sources
- CSV
- JSON
- AVRO
- PARQUET
- Kafka
Sinks
- Console
- File Sources
- CSV
- JSON
- AVRO
- PARQUET
- MongoDB
- ElasticSearch
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
polars-streaming-0.3.0.tar.gz
(15.6 kB
view hashes)
Built Distribution
Close
Hashes for polars_streaming-0.3.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | ea7c59eef153b4add113286e9160930a63ebe76a17cd712e92a3c5f40e93507e |
|
MD5 | a0d05d8f95da82cd78d69d67a97137fb |
|
BLAKE2b-256 | 0a6d24ec1bb746770a6669b6db5315df76aa770323508aa2b42cee5584059b07 |