asyncio-powered coroutine worker pool
Project description
aioutilities
asyncio
-powered coroutine worker pool. No more juggling bounded semaphores and annoying timeouts, and allows you to run through millions of pieces of data efficiently.
Installation
python -m pip install -U aioutilities
Credits
This is refactored and built on top of https://github.com/CaliDog/asyncpool
Example Usage
from asyncio import Queue, ensure_future, run, sleep
from aioutilities.pool import AioPool
async def example_coro(initial_number: int, result_queue: Queue[int]) -> None:
result = initial_number * 2
print(f"Processing Value! -> {initial_number} * 2 = {result}")
await sleep(1)
await result_queue.put(initial_number * 2)
async def result_reader(queue: Queue[int | None]) -> None:
while True:
value = await queue.get()
if value is None:
break
print(f"Got value! -> {value}")
async def example() -> None:
result_queue = Queue[int | None]()
reader_future = ensure_future(result_reader(result_queue))
# Start a worker pool with 10 coroutines, invokes `example_coro` and waits for
# it to complete or 5 minutes to pass.
pool = AioPool[int](
name="ExamplePool",
task=example_coro,
worker_qty=10,
timeout=300,
)
async with pool.spawn() as workers:
for i in range(50):
await workers.push(i, result_queue)
await result_queue.put(None)
await reader_future
def run_example() -> None:
run(example())
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
aioutilities-1.0.3.tar.gz
(5.4 kB
view hashes)
Built Distribution
Close
Hashes for aioutilities-1.0.3-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 02c9152ad1045a06b4866d916739cb5fc12a01cbdc4e59376da55874d840b03e |
|
MD5 | 5c4a64a46550691aeeb5e56cc9de87b0 |
|
BLAKE2b-256 | e2f2a0099694168129ff1f4fbe1ac4fdb3ec489bd3fbfebb83d94577c7776361 |