Skip to main content

Incremental S3 File Processor is a Python package that enables efficient, incremental file processing from an Amazon S3 bucket. It keeps track of processed files using a checkpointing mechanism, ensuring that only new or modified files are processed in subsequent runs. The package supports batch processing, allows resetting checkpoints for reprocessing all data, and correctly handles edge cases like files with identical timestamps.

Project description

S3 Incremental File Processor

A Python package that allows users to fetch new files from an S3 bucket in an incremental fashion. It supports checkpointing, ensuring that only new files are accessed in the next run. The package also provides a method to reset the checkpoint, enabling the reprocessing of all data if needed.

Features

  • Incremental File Processing: Fetch only new or modified files from an S3 bucket.
  • Checkpointing: Keeps track of processed files to prevent duplicate processing.
  • Batch Processing: Process files in configurable batch sizes.
  • Storage Class Filtering: Fetch files based on their storage class (e.g., STANDARD).
  • Reset Checkpoint: Delete the checkpoint to reprocess all files.
  • Handles Edge Cases: Ensures correct file ordering when timestamps are identical.

Installation

pip install S3IncrementalProcessor

Usage

from S3IncrementalProcessor import S3IncrementalProcessor

# Initialize the processor with S3 paths
processor = S3IncrementalProcessor(
    "s3://your-bucket/path/to/files/",
    "s3://your-bucket/checkpoints/checkpoint.json"
)

# Fetch new files in batches
new_files = processor.get_new_files(batch_size=5)

if new_files:
    print(f"Processing {len(new_files)} files:")
    for file in new_files:
        print(f"- {file}")
        # Add your processing logic here

    # Commit the checkpoint after processing
    processor.commit_checkpoint()
else:
    print("No new or modified files found.")

# To reset checkpoint and reprocess all files
# processor.reset_checkpoint()

Test Cases

This package has been tested with the following scenarios:

1. 10 Files, Batch Size 5

  • Expected: Two runs to process all files. A third run should return no new files.

2. 10 Files, Batch Size 100

  • Expected: One run should process all 10 files.

3. Files with Identical Timestamps

  • Ensures: Files uploaded simultaneously using threading are correctly ordered, and only new files are processed.

Contributing

Contributions are welcome! Please open an issue or submit a pull request. License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

s3incrementalprocessor-0.1.0.tar.gz (2.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

S3IncrementalProcessor-0.1.0-py3-none-any.whl (2.2 kB view details)

Uploaded Python 3

File details

Details for the file s3incrementalprocessor-0.1.0.tar.gz.

File metadata

  • Download URL: s3incrementalprocessor-0.1.0.tar.gz
  • Upload date:
  • Size: 2.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.7

File hashes

Hashes for s3incrementalprocessor-0.1.0.tar.gz
Algorithm Hash digest
SHA256 ebd7b9f0acca63aaf35179d9d1f8720823eb87c7aaf006233fd79ae60a284331
MD5 4745e11e7a1d5ae0467509499e9d317b
BLAKE2b-256 ea9770278b1fd6481cc1fcd265ca6a4acac0fd1ad24f066168c228f7f46b3b40

See more details on using hashes here.

File details

Details for the file S3IncrementalProcessor-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for S3IncrementalProcessor-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 9806d8254d599ffc0eeb1c4f06e42a7c15fc9880be1f64617466bd2fee156e8d
MD5 dff94c33d78203f00e361d66caa977c1
BLAKE2b-256 420da422799100515de9145f913b638593f1c04eff23d554791b13b667ea470a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page