A Python library that adds functionality to asyncio queues
Project description
QueuePlus ➕
1️⃣ version: 0.5.0
✍️ author: Mitchell Lisle
A Python library that adds functionality to asyncio queues
Install
pip install queueplus
Usage
You can use AioQueue with all the same functionality as a regular asyncio.Queue
.
from queueplus import AioQueue
q = AioQueue()
await q.put("hello world")
message = await q.get()
# hello world
With a few extra capabilities
Iterate over a queue (can be async or not)
from queueplus import AioQueue
q = AioQueue()
[await q.put(i) for i in range(10)] # in non-async mode you would call q.put_nowait
async for row in q:
print(row)
Collect all values into a list
from queueplus import AioQueue
q = AioQueue()
[await q.put(i) for i in range(10)]
messages = q.collect()
# [0, 1, 2, 3, 4 ,5 ,6 ,7, 8, 9]
Create a callback everytime a message is added
from queueplus import AioQueue
inq = AioQueue()
outq = AioQueue()
async def copy_to_q(message: str):
await outq.put(message)
inq.add_consumer(copy_to_q)
inq.put("hello world")
await inq.wait_for_consumer()
Enforce a type on a queue, error if violated
from queueplus import TypedAioQueue, RaiseOnViolation
q = TypedAioQueue(int, violations_strategy=RaiseOnViolation)
[await q.put(i) for i in range(10)]
messages = q.collect()
# [0, 1, 2, 3, 4 ,5 ,6 ,7, 8, 9]
await q.put("hello") # Raises a ViolationError
Enforce a type on a queue, ignore if violated
from queueplus import TypedAioQueue, DiscardOnViolation
q = TypedAioQueue(int, violations_strategy=DiscardOnViolation)
[await q.put(i) for i in range(10)]
await q.put("hello")
messages = q.collect()
# [0, 1, 2, 3, 4 ,5 ,6 ,7, 8, 9]
Full example
from queueplus import AioQueue
import asyncio
async def main():
q = AioQueue()
await q.put("hello")
await q.put("world")
async for item in q:
print(item)
if __name__ == "__main__":
asyncio.run(main())
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
queueplus-0.5.0.tar.gz
(6.0 kB
view hashes)
Built Distribution
Close
Hashes for queueplus-0.5.0-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 667fb9fb5450241cc047a2425b42118ffe7bb0a2ccdc830324445b5131a5ca09 |
|
MD5 | d37d73ce160bcec35fcd452d8dac8115 |
|
BLAKE2b-256 | 31c83c6f67dc7007a6694e0cdc9ff195e16c2e4f74d52bf805fb3ea87e51888a |