Batch any Python iterator by count and/or elapsed time
Project description
batchit
Tiny batching for Python pipelines, streams, and agent workflows.
Batch any Python iterator by count, elapsed time, or both — in one pip install, with no dependencies.
from batchit import batcher, async_batcher
# Sync — flush every 100 items or when the next item reveals 5 s have passed
for batch in batcher(source, size=100, timeout=5.0):
db.bulk_insert(batch)
# Async — flush every 100 items or after 5 s of silence from the source
async for batch in async_batcher(source, size=100, timeout=5.0):
await db.bulk_insert(batch)
AI / agent pipelines
# Batch embedding requests before upserting to a vector store
async for batch in async_batcher(document_stream(), size=96, timeout=2.0):
vectors = await embed(batch)
await vector_store.upsert(vectors)
# Batch agent tool outputs before writing to Postgres
async for batch in async_batcher(tool_result_stream(), size=50, timeout=1.0):
await db.execute("INSERT INTO results ...", batch)
# Batch scraped records before bulk API submission
for batch in batcher(scraper.records(), size=200, timeout=10.0):
api.bulk_submit(batch)
# Batch trace/log events before shipping to observability backend
async for batch in async_batcher(event_stream(), size=500, timeout=5.0, maxsize=1000):
await telemetry.ingest(batch)
Why not just write this yourself?
You could — it's not a lot of code. But here's what you'd need to get right:
- Sync and async variants with consistent semantics
- Partial final batch always flushed, never silently dropped
- Timeout measured correctly from the first item in the batch, not wall clock
- Async timeout fires independently of item delivery (via
asyncio.wait_for) — not just on arrival - Backpressure support via bounded internal queue
- Exception propagation from the source through to the consumer
- PEP 561 typing so your IDE and type checker understand the API
- Tested across Python 3.10–3.13
pip install batchit and move on.
Why batchit?
more-itertools.batched() batches by count only. In real streaming workloads
(Kafka consumers, database cursors, API result streams) you also need a time
window. Every team writes this boilerplate. batchit is the one-liner that replaces it.
| Count limit | Time limit | Async | Backpressure | Dependencies | |
|---|---|---|---|---|---|
batchit |
✓ | ✓ | ✓ | ✓ | none |
more-itertools |
✓ | ✗ | ✗ | ✗ | 1 |
toolz |
✓ | ✗ | ✗ | ✗ | 1 |
| hand-rolled | maybe | maybe | maybe | maybe | — |
Installation
pip install batchit
No runtime dependencies. Python 3.10–3.13. Fully typed (PEP 561). Works with generators, Kafka consumers, database cursors, file readers, async queues, and any other iterable.
Usage
Sync — batcher
from batchit import batcher
# By size only
for batch in batcher(range(1000), size=50):
process(batch)
# By timeout only
for batch in batcher(kafka_consumer, timeout=5.0):
send_to_api(batch)
# By both — whichever fires first
for batch in batcher(db_cursor, size=200, timeout=10.0):
write_to_s3(batch)
Async — async_batcher
from batchit import async_batcher
async for batch in async_batcher(async_source, size=100, timeout=5.0):
await db.bulk_insert(batch)
# With backpressure — producer blocks if more than 200 items are queued
async for batch in async_batcher(fast_source, size=50, timeout=2.0, maxsize=200):
await slow_downstream(batch)
Timeout semantics
The two variants behave differently under a slow or stalled source:
batcher (sync) |
async_batcher (async) |
|
|---|---|---|
| How timeout fires | Checked on each item arrival | Fires independently via asyncio.wait_for |
| Stalled source | Waits until the next item arrives, then flushes | Flushes after T seconds even with no new items |
| Triggering item | Included in the flushing batch | Starts the next batch |
| Threading | None — single-threaded safe | asyncio event loop only |
| Source exception | Propagates immediately | Propagates to consumer |
Rule of thumb: use batcher for sync iterables where the source drives timing. Use async_batcher when you need the timeout to fire independently of item delivery.
API
batcher(iterable, *, size=None, timeout=None)
| Parameter | Type | Description |
|---|---|---|
iterable |
Iterable[T] |
Any iterable to batch |
size |
int | None |
Max items per batch |
timeout |
float | None |
Max seconds per batch, measured from the first item |
Yields list[T]. At least one of size / timeout must be provided.
async_batcher(aiterable, *, size=None, timeout=None, maxsize=0)
| Parameter | Type | Description |
|---|---|---|
aiterable |
AsyncIterable[T] |
Any async iterable to batch |
size |
int | None |
Max items per batch |
timeout |
float | None |
Max seconds per batch, measured from the first item |
maxsize |
int |
Internal queue cap for backpressure. 0 = unbounded |
Yields list[T] asynchronously.
Patterns
Kafka consumer
from kafka import KafkaConsumer
from batchit import batcher
consumer = KafkaConsumer("events")
for batch in batcher(consumer, size=500, timeout=10.0):
db.bulk_insert([msg.value for msg in batch])
consumer.commit()
Database cursor
cursor.execute("SELECT * FROM events")
for batch in batcher(cursor, size=1000):
warehouse.insert_many(batch)
LLM embedding pipeline
from batchit import async_batcher
async for batch in async_batcher(document_stream(), size=96, timeout=2.0):
response = await openai_client.embeddings.create(input=batch, model="...")
await vector_db.upsert(response.data)
Agent tool results to Postgres
async for batch in async_batcher(tool_outputs(), size=50, timeout=1.0):
await conn.executemany("INSERT INTO tool_results VALUES ($1, $2)", batch)
Web crawler bulk insert
for batch in batcher(crawler.records(), size=200, timeout=10.0):
api.bulk_submit(batch)
Tests
The test suite is organised by use case:
| File | What it covers |
|---|---|
tests/test_sync.py |
Core sync batcher behaviour |
tests/test_async.py |
Core async batcher behaviour |
tests/test_kafka.py |
Kafka consumer patterns (sync + async) |
tests/test_db.py |
Database cursor and file iterator patterns |
Roadmap
- v0.1 — stable core: sync + async batching by size and timeout
- v0.2 — PEP 561 typing, Python 3.13, async exception propagation, pattern test suites
- v0.3 — async backpressure via bounded queue (
maxsize) - next — docs site, additional convenience helpers
Contributing
See CONTRIBUTING.md.
License
MIT — see LICENSE.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file batchit-0.3.1.tar.gz.
File metadata
- Download URL: batchit-0.3.1.tar.gz
- Upload date:
- Size: 13.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2bc1d425cc7641a10a22a2a0866292f1ef0f66d45801c8e8909e61bd88f2bc8e
|
|
| MD5 |
22f45070b4dd2ace98aa2632581e6c92
|
|
| BLAKE2b-256 |
bd439e5bd677146148fc1025e1310865b730e15d4575c8affb600a74ae2d1953
|
Provenance
The following attestation bundles were made for batchit-0.3.1.tar.gz:
Publisher:
publish.yml on Ahmedie-m/batchit
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
batchit-0.3.1.tar.gz -
Subject digest:
2bc1d425cc7641a10a22a2a0866292f1ef0f66d45801c8e8909e61bd88f2bc8e - Sigstore transparency entry: 1199359204
- Sigstore integration time:
-
Permalink:
Ahmedie-m/batchit@c2a83c883d45c5629075271edeec6ed3589fbb5a -
Branch / Tag:
refs/tags/v0.3.1 - Owner: https://github.com/Ahmedie-m
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@c2a83c883d45c5629075271edeec6ed3589fbb5a -
Trigger Event:
push
-
Statement type:
File details
Details for the file batchit-0.3.1-py3-none-any.whl.
File metadata
- Download URL: batchit-0.3.1-py3-none-any.whl
- Upload date:
- Size: 8.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bbca419e1eef4ca6fcb85772d5f3efe30a9fdda61b5f21d7242b86fc4d98afb1
|
|
| MD5 |
031704c2520be6a1b5d21e55db2c2e3c
|
|
| BLAKE2b-256 |
b75f8f9a99490df24a2be22d71c7ebd3a5d0743c0b006b4c7945733815074ef6
|
Provenance
The following attestation bundles were made for batchit-0.3.1-py3-none-any.whl:
Publisher:
publish.yml on Ahmedie-m/batchit
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
batchit-0.3.1-py3-none-any.whl -
Subject digest:
bbca419e1eef4ca6fcb85772d5f3efe30a9fdda61b5f21d7242b86fc4d98afb1 - Sigstore transparency entry: 1199359210
- Sigstore integration time:
-
Permalink:
Ahmedie-m/batchit@c2a83c883d45c5629075271edeec6ed3589fbb5a -
Branch / Tag:
refs/tags/v0.3.1 - Owner: https://github.com/Ahmedie-m
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@c2a83c883d45c5629075271edeec6ed3589fbb5a -
Trigger Event:
push
-
Statement type: