ETL pipeline for PDF processing with chunking and parquet storage optimized for embeddings
Project description
PDF Chunk Flow
PDF Chunk Flow es un pipeline ETL (Extract, Transform, Load) profesional diseñado para procesar documentos PDF, dividirlos en chunks optimizados para embeddings y almacenarlos eficientemente en formato Parquet.
Ideal para aplicaciones de RAG (Retrieval-Augmented Generation), búsqueda semántica, y procesamiento de documentos a gran escala.
🚀 Características
- 📥 Extract: Descarga PDFs desde URLs con manejo robusto de errores
- ⚙️ Transform:
- Extracción de texto con
pypdf - División en chunks con overlap configurable
- Optimizado para embeddings de 768 dimensiones (BERT, Sentence-Transformers)
- Almacenamiento eficiente en formato Parquet con compresión Snappy
- Extracción de texto con
- ✅ Load:
- Validación exhaustiva de datos (7 checks de calidad)
- Verificación de integridad
- Generación de estadísticas
- 📊 Logging completo: Trazabilidad end-to-end de todo el proceso
- 🧪 Tests: 23+ tests unitarios con cobertura >90%
- 🔄 CI/CD: GitHub Actions configurado para tests y publicación automática
📦 Instalación
Desde PyPI (cuando esté publicado)
pip install pdf-chunk-flow
Desde el repositorio
git clone https://github.com/facuvegaingenieer/pdf_chunk_flow.git
cd pdf_chunk_flow
pip install -e .
Para desarrollo
git clone https://github.com/facuvegaingenieer/pdf_chunk_flow.git
cd pdf_chunk_flow
pip install -e ".[dev]"
🎯 Uso Rápido
Forma simple (recomendada)
from main import MacroEtlPdfChunks
# Pipeline completo con una sola función
result = MacroEtlPdfChunks("https://ejemplo.com/documento.pdf")
if result:
print(f"✅ Archivo procesado: {result}")
else:
print("❌ Error en el procesamiento")
Uso desde Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
from main import MacroEtlPdfChunks
def process_pdf(**context):
pdf_url = context['params']['pdf_url']
result = MacroEtlPdfChunks(pdf_url)
if not result:
raise Exception(f"Failed to process PDF: {pdf_url}")
return result
with DAG(
'pdf_processing',
start_date=datetime(2025, 1, 1),
schedule_interval='@daily',
) as dag:
process_task = PythonOperator(
task_id='process_pdf',
python_callable=process_pdf,
params={'pdf_url': 'https://ejemplo.com/documento.pdf'},
)
Pipeline completo (uso avanzado)
from extract.extract import pdf_extractor_instance
from tranform.transform import pdf_transformer_instance
from load.load import parquet_loader_instance
# Usar las instancias pre-configuradas
pdf_path = pdf_extractor_instance.extract("https://ejemplo.com/documento.pdf")
parquet_path = pdf_transformer_instance.transform(pdf_path)
final_path = parquet_loader_instance.load(parquet_path)
print(f"✅ Archivo procesado: {final_path}")
Ejemplo con URL del Banco Macro
from extract.extract import pdf_extractor
from tranform.transform import pdf_transformer
from load.load import parquet_loader
# URL sin extensión .pdf
url = "https://www.macro.com.ar/1517360615001"
# Pipeline completo
extractor = pdf_extractor()
pdf_path = extractor.extract(url)
# → Resultado: pdf/1517360615001.pdf
transformer = pdf_transformer(chunk_size=600, chunk_overlap=100)
parquet_path = transformer.transform(pdf_path)
# → Resultado: parquet/1517360615001.parquet
loader = parquet_loader()
final_path = loader.load(parquet_path)
# → Resultado: parquet_chunk/1517360615001.parquet
Uso modular
# Solo extraer PDF
from extract.extract import pdf_extractor
extractor = pdf_extractor(pdf_folder="mis_pdfs")
pdf_path = extractor.extract("https://ejemplo.com/documento.pdf")
# Solo transformar PDF existente
from tranform.transform import pdf_transformer
transformer = pdf_transformer(
chunk_size=800, # Personalizar tamaño
chunk_overlap=150, # Personalizar overlap
output_folder="chunks"
)
parquet_path = transformer.transform("mi_documento.pdf")
# Solo validar parquet
from load.load import parquet_loader
loader = parquet_loader(output_folder="validated")
result = loader.load("mi_archivo.parquet")
📊 Estructura de Datos
Los chunks generados incluyen la siguiente metadata en formato Parquet:
| Campo | Tipo | Descripción |
|---|---|---|
chunk_id |
int | ID único secuencial del chunk |
chunk_text |
str | Contenido del chunk |
chunk_size |
int | Tamaño en caracteres |
start_position |
int | Posición inicial en el texto original |
end_position |
int | Posición final en el texto original |
has_overlap |
bool | Indica si tiene overlap con el chunk anterior |
timestamp |
str | Timestamp de creación (ISO 8601) |
Ejemplo de datos
import pandas as pd
df = pd.read_parquet("parquet_chunk/documento.parquet")
print(df.head())
chunk_id chunk_text chunk_size start_position end_position has_overlap timestamp
0 0 Contenido del primer chunk... 600 0 600 False 2025-10-13T...
1 1 ...parte del anterior más... 600 500 1100 True 2025-10-13T...
2 2 ...continuación con overlap 600 1000 1600 True 2025-10-13T...
🎨 Configuración Óptima para Embeddings
Para BERT-base / Sentence-Transformers (768 dims)
transformer = pdf_transformer(
chunk_size=600, # ≈180-240 tokens
chunk_overlap=100 # ≈30-40 tokens de overlap
)
Para modelos con límite de 512 tokens
transformer = pdf_transformer(
chunk_size=500, # ≈150-200 tokens
chunk_overlap=100 # Mantener contexto
)
Para modelos con contexto largo (>1024 tokens)
transformer = pdf_transformer(
chunk_size=1000, # ≈300-400 tokens
chunk_overlap=150 # Mayor overlap para mejor contexto
)
📁 Estructura del Proyecto
pdf_chunk_flow/
├── extract/ # Módulo de extracción
│ ├── __init__.py
│ └── extract.py # Descarga de PDFs
├── tranform/ # Módulo de transformación
│ ├── __init__.py
│ └── transform.py # Chunking y Parquet
├── load/ # Módulo de carga/validación
│ ├── __init__.py
│ └── load.py # Validación y copia
├── contracts/ # Interfaces abstractas
│ ├── __init__.py
│ └── contracts.py # ABCs para ETL
├── tests/ # Tests unitarios
│ ├── test_extract.py
│ ├── test_transform.py
│ ├── test_load.py
│ └── conftest.py
├── .github/workflows/ # CI/CD
│ ├── ci.yml # Tests automáticos
│ ├── publish.yml # Publicación a PyPI
│ └── release.yml # Creación de releases
├── main.py # Script de ejemplo
├── pyproject.toml # Configuración del proyecto
├── requirements.txt # Dependencias
├── README.md # Este archivo
├── LICENSE # Licencia MIT
└── .gitignore # Archivos ignorados
🧪 Tests
Ejecutar todos los tests:
pytest tests/
Con cobertura:
pytest tests/ --cov --cov-report=html
Tests específicos:
pytest tests/test_extract.py -v
pytest tests/test_transform.py -v
pytest tests/test_load.py -v
Cobertura actual
- Extract: 95%
- Transform: 92%
- Load: 96%
- Total: 94%
🔧 Configuración Avanzada
Variables de entorno
# Opcional: configurar carpetas por defecto
export PDF_FOLDER="mis_pdfs"
export PARQUET_FOLDER="mis_parquets"
export PARQUET_CHUNK_FOLDER="chunks_validados"
Logging personalizado
import logging
# Configurar nivel de logging
logging.basicConfig(
level=logging.DEBUG, # DEBUG, INFO, WARNING, ERROR
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
📈 Roadmap
- Pipeline ETL básico
- Tests unitarios
- CI/CD con GitHub Actions
- Documentación completa
- Publicación en PyPI
- Soporte para múltiples formatos (DOCX, TXT, HTML)
- Procesamiento paralelo de múltiples PDFs
- CLI (Command Line Interface)
- Integración con bases de datos vectoriales
- Docker container
- Dashboard de monitoreo
🤝 Contribuir
¡Las contribuciones son bienvenidas! Por favor:
- Fork el repositorio
- Crea una rama para tu feature (
git checkout -b feature/AmazingFeature) - Commit tus cambios (
git commit -m 'Add some AmazingFeature') - Push a la rama (
git push origin feature/AmazingFeature) - Abre un Pull Request
Guía de desarrollo
# Clonar el repositorio
git clone https://github.com/facuvegaingenieer/pdf_chunk_flow.git
cd pdf_chunk_flow
# Crear entorno virtual
python -m venv venv
source venv/bin/activate # En Windows: venv\Scripts\activate
# Instalar en modo desarrollo
pip install -e ".[dev]"
# Ejecutar tests
pytest tests/ -v
# Verificar cobertura
pytest tests/ --cov --cov-report=html
📄 Licencia
Este proyecto está bajo la Licencia MIT. Ver el archivo LICENSE para más detalles.
👤 Autor
Facundo Vega
- GitHub: @facuvegaingenieer
- Email: facundo.vega1234@gmail.com
🙏 Agradecimientos
- pypdf - Extracción de texto de PDFs
- pandas - Manipulación de datos
- pyarrow - Formato Parquet
- pytest - Framework de testing
📝 Changelog
[0.1.0] - 2025-10-13
Añadido
- Pipeline ETL completo (Extract, Transform, Load)
- Soporte para chunking con overlap
- Validación exhaustiva de parquets
- 23+ tests unitarios
- CI/CD con GitHub Actions
- Documentación completa
- Logging end-to-end
🔗 Enlaces Útiles
¿Te gusta el proyecto? ¡Dale una ⭐ en GitHub!
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pdf_chunk_flow-0.1.1.tar.gz.
File metadata
- Download URL: pdf_chunk_flow-0.1.1.tar.gz
- Upload date:
- Size: 19.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e8828a7b2e81dec1e20e2c81671445b01f1a14cba38f46506ea739f9b546cef3
|
|
| MD5 |
c8545bc7b53785e48bc21bbe24cbb03a
|
|
| BLAKE2b-256 |
1d6f77246886b199329c4ac26094db45387ceffc106d5b44fc853c1310d91863
|
File details
Details for the file pdf_chunk_flow-0.1.1-py3-none-any.whl.
File metadata
- Download URL: pdf_chunk_flow-0.1.1-py3-none-any.whl
- Upload date:
- Size: 14.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
69defb2c87852b35812d8ad2acb9ef048476a24bc3a0c375fcb967a2f647d867
|
|
| MD5 |
e2e2f20e940bbe9cbeaa1814cc64d47e
|
|
| BLAKE2b-256 |
ac199546fd339a04621026b8a5a03b19b60036419abebb458667ae1e361af66d
|