Skip to main content

Incremental S3 File Processor is a Python package that enables efficient, incremental file processing from an Amazon S3 bucket. It keeps track of processed files using a checkpointing mechanism, ensuring that only new or modified files are processed in subsequent runs. The package supports batch processing, allows resetting checkpoints for reprocessing all data, and correctly handles edge cases like files with identical timestamps.

Project description

S3 Incremental File Processor

A Python package that allows users to fetch new files from an S3 bucket in an incremental fashion. It supports checkpointing, ensuring that only new files are accessed in the next run. The package also provides a method to reset the checkpoint, enabling the reprocessing of all data if needed.

Features

  • Incremental File Processing: Fetch only new or modified files from an S3 bucket.
  • Checkpointing: Keeps track of processed files to prevent duplicate processing.
  • Batch Processing: Process files in configurable batch sizes.
  • Storage Class Filtering: Fetch files based on their storage class (e.g., STANDARD).
  • Reset Checkpoint: Delete the checkpoint to reprocess all files.
  • Handles Edge Cases: Ensures correct file ordering when timestamps are identical.

Installation

pip install S3IncrementalProcessor

Usage

from S3IncrementalProcessor import S3IncrementalProcessor

# Initialize the processor with S3 paths
processor = S3IncrementalProcessor(
    "s3://your-bucket/path/to/files/",
    "s3://your-bucket/checkpoints/checkpoint.json"
)

# Fetch new files in batches
new_files = processor.get_new_files(batch_size=5)

if new_files:
    print(f"Processing {len(new_files)} files:")
    for file in new_files:
        print(f"- {file}")
        # Add your processing logic here

    # Commit the checkpoint after processing
    processor.commit_checkpoint()
else:
    print("No new or modified files found.")

# To reset checkpoint and reprocess all files
# processor.reset_checkpoint()

Test Cases

This package has been tested with the following scenarios:

1. 10 Files, Batch Size 5

  • Expected: Two runs to process all files. A third run should return no new files.

2. 10 Files, Batch Size 100

  • Expected: One run should process all 10 files.

3. Files with Identical Timestamps

  • Ensures: Files uploaded simultaneously using threading are correctly ordered, and only new files are processed.

Contributing

Contributions are welcome! Please open an issue or submit a pull request. License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

s3incrementalprocessor-0.1.2.tar.gz (3.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

S3IncrementalProcessor-0.1.2-py3-none-any.whl (4.2 kB view details)

Uploaded Python 3

File details

Details for the file s3incrementalprocessor-0.1.2.tar.gz.

File metadata

  • Download URL: s3incrementalprocessor-0.1.2.tar.gz
  • Upload date:
  • Size: 3.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.7

File hashes

Hashes for s3incrementalprocessor-0.1.2.tar.gz
Algorithm Hash digest
SHA256 bad95b3045fc88f29d1b168779860ff86389a27d4155d5458e642887a7f29ed8
MD5 086c6903b3fdbcb4d269779fb84230a7
BLAKE2b-256 0c7f3fec79b233482416e4728094d9c70542841cc84b446fcf24f6159cb6040c

See more details on using hashes here.

File details

Details for the file S3IncrementalProcessor-0.1.2-py3-none-any.whl.

File metadata

File hashes

Hashes for S3IncrementalProcessor-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 845d0e212c4168629a7c88c9115068eb54de193273b3526d851d6c304fbea52f
MD5 ea79480cda8664ece066493486988021
BLAKE2b-256 56f57726b419ff38bdfd713d6c19273ee8ff1344583b8f81a01ef22329055fd7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page