Batch any Python iterator by count and/or elapsed time
Project description
batchit
Batch any Python iterator by count, elapsed time, or both.
from batchit import batcher
for batch in batcher(source, size=100, timeout=5.0):
db.bulk_insert(batch) # never waits more than 5 s; never more than 100 items
Why batchit?
more-itertools.batched() batches by count only. In real streaming workloads
(Kafka consumers, database cursors, API result streams) you also need a time
window: flush whatever you have after N seconds, even if the count hasn't
been reached yet. Every team writes this boilerplate from scratch. batchit
is that one pip install away.
| Count limit | Time limit | Async | Dependencies | |
|---|---|---|---|---|
batchit |
✓ | ✓ | ✓ | none |
more-itertools |
✓ | ✗ | ✗ | 1 |
toolz |
✓ | ✗ | ✗ | 1 |
| hand-rolled | maybe | maybe | maybe | — |
Installation
pip install batchit
No runtime dependencies. Python 3.10–3.13. Fully typed (PEP 561).
Usage
Sync — batcher
from batchit import batcher
# By size only
for batch in batcher(range(1000), size=50):
process(batch)
# By timeout only (flush every 5 seconds)
for batch in batcher(kafka_consumer, timeout=5.0):
send_to_api(batch)
# By both — whichever fires first
for batch in batcher(db_cursor, size=200, timeout=10.0):
write_to_s3(batch)
Async — async_batcher
from batchit import async_batcher
async for batch in async_batcher(async_source, size=100, timeout=5.0):
await db.bulk_insert(batch)
Timeout semantics
The two variants behave differently under a slow or stalled source — know which you need:
batcher (sync) |
async_batcher (async) |
|
|---|---|---|
| How timeout fires | Checked on each item arrival | Fires independently via asyncio.wait_for |
| Stalled source | Waits until the next item arrives, then flushes | Flushes after T seconds even with no new items |
| Triggering item | Included in the flushing batch | Starts the next batch |
| Threading | None — single-threaded safe | asyncio event loop only |
| Source exception | Propagates immediately | Propagates to consumer |
Rule of thumb: use batcher for sync iterables where the source drives timing (Kafka poll loops, DB cursors). Use async_batcher when you need the timeout to fire independently of item delivery (WebSocket streams, async queues, idle-timeout flushing).
API
batcher(iterable, *, size=None, timeout=None)
| Parameter | Type | Description |
|---|---|---|
iterable |
Iterable[T] |
Any iterable to batch |
size |
int | None |
Max items per batch |
timeout |
float | None |
Max seconds per batch, measured from the first item |
Yields list[T]. At least one of size / timeout must be provided.
Remaining items are always yielded — nothing is silently dropped.
async_batcher(aiterable, *, size=None, timeout=None, maxsize=0)
Same parameters as batcher, plus:
| Parameter | Type | Description |
|---|---|---|
aiterable |
AsyncIterable[T] |
Any async iterable to batch |
maxsize |
int |
Max items to buffer internally before the producer blocks. 0 = unbounded (default) |
Accepts AsyncIterable[T], yields list[T] asynchronously. Set maxsize to apply
backpressure when the source can outpace the consumer.
Patterns
Kafka consumer
from kafka import KafkaConsumer
from batchit import batcher
consumer = KafkaConsumer("events")
for batch in batcher(consumer, size=500, timeout=10.0):
db.bulk_insert([msg.value for msg in batch])
consumer.commit()
Database cursor
cursor.execute("SELECT * FROM events")
for batch in batcher(cursor, size=1000):
warehouse.insert_many(batch)
Async HTTP / WebSocket stream
async for batch in async_batcher(response.content, size=64, timeout=2.0):
await storage.write(batch)
Backpressure — bounded queue
If your source can produce faster than the consumer processes, the internal queue
grows without bound. Use maxsize to cap it — the producer will block naturally
when the queue is full:
# Source blocked if more than 200 items are waiting to be batched
async for batch in async_batcher(fast_source(), size=50, timeout=2.0, maxsize=200):
await slow_downstream(batch)
AI / ML pipelines
Batch embedding requests to stay within API rate limits and maximise throughput:
from batchit import async_batcher
async for batch in async_batcher(document_stream(), size=96, timeout=2.0):
embeddings = await embed(batch) # one API call per batch
await vector_db.upsert(embeddings)
Batch LLM inference over a large dataset:
from batchit import batcher
for batch in batcher(prompts, size=20):
responses = llm.generate(batch) # single batched call
results.extend(responses)
Stream model outputs to a downstream consumer without accumulating everything in memory:
async for batch in async_batcher(model.stream_predict(inputs), size=50, timeout=1.0):
await sink.write(batch)
Tests
The test suite is organised by use case:
| File | What it covers |
|---|---|
tests/test_sync.py |
Core sync batcher behaviour |
tests/test_async.py |
Core async batcher behaviour |
tests/test_kafka.py |
Kafka consumer patterns (sync + async) |
tests/test_db.py |
Database cursor and file iterator patterns |
Contributing
See CONTRIBUTING.md.
License
MIT — see LICENSE.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file batchit-0.3.0.tar.gz.
File metadata
- Download URL: batchit-0.3.0.tar.gz
- Upload date:
- Size: 13.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cc30d1dca3068fe46279304c43e9d303ad1b7163b109effe01a21e27c35800d0
|
|
| MD5 |
0a21bd904e093e605558bba828ee4b14
|
|
| BLAKE2b-256 |
e74b3fe41912f40b9d9353366d99a8b3623127ea40c9d03870a2be7b8a02b780
|
Provenance
The following attestation bundles were made for batchit-0.3.0.tar.gz:
Publisher:
publish.yml on Ahmedie-m/batchit
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
batchit-0.3.0.tar.gz -
Subject digest:
cc30d1dca3068fe46279304c43e9d303ad1b7163b109effe01a21e27c35800d0 - Sigstore transparency entry: 1199332544
- Sigstore integration time:
-
Permalink:
Ahmedie-m/batchit@f1232ab19d6b3fb31f7b5fe31212d1f780794152 -
Branch / Tag:
refs/tags/v0.3.0 - Owner: https://github.com/Ahmedie-m
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@f1232ab19d6b3fb31f7b5fe31212d1f780794152 -
Trigger Event:
push
-
Statement type:
File details
Details for the file batchit-0.3.0-py3-none-any.whl.
File metadata
- Download URL: batchit-0.3.0-py3-none-any.whl
- Upload date:
- Size: 8.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
423b928958093bab59920fbea87fe8a0cfecdcee1964a39b1b827e12326abbbe
|
|
| MD5 |
b4166d564b0cb36ec2f1539f716834dc
|
|
| BLAKE2b-256 |
194335662445f4ca58c945dab4169b19c271215aea3575d32ef8b071658afc58
|
Provenance
The following attestation bundles were made for batchit-0.3.0-py3-none-any.whl:
Publisher:
publish.yml on Ahmedie-m/batchit
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
batchit-0.3.0-py3-none-any.whl -
Subject digest:
423b928958093bab59920fbea87fe8a0cfecdcee1964a39b1b827e12326abbbe - Sigstore transparency entry: 1199332564
- Sigstore integration time:
-
Permalink:
Ahmedie-m/batchit@f1232ab19d6b3fb31f7b5fe31212d1f780794152 -
Branch / Tag:
refs/tags/v0.3.0 - Owner: https://github.com/Ahmedie-m
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@f1232ab19d6b3fb31f7b5fe31212d1f780794152 -
Trigger Event:
push
-
Statement type: