Skip to main content

ABC for asynchronous, point-readable, exception-free read/write queues

Project description

Queue Api

ABC for asynchronous, point-readable, exception-free read/write queues

pip install queue-api

Interfaces

ReadQueue[A]

A = TypeVar('A', covariant=True)

class ReadQueue(Generic[A]):
  async def pop(self) -> Either[QueueError, tuple[str, A]]:
    ...

  async def pop(self, id: str) -> Either[ReadError, A]:
    ...

  async def read(self) -> Either[QueueError, tuple[str, A]]:
    ...

  async def read(self, id: str) -> Either[ReadError, A]:
    ...

  async def items(self) -> AsyncIter[Either[QueueError, tuple[str, A]]]:
    ...
  
  async def keys(self) -> AsyncIter[Either[ReadError, str]]:
    ...
  
  async def values(self) -> AsyncIter[Either[ReadError, A]]:
    ...

WriteQueue[A]

A = TypeVar('A', contravariant=True)

class WriteQueue(Generic[A]):
  async def push(self, key: str, value: A) -> Either[QueueError, None]:
    ...

Queue[A] = ReadQueue[A] & WriteQueue[A]

AppendQueue[A]

class AppendQueue(WriteQueue[Sequence[A]], Generic[A]):
  async def append(self, id: str, values: Sequence[A], *, create: bool) -> bool:
    ...

R/W differences

In general, a Queue[A] can be treated as an AsyncIterable[A]. However, there are some distintions between Read- and WriteQueues

Variance

ReadQueue[A] is covariant

Since ReadQueue is immutable, someone expecting a ReadQueue[Animal] will be happy with a ReadQueue[Cat].

WriteQueue[A] is contravariant

This is a bit of a weird one: WriteQueue is mutable but not readable. Thus, someone expecting a WriteQueue[Cat] will be happy with a WriteQueue[Animal].

Operations

ReadQueues can be map-ed and filter-ed. This is akin to map and filter on AsyncIter-ables.

WriteQueues can be premap-ed and prefilter-ed.

Again a bit of a weird one: let's use an example:

async def cats(queue: WriteQueue[tuple[str, Cat]]):
  await queue.push('key1', ('Garfield', Cat(...)))
  await queue.push('key2', ('Puss in Boots', Cat(...)))

q_cats: Queue[Cat] = ...

Here, our q_cats queue wants Cats, but cats insists on yielding a tuple (name, Cat). So, we can premap a function to adapt the queue:

await cats(q_cats.premap(lambda t: t[1]))

Composition

Queue[B] is both a ReadQueue[B] and a WriteQueue[B]. However, we'll generally want to have a producer function (that receives a WriteQueue[A]) and a consumer function, that receives a ReadQueue[C].

Still, for whatever reason, we may want to actually store data in a Queue[B]. The setup is then as follows:

queue: Queue[B] = ...

async def producer(queue: WriteQueue[A]):
  ...

async def consumer(queue: ReadQueue[C]):
  ...

producer(queue.premap(f))
consumer(queue.map(g))

Now, premap always returns a WriteQueue; map always a ReadQueue. Thus, we can compose indefinetely in either direction:

producer(queue.premap(f1).premap(f2).premap(f3))
consumer(queue.map(g1).map(g2).map(g3))

Implementations

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

queue_api-0.1.7.tar.gz (9.6 kB view details)

Uploaded Source

Built Distribution

queue_api-0.1.7-py3-none-any.whl (13.2 kB view details)

Uploaded Python 3

File details

Details for the file queue_api-0.1.7.tar.gz.

File metadata

  • Download URL: queue_api-0.1.7.tar.gz
  • Upload date:
  • Size: 9.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.0.0 CPython/3.11.6

File hashes

Hashes for queue_api-0.1.7.tar.gz
Algorithm Hash digest
SHA256 323ff1c9b7016c03cf0d9c58d1144f0df18c294c400520a875b8adea1b418eb1
MD5 3e79d500d02c652531cfa1f1ede17f06
BLAKE2b-256 41c39c7f7eb431f7985dbfc2d0c5c7fc1095354db71355cc30a922e59d8c3aa4

See more details on using hashes here.

File details

Details for the file queue_api-0.1.7-py3-none-any.whl.

File metadata

  • Download URL: queue_api-0.1.7-py3-none-any.whl
  • Upload date:
  • Size: 13.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.0.0 CPython/3.11.6

File hashes

Hashes for queue_api-0.1.7-py3-none-any.whl
Algorithm Hash digest
SHA256 45c3157c36307722c5769cd0301a51a537650e23608f141182fc8df2dcc66c15
MD5 84e26eace1dccf1b0019660d684b133e
BLAKE2b-256 9c4618def9b9ee444d4f2570279dd4e15c22c04013cf59ef815627d2982330e2

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page