Skip to main content

Distributed, scalable GCS PDF processing pipeline with Gemini OCR, Redis, and API endpoints.

Project description

DCPR PDF Processing Pipeline

Overview

This service processes PDFs from a Google Cloud Storage (GCS) bucket, performs OCR using the Gemini API, and uploads clean, text-based PDFs to a destination GCS folder. It features robust error handling, logging, monitoring, and is designed for scalable, production use.


Setup & Environment

  1. Clone the repository and navigate to the project root:

    git clone <repo-url>
    cd nest-starters
    
  2. Install dependencies:

    pip install -r requirements.txt
    pip install -e .
    
  3. Environment Variables:

    • Place your .env file in the secrets/ directory at the project root.
    • Example .env variables:
      GCS_BUCKET=your-bucket-name
      GCS_SOURCE_PREFIX=source-folder
      GCS_DEST_PREFIX=dest-folder
      GEMINI_API_KEY=your-gemini-api-key
      SUPABASE_URL=https://your-supabase-url
      SUPABASE_API_KEY=your-supabase-api-key
      MAX_RETRIES=3
      GEMINI_GLOBAL_CONCURRENCY=10
      MAX_CONCURRENT_FILES=3
      PAGE_MAX_WORKERS=5
      DOC_BATCH_SIZE=10
      MAX_QUEUE=100
      POLL_INTERVAL=30
      G_MESSAGES_DEBUG=none
      G_DEBUG=fatal-warnings
      

Running the Worker & API

You can run the background worker or the FastAPI API server:

Run the Worker (background processing only)

python src/worker.py

Run the FastAPI API (with all endpoints)

uvicorn src.main:app --reload
  • The API exposes health, status, logs, metrics, config, and file processing endpoints.
  • The worker will start automatically in the background when the API starts.

Logging & Monitoring

  • Logs:
    • Human-readable logs: /logs/worker.log (daily rotation)
    • JSON logs: /logs/json/YYYY-MM-DD.json
    • Dead letter logs: /logs/dead_letter/dead_letter.log
  • Supabase:
    • Persistent errors are logged to the Activity_Error_Log table for monitoring.
  • Suppressing GTK/GLib output:
    • Set in .env and at the top of main.py and worker.py.

Error Handling

  • Retries for transient errors (network, quota, etc.) with configurable limits.
  • Per-page retries: Each page is retried up to MAX_RETRIES times before being skipped.
  • Per-file retries: If a file fails (e.g., page count mismatch), the whole file is retried up to MAX_RETRIES times.
  • All persistent errors are logged to file, JSON, dead letter, and Supabase.

Scalability, Concurrency & Throttling

  • Rolling Concurrency Model:
    • The worker always keeps up to MAX_CONCURRENT_FILES files in progress.
    • As soon as a file finishes, the next available file is picked up, until all are processed.
    • This ensures maximum throughput and efficient resource usage.
  • Per-Page Concurrency:
    • Each file's pages are OCRed in parallel, up to PAGE_MAX_WORKERS at a time.
  • Global Gemini API Throttling:
    • All Gemini API requests (across all files and pages) are globally throttled by GEMINI_GLOBAL_CONCURRENCY.
    • This ensures you never exceed your API quota or rate limits.
  • Backpressure:
    • If too many files are queued (MAX_QUEUE), the worker will pause and log a warning.
  • Horizontal scaling:
    • Run multiple stateless worker instances on different machines/VMs for even more throughput.

Temp/Log Cleanup

  • Files in logs, logs/json, logs/dead_letter, staging, and processed older than 200 days are deleted before the worker starts.

Tests

  • Unit and integration tests are located in /tests.
  • Tests cover:
    • PDF splitting/merging
    • Per-page and per-file retry logic
    • File-level rolling concurrency (ensuring the concurrency window is always full)
    • Global Gemini API throttling
    • Trace ID propagation in logs
  • To run tests:
    pytest
    

CI/CD

  • GitHub Actions workflow runs linting and tests on every push.
  • Example workflow file: .github/workflows/ci.yml.

Additional Notes

  • All print/log statements are also written to log files.
  • Trace/request IDs are used for end-to-end traceability.
  • For any persistent errors, check Supabase and the dead letter log for details.

Project Structure

project-root/
├── src/                # All main code (import as src.module)
├── tests/              # All tests (import as from src.module import ...)
├── logs/               # Log output
├── secrets/            # Secrets and credentials
│   └── your-service-account.json
│   └── .env
├── requirements.txt    # Python dependencies
├── setup.py            # For pip install -e .
├── Dockerfile
├── README.md

Local Development

  1. Install dependencies:

    pip install -r requirements.txt
    pip install -e .
    
  2. Run the worker:

    python -m src.worker
    # or, if you have an entrypoint script, use that
    
  3. Run tests:

    pytest --import-mode=importlib tests/
    # or, if you have trouble with imports:
    PYTHONPATH=. pytest tests/
    

Docker Usage

  1. Build the Docker image:

    docker build -t nest-starters .
    
  2. Run the container (worker only):

    docker run --rm -it -v $PWD/logs:/app/logs nest-starters python src/worker.py
    
  3. Run the container (API server):

    docker run --rm -it -v $PWD/logs:/app/logs -p 8000:8000 nest-starters uvicorn src.main:app --host 0.0.0.0 --port 8000
    
  • The Dockerfile can be overridden to run either the worker or the API server.
  • The .dockerignore file ensures your build context is clean and fast.
  • You can override the CMD to run tests or other scripts as needed:
    docker run --rm -it nest-starters python -m pytest --import-mode=importlib tests/
    

Continuous Integration (CI)

  • Use GitHub Actions or similar CI to run:
    - name: Install deps
      run: |
        pip install -r requirements.txt
        pip install -e .
    - name: Run tests
      run: pytest --import-mode=importlib tests/
    

Secrets and Environment Variables

  • Place your GCP credentials JSON file in a secrets/ directory at the project root (not tracked by git).
  • In your .env file (in the secrets/ directory), set:
    GOOGLE_APPLICATION_CREDENTIALS=secrets/your-service-account.json
    
  • The worker will automatically load .env from secrets/.
  • For Docker/CI, mount the secrets/ directory and ensure the .env file and credentials are present.
  • Never commit secrets or credentials to version control!

Installation

You can install the package from a GitHub Release:

pip install https://github.com/youruser/dist-gcs-pdf-processing/releases/download/v0.1.0/dist_gcs_pdf_processing-0.1.0-py3-none-any.whl

CLI Usage

After installation, you can run:

dist-gcs-worker  # Start the background worker

dist-gcs-api     # Start the FastAPI API server (with all endpoints)

Or, for advanced usage:

python -m dist_gcs_pdf_processing.worker
python -m dist_gcs_pdf_processing.main

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dist_gcs_pdf_processing-0.1.0.tar.gz (22.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dist_gcs_pdf_processing-0.1.0-py3-none-any.whl (18.6 kB view details)

Uploaded Python 3

File details

Details for the file dist_gcs_pdf_processing-0.1.0.tar.gz.

File metadata

  • Download URL: dist_gcs_pdf_processing-0.1.0.tar.gz
  • Upload date:
  • Size: 22.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.9

File hashes

Hashes for dist_gcs_pdf_processing-0.1.0.tar.gz
Algorithm Hash digest
SHA256 555217765e8e1e6a0f3218eb2d217a1e249f60c1e5fb9b36ec27ebf282ecbd10
MD5 821ac95841a8f462977042a71fcb5aae
BLAKE2b-256 9d96e6fe79e6705380ba45adfd52df082969188ee210be92ea468e148b9eb431

See more details on using hashes here.

File details

Details for the file dist_gcs_pdf_processing-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for dist_gcs_pdf_processing-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 66ac194e96bbc0dd43b884da450d5c9811db71b442cf20df879ae0110db734bf
MD5 9e96c42b109c8774115b5ca8013f7b56
BLAKE2b-256 6d3ed1f645d8f51561d316a112a6dad254ce477999790fa6ebfcf1b955ee07c9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page