A Python library that adds functionality to asyncio queues
Project description
QueuePlus ➕
1️⃣ version: 0.7.0
✍️ author: Mitchell Lisle
A Python library that adds functionality to asyncio queues
Install
pip install queueplus
Usage
You can use AioQueue with all the same functionality as a regular asyncio.Queue
.
from queueplus import AioQueue
q = AioQueue()
await q.put("hello world")
message = await q.get()
# hello world
With a few extra capabilities
Iterate over a queue (can be async or not)
from queueplus import AioQueue
q = AioQueue()
[await q.put(i) for i in range(10)] # in non-async mode you would call q.put_nowait
async for row in q:
print(row)
Collect all values into a list
from queueplus import AioQueue
q = AioQueue()
[await q.put(i) for i in range(10)]
messages = q.collect()
# [0, 1, 2, 3, 4 ,5 ,6 ,7, 8, 9]
Create a callback everytime a message is added
from queueplus import AioQueue
inq = AioQueue()
outq = AioQueue()
async def copy_to_q(message: str):
await outq.put(message)
inq.add_consumer(copy_to_q)
inq.put("hello world")
await inq.wait_for_consumer()
Enforce a type on a queue, error if violated
from queueplus import TypedAioQueue, RaiseOnViolation
q = TypedAioQueue(int, violations_strategy=RaiseOnViolation)
[await q.put(i) for i in range(10)]
messages = q.collect()
# [0, 1, 2, 3, 4 ,5 ,6 ,7, 8, 9]
await q.put("hello") # Raises a ViolationError
Enforce a type on a queue, ignore if violated
from queueplus import TypedAioQueue, DiscardOnViolation
q = TypedAioQueue(int, violations_strategy=DiscardOnViolation)
[await q.put(i) for i in range(10)]
await q.put("hello")
messages = q.collect()
# [0, 1, 2, 3, 4 ,5 ,6 ,7, 8, 9]
Full example
from queueplus import AioQueue
import asyncio
async def main():
q = AioQueue()
await q.put("hello")
await q.put("world")
async for item in q:
print(item)
if __name__ == "__main__":
asyncio.run(main())
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
queueplus-0.7.0.tar.gz
(6.6 kB
view hashes)
Built Distribution
Close
Hashes for queueplus-0.7.0-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2686adcb173def742acd4c7d764f262b6897bb7f2a9542fd79ae83290e4213bb |
|
MD5 | 9702ce37cfdfe64c1a108c15fa2c4a05 |
|
BLAKE2b-256 | 675cdc07bb9e7ed9160d416fed4bd335f80ae6db84930cf0524d1bd1e8183b47 |