Async queue with selective retrieval
Project description
Async Selective Queue
Python library for an asynchronous queue with the ability to selectively retrieve elements from the queue.
Overview
AsyncSelectiveQueue lets you enqueue items and have consumers wait for—and optionally filter—specific items—without dropping everything else. Unmatched items remain in the queue until explicitly retrieved or flushed. Ideal for noisy event buses and test harnesses where you only care about a subset of events but still want to inspect or clear the rest later.
Installation
pip install async-selective-queue
Usage
import asyncio
from async_selective_queue import AsyncSelectiveQueue
async def producer(q: AsyncSelectiveQueue[str]):
for e in ["foo", "bar", "baz", "qux"]:
await q.put(e)
async def test_consumer(q: AsyncSelectiveQueue[str]):
# Only care about events containing "a"
matches = await q.get_all(select=lambda s: "a" in s)
print("Matched:", matches)
# Non‑matching items ("foo", "qux") remain in queue
print("Still queued:", q.flush())
API Reference
await put(value: T) -> None
Enqueue and notify waiters.
await get(select: Optional[Callable[[T], bool]] = None, *, timeout: float = 5) -> T
- Removes and returns the first matching item (or oldest if no
select). - Waits up to
timeoutseconds, then raisesasyncio.TimeoutError. - Non‑matching items are untouched.
get_nowait(select: Optional[Callable[[T], bool]] = None) -> Optional[T]
- Non‑blocking version of
get. - Returns a matching item or
None. - Everything else stays in queue.
get_all(select: Optional[Callable[[T], bool]] = None) -> List[T]
- Atomically removes all matching items and returns them.
- If
selectisNone, drains entire queue. - Non‑matching items remain in their original order for later retrieval or inspection.
flush() -> List[T]
- Clears queue and returns all items present at time of call.
empty() -> bool
- Returns
Trueif no items are queued.
Concurrency & Nuances
- Uses
asyncio.Conditionto avoid busy‑waiting. - Selective retrieval wakes only when matching items arrive—others stay queued.
- Useful for:
- Noisy event buses: pull only relevant events.
- Testing: assert on a subset of actions, then inspect or flush the rest.
- Cancelling a pending
getleaves the queue unchanged. - FIFO ordering for non‑selective operations; relative order preserved for
get_all.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file async_selective_queue-0.1.1.post0.tar.gz.
File metadata
- Download URL: async_selective_queue-0.1.1.post0.tar.gz
- Upload date:
- Size: 8.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.7.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b3b0a564b07a0d0a32a097b392f520a4bd106f18bc3ee12e78daecc800613d23
|
|
| MD5 |
21df65d26663baca19d4c702c0dcf78e
|
|
| BLAKE2b-256 |
dfde953ff86bfc518b4d1049b422386be15c68cfc913b764170651d2b48890b7
|
File details
Details for the file async_selective_queue-0.1.1.post0-py3-none-any.whl.
File metadata
- Download URL: async_selective_queue-0.1.1.post0-py3-none-any.whl
- Upload date:
- Size: 7.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.7.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5478101066603d8e1959b08329e853b9b1cc0612debc101f0f87d1379154d7a9
|
|
| MD5 |
608cd2a2dcb89b4ad0efc7d25a1cb5e6
|
|
| BLAKE2b-256 |
4cebd76ea412c219b71cf8609342fc9242f98820c487b35b8d0525dcbd89247d
|