Skip to main content

A tiny library for dataflow programming.

Project description

MiniGasket banner

MiniGasket: A Tiny Flow-Based Programming Library

pipeline status coverage report PyPI PyPI - Python Version PyPI - License

MiniGasket is a tiny library for facilitating flow-based programming.

Installation

Installation is simple using pip:

$ pip install pyminigasket

MiniGasket has no external dependencies.

Examples

Directing data flows:

import minigasket
from minigasket import SourceBase


class StringSource(minigasket.SourceBase):
    def emit(self, value: str):
        self.send(value)


class ToUpper(minigasket.SourceSinkBase):
    def __init__(self):
        super().__init__()

    def receive(self, sender: SourceBase, message: str) -> None:
        self.send(message.upper())


class Appender(minigasket.SourceSinkBase):
    def __init__(self, value: str):
        super().__init__()
        self._value = value

    def receive(self, sender: SourceBase, message) -> None:
        self.send(message + self._value)


class Collector(minigasket.SinkBase):
    def __init__(self):
        super().__init__()
        self.received = []

    def receive(self, sender: SourceBase, message) -> None:
        self.received.append(message)


source = StringSource()
sink = Collector()

source >> sink
source >> ToUpper() >> sink
source >> Appender('!') >> sink

source.emit('hello')
source.emit('world')
assert sink.received == ['hello', 'HELLO', 'hello!', 'world', 'WORLD', 'world!']

Creating a filter:

import random

import minigasket
from minigasket import SourceBase


class RandomNumberSource(minigasket.SourceBase):
    def emit(self):
        self.send(random.randrange(100))


class EvenNumberFilter(minigasket.FilterBase):
    def predicate(self, sender: SourceBase, message: int) -> bool:
        return (message % 2) == 0


class Collector(minigasket.SinkBase):
    def __init__(self):
        super().__init__()
        self.received = []

    def receive(self, sender: SourceBase, message) -> None:
        self.received.append(message)


source = RandomNumberSource()
even_sink = Collector()
odd_sink = Collector()

filt = EvenNumberFilter()
source >> filt >> even_sink
filt.rejected >> odd_sink

for _ in range(10):
    source.emit()

print('EVENS:', even_sink.received)
print('ODDS:', odd_sink.received)

Multiple sources:

import minigasket


class IntSource(minigasket.SourceBase):
    def emit(self, value: int):
        self.send(value)


class IncrementDecrement(minigasket.SinkBase):
    def __init__(self):
        super().__init__()
        self.incremented = minigasket.SourceProxy(self)
        self.decremented = minigasket.SourceProxy(self)

    def receive(self, sender, message: int) -> None:
        self.incremented.send(message + 1)
        self.decremented.send(message - 1)


class Collector(minigasket.SinkBase):
    def __init__(self):
        super().__init__()
        self.received = []

    def receive(self, sender, message) -> None:
        self.received.append(message)


source = IntSource()
sink_increment = Collector()
sink_decrement = Collector()

incdec = IncrementDecrement()
source >> incdec
incdec.incremented >> sink_increment
incdec.decremented >> sink_decrement

source.emit(1)
source.emit(2)
source.emit(3)

assert sink_increment.received == [2, 3, 4]
assert sink_decrement.received == [0, 1, 2]

Multiple Sinks:

import minigasket


class StringSource(minigasket.SourceBase):
    def emit(self, value: str):
        self.send(value)


class MultiPrint(object):
    def __init__(self):
        super().__init__()
        self.sink_a = minigasket.SinkProxy(self.receive_a)
        self.sink_b = minigasket.SinkProxy(self.receive_b)

    def receive_a(self, sender, message) -> None:
        print('Got message from sink A:', message)

    def receive_b(self, sender, message) -> None:
        print('Got message from sink B:', message)


source_a = StringSource()
source_b = StringSource()

sink = MultiPrint()

source_a >> sink.sink_a
source_b >> sink.sink_b

source_a.emit('Hello to sink A!')
source_b.emit('Hello to sink B!')

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyminigasket-0.2.0.tar.gz (5.2 kB view details)

Uploaded Source

Built Distribution

pyminigasket-0.2.0-py3-none-any.whl (4.9 kB view details)

Uploaded Python 3

File details

Details for the file pyminigasket-0.2.0.tar.gz.

File metadata

  • Download URL: pyminigasket-0.2.0.tar.gz
  • Upload date:
  • Size: 5.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.0.5 CPython/3.8.0 Windows/10

File hashes

Hashes for pyminigasket-0.2.0.tar.gz
Algorithm Hash digest
SHA256 c10b95628d763b96bb2b75dc7fedf510d735510d7802baf34400a5bf708e099b
MD5 70bf201677abc1f32585be297c833896
BLAKE2b-256 6d267c623f01d063e59c36e920fc87fd15734c4e7402c10e49d2b3d5ce1e9788

See more details on using hashes here.

File details

Details for the file pyminigasket-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: pyminigasket-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 4.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.0.5 CPython/3.8.0 Windows/10

File hashes

Hashes for pyminigasket-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 8378e5b3863aa5f0bd01a0334a37ed2689b95741e622b9d61f72ac647b357672
MD5 6810d5646e5bdb33ae32b65cea31b5d1
BLAKE2b-256 a9761e466688e01ff3e3694c162b2a01ec1ac018d3dfca3c8be11cccf1f93650

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page