Skip to main content

Batch any Python iterator by count and/or elapsed time

Project description

batchit

PyPI version Python versions License: MIT CI

Batch any Python iterator by count, elapsed time, or both.

from batchit import batcher

for batch in batcher(source, size=100, timeout=5.0):
    db.bulk_insert(batch)   # never waits more than 5 s; never more than 100 items

Why batchit?

more-itertools.batched() batches by count only. In real streaming workloads (Kafka consumers, database cursors, API result streams) you also need a time window: flush whatever you have after N seconds, even if the count hasn't been reached yet. Every team writes this boilerplate from scratch. batchit is that one pip install away.

Installation

pip install batchit

No runtime dependencies. Python 3.10+.

Usage

Sync — batcher

from batchit import batcher

# By size only
for batch in batcher(range(1000), size=50):
    process(batch)

# By timeout only (flush every 5 seconds)
for batch in batcher(kafka_consumer, timeout=5.0):
    send_to_api(batch)

# By both — whichever fires first
for batch in batcher(db_cursor, size=200, timeout=10.0):
    write_to_s3(batch)

The timeout is measured from the first item in the current batch, so no threads or background tasks are needed. Works with any iterable: generators, Kafka consumers, database cursors, file readers.

Async — async_batcher

from batchit import async_batcher

async for batch in async_batcher(async_source, size=100, timeout=5.0):
    await db.bulk_insert(batch)

The async variant uses asyncio.wait_for internally, so it flushes a batch even when the upstream source stalls — no items need to arrive to trigger the timeout.

API

batcher(iterable, *, size=None, timeout=None)

Parameter Type Description
iterable Iterable[T] Any iterable to batch
size int | None Max items per batch
timeout float | None Max seconds per batch

Yields list[T]. At least one of size / timeout must be provided. Remaining items at end of the iterable are always yielded (no silent drops).

async_batcher(aiterable, *, size=None, timeout=None)

Same parameters, accepts AsyncIterable[T], yields list[T] asynchronously.

Real-world patterns

Kafka consumer with time-based flush:

from kafka import KafkaConsumer
from batchit import batcher

consumer = KafkaConsumer("my-topic")
for batch in batcher(consumer, size=500, timeout=10.0):
    db.bulk_insert([msg.value for msg in batch])
    consumer.commit()

Database cursor in chunks:

cursor.execute("SELECT * FROM events")
for batch in batcher(cursor, size=1000):
    warehouse.insert_many(batch)

Async HTTP stream:

async for batch in async_batcher(response.content, size=64, timeout=2.0):
    await storage.write(batch)

Contributing

See CONTRIBUTING.md.

License

MIT — see LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

batchit-0.1.0.tar.gz (8.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

batchit-0.1.0-py3-none-any.whl (6.5 kB view details)

Uploaded Python 3

File details

Details for the file batchit-0.1.0.tar.gz.

File metadata

  • Download URL: batchit-0.1.0.tar.gz
  • Upload date:
  • Size: 8.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for batchit-0.1.0.tar.gz
Algorithm Hash digest
SHA256 3f2fa0cbbd226b980e37de5895ab52a088b645862e5f53a60ad63e41458bacb5
MD5 4534e239ba7ba84367fa95c0dc923f9f
BLAKE2b-256 689bd378b18d1e4a82098a8b50289e24ec01a7d805ef63b80b5f9e4089afecb4

See more details on using hashes here.

Provenance

The following attestation bundles were made for batchit-0.1.0.tar.gz:

Publisher: publish.yml on Ahmedie-m/batchit

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file batchit-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: batchit-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 6.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for batchit-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 66ffa09884bc0110f62b0ac384c6586e8e25d32a69581286234dcf44c23a9e68
MD5 fe41bdfccbc66c2268d646bc9e135d90
BLAKE2b-256 e7b649c7a669cecaf7b9914dd48d0f268f01a302964eb56f3d7ecd06f8811e13

See more details on using hashes here.

Provenance

The following attestation bundles were made for batchit-0.1.0-py3-none-any.whl:

Publisher: publish.yml on Ahmedie-m/batchit

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page