Batch any Python iterator by count and/or elapsed time
Project description
batchit
Batch any Python iterator by count, elapsed time, or both.
from batchit import batcher
for batch in batcher(source, size=100, timeout=5.0):
db.bulk_insert(batch) # never waits more than 5 s; never more than 100 items
Why batchit?
more-itertools.batched() batches by count only. In real streaming workloads
(Kafka consumers, database cursors, API result streams) you also need a time
window: flush whatever you have after N seconds, even if the count hasn't
been reached yet. Every team writes this boilerplate from scratch. batchit
is that one pip install away.
Installation
pip install batchit
No runtime dependencies. Python 3.10+.
Usage
Sync — batcher
from batchit import batcher
# By size only
for batch in batcher(range(1000), size=50):
process(batch)
# By timeout only (flush every 5 seconds)
for batch in batcher(kafka_consumer, timeout=5.0):
send_to_api(batch)
# By both — whichever fires first
for batch in batcher(db_cursor, size=200, timeout=10.0):
write_to_s3(batch)
The timeout is measured from the first item in the current batch, so no threads or background tasks are needed. Works with any iterable: generators, Kafka consumers, database cursors, file readers.
Async — async_batcher
from batchit import async_batcher
async for batch in async_batcher(async_source, size=100, timeout=5.0):
await db.bulk_insert(batch)
The async variant uses asyncio.wait_for internally, so it flushes a batch
even when the upstream source stalls — no items need to arrive to trigger
the timeout.
API
batcher(iterable, *, size=None, timeout=None)
| Parameter | Type | Description |
|---|---|---|
iterable |
Iterable[T] |
Any iterable to batch |
size |
int | None |
Max items per batch |
timeout |
float | None |
Max seconds per batch |
Yields list[T]. At least one of size / timeout must be provided.
Remaining items at end of the iterable are always yielded (no silent drops).
async_batcher(aiterable, *, size=None, timeout=None)
Same parameters, accepts AsyncIterable[T], yields list[T] asynchronously.
Real-world patterns
Kafka consumer with time-based flush:
from kafka import KafkaConsumer
from batchit import batcher
consumer = KafkaConsumer("my-topic")
for batch in batcher(consumer, size=500, timeout=10.0):
db.bulk_insert([msg.value for msg in batch])
consumer.commit()
Database cursor in chunks:
cursor.execute("SELECT * FROM events")
for batch in batcher(cursor, size=1000):
warehouse.insert_many(batch)
Async HTTP stream:
async for batch in async_batcher(response.content, size=64, timeout=2.0):
await storage.write(batch)
Contributing
See CONTRIBUTING.md.
License
MIT — see LICENSE.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file batchit-0.1.0.tar.gz.
File metadata
- Download URL: batchit-0.1.0.tar.gz
- Upload date:
- Size: 8.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3f2fa0cbbd226b980e37de5895ab52a088b645862e5f53a60ad63e41458bacb5
|
|
| MD5 |
4534e239ba7ba84367fa95c0dc923f9f
|
|
| BLAKE2b-256 |
689bd378b18d1e4a82098a8b50289e24ec01a7d805ef63b80b5f9e4089afecb4
|
Provenance
The following attestation bundles were made for batchit-0.1.0.tar.gz:
Publisher:
publish.yml on Ahmedie-m/batchit
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
batchit-0.1.0.tar.gz -
Subject digest:
3f2fa0cbbd226b980e37de5895ab52a088b645862e5f53a60ad63e41458bacb5 - Sigstore transparency entry: 1199312397
- Sigstore integration time:
-
Permalink:
Ahmedie-m/batchit@cd6d8c1c02beae94603335a2ab309a034c07944a -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/Ahmedie-m
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@cd6d8c1c02beae94603335a2ab309a034c07944a -
Trigger Event:
push
-
Statement type:
File details
Details for the file batchit-0.1.0-py3-none-any.whl.
File metadata
- Download URL: batchit-0.1.0-py3-none-any.whl
- Upload date:
- Size: 6.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
66ffa09884bc0110f62b0ac384c6586e8e25d32a69581286234dcf44c23a9e68
|
|
| MD5 |
fe41bdfccbc66c2268d646bc9e135d90
|
|
| BLAKE2b-256 |
e7b649c7a669cecaf7b9914dd48d0f268f01a302964eb56f3d7ecd06f8811e13
|
Provenance
The following attestation bundles were made for batchit-0.1.0-py3-none-any.whl:
Publisher:
publish.yml on Ahmedie-m/batchit
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
batchit-0.1.0-py3-none-any.whl -
Subject digest:
66ffa09884bc0110f62b0ac384c6586e8e25d32a69581286234dcf44c23a9e68 - Sigstore transparency entry: 1199312400
- Sigstore integration time:
-
Permalink:
Ahmedie-m/batchit@cd6d8c1c02beae94603335a2ab309a034c07944a -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/Ahmedie-m
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@cd6d8c1c02beae94603335a2ab309a034c07944a -
Trigger Event:
push
-
Statement type: