Skip to main content

ABC for asynchronous, point-readable, exception-free read/write queues

Project description

Queue Api

ABC for asynchronous, point-readable, exception-free read/write queues

pip install queue-api

Interfaces

ReadQueue[A]

A = TypeVar('A', covariant=True)

class ReadQueue(Generic[A]):
  async def pop(self) -> Either[QueueError, tuple[str, A]]:
    ...

  async def pop(self, id: str) -> Either[ReadError, A]:
    ...

  async def read(self) -> Either[QueueError, tuple[str, A]]:
    ...

  async def read(self, id: str) -> Either[ReadError, A]:
    ...

  async def items(self) -> AsyncIter[Either[QueueError, tuple[str, A]]]:
    ...
  
  async def keys(self) -> AsyncIter[Either[ReadError, str]]:
    ...
  
  async def values(self) -> AsyncIter[Either[ReadError, A]]:
    ...

WriteQueue[A]

A = TypeVar('A', contravariant=True)

class WriteQueue(Generic[A]):
  async def push(self, key: str, value: A) -> Either[QueueError, None]:
    ...

Queue[A] = ReadQueue[A] & WriteQueue[A]

AppendQueue[A]

class AppendQueue(WriteQueue[Sequence[A]], Generic[A]):
  async def append(self, id: str, values: Sequence[A], *, create: bool) -> bool:
    ...

R/W differences

In general, a Queue[A] can be treated as an AsyncIterable[A]. However, there are some distintions between Read- and WriteQueues

Variance

ReadQueue[A] is covariant

Since ReadQueue is immutable, someone expecting a ReadQueue[Animal] will be happy with a ReadQueue[Cat].

WriteQueue[A] is contravariant

This is a bit of a weird one: WriteQueue is mutable but not readable. Thus, someone expecting a WriteQueue[Cat] will be happy with a WriteQueue[Animal].

Operations

ReadQueues can be map-ed and filter-ed. This is akin to map and filter on AsyncIter-ables.

WriteQueues can be premap-ed and prefilter-ed.

Again a bit of a weird one: let's use an example:

async def cats(queue: WriteQueue[tuple[str, Cat]]):
  await queue.push('key1', ('Garfield', Cat(...)))
  await queue.push('key2', ('Puss in Boots', Cat(...)))

q_cats: Queue[Cat] = ...

Here, our q_cats queue wants Cats, but cats insists on yielding a tuple (name, Cat). So, we can premap a function to adapt the queue:

await cats(q_cats.premap(lambda t: t[1]))

Composition

Queue[B] is both a ReadQueue[B] and a WriteQueue[B]. However, we'll generally want to have a producer function (that receives a WriteQueue[A]) and a consumer function, that receives a ReadQueue[C].

Still, for whatever reason, we may want to actually store data in a Queue[B]. The setup is then as follows:

queue: Queue[B] = ...

async def producer(queue: WriteQueue[A]):
  ...

async def consumer(queue: ReadQueue[C]):
  ...

producer(queue.premap(f))
consumer(queue.map(g))

Now, premap always returns a WriteQueue; map always a ReadQueue. Thus, we can compose indefinetely in either direction:

producer(queue.premap(f1).premap(f2).premap(f3))
consumer(queue.map(g1).map(g2).map(g3))

Implementations

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

queue_api-0.1.7.tar.gz (9.6 kB view hashes)

Uploaded Source

Built Distribution

queue_api-0.1.7-py3-none-any.whl (13.2 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page