Skip to main content

Batch any Python iterator by count and/or elapsed time

Project description

batchit

PyPI version Python versions License: MIT CI

Batch any Python iterator by count, elapsed time, or both.

from batchit import batcher

for batch in batcher(source, size=100, timeout=5.0):
    db.bulk_insert(batch)   # never waits more than 5 s; never more than 100 items

Why batchit?

more-itertools.batched() batches by count only. In real streaming workloads (Kafka consumers, database cursors, API result streams) you also need a time window: flush whatever you have after N seconds, even if the count hasn't been reached yet. Every team writes this boilerplate from scratch. batchit is that one pip install away.

Count limit Time limit Async Dependencies
batchit none
more-itertools 1
toolz 1
hand-rolled maybe maybe maybe

Installation

pip install batchit

No runtime dependencies. Python 3.10–3.13. Fully typed (PEP 561).

Usage

Sync — batcher

from batchit import batcher

# By size only
for batch in batcher(range(1000), size=50):
    process(batch)

# By timeout only (flush every 5 seconds)
for batch in batcher(kafka_consumer, timeout=5.0):
    send_to_api(batch)

# By both — whichever fires first
for batch in batcher(db_cursor, size=200, timeout=10.0):
    write_to_s3(batch)

The timeout is measured from the first item in the current batch, so no threads or background tasks are needed. Works with any iterable: generators, Kafka consumers, database cursors, file readers.

Async — async_batcher

from batchit import async_batcher

async for batch in async_batcher(async_source, size=100, timeout=5.0):
    await db.bulk_insert(batch)

The async variant uses asyncio.wait_for internally, so it flushes a batch even when the upstream source stalls — no items need to arrive to trigger the timeout.

API

batcher(iterable, *, size=None, timeout=None)

Parameter Type Description
iterable Iterable[T] Any iterable to batch
size int | None Max items per batch
timeout float | None Max seconds per batch

Yields list[T]. At least one of size / timeout must be provided. Remaining items at end of the iterable are always yielded (no silent drops).

async_batcher(aiterable, *, size=None, timeout=None)

Same parameters, accepts AsyncIterable[T], yields list[T] asynchronously.

Real-world patterns

Kafka consumer with time-based flush:

from kafka import KafkaConsumer
from batchit import batcher

consumer = KafkaConsumer("my-topic")
for batch in batcher(consumer, size=500, timeout=10.0):
    db.bulk_insert([msg.value for msg in batch])
    consumer.commit()

Database cursor in chunks:

cursor.execute("SELECT * FROM events")
for batch in batcher(cursor, size=1000):
    warehouse.insert_many(batch)

Async HTTP stream:

async for batch in async_batcher(response.content, size=64, timeout=2.0):
    await storage.write(batch)

Contributing

See CONTRIBUTING.md.

License

MIT — see LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

batchit-0.2.0.tar.gz (11.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

batchit-0.2.0-py3-none-any.whl (7.1 kB view details)

Uploaded Python 3

File details

Details for the file batchit-0.2.0.tar.gz.

File metadata

  • Download URL: batchit-0.2.0.tar.gz
  • Upload date:
  • Size: 11.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for batchit-0.2.0.tar.gz
Algorithm Hash digest
SHA256 f06b9a50fe07becc1a047b4385b94987fd041c2baeeb35557b9ca63d4230ea5f
MD5 94e870c31acd3cbb3707057f0cf2d817
BLAKE2b-256 e00352eaec8dc60cddf836c36ba83ffa8016854a9fcc75476048d1836b240bf9

See more details on using hashes here.

Provenance

The following attestation bundles were made for batchit-0.2.0.tar.gz:

Publisher: publish.yml on Ahmedie-m/batchit

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file batchit-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: batchit-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 7.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for batchit-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 7f4d8c390a7e4849e50f66e1755eb46f044a90eaf05ebaca1741031f3a5abd87
MD5 9c1c39e1e912fbd62aea850dfbc76422
BLAKE2b-256 1d81a670a7238e4bf5733131d68a6761d6ac2c5c47a7e93dd90fb3f4b4b6b55c

See more details on using hashes here.

Provenance

The following attestation bundles were made for batchit-0.2.0-py3-none-any.whl:

Publisher: publish.yml on Ahmedie-m/batchit

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page