Skip to main content

ETL pipeline for PDF processing with chunking and parquet storage optimized for embeddings

Project description

PDF Chunk Flow

CI PyPI version Python 3.9+ License: MIT Code Coverage

PDF Chunk Flow es un pipeline ETL (Extract, Transform, Load) profesional diseñado para procesar documentos PDF, dividirlos en chunks optimizados para embeddings y almacenarlos eficientemente en formato Parquet.

Ideal para aplicaciones de RAG (Retrieval-Augmented Generation), búsqueda semántica, y procesamiento de documentos a gran escala.

🚀 Características

  • 📥 Extract: Descarga PDFs desde URLs con manejo robusto de errores
  • ⚙️ Transform:
    • Extracción de texto con pypdf
    • División en chunks con overlap configurable
    • Optimizado para embeddings de 768 dimensiones (BERT, Sentence-Transformers)
    • Almacenamiento eficiente en formato Parquet con compresión Snappy
  • ✅ Load:
    • Validación exhaustiva de datos (7 checks de calidad)
    • Verificación de integridad
    • Generación de estadísticas
  • 📊 Logging completo: Trazabilidad end-to-end de todo el proceso
  • 🧪 Tests: 23+ tests unitarios con cobertura >90%
  • 🔄 CI/CD: GitHub Actions configurado para tests y publicación automática

📦 Instalación

Desde PyPI (cuando esté publicado)

pip install pdf-chunk-flow

Desde el repositorio

git clone https://github.com/facuvegaingenieer/pdf_chunk_flow.git
cd pdf_chunk_flow
pip install -e .

Para desarrollo

git clone https://github.com/facuvegaingenieer/pdf_chunk_flow.git
cd pdf_chunk_flow
pip install -e ".[dev]"

🎯 Uso Rápido

Forma simple (recomendada)

from main import MacroEtlPdfChunks

# Pipeline completo con una sola función
result = MacroEtlPdfChunks("https://ejemplo.com/documento.pdf")

if result:
    print(f"✅ Archivo procesado: {result}")
else:
    print("❌ Error en el procesamiento")

Uso desde Airflow

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
from main import MacroEtlPdfChunks

def process_pdf(**context):
    pdf_url = context['params']['pdf_url']
    result = MacroEtlPdfChunks(pdf_url)
    
    if not result:
        raise Exception(f"Failed to process PDF: {pdf_url}")
    
    return result

with DAG(
    'pdf_processing',
    start_date=datetime(2025, 1, 1),
    schedule_interval='@daily',
) as dag:
    
    process_task = PythonOperator(
        task_id='process_pdf',
        python_callable=process_pdf,
        params={'pdf_url': 'https://ejemplo.com/documento.pdf'},
    )

Pipeline completo (uso avanzado)

from extract.extract import pdf_extractor_instance
from tranform.transform import pdf_transformer_instance
from load.load import parquet_loader_instance

# Usar las instancias pre-configuradas
pdf_path = pdf_extractor_instance.extract("https://ejemplo.com/documento.pdf")
parquet_path = pdf_transformer_instance.transform(pdf_path)
final_path = parquet_loader_instance.load(parquet_path)

print(f"✅ Archivo procesado: {final_path}")

Ejemplo con URL del Banco Macro

from extract.extract import pdf_extractor
from tranform.transform import pdf_transformer
from load.load import parquet_loader

# URL sin extensión .pdf
url = "https://www.macro.com.ar/1517360615001"

# Pipeline completo
extractor = pdf_extractor()
pdf_path = extractor.extract(url)
# → Resultado: pdf/1517360615001.pdf

transformer = pdf_transformer(chunk_size=600, chunk_overlap=100)
parquet_path = transformer.transform(pdf_path)
# → Resultado: parquet/1517360615001.parquet

loader = parquet_loader()
final_path = loader.load(parquet_path)
# → Resultado: parquet_chunk/1517360615001.parquet

Uso modular

# Solo extraer PDF
from extract.extract import pdf_extractor

extractor = pdf_extractor(pdf_folder="mis_pdfs")
pdf_path = extractor.extract("https://ejemplo.com/documento.pdf")

# Solo transformar PDF existente
from tranform.transform import pdf_transformer

transformer = pdf_transformer(
    chunk_size=800,       # Personalizar tamaño
    chunk_overlap=150,    # Personalizar overlap
    output_folder="chunks"
)
parquet_path = transformer.transform("mi_documento.pdf")

# Solo validar parquet
from load.load import parquet_loader

loader = parquet_loader(output_folder="validated")
result = loader.load("mi_archivo.parquet")

📊 Estructura de Datos

Los chunks generados incluyen la siguiente metadata en formato Parquet:

Campo Tipo Descripción
chunk_id int ID único secuencial del chunk
chunk_text str Contenido del chunk
chunk_size int Tamaño en caracteres
start_position int Posición inicial en el texto original
end_position int Posición final en el texto original
has_overlap bool Indica si tiene overlap con el chunk anterior
timestamp str Timestamp de creación (ISO 8601)

Ejemplo de datos

import pandas as pd

df = pd.read_parquet("parquet_chunk/documento.parquet")
print(df.head())
   chunk_id  chunk_text                   chunk_size  start_position  end_position  has_overlap  timestamp
0         0  Contenido del primer chunk...        600               0           600        False  2025-10-13T...
1         1  ...parte del anterior más...         600             500          1100         True  2025-10-13T...
2         2  ...continuación con overlap         600            1000          1600         True  2025-10-13T...

🎨 Configuración Óptima para Embeddings

Para BERT-base / Sentence-Transformers (768 dims)

transformer = pdf_transformer(
    chunk_size=600,        # ≈180-240 tokens
    chunk_overlap=100      # ≈30-40 tokens de overlap
)

Para modelos con límite de 512 tokens

transformer = pdf_transformer(
    chunk_size=500,        # ≈150-200 tokens
    chunk_overlap=100      # Mantener contexto
)

Para modelos con contexto largo (>1024 tokens)

transformer = pdf_transformer(
    chunk_size=1000,       # ≈300-400 tokens
    chunk_overlap=150      # Mayor overlap para mejor contexto
)

📁 Estructura del Proyecto

pdf_chunk_flow/
├── extract/              # Módulo de extracción
│   ├── __init__.py
│   └── extract.py       # Descarga de PDFs
├── tranform/            # Módulo de transformación
│   ├── __init__.py
│   └── transform.py     # Chunking y Parquet
├── load/                # Módulo de carga/validación
│   ├── __init__.py
│   └── load.py          # Validación y copia
├── contracts/           # Interfaces abstractas
│   ├── __init__.py
│   └── contracts.py     # ABCs para ETL
├── tests/               # Tests unitarios
│   ├── test_extract.py
│   ├── test_transform.py
│   ├── test_load.py
│   └── conftest.py
├── .github/workflows/   # CI/CD
│   ├── ci.yml          # Tests automáticos
│   ├── publish.yml     # Publicación a PyPI
│   └── release.yml     # Creación de releases
├── main.py             # Script de ejemplo
├── pyproject.toml      # Configuración del proyecto
├── requirements.txt    # Dependencias
├── README.md          # Este archivo
├── LICENSE            # Licencia MIT
└── .gitignore         # Archivos ignorados

🧪 Tests

Ejecutar todos los tests:

pytest tests/

Con cobertura:

pytest tests/ --cov --cov-report=html

Tests específicos:

pytest tests/test_extract.py -v
pytest tests/test_transform.py -v
pytest tests/test_load.py -v

Cobertura actual

  • Extract: 95%
  • Transform: 92%
  • Load: 96%
  • Total: 94%

🔧 Configuración Avanzada

Variables de entorno

# Opcional: configurar carpetas por defecto
export PDF_FOLDER="mis_pdfs"
export PARQUET_FOLDER="mis_parquets"
export PARQUET_CHUNK_FOLDER="chunks_validados"

Logging personalizado

import logging

# Configurar nivel de logging
logging.basicConfig(
    level=logging.DEBUG,  # DEBUG, INFO, WARNING, ERROR
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

📈 Roadmap

  • Pipeline ETL básico
  • Tests unitarios
  • CI/CD con GitHub Actions
  • Documentación completa
  • Publicación en PyPI
  • Soporte para múltiples formatos (DOCX, TXT, HTML)
  • Procesamiento paralelo de múltiples PDFs
  • CLI (Command Line Interface)
  • Integración con bases de datos vectoriales
  • Docker container
  • Dashboard de monitoreo

🤝 Contribuir

¡Las contribuciones son bienvenidas! Por favor:

  1. Fork el repositorio
  2. Crea una rama para tu feature (git checkout -b feature/AmazingFeature)
  3. Commit tus cambios (git commit -m 'Add some AmazingFeature')
  4. Push a la rama (git push origin feature/AmazingFeature)
  5. Abre un Pull Request

Guía de desarrollo

# Clonar el repositorio
git clone https://github.com/facuvegaingenieer/pdf_chunk_flow.git
cd pdf_chunk_flow

# Crear entorno virtual
python -m venv venv
source venv/bin/activate  # En Windows: venv\Scripts\activate

# Instalar en modo desarrollo
pip install -e ".[dev]"

# Ejecutar tests
pytest tests/ -v

# Verificar cobertura
pytest tests/ --cov --cov-report=html

📄 Licencia

Este proyecto está bajo la Licencia MIT. Ver el archivo LICENSE para más detalles.

👤 Autor

Facundo Vega

🙏 Agradecimientos

  • pypdf - Extracción de texto de PDFs
  • pandas - Manipulación de datos
  • pyarrow - Formato Parquet
  • pytest - Framework de testing

📝 Changelog

[0.1.0] - 2025-10-13

Añadido

  • Pipeline ETL completo (Extract, Transform, Load)
  • Soporte para chunking con overlap
  • Validación exhaustiva de parquets
  • 23+ tests unitarios
  • CI/CD con GitHub Actions
  • Documentación completa
  • Logging end-to-end

🔗 Enlaces Útiles


¿Te gusta el proyecto? ¡Dale una ⭐ en GitHub!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pdf_chunk_flow-0.1.1.tar.gz (19.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pdf_chunk_flow-0.1.1-py3-none-any.whl (14.6 kB view details)

Uploaded Python 3

File details

Details for the file pdf_chunk_flow-0.1.1.tar.gz.

File metadata

  • Download URL: pdf_chunk_flow-0.1.1.tar.gz
  • Upload date:
  • Size: 19.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.13

File hashes

Hashes for pdf_chunk_flow-0.1.1.tar.gz
Algorithm Hash digest
SHA256 e8828a7b2e81dec1e20e2c81671445b01f1a14cba38f46506ea739f9b546cef3
MD5 c8545bc7b53785e48bc21bbe24cbb03a
BLAKE2b-256 1d6f77246886b199329c4ac26094db45387ceffc106d5b44fc853c1310d91863

See more details on using hashes here.

File details

Details for the file pdf_chunk_flow-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: pdf_chunk_flow-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 14.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.13

File hashes

Hashes for pdf_chunk_flow-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 69defb2c87852b35812d8ad2acb9ef048476a24bc3a0c375fcb967a2f647d867
MD5 e2e2f20e940bbe9cbeaa1814cc64d47e
BLAKE2b-256 ac199546fd339a04621026b8a5a03b19b60036419abebb458667ae1e361af66d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page