Skip to main content

Distributed, scalable GCS PDF processing pipeline with Gemini OCR, Redis, and API endpoints.

Project description

DCPR PDF Processing Pipeline

Overview

This service processes PDFs from a Google Cloud Storage (GCS) bucket, performs OCR using the Gemini API, and uploads clean, text-based PDFs to a destination GCS folder. It features robust error handling, logging, monitoring, and is designed for scalable, production use.


Setup & Environment

  1. Clone the repository and navigate to the project root:

    git clone <repo-url>
    cd nest-starters
    
  2. Install dependencies and the package:

    pip install -r requirements.txt
    pip install .
    
  3. Environment Variables:

    • Place your .env file in the secrets/ directory at the project root.
    • Example .env variables:
      GCS_BUCKET=your-bucket-name
      GCS_SOURCE_PREFIX=source-folder
      GCS_DEST_PREFIX=dest-folder
      GEMINI_API_KEY=your-gemini-api-key
      SUPABASE_URL=https://your-supabase-url
      SUPABASE_API_KEY=your-supabase-api-key
      MAX_RETRIES=3
      GEMINI_GLOBAL_CONCURRENCY=10
      MAX_CONCURRENT_FILES=3
      PAGE_MAX_WORKERS=5
      DOC_BATCH_SIZE=10
      MAX_QUEUE=100
      POLL_INTERVAL=30
      G_MESSAGES_DEBUG=none
      G_DEBUG=fatal-warnings
      

Running as a pip-installed Package

After installing the package with pip install ., you can use the following console scripts from anywhere:

Run the Worker (background processing)

dist-gcs-worker
  • This will start the background worker that processes files from GCS.

Run the FastAPI API Server

dist_gcs_pdf_processing
  • This will start the FastAPI server (with the worker running in the background).
  • You can override the port (default 8000):
    dist_gcs_pdf_processing 8080
    

Run the API app directly (ASGI app, for advanced users)

uvicorn dist_gcs_pdf_processing.main:app --reload
  • This runs the FastAPI app directly (no worker thread). The ASGI app is always app in main.py.

Logging & Monitoring

  • Logs:
    • Human-readable logs: /logs/worker.log (daily rotation)
    • JSON logs: /logs/json/YYYY-MM-DD.json
    • Dead letter logs: /logs/dead_letter/dead_letter.log
  • Supabase:
    • Persistent errors are logged to the Activity_Error_Log table for monitoring.
  • Suppressing GTK/GLib output:
    • Set in .env and at the top of main.py and worker.py.

Error Handling

  • Retries for transient errors (network, quota, etc.) with configurable limits.
  • Per-page retries: Each page is retried up to MAX_RETRIES times before being skipped.
  • Per-file retries: If a file fails (e.g., page count mismatch), the whole file is retried up to MAX_RETRIES times.
  • All persistent errors are logged to file, JSON, dead letter, and Supabase.

Scalability, Concurrency & Throttling

  • Rolling Concurrency Model:
    • The worker always keeps up to MAX_CONCURRENT_FILES files in progress.
    • As soon as a file finishes, the next available file is picked up, until all are processed.
    • This ensures maximum throughput and efficient resource usage.
  • Per-Page Concurrency:
    • Each file's pages are OCRed in parallel, up to PAGE_MAX_WORKERS at a time.
  • Global Gemini API Throttling:
    • All Gemini API requests (across all files and pages) are globally throttled by GEMINI_GLOBAL_CONCURRENCY.
    • This ensures you never exceed your API quota or rate limits.
  • Backpressure:
    • If too many files are queued (MAX_QUEUE), the worker will pause and log a warning.
  • Horizontal scaling:
    • Run multiple stateless worker instances on different machines/VMs for even more throughput.

Temp/Log Cleanup

  • Files in logs, logs/json, logs/dead_letter, staging, and processed older than 200 days are deleted before the worker starts.

Tests

  • Unit and integration tests are located in /tests.
  • Tests cover:
    • PDF splitting/merging
    • Per-page and per-file retry logic
    • File-level rolling concurrency (ensuring the concurrency window is always full)
    • Global Gemini API throttling
    • Trace ID propagation in logs
  • To run tests:
    pytest
    

CI/CD

  • GitHub Actions workflow runs linting and tests on every push.
  • Example workflow file: .github/workflows/ci.yml.

Additional Notes

  • All print/log statements are also written to log files.
  • Trace/request IDs are used for end-to-end traceability.
  • For any persistent errors, check Supabase and the dead letter log for details.

Project Structure

project-root/
├── src/                # All main code (import as src.module)
├── tests/              # All tests (import as from src.module import ...)
├── logs/               # Log output
├── secrets/            # Secrets and credentials
│   └── your-service-account.json
│   └── .env
├── requirements.txt    # Python dependencies
├── setup.py            # For pip install -e .
├── Dockerfile
├── README.md

Local Development

  1. Install dependencies:

    pip install -r requirements.txt
    pip install .
    
  2. Run the worker:

    dist-gcs-worker
    # or, for the API server:
    dist_gcs_pdf_processing
    
  3. Run tests:

    pytest --import-mode=importlib tests/
    # or, if you have trouble with imports:
    PYTHONPATH=. pytest tests/
    

Docker Usage

  1. Build the Docker image:

    docker build -t nest-starters .
    
  2. Run the container (worker only):

    docker run --rm -it -v $PWD/logs:/app/logs nest-starters dist-gcs-worker
    
  3. Run the container (API server):

    docker run --rm -it -v $PWD/logs:/app/logs -p 8000:8000 nest-starters dist_gcs_pdf_processing
    
  • The Dockerfile can be overridden to run either the worker or the API server.
  • The .dockerignore file ensures your build context is clean and fast.
  • You can override the CMD to run tests or other scripts as needed:
    docker run --rm -it nest-starters python -m pytest --import-mode=importlib tests/
    

Continuous Integration (CI)

  • Use GitHub Actions or similar CI to run:
    - name: Install deps
      run: |
        pip install -r requirements.txt
        pip install .
    - name: Run tests
    

Secrets and Environment Variables

  • Place your GCP credentials JSON file in a secrets/ directory at the project root (not tracked by git).
  • In your .env file (in the secrets/ directory), set:
    GOOGLE_APPLICATION_CREDENTIALS=secrets/your-service-account.json
    
  • The worker will automatically load .env from secrets/.
  • For Docker/CI, mount the secrets/ directory and ensure the .env file and credentials are present.
  • Never commit secrets or credentials to version control!

Installation

You can install the package from a GitHub Release:

pip install https://github.com/youruser/dist-gcs-pdf-processing/releases/download/v0.1.0/dist_gcs_pdf_processing-0.1.0-py3-none-any.whl

CLI Usage

After installation, you can run:

dist-gcs-worker  # Start the background worker

dist-gcs-api     # Start the FastAPI API server (with all endpoints)

Or, for advanced usage:

python -m dist_gcs_pdf_processing.worker
python -m dist_gcs_pdf_processing.main

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dist_gcs_pdf_processing-1.0.0.tar.gz (22.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dist_gcs_pdf_processing-1.0.0-py3-none-any.whl (18.2 kB view details)

Uploaded Python 3

File details

Details for the file dist_gcs_pdf_processing-1.0.0.tar.gz.

File metadata

  • Download URL: dist_gcs_pdf_processing-1.0.0.tar.gz
  • Upload date:
  • Size: 22.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.9

File hashes

Hashes for dist_gcs_pdf_processing-1.0.0.tar.gz
Algorithm Hash digest
SHA256 149d06b48624307c8ed16d25ceb6130ecbdeb6875a08664b8eff6fae2ad47a5f
MD5 8b7fe11b026728137cb1aa2c1b1b29d7
BLAKE2b-256 ed2e6047396fb214a3ee6d0f100bf802bd66b26bb20b69c77f2d16512f789dfa

See more details on using hashes here.

File details

Details for the file dist_gcs_pdf_processing-1.0.0-py3-none-any.whl.

File metadata

File hashes

Hashes for dist_gcs_pdf_processing-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 7ba8e45382acf426110b6df3e01b117345cd2397619efd2c04a5d76dc4dd6c5c
MD5 74fdd236388a6518d021ee864ce101da
BLAKE2b-256 dae9cb3a9c9a1bc50410876278cb831ae8adb5977d832034b08041ad58358435

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page