Skip to main content

Simple real time data stream manipulation.

Project description

Gully

Gully is a simple framework for manipulating streams of data. It provides methods for asynchronous & synchronous access to the streams.

Installation

pip install gully

Usage

import gully

async def monitor_stream_using_iterator(stream: gully.DataStream):
    async for item in stream:
        print(item)

async def monitor_stream_using_future(stream: gully.DataStream):
    while not stream.next.done():
        item = await stream.next
        print(item)
        

data_stream = gully.DataStream()
filtered = data_stream.filter(lambda stream, item: item == "foobar")
mapped = filtered.map(lambda stream, item: item.upper())

Documentation

gully.DataStream(max_size: int = -1, loop: asyncio.Loop = None)

Provides the interface into the data stream. It offers both iterable and async iterable functionality, it also has item getters for accessing history.

The data stream will cache past values up to max_size items, or unlimited if max_size is less than 1. These values can be accessed using get item (e.g. stream[1]), len(stream) will return the total number and items cached, and value in stream will tell you if a value is in the data stream.

Iterating through the data stream will go through the value cache from oldest to newest. Using an async iterator will go through the cache from oldest to newest and then will await new values.

  • property DataStream.max_size: int The maximum size set for the data stream cache.

  • property DataStream.next: asyncio.Future The future that will receive the next value that is pushed into the data stream. This future will change with every pushed value, it will be cancelled if the stream is closed.

method DataStream.push(value: Any)

Pushes a value into the data stream.

method DataStream.close()

Closes the stream and cancels the next future stopping all watchers whether created synchronously, asynchronously, or through an iterator.

method DataStream.on_next(callback: Callable[[DataStreamBase, Any], None])

Registers a function to be called when the next future is set. The callback will be passed the data stream it was registered on and the value that was pushed.

method DataStream.iterate_first(limit: int) -> DataStreamIterator

Gets an iterator that will only yield at most limit items. This iterator can be used either synchronously or asynchronously, only an async iterator will wait for new values to be pushed. This will always start at the oldest item in the cache.

method DataStream.filter(predicate: Callable[[DataStreamBase, Any], bool], max_size: int = -1) -> DataStreamFilteredView

Creates a data stream view that only receives values that the predicate function allows. The predicate function should return True for any value that should be allowed into the stream.

method DataStream.map(mapper: Callable[[DataStreamBase, Any], Any], max_size: int = -1) -> DataStreamMappedView

Creates a data stream view that passes every value pushed to the stream into the mapping function, the returned value will be pushed into the data stream view.

gully.DataStreamFilteredView

Functions like a normal DataStream but it monitors other streams and only pushes values to itself that pass the predicate function's conditions. The cache history begins when it is created and will not have access to older values from the stream it is monitoring.

gully.DataStreamMappedView

Functions like a normal DataStream but it monitors other streams and passes the new values through a mapping function before pushing them to itself. The cache history begins when it is created and will not have access to older values from the stream it is monitoring.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gully-0.1.0.tar.gz (4.6 kB view hashes)

Uploaded Source

Built Distribution

gully-0.1.0-py3-none-any.whl (4.6 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page