Simple real time data stream manipulation.
Project description
Gully
Gully is a simple framework for manipulating asynchronous streams of data.
Installation
pip install gully
Usage
import asyncio
import gully
async def observer(item):
print(item)
async def main():
stream = gully.Gully()
stream.watch(observer)
stream.add_filter(lambda item: item == "foobar")
stream.add_mapping(lambda item: item.upper())
await stream.push("foobar")
await stream.push("baz")
await stream.push("foobar")
asyncio.run(main())
Output
FOOBAR
FOOBAR
Documentation
gully.Gully(watch: Sequence[Gully] = None, *, filters: Sequence[Callable], mappings: Sequence[Callable], max_size: int = -1, loop: EventLoop = None)
Gully is a stream. It can observe other gullies, and it can be observed by coroutines. Any number of gullies can be passed as args, the new gully will observe them to aggregate their pushes. By default gullies will retain an unlimited history of their pushes, this can be changed by setting the max_size
keyword arg to any value greater than 0.
-
property Gully.loop: asyncio.AbstractEventLoop
This is the loop that the gully will use to run observers. -
property Gully.history: gully.HistoryView
The current history of pushes to the gully. This is a view that cannot be set to. It is restricted by themax_size
setting that was given to the gully. -
property Gully.pipeline: gully.Pipeline
The pipeline that is run when pushing a new item into the gully. The gully will only ever call the pipeline with a single argument, so all steps added to the pipeline must support only receiving a single argument.
method Gully.push(value: Any)
Pushes a value into the gully. This will run the pipeline to map and filter the value. It will only add it to the history and call the observers if a filter doesn't reject the value.
method Gully.watch(callback: Callable[[Any], Awaitable[None]])
Registers a coroutine to observe new values that are pushed into the gully.
method Gully.filter(*predicates: Callable[[Any], bool], max_size: int = -1) -> Gully
Branches the gully into a new gully which uses the given filter predicates. The branched gully can have a custom max_size
set.
method Gully.map(mapping: Callable[[Any], Any], max_size: int = -1) -> Gully
Branches the gully into a new gully which uses the given mapping callbacks. The branched gully can have a custom max_size
set.
method Gully.add_filter(*predicates: Callable[[Any], Any])
Adds the given filter predicates to the gully pipeline. These cannot be removed, use the filter method to create a new gully that has the desired filter predicates if they need to be disabled later.
This wraps each filter predicate in a function that will raise NotAFilterMatch
if the filter predicate returns False
. This will cause the pipeline to stop and push will ignore the current item, not adding it to the history and not calling the observers.
method Gully.add_mapping(*mappings: Callable[[Any], Any])
Adds the given mapping callbacks to the gully pipeline. These cannot be removed, use the map method to create a new gully that has the desired mapping callbacks if they need to be disabled later.
method Gully.stop_watching(callback: Union[Callable, Observer])
Removes an observer from the gully. This will accept either the original callback, or an observer object that wraps that callback.
gully.Observable(gully.Gully)
Simple wrapper for callback coroutines. This allows the observer to be enabled or disabled. The observer must be provided a start function that enables the callback to observer new events, and a stop function that disables it.
This can be used as a stand-in for the callback in sets/dictionaries keys or when stopping a watcher on a gully object.
gully.Pipeline
A simple action pipeline that allows steps to be run in order.
method Pipeline.add(*steps: Callable[[Any], Any])
Adds any number of steps to the pipeline.
method Pipeline.run(item: Any, *args, **kwargs)
Runs the pipeline. It will replace the first argument passed with the return from prior steps.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file gully-0.3.1.tar.gz
.
File metadata
- Download URL: gully-0.3.1.tar.gz
- Upload date:
- Size: 6.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.4 CPython/3.9.0 Darwin/20.5.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0fda18398f0f232930c698927335453eb5f4c7c7ec37ffbd72fa07867c7f99fe |
|
MD5 | c0a50f4c12961c901859dc6122dacd65 |
|
BLAKE2b-256 | 37e11b7d1ea309fa954278041e362dee8e7fa75419091cb89edd21ce56c7d0ae |
File details
Details for the file gully-0.3.1-py3-none-any.whl
.
File metadata
- Download URL: gully-0.3.1-py3-none-any.whl
- Upload date:
- Size: 6.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.4 CPython/3.9.0 Darwin/20.5.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | ac0326934c466b6d3052306ce2a1c39aa84d782cc0384d7a5a7a5e954683085b |
|
MD5 | 40eef50b679ef6dbfc14e9b3de857d35 |
|
BLAKE2b-256 | 29fc9d22a7976255f0646c464fd7c88aef6af961afe53eee4ae921cd02aa5001 |