API for an asynchronous, point-readable queue
Project description
Queue Api
ABC for asynchronous, point-readable, exception-free read/write queues
Interfaces
ReadQueue[A]
A = TypeVar('A', covariant=True)
class ReadQueue(Generic[A]):
async def pop(self) -> Either[QueueError, tuple[str, A]]:
...
async def pop(self, id: str) -> Either[ReadError, A]:
...
async def read(self) -> Either[QueueError, tuple[str, A]]:
...
async def read(self, id: str) -> Either[ReadError, A]:
...
async def items(self) -> AsyncIter[Either[QueueError, tuple[str, A]]]:
...
async def keys(self) -> AsyncIter[Either[ReadError, str]]:
...
async def values(self) -> AsyncIter[Either[ReadError, A]]:
...
WriteQueue[A]
A = TypeVar('A', contravariant=True)
class WriteQueue(Generic[A]):
async def push(self, key: str, value: A) -> Either[QueueError, None]:
...
Queue[A] = ReadQueue[A] & WriteQueue[A]
AppendQueue[A]
class AppendQueue(WriteQueue[Sequence[A]], Generic[A]):
async def append(self, id: str, values: Sequence[A], *, create: bool) -> bool:
...
R/W differences
In general, a Queue[A]
can be treated as an AsyncIterable[A]
. However, there are some distintions between Read-
and WriteQueues
Variance
ReadQueue[A]
is covariant
Since ReadQueue
is immutable, someone expecting a ReadQueue[Animal]
will be happy with a ReadQueue[Cat]
.
WriteQueue[A]
is contravariant
This is a bit of a weird one: WriteQueue
is mutable but not readable. Thus, someone expecting a WriteQueue[Cat]
will be happy with a WriteQueue[Animal]
.
Operations
ReadQueue
s can bemap
-ed andfilter
-ed. This is akin tomap
andfilter
onAsyncIter
-ables.
WriteQueue
s can bepremap
-ed andprefilter
-ed.
Again a bit of a weird one: let's use an example:
async def cats(queue: WriteQueue[tuple[str, Cat]]):
await queue.push('key1', ('Garfield', Cat(...)))
await queue.push('key2', ('Puss in Boots', Cat(...)))
q_cats: Queue[Cat] = ...
Here, our q_cats
queue wants Cat
s, but cats
insists on yielding a tuple (name, Cat)
. So, we can premap
a function to adapt the queue:
await cats(q_cats.premap(lambda t: t[1]))
Composition
Queue[B]
is both a ReadQueue[B]
and a WriteQueue[B]
. However, we'll generally want to have a producer function (that receives a WriteQueue[A]
) and a consumer function, that receives a ReadQueue[C]
.
Still, for whatever reason, we may want to actually store data in a Queue[B]
. The setup is then as follows:
queue: Queue[B] = ...
async def producer(queue: WriteQueue[A]):
...
async def consumer(queue: ReadQueue[C]):
...
producer(queue.premap(f))
consumer(queue.map(g))
Now, premap
always returns a WriteQueue
; map
always a ReadQueue
. Thus, we can compose indefinetely in either direction:
producer(queue.premap(f1).premap(f2).premap(f3))
consumer(queue.map(g1).map(g2).map(g3))
Implementations
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for queue_api-0.1.6-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | ac7b7b1d0604de2c1582b2e295ca4f596a1273e9fbda0ff66e481bf3a0beef4d |
|
MD5 | d1453318a586d9dd648ac30434461d0d |
|
BLAKE2b-256 | 8f89e649657cfa179176ffa395568a10a216071980b66b0e2ac20c5b7d960d04 |