Skip to main content

Batch any Python iterator by count and/or elapsed time

Project description

batchit

PyPI version Python versions License: MIT CI

Batch any Python iterator by count, elapsed time, or both.

from batchit import batcher

for batch in batcher(source, size=100, timeout=5.0):
    db.bulk_insert(batch)   # never waits more than 5 s; never more than 100 items

Why batchit?

more-itertools.batched() batches by count only. In real streaming workloads (Kafka consumers, database cursors, API result streams) you also need a time window: flush whatever you have after N seconds, even if the count hasn't been reached yet. Every team writes this boilerplate from scratch. batchit is that one pip install away.

Count limit Time limit Async Dependencies
batchit none
more-itertools 1
toolz 1
hand-rolled maybe maybe maybe

Installation

pip install batchit

No runtime dependencies. Python 3.10–3.13. Fully typed (PEP 561).

Usage

Sync — batcher

from batchit import batcher

# By size only
for batch in batcher(range(1000), size=50):
    process(batch)

# By timeout only (flush every 5 seconds)
for batch in batcher(kafka_consumer, timeout=5.0):
    send_to_api(batch)

# By both — whichever fires first
for batch in batcher(db_cursor, size=200, timeout=10.0):
    write_to_s3(batch)

Async — async_batcher

from batchit import async_batcher

async for batch in async_batcher(async_source, size=100, timeout=5.0):
    await db.bulk_insert(batch)

Timeout semantics

The two variants behave differently under a slow or stalled source — know which you need:

batcher (sync) async_batcher (async)
How timeout fires Checked on each item arrival Fires independently via asyncio.wait_for
Stalled source Waits until the next item arrives, then flushes Flushes after T seconds even with no new items
Triggering item Included in the flushing batch Starts the next batch
Threading None — single-threaded safe asyncio event loop only
Source exception Propagates immediately Propagates to consumer

Rule of thumb: use batcher for sync iterables where the source drives timing (Kafka poll loops, DB cursors). Use async_batcher when you need the timeout to fire independently of item delivery (WebSocket streams, async queues, idle-timeout flushing).

API

batcher(iterable, *, size=None, timeout=None)

Parameter Type Description
iterable Iterable[T] Any iterable to batch
size int | None Max items per batch
timeout float | None Max seconds per batch, measured from the first item

Yields list[T]. At least one of size / timeout must be provided. Remaining items are always yielded — nothing is silently dropped.

async_batcher(aiterable, *, size=None, timeout=None, maxsize=0)

Same parameters as batcher, plus:

Parameter Type Description
aiterable AsyncIterable[T] Any async iterable to batch
maxsize int Max items to buffer internally before the producer blocks. 0 = unbounded (default)

Accepts AsyncIterable[T], yields list[T] asynchronously. Set maxsize to apply backpressure when the source can outpace the consumer.

Patterns

Kafka consumer

from kafka import KafkaConsumer
from batchit import batcher

consumer = KafkaConsumer("events")
for batch in batcher(consumer, size=500, timeout=10.0):
    db.bulk_insert([msg.value for msg in batch])
    consumer.commit()

Database cursor

cursor.execute("SELECT * FROM events")
for batch in batcher(cursor, size=1000):
    warehouse.insert_many(batch)

Async HTTP / WebSocket stream

async for batch in async_batcher(response.content, size=64, timeout=2.0):
    await storage.write(batch)

Backpressure — bounded queue

If your source can produce faster than the consumer processes, the internal queue grows without bound. Use maxsize to cap it — the producer will block naturally when the queue is full:

# Source blocked if more than 200 items are waiting to be batched
async for batch in async_batcher(fast_source(), size=50, timeout=2.0, maxsize=200):
    await slow_downstream(batch)

AI / ML pipelines

Batch embedding requests to stay within API rate limits and maximise throughput:

from batchit import async_batcher

async for batch in async_batcher(document_stream(), size=96, timeout=2.0):
    embeddings = await embed(batch)          # one API call per batch
    await vector_db.upsert(embeddings)

Batch LLM inference over a large dataset:

from batchit import batcher

for batch in batcher(prompts, size=20):
    responses = llm.generate(batch)          # single batched call
    results.extend(responses)

Stream model outputs to a downstream consumer without accumulating everything in memory:

async for batch in async_batcher(model.stream_predict(inputs), size=50, timeout=1.0):
    await sink.write(batch)

Tests

The test suite is organised by use case:

File What it covers
tests/test_sync.py Core sync batcher behaviour
tests/test_async.py Core async batcher behaviour
tests/test_kafka.py Kafka consumer patterns (sync + async)
tests/test_db.py Database cursor and file iterator patterns

Contributing

See CONTRIBUTING.md.

License

MIT — see LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

batchit-0.3.0.tar.gz (13.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

batchit-0.3.0-py3-none-any.whl (8.1 kB view details)

Uploaded Python 3

File details

Details for the file batchit-0.3.0.tar.gz.

File metadata

  • Download URL: batchit-0.3.0.tar.gz
  • Upload date:
  • Size: 13.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for batchit-0.3.0.tar.gz
Algorithm Hash digest
SHA256 cc30d1dca3068fe46279304c43e9d303ad1b7163b109effe01a21e27c35800d0
MD5 0a21bd904e093e605558bba828ee4b14
BLAKE2b-256 e74b3fe41912f40b9d9353366d99a8b3623127ea40c9d03870a2be7b8a02b780

See more details on using hashes here.

Provenance

The following attestation bundles were made for batchit-0.3.0.tar.gz:

Publisher: publish.yml on Ahmedie-m/batchit

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file batchit-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: batchit-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 8.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for batchit-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 423b928958093bab59920fbea87fe8a0cfecdcee1964a39b1b827e12326abbbe
MD5 b4166d564b0cb36ec2f1539f716834dc
BLAKE2b-256 194335662445f4ca58c945dab4169b19c271215aea3575d32ef8b071658afc58

See more details on using hashes here.

Provenance

The following attestation bundles were made for batchit-0.3.0-py3-none-any.whl:

Publisher: publish.yml on Ahmedie-m/batchit

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page