Lightweight batch processing engine.
Project description
batchwise
Lightweight batch processing engine.
Table of Contents
Getting Started
Installation
pip install batchwise
Basic engine and processor
The core of batchwise consists of an Engine that manages one or more Processor functions. Each processor operates on time-based windows determined by cron expressions.
import datetime
from batchwise import Engine, Window
# Initialize the engine (defaults are shown here)
engine = Engine(
checkpoint_path="./batchwise_checkpoints", # path or uri to checkpoint directory
logger_name="batchwise", # name of logger instance
timezone=datetime.timezone.utc, # timezone (datetime.tzinfo object)
)
# Register a processor using the decorator
@engine.processor(
interval="*/30 * * * *", # Run every 30 minutes
delay="1h", # Wait 1 hour before considering window complete
lookback="1d", # Look back 1 day for processing windows
include_incomplete=False, # Only process complete windows
)
def my_processor(window: Window):
print(f"Processing window: {window.start} to {window.end}")
print(f"Window complete: {window.complete}")
# Your processing logic here
pass
# Run the engine
engine()
The Window object provides:
window_id: Unique identifier for the windowstart: Start time of the window (datetime.datetime)end: End time of the window (datetime.datetime)complete: Boolean indicating if the window is complete
Using an fsspec-compatible filesystem
batchwise supports any fsspec-compatible filesystem for checkpoint storage, enabling cloud storage integration:
from pathlib import Path
from fsspec.implementations.dirfs import DirFileSystem
from fsspec.implementations.local import LocalFileSystem
from batchwise import Engine, Window
# For example, construct a DirFileSystem to wrap a base filesystem
fs = LocalFileSystem()
dir_fs = DirFileSystem(path=str(Path.cwd()), fs=fs)
engine = Engine(
checkpoint_path="batchwise_checkpoints", # sub-path on filesystem
fs=dir_fs,
)
@engine.processor(
interval="0 * * * *",
delay="2h",
lookback="1d",
)
def my_processor(window: Window):
# Your processing logic here
pass
engine()
For cloud storage, use the appropriate fsspec implementation (e.g., s3fs, adlfs, gcsfs).
Additional context
You can pass additional context to your processors:
from batchwise import Engine, Window
engine = Engine()
# Define a dictionary to be passed to the processor
config = {
"database_url": "postgresql://...",
"threshold": 100,
}
@engine.processor(
interval="0 0 * * *",
delay="1d",
lookback="7d",
context=config
)
def processor_with_context(window: Window, context: dict):
# Access context parameters
database_url = context["database_url"]
threshold = context["threshold"]
# Your processing logic here
pass
engine()
Important: When using context, your processor function must accept a context parameter.
Parallel and/or continuous processing
Run multiple processors in parallel using multiprocessing:
from batchwise import Engine, Window
engine = Engine()
@engine.processor(interval="*/15 * * * *", delay="30m", lookback="2h")
def processor_1(window: Window):
pass
@engine.processor(interval="*/30 * * * *", delay="1h", lookback="4h")
def processor_2(window: Window):
pass
@engine.processor(interval="0 * * * *", delay="2h", lookback="12h")
def processor_3(window: Window):
pass
# Run sequentially (default)
engine()
# Run with 3 parallel processes
engine(num_processes=3)
# Or run continuously with a minimum time between full cycles
engine(num_processes=3, every_seconds=60) # Check and run every 60 seconds
License
batchwise is distributed under the terms of the MIT license.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file batchwise-0.2.0.tar.gz.
File metadata
- Download URL: batchwise-0.2.0.tar.gz
- Upload date:
- Size: 16.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9778ccd48249bfa40fea2d696e24e768da7f6d261218b881b7c2b8b4f6f5f760
|
|
| MD5 |
23dceedd7ef096c6bf719d71615b109e
|
|
| BLAKE2b-256 |
2325f5f54e72b8c2829e5f3722811c5530d58e84a3ae38fb924073524ff75051
|
Provenance
The following attestation bundles were made for batchwise-0.2.0.tar.gz:
Publisher:
build.yml on manuelkonrad/batchwise
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
batchwise-0.2.0.tar.gz -
Subject digest:
9778ccd48249bfa40fea2d696e24e768da7f6d261218b881b7c2b8b4f6f5f760 - Sigstore transparency entry: 714164859
- Sigstore integration time:
-
Permalink:
manuelkonrad/batchwise@421cd393fb005f8fab89febdff1066b74c0c81e2 -
Branch / Tag:
refs/tags/v0.2.0 - Owner: https://github.com/manuelkonrad
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
build.yml@421cd393fb005f8fab89febdff1066b74c0c81e2 -
Trigger Event:
push
-
Statement type:
File details
Details for the file batchwise-0.2.0-py3-none-any.whl.
File metadata
- Download URL: batchwise-0.2.0-py3-none-any.whl
- Upload date:
- Size: 10.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2a9e33b298bfb24abb625f02814c37d8ea49a5d09f3fa9049f798aa129b07fb8
|
|
| MD5 |
c44e0c058c9834c9def915cbd53cc039
|
|
| BLAKE2b-256 |
95b2811d15f3be016855b57092430adcab0866889c7e976ba8ccaf89cd307b90
|
Provenance
The following attestation bundles were made for batchwise-0.2.0-py3-none-any.whl:
Publisher:
build.yml on manuelkonrad/batchwise
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
batchwise-0.2.0-py3-none-any.whl -
Subject digest:
2a9e33b298bfb24abb625f02814c37d8ea49a5d09f3fa9049f798aa129b07fb8 - Sigstore transparency entry: 714164864
- Sigstore integration time:
-
Permalink:
manuelkonrad/batchwise@421cd393fb005f8fab89febdff1066b74c0c81e2 -
Branch / Tag:
refs/tags/v0.2.0 - Owner: https://github.com/manuelkonrad
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
build.yml@421cd393fb005f8fab89febdff1066b74c0c81e2 -
Trigger Event:
push
-
Statement type: