A Python library that adds functionality to asyncio queues
Project description
QueuePlus ➕
1️⃣ version: 0.6.0
✍️ author: Mitchell Lisle
A Python library that adds functionality to asyncio queues
Install
pip install queueplus
Usage
You can use AioQueue with all the same functionality as a regular asyncio.Queue
.
from queueplus import AioQueue
q = AioQueue()
await q.put("hello world")
message = await q.get()
# hello world
With a few extra capabilities
Iterate over a queue (can be async or not)
from queueplus import AioQueue
q = AioQueue()
[await q.put(i) for i in range(10)] # in non-async mode you would call q.put_nowait
async for row in q:
print(row)
Collect all values into a list
from queueplus import AioQueue
q = AioQueue()
[await q.put(i) for i in range(10)]
messages = q.collect()
# [0, 1, 2, 3, 4 ,5 ,6 ,7, 8, 9]
Create a callback everytime a message is added
from queueplus import AioQueue
inq = AioQueue()
outq = AioQueue()
async def copy_to_q(message: str):
await outq.put(message)
inq.add_consumer(copy_to_q)
inq.put("hello world")
await inq.wait_for_consumer()
Enforce a type on a queue, error if violated
from queueplus import TypedAioQueue, RaiseOnViolation
q = TypedAioQueue(int, violations_strategy=RaiseOnViolation)
[await q.put(i) for i in range(10)]
messages = q.collect()
# [0, 1, 2, 3, 4 ,5 ,6 ,7, 8, 9]
await q.put("hello") # Raises a ViolationError
Enforce a type on a queue, ignore if violated
from queueplus import TypedAioQueue, DiscardOnViolation
q = TypedAioQueue(int, violations_strategy=DiscardOnViolation)
[await q.put(i) for i in range(10)]
await q.put("hello")
messages = q.collect()
# [0, 1, 2, 3, 4 ,5 ,6 ,7, 8, 9]
Full example
from queueplus import AioQueue
import asyncio
async def main():
q = AioQueue()
await q.put("hello")
await q.put("world")
async for item in q:
print(item)
if __name__ == "__main__":
asyncio.run(main())
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
queueplus-0.6.0.tar.gz
(6.4 kB
view hashes)
Built Distribution
Close
Hashes for queueplus-0.6.0-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 639f055a658772aff395eaf4d35a56ac7a549b258cb028809010fbe37a23212f |
|
MD5 | 93922fad7cc03d2404c630c8b1239649 |
|
BLAKE2b-256 | 524d5f52e3a1444fbd5e5531eec494cb55c08b4293667e2c4a109c4221d4d1da |