Batch any Python iterator by count and/or elapsed time
Project description
batchit
Tiny batching for Python pipelines, streams, and agent workflows.
Batch any Python iterator by count, weight, elapsed time, or any combination — in one pip install, with no dependencies.
from batchit import batcher, async_batcher
# Sync — flush every 100 items or when the next item reveals 5 s have passed
for batch in batcher(source, size=100, timeout=5.0):
db.bulk_insert(batch)
# Async — flush every 100 items or after 5 s of silence from the source
async for batch in async_batcher(source, size=100, timeout=5.0):
await db.bulk_insert(batch)
# Weighted — flush when cumulative token count exceeds 4096
for batch in batcher(prompts, max_weight=4096, weight=count_tokens):
llm.generate(batch)
# Queue draining — batch directly from an asyncio.Queue
from batchit import drain_queue, STOP
async for batch in drain_queue(queue, size=50, timeout=2.0):
await sink.write(batch)
AI / agent pipelines
# Batch embedding requests before upserting to a vector store
async for batch in async_batcher(document_stream(), size=96, timeout=2.0):
vectors = await embed(batch)
await vector_store.upsert(vectors)
# Token-budget batching for LLM inference
for batch in batcher(prompts, max_weight=4096, weight=count_tokens):
responses = llm.generate(batch)
# Batch agent tool outputs before writing to Postgres
async for batch in async_batcher(tool_result_stream(), size=50, timeout=1.0):
await db.execute("INSERT INTO results ...", batch)
# Batch scraped records before bulk API submission
for batch in batcher(scraper.records(), size=200, timeout=10.0):
api.bulk_submit(batch)
# Batch trace/log events before shipping to observability backend
async for batch in async_batcher(event_stream(), size=500, timeout=5.0, maxsize=1000):
await telemetry.ingest(batch)
# Inspect why each batch flushed
from batchit import batcher_with_meta
for result in batcher_with_meta(source, size=100, timeout=5.0):
print(result.reason, result.count, result.age)
process(result.items)
Why not just write this yourself?
You could — it's not a lot of code. But here's what you'd need to get right:
- Sync and async variants with consistent semantics
- Partial final batch always flushed, never silently dropped
- Timeout measured correctly from the first item in the batch, not wall clock
- Async timeout fires independently of item delivery (via
asyncio.wait_for) — not just on arrival - Weight-based flushing for token budgets, byte limits, or any custom metric
- Queue draining for producer/consumer patterns with
asyncio.Queue - Flush metadata (
reason,age,count) for observability and adaptive logic - Backpressure support via bounded internal queue
- Exception propagation from the source through to the consumer
- PEP 561 typing so your IDE and type checker understand the API
- Tested across Python 3.10–3.13
pip install batchit and move on.
Why batchit?
more-itertools.batched() batches by count only. In real streaming workloads
(Kafka consumers, database cursors, API result streams) you also need a time
window and often a weight limit. Every team writes this boilerplate. batchit is the one-liner that replaces it.
| Count limit | Weight limit | Time limit | Async | Queue drain | Backpressure | Dependencies | |
|---|---|---|---|---|---|---|---|
batchit |
✓ | ✓ | ✓ | ✓ | ✓ | ✓ | none |
more-itertools |
✓ | ✗ | ✗ | ✗ | ✗ | ✗ | 1 |
toolz |
✓ | ✗ | ✗ | ✗ | ✗ | ✗ | 1 |
| hand-rolled | maybe | maybe | maybe | maybe | maybe | maybe | — |
Installation
pip install batchit
No runtime dependencies. Python 3.10–3.13. Fully typed (PEP 561). Works with generators, Kafka consumers, database cursors, file readers, async queues, and any other iterable.
Usage
Sync — batcher
from batchit import batcher
# By size only
for batch in batcher(range(1000), size=50):
process(batch)
# By timeout only
for batch in batcher(kafka_consumer, timeout=5.0):
send_to_api(batch)
# By both — whichever fires first
for batch in batcher(db_cursor, size=200, timeout=10.0):
write_to_s3(batch)
# By weight — flush when token budget is exhausted
for batch in batcher(prompts, max_weight=4096, weight=count_tokens):
llm.generate(batch)
# Combined weight + size + timeout
for batch in batcher(records, size=100, max_weight=1_000_000, weight=len, timeout=10.0):
upload(batch)
Sync with metadata — batcher_with_meta
from batchit import batcher_with_meta
for result in batcher_with_meta(source, size=100, timeout=5.0):
print(f"Flushed {result.count} items after {result.age:.2f}s (reason: {result.reason})")
process(result.items)
BatchResult fields: items, reason ("size" | "weight" | "timeout" | "final"), count, age.
Async — async_batcher
from batchit import async_batcher
async for batch in async_batcher(async_source, size=100, timeout=5.0):
await db.bulk_insert(batch)
# With backpressure — producer blocks if more than 200 items are queued
async for batch in async_batcher(fast_source, size=50, timeout=2.0, maxsize=200):
await slow_downstream(batch)
# With weight limit
async for batch in async_batcher(prompt_stream(), max_weight=4096, weight=count_tokens):
await llm.generate_batch(batch)
Async with metadata — async_batcher_with_meta
from batchit import async_batcher_with_meta
async for result in async_batcher_with_meta(source, size=100, timeout=5.0):
metrics.record(result.reason, result.count, result.age)
await process(result.items)
Queue draining — drain_queue
from batchit import drain_queue, STOP
import asyncio
queue: asyncio.Queue[str] = asyncio.Queue()
# Put STOP into the queue to signal end of stream
async for batch in drain_queue(queue, size=50, timeout=2.0):
await sink.write(batch)
# Custom sentinel
async for batch in drain_queue(queue, size=50, sentinel="<<END>>"):
await sink.write(batch)
Convenience aliases
from batchit import batch_by_size, batch_by_timeout
for batch in batch_by_size(records, 100): # same as batcher(records, size=100)
process(batch)
for batch in batch_by_timeout(stream, 5.0): # same as batcher(stream, timeout=5.0)
flush(batch)
Timeout semantics
The two variants behave differently under a slow or stalled source:
batcher (sync) |
async_batcher (async) |
|
|---|---|---|
| How timeout fires | Checked on each item arrival | Fires independently via asyncio.wait_for |
| Stalled source | Waits until the next item arrives, then flushes | Flushes after T seconds even with no new items |
| Triggering item | Included in the flushing batch | Starts the next batch |
| Threading | None — single-threaded safe | asyncio event loop only |
| Source exception | Propagates immediately | Propagates to consumer |
Rule of thumb: use batcher for sync iterables where the source drives timing. Use async_batcher when you need the timeout to fire independently of item delivery.
API
batcher(iterable, *, size=None, timeout=None, max_weight=None, weight=..., min_size=0)
| Parameter | Type | Description |
|---|---|---|
iterable |
Iterable[T] |
Any iterable to batch |
size |
int | None |
Max items per batch |
timeout |
float | None |
Max seconds per batch, measured from the first item |
max_weight |
float | None |
Max total weight per batch |
weight |
Callable[[T], float] |
Weight function for each item. Default: 1.0 per item |
min_size |
int |
Minimum items before a timeout flush fires. Default: 0 |
Yields list[T]. At least one of size, timeout, or max_weight must be provided.
batcher_with_meta(iterable, *, size=None, timeout=None, max_weight=None, weight=..., min_size=0)
Same parameters as batcher. Yields BatchResult[T] with .items, .reason, .count, .age.
async_batcher(aiterable, *, size=None, timeout=None, max_weight=None, weight=..., min_size=0, maxsize=0)
| Parameter | Type | Description |
|---|---|---|
aiterable |
AsyncIterable[T] |
Any async iterable to batch |
size |
int | None |
Max items per batch |
timeout |
float | None |
Max seconds per batch, measured from the first item |
max_weight |
float | None |
Max total weight per batch |
weight |
Callable[[T], float] |
Weight function for each item. Default: 1.0 per item |
min_size |
int |
Minimum items before a timeout flush fires. Default: 0 |
maxsize |
int |
Internal queue cap for backpressure. 0 = unbounded |
Yields list[T] asynchronously.
async_batcher_with_meta(aiterable, *, size=None, timeout=None, max_weight=None, weight=..., min_size=0, maxsize=0)
Same parameters as async_batcher. Yields BatchResult[T] asynchronously.
drain_queue(queue, *, size=None, timeout=None, sentinel=STOP)
| Parameter | Type | Description |
|---|---|---|
queue |
asyncio.Queue[T] |
Queue to drain |
size |
int | None |
Max items per batch |
timeout |
float | None |
Max seconds to wait for the next item before flushing |
sentinel |
object |
Value that signals end of stream. Default: STOP |
Yields list[T] asynchronously. At least one of size or timeout must be provided.
BatchResult[T]
| Field | Type | Description |
|---|---|---|
items |
list[T] |
The batch contents |
reason |
"size" | "weight" | "timeout" | "final" |
Why the batch was flushed |
count |
int |
Number of items (len(items)) |
age |
float |
Seconds elapsed since the first item in this batch arrived |
Patterns
Kafka consumer
from kafka import KafkaConsumer
from batchit import batcher
consumer = KafkaConsumer("events")
for batch in batcher(consumer, size=500, timeout=10.0):
db.bulk_insert([msg.value for msg in batch])
consumer.commit()
Database cursor
cursor.execute("SELECT * FROM events")
for batch in batcher(cursor, size=1000):
warehouse.insert_many(batch)
LLM token-budget batching
from batchit import batcher
def count_tokens(text: str) -> float:
return len(text.split()) # or use tiktoken
for batch in batcher(prompts, max_weight=4096, weight=count_tokens):
responses = llm.generate(batch)
LLM embedding pipeline
from batchit import async_batcher
async for batch in async_batcher(document_stream(), size=96, timeout=2.0):
response = await openai_client.embeddings.create(input=batch, model="...")
await vector_db.upsert(response.data)
Agent tool results to Postgres
async for batch in async_batcher(tool_outputs(), size=50, timeout=1.0):
await conn.executemany("INSERT INTO tool_results VALUES ($1, $2)", batch)
Adaptive flushing with metadata
from batchit import batcher_with_meta
for result in batcher_with_meta(source, size=100, timeout=5.0, max_weight=4096, weight=len):
if result.reason == "timeout" and result.count < 10:
metrics.increment("small_timeout_batch")
process(result.items)
Producer/consumer queue draining
import asyncio
from batchit import drain_queue, STOP
queue: asyncio.Queue[dict] = asyncio.Queue(maxsize=1000)
async def producer():
async for event in event_source():
await queue.put(event)
await queue.put(STOP)
async def consumer():
async for batch in drain_queue(queue, size=50, timeout=2.0):
await api.bulk_submit(batch)
await asyncio.gather(producer(), consumer())
Web crawler bulk insert
for batch in batcher(crawler.records(), size=200, timeout=10.0):
api.bulk_submit(batch)
Tests
The test suite is organised by use case:
| File | What it covers |
|---|---|
tests/test_sync.py |
Core sync batcher behaviour |
tests/test_async.py |
Core async batcher behaviour |
tests/test_weighted.py |
Weighted batching (max_weight, min_size) — sync and async |
tests/test_drain_queue.py |
Queue draining (drain_queue, STOP) |
tests/test_kafka.py |
Kafka consumer patterns (sync + async) |
tests/test_db.py |
Database cursor and file iterator patterns |
Roadmap
- v0.1 — stable core: sync + async batching by size and timeout
- v0.2 — PEP 561 typing, Python 3.13, async exception propagation, pattern test suites
- v0.3 — async backpressure via bounded queue (
maxsize) - v0.4 — weighted batching (
max_weight,weight), flush metadata (batcher_with_meta), queue draining (drain_queue),min_sizeguard, convenience aliases - next — docs site, additional convenience helpers
Contributing
See CONTRIBUTING.md.
License
MIT — see LICENSE.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file batchit-0.4.0.tar.gz.
File metadata
- Download URL: batchit-0.4.0.tar.gz
- Upload date:
- Size: 21.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d18f630bb33c2228f2f0f104bfeef48f2ae293f39db03010d03b84a012022746
|
|
| MD5 |
f159682bfade40b5b443c78b83de87f7
|
|
| BLAKE2b-256 |
dd4d5ad03cb2684d1393dbb3f6060dc1411f2637e1dda024a493709588c2bc02
|
Provenance
The following attestation bundles were made for batchit-0.4.0.tar.gz:
Publisher:
publish.yml on Ahmedie-m/batchit
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
batchit-0.4.0.tar.gz -
Subject digest:
d18f630bb33c2228f2f0f104bfeef48f2ae293f39db03010d03b84a012022746 - Sigstore transparency entry: 1207654961
- Sigstore integration time:
-
Permalink:
Ahmedie-m/batchit@9b1c59108eb08ae2334eafdbc7ad269f5d80555d -
Branch / Tag:
refs/tags/v0.4.0 - Owner: https://github.com/Ahmedie-m
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@9b1c59108eb08ae2334eafdbc7ad269f5d80555d -
Trigger Event:
push
-
Statement type:
File details
Details for the file batchit-0.4.0-py3-none-any.whl.
File metadata
- Download URL: batchit-0.4.0-py3-none-any.whl
- Upload date:
- Size: 12.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cf4bfd8d0fd19ea358f08d632335fcefda804ea08d8bc0216ed72c87d7b275fe
|
|
| MD5 |
427477bb17942988dd782a34eeaa125c
|
|
| BLAKE2b-256 |
4706beb6de67d9c5e2eface2239a3b17bdff577f7966385bc558f68fb8ec1e23
|
Provenance
The following attestation bundles were made for batchit-0.4.0-py3-none-any.whl:
Publisher:
publish.yml on Ahmedie-m/batchit
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
batchit-0.4.0-py3-none-any.whl -
Subject digest:
cf4bfd8d0fd19ea358f08d632335fcefda804ea08d8bc0216ed72c87d7b275fe - Sigstore transparency entry: 1207654985
- Sigstore integration time:
-
Permalink:
Ahmedie-m/batchit@9b1c59108eb08ae2334eafdbc7ad269f5d80555d -
Branch / Tag:
refs/tags/v0.4.0 - Owner: https://github.com/Ahmedie-m
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@9b1c59108eb08ae2334eafdbc7ad269f5d80555d -
Trigger Event:
push
-
Statement type: