Skip to main content

A Python library that adds functionality to asyncio queues

Project description

QueuePlus ➕

1️⃣ version: 0.7.0

✍️ author: Mitchell Lisle

A Python library that adds functionality to asyncio queues

Install

pip install queueplus

Usage

You can use AioQueue with all the same functionality as a regular asyncio.Queue.

from queueplus import AioQueue

q = AioQueue()
await q.put("hello world")

message = await q.get()
# hello world

With a few extra capabilities

Iterate over a queue (can be async or not)

from queueplus import AioQueue
q = AioQueue()

[await q.put(i) for i in range(10)] # in non-async mode you would call q.put_nowait

async for row in q:
    print(row)

Collect all values into a list

from queueplus import AioQueue
q = AioQueue()

[await q.put(i) for i in range(10)]
messages = q.collect()
# [0, 1, 2, 3, 4 ,5 ,6 ,7, 8, 9]

Create a callback everytime a message is added

from queueplus import AioQueue
inq = AioQueue()
outq = AioQueue()

async def copy_to_q(message: str):
    await outq.put(message)

inq.add_consumer(copy_to_q)

inq.put("hello world")

await inq.wait_for_consumer()

Enforce a type on a queue, error if violated

from queueplus import TypedAioQueue, RaiseOnViolation
q = TypedAioQueue(int, violations_strategy=RaiseOnViolation)

[await q.put(i) for i in range(10)]
messages = q.collect()
# [0, 1, 2, 3, 4 ,5 ,6 ,7, 8, 9]

await q.put("hello") # Raises a ViolationError

Enforce a type on a queue, ignore if violated

from queueplus import TypedAioQueue, DiscardOnViolation
q = TypedAioQueue(int, violations_strategy=DiscardOnViolation)

[await q.put(i) for i in range(10)]
await q.put("hello")

messages = q.collect()
# [0, 1, 2, 3, 4 ,5 ,6 ,7, 8, 9]

Full example

from queueplus import AioQueue
import asyncio

async def main():
    q = AioQueue()
    await q.put("hello")
    await q.put("world")
    
    async for item in q:
        print(item)

if __name__ == "__main__":
    asyncio.run(main())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

queueplus-0.7.0.tar.gz (6.6 kB view hashes)

Uploaded Source

Built Distribution

queueplus-0.7.0-py2.py3-none-any.whl (4.4 kB view hashes)

Uploaded Python 2 Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page