Incremental S3 File Processor is a Python package that enables efficient, incremental file processing from an Amazon S3 bucket. It keeps track of processed files using a checkpointing mechanism, ensuring that only new or modified files are processed in subsequent runs. The package supports batch processing, allows resetting checkpoints for reprocessing all data, and correctly handles edge cases like files with identical timestamps.
Project description
S3 Incremental File Processor
A Python package that allows users to fetch new files from an S3 bucket in an incremental fashion. It supports checkpointing, ensuring that only new files are accessed in the next run. The package also provides a method to reset the checkpoint, enabling the reprocessing of all data if needed.
Features
- Incremental File Processing: Fetch only new or modified files from an S3 bucket.
- Checkpointing: Keeps track of processed files to prevent duplicate processing.
- Batch Processing: Process files in configurable batch sizes.
- Storage Class Filtering: Fetch files based on their storage class (e.g.,
STANDARD). - Reset Checkpoint: Delete the checkpoint to reprocess all files.
- Handles Edge Cases: Ensures correct file ordering when timestamps are identical.
Installation
pip install S3IncrementalProcessor
Usage
from S3IncrementalProcessor import S3IncrementalProcessor
# Initialize the processor with S3 paths
processor = S3IncrementalProcessor(
"s3://your-bucket/path/to/files/",
"s3://your-bucket/checkpoints/checkpoint.json"
)
# Fetch new files in batches
new_files = processor.get_new_files(batch_size=5)
if new_files:
print(f"Processing {len(new_files)} files:")
for file in new_files:
print(f"- {file}")
# Add your processing logic here
# Commit the checkpoint after processing
processor.commit_checkpoint()
else:
print("No new or modified files found.")
# To reset checkpoint and reprocess all files
# processor.reset_checkpoint()
Test Cases
This package has been tested with the following scenarios:
1. 10 Files, Batch Size 5
- Expected: Two runs to process all files. A third run should return no new files.
2. 10 Files, Batch Size 100
- Expected: One run should process all 10 files.
3. Files with Identical Timestamps
- Ensures: Files uploaded simultaneously using threading are correctly ordered, and only new files are processed.
Contributing
Contributions are welcome! Please open an issue or submit a pull request. License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file s3incrementalprocessor-0.1.0.tar.gz.
File metadata
- Download URL: s3incrementalprocessor-0.1.0.tar.gz
- Upload date:
- Size: 2.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ebd7b9f0acca63aaf35179d9d1f8720823eb87c7aaf006233fd79ae60a284331
|
|
| MD5 |
4745e11e7a1d5ae0467509499e9d317b
|
|
| BLAKE2b-256 |
ea9770278b1fd6481cc1fcd265ca6a4acac0fd1ad24f066168c228f7f46b3b40
|
File details
Details for the file S3IncrementalProcessor-0.1.0-py3-none-any.whl.
File metadata
- Download URL: S3IncrementalProcessor-0.1.0-py3-none-any.whl
- Upload date:
- Size: 2.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9806d8254d599ffc0eeb1c4f06e42a7c15fc9880be1f64617466bd2fee156e8d
|
|
| MD5 |
dff94c33d78203f00e361d66caa977c1
|
|
| BLAKE2b-256 |
420da422799100515de9145f913b638593f1c04eff23d554791b13b667ea470a
|