A tiny library for dataflow programming.
Project description
MiniGasket: A Tiny Flow-Based Programming Library
MiniGasket is a tiny library for facilitating flow-based programming.
Installation
Installation is simple using pip
:
$ pip install pyminigasket
MiniGasket has no external dependencies.
Examples
Directing data flows:
import minigasket
from minigasket import SourceBase
class StringSource(minigasket.SourceBase):
def emit(self, value: str):
self.send(value)
class ToUpper(minigasket.SourceSinkBase):
def __init__(self):
super().__init__()
def receive(self, sender: SourceBase, message: str) -> None:
self.send(message.upper())
class Appender(minigasket.SourceSinkBase):
def __init__(self, value: str):
super().__init__()
self._value = value
def receive(self, sender: SourceBase, message) -> None:
self.send(message + self._value)
class Collector(minigasket.SinkBase):
def __init__(self):
super().__init__()
self.received = []
def receive(self, sender: SourceBase, message) -> None:
self.received.append(message)
source = StringSource()
sink = Collector()
source >> sink
source >> ToUpper() >> sink
source >> Appender('!') >> sink
source.emit('hello')
source.emit('world')
assert sink.received == ['hello', 'HELLO', 'hello!', 'world', 'WORLD', 'world!']
Creating a filter:
import random
import minigasket
from minigasket import SourceBase
class RandomNumberSource(minigasket.SourceBase):
def emit(self):
self.send(random.randrange(100))
class EvenNumberFilter(minigasket.FilterBase):
def predicate(self, sender: SourceBase, message: int) -> bool:
return (message % 2) == 0
class Collector(minigasket.SinkBase):
def __init__(self):
super().__init__()
self.received = []
def receive(self, sender: SourceBase, message) -> None:
self.received.append(message)
source = RandomNumberSource()
even_sink = Collector()
odd_sink = Collector()
filt = EvenNumberFilter()
source >> filt >> even_sink
filt.rejected >> odd_sink
for _ in range(10):
source.emit()
print('EVENS:', even_sink.received)
print('ODDS:', odd_sink.received)
Multiple sources:
import minigasket
class IntSource(minigasket.SourceBase):
def emit(self, value: int):
self.send(value)
class IncrementDecrement(minigasket.SinkBase):
def __init__(self):
super().__init__()
self.incremented = minigasket.SourceProxy(self)
self.decremented = minigasket.SourceProxy(self)
def receive(self, sender, message: int) -> None:
self.incremented.send(message + 1)
self.decremented.send(message - 1)
class Collector(minigasket.SinkBase):
def __init__(self):
super().__init__()
self.received = []
def receive(self, sender, message) -> None:
self.received.append(message)
source = IntSource()
sink_increment = Collector()
sink_decrement = Collector()
incdec = IncrementDecrement()
source >> incdec
incdec.incremented >> sink_increment
incdec.decremented >> sink_decrement
source.emit(1)
source.emit(2)
source.emit(3)
assert sink_increment.received == [2, 3, 4]
assert sink_decrement.received == [0, 1, 2]
Multiple Sinks:
import minigasket
class StringSource(minigasket.SourceBase):
def emit(self, value: str):
self.send(value)
class MultiPrint(object):
def __init__(self):
super().__init__()
self.sink_a = minigasket.SinkProxy(self.receive_a)
self.sink_b = minigasket.SinkProxy(self.receive_b)
def receive_a(self, sender, message) -> None:
print('Got message from sink A:', message)
def receive_b(self, sender, message) -> None:
print('Got message from sink B:', message)
source_a = StringSource()
source_b = StringSource()
sink = MultiPrint()
source_a >> sink.sink_a
source_b >> sink.sink_b
source_a.emit('Hello to sink A!')
source_b.emit('Hello to sink B!')
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
pyminigasket-0.2.0.tar.gz
(5.2 kB
view details)
Built Distribution
File details
Details for the file pyminigasket-0.2.0.tar.gz
.
File metadata
- Download URL: pyminigasket-0.2.0.tar.gz
- Upload date:
- Size: 5.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.0.5 CPython/3.8.0 Windows/10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | c10b95628d763b96bb2b75dc7fedf510d735510d7802baf34400a5bf708e099b |
|
MD5 | 70bf201677abc1f32585be297c833896 |
|
BLAKE2b-256 | 6d267c623f01d063e59c36e920fc87fd15734c4e7402c10e49d2b3d5ce1e9788 |
File details
Details for the file pyminigasket-0.2.0-py3-none-any.whl
.
File metadata
- Download URL: pyminigasket-0.2.0-py3-none-any.whl
- Upload date:
- Size: 4.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.0.5 CPython/3.8.0 Windows/10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8378e5b3863aa5f0bd01a0334a37ed2689b95741e622b9d61f72ac647b357672 |
|
MD5 | 6810d5646e5bdb33ae32b65cea31b5d1 |
|
BLAKE2b-256 | a9761e466688e01ff3e3694c162b2a01ec1ac018d3dfca3c8be11cccf1f93650 |