Skip to main content

Stream Processing Library using Polars

Project description

polars-streaming

This library helps to process streaming data using Polars.

Installation

pip install polars-streaming

Install from sources

Alternatively, you can also clone the latest version from the repository and install it directly from the source code:

pip install -e .

Quick tour

Socket

>>> from polars_streaming import StreamProcessor

>>> s = StreamProcessor()
>>> s.readStream.format('socket').options({'host':'localhost','port':12345}).load()

>>> def transformation(df):
>>>     # Add your transformation code here
>>>     df = df.sum() # For example purpose, I am calculating the sum.
>>>     return df # Return the transformed dataframe

>>> s.add_transform(transformation)
>>> s.writeStream.format('console').trigger('3 seconds')

>>> s.start()

Kafka

>>> from polars_streaming import StreamProcessor

>>> s = StreamProcessor()
>>> s.readStream.format('kafka').options({'kafka.bootstrap.servers':'localhost','subscribe': 'topic_name',
                                          'startingOffsets': 'earliest',
                                          'kafka.group.id': 'g1'}).load()

>>> def transformation(df):
>>>     # Add your transformation code here
>>>     df = df.sum() # For example purpose, I am calculating the sum.
>>>     return df # Return the transformed dataframe

>>> s.add_transform(transformation)
>>> s.writeStream.format('console').trigger('10 seconds')

>>> s.start()

File Sources

>>> from polars_streaming import StreamProcessor

>>> s = StreamProcessor()
>>> s.readStream.format('csv').load('read_path_of_file_source') # Reads the csv file from the path once it is created

>>> def transformation(df):
>>>     # Add your transformation code here
>>>     df = df.sum() # For example purpose, I am calculating the sum.
>>>     return df # Return the transformed dataframe

>>> s.add_transform(transformation)
>>> s.writeStream.option('path','write_path').format('avro') # Write the processed data to the write path in avro format

>>> s.start()

Sources

  • Socket
  • File Sources
    • CSV
    • JSON
    • AVRO
    • PARQUET
  • Kafka

Sinks

  • Console
  • File Sources
    • CSV
    • JSON
    • AVRO
    • PARQUET
  • MongoDB
  • ElasticSearch

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

polars-streaming-0.3.0.tar.gz (15.6 kB view details)

Uploaded Source

Built Distribution

polars_streaming-0.3.0-py3-none-any.whl (18.0 kB view details)

Uploaded Python 3

File details

Details for the file polars-streaming-0.3.0.tar.gz.

File metadata

  • Download URL: polars-streaming-0.3.0.tar.gz
  • Upload date:
  • Size: 15.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.7.16

File hashes

Hashes for polars-streaming-0.3.0.tar.gz
Algorithm Hash digest
SHA256 b2e69b5a737b65f1d5c12b180870e34a8fba9da076d0ad335b7603dc407e306b
MD5 c3ea636a967e2af8bf03052efa1da47c
BLAKE2b-256 a8a9d132a3cc42c41b7038661eb27a1e121a2c508c9bfee0b075aed7a6055e28

See more details on using hashes here.

File details

Details for the file polars_streaming-0.3.0-py3-none-any.whl.

File metadata

File hashes

Hashes for polars_streaming-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ea7c59eef153b4add113286e9160930a63ebe76a17cd712e92a3c5f40e93507e
MD5 a0d05d8f95da82cd78d69d67a97137fb
BLAKE2b-256 0a6d24ec1bb746770a6669b6db5315df76aa770323508aa2b42cee5584059b07

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page