Async batching queue with pluggable flush policies
Project description
A lightweight async batching queue for Python. Collect items and flush them in batches, by record count and time interval, or automatically as fast as your sink can consume. Zero dependencies. Cancel safe.
Flush policies: IntervalPolicy (flush at N records or M seconds) and NaturalPolicy (flush as soon as the previous flush completes)
Install
pip install flushq
Usage
import asyncio
from flushq import FlushQueue, IntervalPolicy
async def save_to_db(events: list[Event]):
await db.bulk_insert(events)
policy = IntervalPolicy(max_wait_seconds=2.0, max_records=500)
async with FlushQueue(flush_fn=save_to_db, policy=policy) as q:
async for event in event_stream():
await q.enqueue(event)
Flushes to save_to_db whenever 500 events accumulate or 2 seconds pass, whichever comes first.
Backpressure is handled automatically — enqueue blocks if the internal buffer is full.
NaturalPolicy — no tuning required
from flushq import FlushQueue, NaturalPolicy
async def send_to_api(events: list[Event]):
await api.bulk_send(events)
policy = NaturalPolicy()
async with FlushQueue(flush_fn=send_to_api, policy=policy) as q:
async for event in event_stream():
await q.enqueue(event)
Flushes as soon as the previous flush completes, with whatever has accumulated in the meantime.
When send_to_api is slow, batches grow larger. When it's fast, batches stay small.
No max_wait_seconds or max_records to tune — throughput self-regulates to match your sink.
If you want a ceiling on batch size:
policy = NaturalPolicy(max_records=1000)
Deduplication
Pass dedupe_key to drop duplicate items within each batch. The first occurrence is kept.
async with FlushQueue(
flush_fn=save_to_db,
policy=policy,
dedupe_key=lambda e: e.id,
) as q:
async for event in event_stream():
await q.enqueue(event)
If two events with the same id land in the same flush window, only the first is passed to save_to_db.
Deduplication is scoped per batch — the same key can appear across separate flushes.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file flushq-0.1.0.tar.gz.
File metadata
- Download URL: flushq-0.1.0.tar.gz
- Upload date:
- Size: 4.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ecd122af4fc3e10a89bceb601f190f435f26dfe5cc7bec8ef157d11344f5d433
|
|
| MD5 |
d588a66d469ff685831b4ae88c210661
|
|
| BLAKE2b-256 |
b4561f69b994c5274c029701ae7314d7f35add2c97f6c40818dc35cac140b7f9
|
File details
Details for the file flushq-0.1.0-py3-none-any.whl.
File metadata
- Download URL: flushq-0.1.0-py3-none-any.whl
- Upload date:
- Size: 5.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8b7ca940242a51f8ac8a9c97d5d1d964b0e205bbc4bacd28298fe00b959e5a98
|
|
| MD5 |
bac8c896382ec7405380ba4f62322445
|
|
| BLAKE2b-256 |
5ed4d475df67b5770e9a09f1e1ed2de2af6c4c0ef31f09d12d6b6bf4911ec80a
|