Skip to main content

Incremental S3 File Processor is a Python package that enables efficient, incremental file processing from an Amazon S3 bucket. It keeps track of processed files using a checkpointing mechanism, ensuring that only new or modified files are processed in subsequent runs. The package supports batch processing, allows resetting checkpoints for reprocessing all data, and correctly handles edge cases like files with identical timestamps.

Project description

S3 Incremental File Processor

A Python package that allows users to fetch new files from an S3 bucket in an incremental fashion. It supports checkpointing, ensuring that only new files are accessed in the next run. The package also provides a method to reset the checkpoint, enabling the reprocessing of all data if needed.

Features

  • Incremental File Processing: Fetch only new or modified files from an S3 bucket.
  • Checkpointing: Keeps track of processed files to prevent duplicate processing.
  • Batch Processing: Process files in configurable batch sizes.
  • Storage Class Filtering: Fetch files based on their storage class (e.g., STANDARD).
  • Reset Checkpoint: Delete the checkpoint to reprocess all files.
  • Handles Edge Cases: Ensures correct file ordering when timestamps are identical.

Installation

pip install S3IncrementalProcessor

Usage

from S3IncrementalProcessor import S3IncrementalProcessor

# Initialize the processor with S3 paths
processor = S3IncrementalProcessor(
    "s3://your-bucket/path/to/files/",
    "s3://your-bucket/checkpoints/checkpoint.json"
)

# Fetch new files in batches
new_files = processor.get_new_files(batch_size=5)

if new_files:
    print(f"Processing {len(new_files)} files:")
    for file in new_files:
        print(f"- {file}")
        # Add your processing logic here

    # Commit the checkpoint after processing
    processor.commit_checkpoint()
else:
    print("No new or modified files found.")

# To reset checkpoint and reprocess all files
# processor.reset_checkpoint()

Test Cases

This package has been tested with the following scenarios:

1. 10 Files, Batch Size 5

  • Expected: Two runs to process all files. A third run should return no new files.

2. 10 Files, Batch Size 100

  • Expected: One run should process all 10 files.

3. Files with Identical Timestamps

  • Ensures: Files uploaded simultaneously using threading are correctly ordered, and only new files are processed.

Contributing

Contributions are welcome! Please open an issue or submit a pull request. License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

s3incrementalprocessor-0.1.1.tar.gz (2.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

S3IncrementalProcessor-0.1.1-py3-none-any.whl (2.2 kB view details)

Uploaded Python 3

File details

Details for the file s3incrementalprocessor-0.1.1.tar.gz.

File metadata

  • Download URL: s3incrementalprocessor-0.1.1.tar.gz
  • Upload date:
  • Size: 2.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.7

File hashes

Hashes for s3incrementalprocessor-0.1.1.tar.gz
Algorithm Hash digest
SHA256 c6c3ca270b19466a74a0f10260033dd56afa2c54348e7f3ca5169a2734733643
MD5 77d04338d6dfbf797dd73c00fab1e74c
BLAKE2b-256 17d4e4ec3b2f0a6864267fffc062f9a6a32005794f957c1434a80c95148638c6

See more details on using hashes here.

File details

Details for the file S3IncrementalProcessor-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for S3IncrementalProcessor-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 8f52eb1c611212273571aa16a916a6a7ba880a1aecaa4233a9d17dfac736d88c
MD5 2d5c964ed2c604963011c5ff96fc452d
BLAKE2b-256 ba684d5dbff38ad86e0f60522666ac637fc4d841539c7e99283eabf1a399b9a6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page