Size and time aware deduplicated asynchronous buffer.
Project description
Stdbuf
Size and time bounded asynchronous buffer with deduplication.
Buffer's content is flushed when either maximum size or time since the first insertion reaches the specified limits.
Inspired by ClickHouse buffer engine. Used for the same purpose.
Note
Multiple concurrent consumers are not supported.
Usage
import asyncio
import time
from stdbuf import Stdbuf
async def produce(buf: Stdbuf[int]):
for i in range(2 ** 16):
await buf.put(i)
# Duplicates are ignored.
await buf.put(i)
async def consume(buf: Stdbuf[int]):
while True:
start = time.perf_counter()
# Get data at least every 2 seconds.
# May return earlier if full.
data = await buf.get()
elapsed = time.perf_counter() - start
assert len(data) <= 16
assert elapsed <= 0.5 + 1e-2
async def main():
with Stdbuf(16, 0.5, True) as buf:
done, pending = await asyncio.wait({
asyncio.create_task(produce(buf)),
asyncio.create_task(consume(buf)),
},
return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
task.cancel()
if __name__ == "__main__":
asyncio.run(main())
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
stdbuf-1.1.0.tar.gz
(4.6 kB
view details)
File details
Details for the file stdbuf-1.1.0.tar.gz
.
File metadata
- Download URL: stdbuf-1.1.0.tar.gz
- Upload date:
- Size: 4.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.13 CPython/3.10.2 Linux/5.13.0-1021-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 757badb395af826fe9c31c08a9e1fcfb866dee744d335d36b12973b2369efe4b |
|
MD5 | 4893fafc57c2e4c3fa3e96522718b5d0 |
|
BLAKE2b-256 | 015b757c0fbf21e4e00275e747473f1c3aa66cac53f11a05b23135894ccf2656 |