Skip to main content

Batch processing with progress tracking and error handling.

Project description

philiprehberger-batch-iter

Tests PyPI version Last updated

Batch processing with progress tracking and error handling.

Installation

pip install philiprehberger-batch-iter

Usage

from philiprehberger_batch_iter import batch, batch_map, collect_errors

# Split any iterable into fixed-size batches
for chunk in batch(range(10), size=3):
    print(chunk)
# [0, 1, 2]
# [3, 4, 5]
# [6, 7, 8]
# [9]

# Enable progress output to stderr
for chunk in batch(range(100), size=25, progress=True):
    process(chunk)
# batch 1: 25 items
# batch 2: 25 items
# ...

Batch map

from philiprehberger_batch_iter import batch_map

# Process items in batches and collect flattened results
results = batch_map(range(10), size=3, fn=lambda chunk: [x * 2 for x in chunk])
print(results)
# [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Error collection

from philiprehberger_batch_iter import collect_errors

def process_batch(items):
    for item in items:
        if item < 0:
            raise ValueError(f"negative value: {item}")

result = collect_errors([1, 2, -3, 4, 5, -6], size=2, fn=process_batch)
print(result.processed)     # 6
print(len(result.errors))   # 2
print(result.duration_ms)   # 0.12

Filtering and reducing in batches

from philiprehberger_batch_iter import batch_filter, batch_reduce

# Stream items through a predicate, yielding batches of matches
for chunk in batch_filter(range(10), size=3, predicate=lambda x: x % 2 == 0):
    print(chunk)
# [0, 2, 4]
# [6, 8]

# Reduce over batches without holding the full sequence in memory
total = batch_reduce(
    range(10),
    size=3,
    fn=lambda acc, chunk: acc + sum(chunk),
    initial=0,
)
print(total)  # 45

Async batching

from philiprehberger_batch_iter import batch_async

async def process():
    async for chunk in batch_async(async_data_source(), size=50):
        await handle(chunk)

Async batch map

from philiprehberger_batch_iter import batch_async_map

async def upload(chunk):
    return await api.upload_many(chunk)

results = await batch_async_map(items_aiter, size=100, fn=upload)

API

Function / Class Description
batch(iterable, size, progress=False) Yield fixed-size batches from an iterable
batch_map(iterable, size, fn) Process batches with fn and return a flat result list
batch_filter(iterable, size, predicate) Yield batches of items matching predicate
batch_reduce(iterable, size, fn, initial) Reduce over batches, calling fn(acc, batch) per batch
batch_async(async_iterable, size) Async generator yielding fixed-size batches
batch_async_map(async_iterable, size, fn) Async counterpart to batch_map, awaits each batch
collect_errors(iterable, size, fn) Process batches and collect errors into a result
BatchResult Dataclass with processed, errors, duration_ms

Development

pip install -e .
python -m pytest tests/ -v

Support

If you find this project useful:

Star the repo

🐛 Report issues

💡 Suggest features

❤️ Sponsor development

🌐 All Open Source Projects

💻 GitHub Profile

🔗 LinkedIn Profile

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

philiprehberger_batch_iter-0.4.0.tar.gz (181.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

philiprehberger_batch_iter-0.4.0-py3-none-any.whl (5.9 kB view details)

Uploaded Python 3

File details

Details for the file philiprehberger_batch_iter-0.4.0.tar.gz.

File metadata

File hashes

Hashes for philiprehberger_batch_iter-0.4.0.tar.gz
Algorithm Hash digest
SHA256 a03a1f5f18cbd0f86b792005be35c06d310051dc5ce5a23b1c328ee38f65c8d7
MD5 6aa7b4272841df15d867b7ee07346a6b
BLAKE2b-256 f6721b1f5bf083bac8698bb5ec1feb9618f5bc579eabf8a190f4be2884ceed01

See more details on using hashes here.

File details

Details for the file philiprehberger_batch_iter-0.4.0-py3-none-any.whl.

File metadata

File hashes

Hashes for philiprehberger_batch_iter-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6c1477cfdaa0fb3e3f2a5c95bf0b66507e85c85f038b97458774ad70f9025d83
MD5 b5013b2cf2f776011dac07f83afe2d18
BLAKE2b-256 6b0cbedea0e186dede9101b10d722dc2f4033203b8a40981c9efbefe18510414

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page