Skip to main content

Async queue with selective retrieval

Project description

Async Selective Queue

PyPI PyPI - Python Version GitHub Workflow Status pre-commit

Python library for an asynchronous queue with the ability to selectively retrieve elements from the queue.

Overview

AsyncSelectiveQueue lets you enqueue items and have consumers wait for—and optionally filter—specific items—without dropping everything else. Unmatched items remain in the queue until explicitly retrieved or flushed. Ideal for noisy event buses and test harnesses where you only care about a subset of events but still want to inspect or clear the rest later.

Installation

pip install async-selective-queue

Usage

import asyncio
from async_selective_queue import AsyncSelectiveQueue

async def producer(q: AsyncSelectiveQueue[str]):
    for e in ["foo", "bar", "baz", "qux"]:
        await q.put(e)

async def test_consumer(q: AsyncSelectiveQueue[str]):
    # Only care about events containing "a"
    matches = await q.get_all(select=lambda s: "a" in s)
    print("Matched:", matches)
    # Non‑matching items ("foo", "qux") remain in queue
    print("Still queued:", q.flush())

API Reference

await put(value: T) -> None

Enqueue and notify waiters.

await get(select: Optional[Callable[[T], bool]] = None, *, timeout: float = 5) -> T

  • Removes and returns the first matching item (or oldest if no select).
  • Waits up to timeout seconds, then raises asyncio.TimeoutError.
  • Non‑matching items are untouched.

get_nowait(select: Optional[Callable[[T], bool]] = None) -> Optional[T]

  • Non‑blocking version of get.
  • Returns a matching item or None.
  • Everything else stays in queue.

get_all(select: Optional[Callable[[T], bool]] = None) -> List[T]

  • Atomically removes all matching items and returns them.
  • If select is None, drains entire queue.
  • Non‑matching items remain in their original order for later retrieval or inspection.

flush() -> List[T]

  • Clears queue and returns all items present at time of call.

empty() -> bool

  • Returns True if no items are queued.

Concurrency & Nuances

  • Uses asyncio.Condition to avoid busy‑waiting.
  • Selective retrieval wakes only when matching items arrive—others stay queued.
  • Useful for:
    • Noisy event buses: pull only relevant events.
    • Testing: assert on a subset of actions, then inspect or flush the rest.
  • Cancelling a pending get leaves the queue unchanged.
  • FIFO ordering for non‑selective operations; relative order preserved for get_all.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_selective_queue-0.1.1.post0.tar.gz (8.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

async_selective_queue-0.1.1.post0-py3-none-any.whl (7.8 kB view details)

Uploaded Python 3

File details

Details for the file async_selective_queue-0.1.1.post0.tar.gz.

File metadata

File hashes

Hashes for async_selective_queue-0.1.1.post0.tar.gz
Algorithm Hash digest
SHA256 b3b0a564b07a0d0a32a097b392f520a4bd106f18bc3ee12e78daecc800613d23
MD5 21df65d26663baca19d4c702c0dcf78e
BLAKE2b-256 dfde953ff86bfc518b4d1049b422386be15c68cfc913b764170651d2b48890b7

See more details on using hashes here.

File details

Details for the file async_selective_queue-0.1.1.post0-py3-none-any.whl.

File metadata

File hashes

Hashes for async_selective_queue-0.1.1.post0-py3-none-any.whl
Algorithm Hash digest
SHA256 5478101066603d8e1959b08329e853b9b1cc0612debc101f0f87d1379154d7a9
MD5 608cd2a2dcb89b4ad0efc7d25a1cb5e6
BLAKE2b-256 4cebd76ea412c219b71cf8609342fc9242f98820c487b35b8d0525dcbd89247d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page