Skip to main content

Async batching queue with pluggable flush policies

Project description

A lightweight async batching queue for Python. Collect items and flush them in batches, by record count and time interval, or automatically as fast as your sink can consume. Zero dependencies. Cancel safe.

Flush policies: IntervalPolicy (flush at N records or M seconds) and NaturalPolicy (flush as soon as the previous flush completes)

Install

pip install flushq

Usage

import asyncio
from flushq import FlushQueue, IntervalPolicy

async def save_to_db(events: list[Event]):
    await db.bulk_insert(events)

policy = IntervalPolicy(max_wait_seconds=2.0, max_records=500)

async with FlushQueue(flush_fn=save_to_db, policy=policy) as q:
    async for event in event_stream():
        await q.enqueue(event)

Flushes to save_to_db whenever 500 events accumulate or 2 seconds pass, whichever comes first. Backpressure is handled automatically — enqueue blocks if the internal buffer is full.

NaturalPolicy — no tuning required

from flushq import FlushQueue, NaturalPolicy

async def send_to_api(events: list[Event]):
    await api.bulk_send(events)

policy = NaturalPolicy()

async with FlushQueue(flush_fn=send_to_api, policy=policy) as q:
    async for event in event_stream():
        await q.enqueue(event)

Flushes as soon as the previous flush completes, with whatever has accumulated in the meantime. When send_to_api is slow, batches grow larger. When it's fast, batches stay small. No max_wait_seconds or max_records to tune — throughput self-regulates to match your sink.

If you want a ceiling on batch size:

policy = NaturalPolicy(max_records=1000)

Deduplication

Pass dedupe_key to drop duplicate items within each batch. The first occurrence is kept.

async with FlushQueue(
    flush_fn=save_to_db,
    policy=policy,
    dedupe_key=lambda e: e.id,
) as q:
    async for event in event_stream():
        await q.enqueue(event)

If two events with the same id land in the same flush window, only the first is passed to save_to_db. Deduplication is scoped per batch — the same key can appear across separate flushes.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flushq-0.1.0.tar.gz (4.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flushq-0.1.0-py3-none-any.whl (5.9 kB view details)

Uploaded Python 3

File details

Details for the file flushq-0.1.0.tar.gz.

File metadata

  • Download URL: flushq-0.1.0.tar.gz
  • Upload date:
  • Size: 4.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for flushq-0.1.0.tar.gz
Algorithm Hash digest
SHA256 ecd122af4fc3e10a89bceb601f190f435f26dfe5cc7bec8ef157d11344f5d433
MD5 d588a66d469ff685831b4ae88c210661
BLAKE2b-256 b4561f69b994c5274c029701ae7314d7f35add2c97f6c40818dc35cac140b7f9

See more details on using hashes here.

File details

Details for the file flushq-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: flushq-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 5.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for flushq-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 8b7ca940242a51f8ac8a9c97d5d1d964b0e205bbc4bacd28298fe00b959e5a98
MD5 bac8c896382ec7405380ba4f62322445
BLAKE2b-256 5ed4d475df67b5770e9a09f1e1ed2de2af6c4c0ef31f09d12d6b6bf4911ec80a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page