Skip to main content

A Python library that adds functionality to asyncio queues

Project description

QueuePlus ➕

1️⃣ version: 0.7.0

✍️ author: Mitchell Lisle

A Python library that adds functionality to asyncio queues

Install

pip install queueplus

Usage

You can use AioQueue with all the same functionality as a regular asyncio.Queue.

from queueplus import AioQueue

q = AioQueue()
await q.put("hello world")

message = await q.get()
# hello world

With a few extra capabilities

Iterate over a queue (can be async or not)

from queueplus import AioQueue
q = AioQueue()

[await q.put(i) for i in range(10)] # in non-async mode you would call q.put_nowait

async for row in q:
    print(row)

Collect all values into a list

from queueplus import AioQueue
q = AioQueue()

[await q.put(i) for i in range(10)]
messages = q.collect()
# [0, 1, 2, 3, 4 ,5 ,6 ,7, 8, 9]

Create a callback everytime a message is added

from queueplus import AioQueue
inq = AioQueue()
outq = AioQueue()

async def copy_to_q(message: str):
    await outq.put(message)

inq.add_consumer(copy_to_q)

inq.put("hello world")

await inq.wait_for_consumer()

Enforce a type on a queue, error if violated

from queueplus import TypedAioQueue, RaiseOnViolation
q = TypedAioQueue(int, violations_strategy=RaiseOnViolation)

[await q.put(i) for i in range(10)]
messages = q.collect()
# [0, 1, 2, 3, 4 ,5 ,6 ,7, 8, 9]

await q.put("hello") # Raises a ViolationError

Enforce a type on a queue, ignore if violated

from queueplus import TypedAioQueue, DiscardOnViolation
q = TypedAioQueue(int, violations_strategy=DiscardOnViolation)

[await q.put(i) for i in range(10)]
await q.put("hello")

messages = q.collect()
# [0, 1, 2, 3, 4 ,5 ,6 ,7, 8, 9]

Full example

from queueplus import AioQueue
import asyncio

async def main():
    q = AioQueue()
    await q.put("hello")
    await q.put("world")
    
    async for item in q:
        print(item)

if __name__ == "__main__":
    asyncio.run(main())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

queueplus-0.7.0.tar.gz (6.6 kB view details)

Uploaded Source

Built Distribution

queueplus-0.7.0-py2.py3-none-any.whl (4.4 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file queueplus-0.7.0.tar.gz.

File metadata

  • Download URL: queueplus-0.7.0.tar.gz
  • Upload date:
  • Size: 6.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.12.1

File hashes

Hashes for queueplus-0.7.0.tar.gz
Algorithm Hash digest
SHA256 0211ca5d9e01ea1434ff26ad2a8f179fe754f3c8e285949febf26d002b078e95
MD5 14118c58802d5872b566bbe7dafb702e
BLAKE2b-256 5bfe340238fe2c0f9537d8768c5c9cd8e31de686e75d68bd68591019499e76b2

See more details on using hashes here.

File details

Details for the file queueplus-0.7.0-py2.py3-none-any.whl.

File metadata

  • Download URL: queueplus-0.7.0-py2.py3-none-any.whl
  • Upload date:
  • Size: 4.4 kB
  • Tags: Python 2, Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.12.1

File hashes

Hashes for queueplus-0.7.0-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 2686adcb173def742acd4c7d764f262b6897bb7f2a9542fd79ae83290e4213bb
MD5 9702ce37cfdfe64c1a108c15fa2c4a05
BLAKE2b-256 675cdc07bb9e7ed9160d416fed4bd335f80ae6db84930cf0524d1bd1e8183b47

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page