Distributed, scalable GCS PDF processing pipeline with Gemini OCR, Redis, and API endpoints.
Project description
DCPR PDF Processing Pipeline
Overview
This service processes PDFs from a Google Cloud Storage (GCS) bucket, performs OCR using the Gemini API, and uploads clean, text-based PDFs to a destination GCS folder. It features robust error handling, logging, monitoring, and is designed for scalable, production use.
Setup & Environment
-
Clone the repository and navigate to the project root:
git clone <repo-url> cd nest-starters
-
Install dependencies:
pip install -r requirements.txt pip install -e .
-
Environment Variables:
- Place your
.envfile in thesecrets/directory at the project root. - Example
.envvariables:GCS_BUCKET=your-bucket-name GCS_SOURCE_PREFIX=source-folder GCS_DEST_PREFIX=dest-folder GEMINI_API_KEY=your-gemini-api-key SUPABASE_URL=https://your-supabase-url SUPABASE_API_KEY=your-supabase-api-key MAX_RETRIES=3 GEMINI_GLOBAL_CONCURRENCY=10 MAX_CONCURRENT_FILES=3 PAGE_MAX_WORKERS=5 DOC_BATCH_SIZE=10 MAX_QUEUE=100 POLL_INTERVAL=30 G_MESSAGES_DEBUG=none G_DEBUG=fatal-warnings
- Place your
Running the Worker & API
You can run the background worker or the FastAPI API server:
Run the Worker (background processing only)
python src/worker.py
Run the FastAPI API (with all endpoints)
uvicorn src.main:app --reload
- The API exposes health, status, logs, metrics, config, and file processing endpoints.
- The worker will start automatically in the background when the API starts.
Logging & Monitoring
- Logs:
- Human-readable logs:
/logs/worker.log(daily rotation) - JSON logs:
/logs/json/YYYY-MM-DD.json - Dead letter logs:
/logs/dead_letter/dead_letter.log
- Human-readable logs:
- Supabase:
- Persistent errors are logged to the
Activity_Error_Logtable for monitoring.
- Persistent errors are logged to the
- Suppressing GTK/GLib output:
- Set in
.envand at the top ofmain.pyandworker.py.
- Set in
Error Handling
- Retries for transient errors (network, quota, etc.) with configurable limits.
- Per-page retries: Each page is retried up to
MAX_RETRIEStimes before being skipped. - Per-file retries: If a file fails (e.g., page count mismatch), the whole file is retried up to
MAX_RETRIEStimes. - All persistent errors are logged to file, JSON, dead letter, and Supabase.
Scalability, Concurrency & Throttling
- Rolling Concurrency Model:
- The worker always keeps up to
MAX_CONCURRENT_FILESfiles in progress. - As soon as a file finishes, the next available file is picked up, until all are processed.
- This ensures maximum throughput and efficient resource usage.
- The worker always keeps up to
- Per-Page Concurrency:
- Each file's pages are OCRed in parallel, up to
PAGE_MAX_WORKERSat a time.
- Each file's pages are OCRed in parallel, up to
- Global Gemini API Throttling:
- All Gemini API requests (across all files and pages) are globally throttled by
GEMINI_GLOBAL_CONCURRENCY. - This ensures you never exceed your API quota or rate limits.
- All Gemini API requests (across all files and pages) are globally throttled by
- Backpressure:
- If too many files are queued (
MAX_QUEUE), the worker will pause and log a warning.
- If too many files are queued (
- Horizontal scaling:
- Run multiple stateless worker instances on different machines/VMs for even more throughput.
Temp/Log Cleanup
- Files in logs, logs/json, logs/dead_letter, staging, and processed older than 200 days are deleted before the worker starts.
Tests
- Unit and integration tests are located in
/tests. - Tests cover:
- PDF splitting/merging
- Per-page and per-file retry logic
- File-level rolling concurrency (ensuring the concurrency window is always full)
- Global Gemini API throttling
- Trace ID propagation in logs
- To run tests:
pytest
CI/CD
- GitHub Actions workflow runs linting and tests on every push.
- Example workflow file:
.github/workflows/ci.yml.
Additional Notes
- All print/log statements are also written to log files.
- Trace/request IDs are used for end-to-end traceability.
- For any persistent errors, check Supabase and the dead letter log for details.
Project Structure
project-root/
├── src/ # All main code (import as src.module)
├── tests/ # All tests (import as from src.module import ...)
├── logs/ # Log output
├── secrets/ # Secrets and credentials
│ └── your-service-account.json
│ └── .env
├── requirements.txt # Python dependencies
├── setup.py # For pip install -e .
├── Dockerfile
├── README.md
Local Development
-
Install dependencies:
pip install -r requirements.txt pip install -e .
-
Run the worker:
python -m src.worker # or, if you have an entrypoint script, use that
-
Run tests:
pytest --import-mode=importlib tests/ # or, if you have trouble with imports: PYTHONPATH=. pytest tests/
Docker Usage
-
Build the Docker image:
docker build -t nest-starters .
-
Run the container (worker only):
docker run --rm -it -v $PWD/logs:/app/logs nest-starters python src/worker.py
-
Run the container (API server):
docker run --rm -it -v $PWD/logs:/app/logs -p 8000:8000 nest-starters uvicorn src.main:app --host 0.0.0.0 --port 8000
- The Dockerfile can be overridden to run either the worker or the API server.
- The
.dockerignorefile ensures your build context is clean and fast. - You can override the CMD to run tests or other scripts as needed:
docker run --rm -it nest-starters python -m pytest --import-mode=importlib tests/
Continuous Integration (CI)
- Use GitHub Actions or similar CI to run:
- name: Install deps run: | pip install -r requirements.txt pip install -e . - name: Run tests run: pytest --import-mode=importlib tests/
Secrets and Environment Variables
- Place your GCP credentials JSON file in a
secrets/directory at the project root (not tracked by git). - In your
.envfile (in thesecrets/directory), set:GOOGLE_APPLICATION_CREDENTIALS=secrets/your-service-account.json - The worker will automatically load
.envfromsecrets/. - For Docker/CI, mount the
secrets/directory and ensure the.envfile and credentials are present. - Never commit secrets or credentials to version control!
Installation
You can install the package from a GitHub Release:
pip install https://github.com/youruser/dist-gcs-pdf-processing/releases/download/v0.1.0/dist_gcs_pdf_processing-0.1.0-py3-none-any.whl
CLI Usage
After installation, you can run:
dist-gcs-worker # Start the background worker
dist-gcs-api # Start the FastAPI API server (with all endpoints)
Or, for advanced usage:
python -m dist_gcs_pdf_processing.worker
python -m dist_gcs_pdf_processing.main
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dist_gcs_pdf_processing-0.1.0.tar.gz.
File metadata
- Download URL: dist_gcs_pdf_processing-0.1.0.tar.gz
- Upload date:
- Size: 22.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
555217765e8e1e6a0f3218eb2d217a1e249f60c1e5fb9b36ec27ebf282ecbd10
|
|
| MD5 |
821ac95841a8f462977042a71fcb5aae
|
|
| BLAKE2b-256 |
9d96e6fe79e6705380ba45adfd52df082969188ee210be92ea468e148b9eb431
|
File details
Details for the file dist_gcs_pdf_processing-0.1.0-py3-none-any.whl.
File metadata
- Download URL: dist_gcs_pdf_processing-0.1.0-py3-none-any.whl
- Upload date:
- Size: 18.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
66ac194e96bbc0dd43b884da450d5c9811db71b442cf20df879ae0110db734bf
|
|
| MD5 |
9e96c42b109c8774115b5ca8013f7b56
|
|
| BLAKE2b-256 |
6d3ed1f645d8f51561d316a112a6dad254ce477999790fa6ebfcf1b955ee07c9
|