Skip to main content

Lightweight batch processing engine.

Project description

batchwise

CI - Tests CI - Bandit CI - Build

License - MIT PyPI - Version PyPI - Python Version Python Project Management - Hatch Linting - Ruff Types - Mypy Security - Bandit

Lightweight batch processing engine.

Table of Contents

Getting Started

Installation

pip install batchwise

Basic engine and processor

The core of batchwise consists of an Engine that manages one or more Processor functions. Each processor operates on time-based windows determined by cron expressions.

import datetime
from batchwise import Engine, Window

# Initialize the engine (defaults are shown here)
engine = Engine(
    checkpoint_path="./batchwise_checkpoints",  # path or uri to checkpoint directory
    logger_name="batchwise",                    # name of logger instance
    timezone=datetime.timezone.utc,             # timezone (datetime.tzinfo object)
)

# Register a processor using the decorator
@engine.processor(
    interval="*/30 * * * *",   # Run every 30 minutes
    delay="1h",                # Wait 1 hour before considering window complete
    lookback="1d",             # Look back 1 day for processing windows
    include_incomplete=False,  # Only process complete windows
)
def my_processor(window: Window):
    print(f"Processing window: {window.start} to {window.end}")
    print(f"Window complete: {window.complete}")
    # Your processing logic here
    pass

# Run the engine
engine()

The Window object provides:

  • window_id: Unique identifier for the window
  • start: Start time of the window (datetime.datetime)
  • end: End time of the window (datetime.datetime)
  • complete: Boolean indicating if the window is complete

Using an fsspec-compatible filesystem

batchwise supports any fsspec-compatible filesystem for checkpoint storage, enabling cloud storage integration:

from pathlib import Path
from fsspec.implementations.dirfs import DirFileSystem
from fsspec.implementations.local import LocalFileSystem
from batchwise import Engine, Window

# For example, construct a DirFileSystem to wrap a base filesystem
fs = LocalFileSystem()
dir_fs = DirFileSystem(path=str(Path.cwd()), fs=fs)

engine = Engine(
    checkpoint_path="batchwise_checkpoints",  # sub-path on filesystem
    fs=dir_fs,
)

@engine.processor(
    interval="0 * * * *",
    delay="2h",
    lookback="1d",
)
def my_processor(window: Window):
    # Your processing logic here
    pass

engine()

For cloud storage, use the appropriate fsspec implementation (e.g., s3fs, adlfs, gcsfs).

Additional context

You can pass additional context to your processors:

from batchwise import Engine, Window

engine = Engine()

# Define a dictionary to be passed to the processor
config = {
    "database_url": "postgresql://...",
    "threshold": 100,
}

@engine.processor(
    interval="0 0 * * *",
    delay="1d",
    lookback="7d",
    context=config
)
def processor_with_context(window: Window, context: dict):
    # Access context parameters
    database_url = context["database_url"]
    threshold = context["threshold"]
    # Your processing logic here
    pass

engine()

Important: When using context, your processor function must accept a context parameter.

Parallel and/or continuous processing

Run multiple processors in parallel using multiprocessing:

from batchwise import Engine, Window

engine = Engine()

@engine.processor(interval="*/15 * * * *", delay="30m", lookback="2h")
def processor_1(window: Window):
    pass

@engine.processor(interval="*/30 * * * *", delay="1h", lookback="4h")
def processor_2(window: Window):
    pass

@engine.processor(interval="0 * * * *", delay="2h", lookback="12h")
def processor_3(window: Window):
    pass

# Run sequentially (default)
engine()

# Run with 3 parallel processes
engine(num_processes=3)

# Or run continuously with a minimum time between full cycles
engine(num_processes=3, every_seconds=60)  # Check and run every 60 seconds

License

batchwise is distributed under the terms of the MIT license.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

batchwise-0.2.0.tar.gz (16.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

batchwise-0.2.0-py3-none-any.whl (10.2 kB view details)

Uploaded Python 3

File details

Details for the file batchwise-0.2.0.tar.gz.

File metadata

  • Download URL: batchwise-0.2.0.tar.gz
  • Upload date:
  • Size: 16.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for batchwise-0.2.0.tar.gz
Algorithm Hash digest
SHA256 9778ccd48249bfa40fea2d696e24e768da7f6d261218b881b7c2b8b4f6f5f760
MD5 23dceedd7ef096c6bf719d71615b109e
BLAKE2b-256 2325f5f54e72b8c2829e5f3722811c5530d58e84a3ae38fb924073524ff75051

See more details on using hashes here.

Provenance

The following attestation bundles were made for batchwise-0.2.0.tar.gz:

Publisher: build.yml on manuelkonrad/batchwise

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file batchwise-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: batchwise-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 10.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for batchwise-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 2a9e33b298bfb24abb625f02814c37d8ea49a5d09f3fa9049f798aa129b07fb8
MD5 c44e0c058c9834c9def915cbd53cc039
BLAKE2b-256 95b2811d15f3be016855b57092430adcab0866889c7e976ba8ccaf89cd307b90

See more details on using hashes here.

Provenance

The following attestation bundles were made for batchwise-0.2.0-py3-none-any.whl:

Publisher: build.yml on manuelkonrad/batchwise

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page