Skip to main content

Simple real time data stream manipulation.

Project description

Gully

PyPI version PyPI license

Gully is a simple framework for manipulating asynchronous streams of data.

Installation

pip install gully

Usage

import asyncio
import gully

async def observer(item):
    print(item)
    
async def main():
    stream = gully.Gully()
    stream.watch(observer)
    stream.add_filter(lambda item: item == "foobar")
    stream.add_mapping(lambda item: item.upper())
    await stream.push("foobar")
    await stream.push("baz")
    await stream.push("foobar")

asyncio.run(main())

Output

FOOBAR
FOOBAR

Documentation

gully.Gully(watch: Sequence[Gully] = None, *, filters: Sequence[Callable], mappings: Sequence[Callable], max_size: int = -1, loop: EventLoop = None)

Gully is a stream. It can observe other gullies, and it can be observed by coroutines. Any number of gullies can be passed as args, the new gully will observe them to aggregate their pushes. By default gullies will retain an unlimited history of their pushes, this can be changed by setting the max_size keyword arg to any value greater than 0.

  • property Gully.loop: asyncio.AbstractEventLoop This is the loop that the gully will use to run observers.

  • property Gully.history: gully.HistoryView The current history of pushes to the gully. This is a view that cannot be set to. It is restricted by the max_size setting that was given to the gully.

  • property Gully.pipeline: gully.Pipeline The pipeline that is run when pushing a new item into the gully. The gully will only ever call the pipeline with a single argument, so all steps added to the pipeline must support only receiving a single argument.

method Gully.push(value: Any)

Pushes a value into the gully. This will run the pipeline to map and filter the value. It will only add it to the history and call the observers if a filter doesn't reject the value.

method Gully.watch(callback: Callable[[Any], Awaitable[None]])

Registers a coroutine to observe new values that are pushed into the gully.

method Gully.filter(*predicates: Callable[[Any], bool], max_size: int = -1) -> Gully

Branches the gully into a new gully which uses the given filter predicates. The branched gully can have a custom max_size set.

method Gully.map(mapping: Callable[[Any], Any], max_size: int = -1) -> Gully

Branches the gully into a new gully which uses the given mapping callbacks. The branched gully can have a custom max_size set.

method Gully.add_filter(*predicates: Callable[[Any], Any])

Adds the given filter predicates to the gully pipeline. These cannot be removed, use the filter method to create a new gully that has the desired filter predicates if they need to be disabled later.

This wraps each filter predicate in a function that will raise NotAFilterMatch if the filter predicate returns False. This will cause the pipeline to stop and push will ignore the current item, not adding it to the history and not calling the observers.

method Gully.add_mapping(*mappings: Callable[[Any], Any])

Adds the given mapping callbacks to the gully pipeline. These cannot be removed, use the map method to create a new gully that has the desired mapping callbacks if they need to be disabled later.

method Gully.stop_watching(callback: Union[Callable, Observer])

Removes an observer from the gully. This will accept either the original callback, or an observer object that wraps that callback.

gully.Observable(gully.Gully)

Simple wrapper for callback coroutines. This allows the observer to be enabled or disabled. The observer must be provided a start function that enables the callback to observer new events, and a stop function that disables it.

This can be used as a stand-in for the callback in sets/dictionaries keys or when stopping a watcher on a gully object.

gully.Pipeline

A simple action pipeline that allows steps to be run in order.

method Pipeline.add(*steps: Callable[[Any], Any])

Adds any number of steps to the pipeline.

method Pipeline.run(item: Any, *args, **kwargs)

Runs the pipeline. It will replace the first argument passed with the return from prior steps.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gully-0.3.1.tar.gz (6.0 kB view details)

Uploaded Source

Built Distribution

gully-0.3.1-py3-none-any.whl (6.4 kB view details)

Uploaded Python 3

File details

Details for the file gully-0.3.1.tar.gz.

File metadata

  • Download URL: gully-0.3.1.tar.gz
  • Upload date:
  • Size: 6.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.4 CPython/3.9.0 Darwin/20.5.0

File hashes

Hashes for gully-0.3.1.tar.gz
Algorithm Hash digest
SHA256 0fda18398f0f232930c698927335453eb5f4c7c7ec37ffbd72fa07867c7f99fe
MD5 c0a50f4c12961c901859dc6122dacd65
BLAKE2b-256 37e11b7d1ea309fa954278041e362dee8e7fa75419091cb89edd21ce56c7d0ae

See more details on using hashes here.

File details

Details for the file gully-0.3.1-py3-none-any.whl.

File metadata

  • Download URL: gully-0.3.1-py3-none-any.whl
  • Upload date:
  • Size: 6.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.4 CPython/3.9.0 Darwin/20.5.0

File hashes

Hashes for gully-0.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 ac0326934c466b6d3052306ce2a1c39aa84d782cc0384d7a5a7a5e954683085b
MD5 40eef50b679ef6dbfc14e9b3de857d35
BLAKE2b-256 29fc9d22a7976255f0646c464fd7c88aef6af961afe53eee4ae921cd02aa5001

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page