Una librería modular para construir data pipelines con arquitectura medallion
Project description
Medallion ETL
Una librería modular para construir data pipelines con arquitectura medallion (Bronze-Silver-Gold).
🚀 Características
- Arquitectura Medallion: Implementación completa del patrón Bronze-Silver-Gold
- CLI Integrado: Comandos para inicializar proyectos y crear pipelines
- Modular y Extensible: Componentes reutilizables para cada capa del proceso
- Validación de Datos: Esquemas con Pydantic para garantizar calidad de datos
- Procesamiento Eficiente: Powered by Polars para manejo de grandes volúmenes
- Orquestación: Integración nativa con Prefect para workflows complejos
- Conectores: Soporte para múltiples fuentes de datos (CSV, JSON, SQL, APIs)
- Logging Avanzado: Sistema de logging estructurado con Rich
📋 Requisitos
- Python 3.11+
- polars >= 1.30
- pydantic >= 2.7
- sqlalchemy >= 2.0
- prefect >= 3.0
- requests >= 2.25.0
- rich >= 14.0.0
📦 Instalación
pip install medallion-etl
O desde el código fuente:
git clone https://github.com/JuanManiglia/medallion-etl.git
cd medallion-etl
pip install -e .
🛠️ Comandos CLI
Inicializar un nuevo proyecto
medallion-etl init
O especificar un directorio:
medallion-etl init --project-dir mi_proyecto
Esto creará la siguiente estructura:
mi_proyecto/
├── config.py # Configuración del proyecto
├── main.py # Script principal
├── README.md # Documentación del proyecto
├── data/ # Directorio para datos
│ ├── bronze/ # Datos crudos (raw)
│ ├── silver/ # Datos validados y limpios
│ └── gold/ # Datos transformados y agregados
├── logs/ # Logs del proyecto
├── pipelines/ # Definiciones de pipelines
└── schemas/ # Esquemas de datos (Pydantic)
Crear un nuevo pipeline
medallion-etl create-pipeline MiPipeline
Esto generará:
pipelines/mipipeline_pipeline.py- Definición del pipelineschemas/mipipeline_schema.py- Esquema de datos con Pydantic
🏗️ Arquitectura Medallion
🥉 Bronze Layer (Datos Crudos)
- Propósito: Ingesta de datos en su formato original
- Extractores disponibles:
CSVExtractor- Archivos CSVJSONExtractor- Archivos JSONSQLExtractor- Bases de datos SQLAPIExtractor- APIs REST
🥈 Silver Layer (Datos Validados)
- Propósito: Validación, limpieza y normalización
- Componentes:
SchemaValidator- Validación con esquemas PydanticDataCleaner- Limpieza de datos (duplicados, nulos)DataNormalizer- Normalización de formatos
🥇 Gold Layer (Datos Transformados)
- Propósito: Agregaciones y transformaciones para análisis
- Transformadores:
Aggregator- Agregaciones (sum, mean, count, etc.)DataJoiner- Unión de datasetsFeatureEngineer- Creación de nuevas características
🔧 Uso Básico
1. Crear un proyecto
medallion-etl init --project-dir mi_etl_project
cd mi_etl_project
2. Crear un pipeline personalizado
medallion-etl create-pipeline Ventas
3. Configurar el esquema de datos
Edita schemas/ventas_schema.py:
from datetime import datetime
from typing import Optional
from medallion_etl.schemas import BaseSchema
class VentasSchema(BaseSchema):
id: int
producto: str
cantidad: int
precio: float
fecha: datetime
cliente: Optional[str] = None
4. Personalizar el pipeline
Edita pipelines/ventas_pipeline.py según tus necesidades.
5. Ejecutar el pipeline
python main.py --pipeline ventas --input data/ventas.csv
📊 Ejemplo de Pipeline Completo
from medallion_etl.core import Pipeline
from medallion_etl.bronze import CSVExtractor
from medallion_etl.silver import SchemaValidator, DataCleaner
from medallion_etl.gold import Aggregator
from schemas.ventas_schema import VentasSchema
def create_sales_pipeline():
pipeline = Pipeline(name="SalesPipeline")
# Bronze: Extraer datos
extractor = CSVExtractor(
name="SalesExtractor",
output_path=config.bronze_dir / "sales"
)
pipeline.add_task(extractor)
# Silver: Validar y limpiar
validator = SchemaValidator(
schema_model=VentasSchema,
name="SalesValidator"
)
pipeline.add_task(validator)
cleaner = DataCleaner(
name="SalesCleaner",
drop_na=True,
drop_duplicates=True
)
pipeline.add_task(cleaner)
# Gold: Agregar datos
aggregator = Aggregator(
group_by=["producto"],
aggregations={
"cantidad": "sum",
"precio": "mean"
},
name="SalesAggregator"
)
pipeline.add_task(aggregator)
return pipeline
# Ejecutar pipeline
pipeline = create_sales_pipeline()
result = pipeline.run("data/ventas.csv")
🔌 Conectores Disponibles
Extractores (Bronze)
- CSVExtractor: Archivos CSV con configuración flexible
- JSONExtractor: Archivos JSON y JSONL
- SQLExtractor: Bases de datos relacionales
- APIExtractor: APIs REST con autenticación
- FileExtractor: Extractor base para otros formatos
Validadores (Silver)
- SchemaValidator: Validación con esquemas Pydantic
- DataCleaner: Limpieza automática de datos
- DataNormalizer: Normalización de tipos y formatos
Transformadores (Gold)
- Aggregator: Agregaciones grupales
- DataJoiner: Unión de múltiples datasets
- FeatureEngineer: Creación de características derivadas
🔧 Configuración
La configuración se maneja a través de la clase MedallionConfig:
from medallion_etl.config import MedallionConfig
config = MedallionConfig(
bronze_dir="data/bronze",
silver_dir="data/silver",
gold_dir="data/gold",
log_dir="logs",
log_level="INFO"
)
🚀 Integración con Prefect
Convierte cualquier pipeline en un flow de Prefect:
from prefect import serve
pipeline = create_sales_pipeline()
flow = pipeline.as_prefect_flow(name="sales-etl")
# Desplegar como servicio
serve(flow)
📝 Logging
Sistema de logging estructurado con Rich:
from medallion_etl.utils import logger
logger.info("Pipeline iniciado", extra={"pipeline": "sales"})
logger.error("Error en validación", extra={"records_failed": 10})
🤝 Contribuir
- Fork el repositorio
- Crea una rama para tu feature (
git checkout -b feature/nueva-funcionalidad) - Commit tus cambios (
git commit -am 'Agregar nueva funcionalidad') - Push a la rama (
git push origin feature/nueva-funcionalidad) - Crea un Pull Request
📄 Licencia
Este proyecto está bajo la Licencia MIT. Ver el archivo LICENSE para más detalles.
🔗 Enlaces
- Repositorio: https://github.com/JuanManiglia/medallion-etl
- Documentación: [En desarrollo]
- Issues: https://github.com/JuanManiglia/medallion-etl/issues
Medallion ETL - Construye pipelines de datos robustos y escalables con arquitectura medallion 🏅
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file medallion_etl-0.1.29.tar.gz.
File metadata
- Download URL: medallion_etl-0.1.29.tar.gz
- Upload date:
- Size: 31.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.3 CPython/3.12.10 Windows/11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a04e6b3b1eb3236092ffc65525ee936bca6ef082be276a164403adb5b080e6fb
|
|
| MD5 |
87d91325b2386fee91502fe22573bb11
|
|
| BLAKE2b-256 |
88c6d08c59a80ef76016218f72ebaacbffebe66ec9c98282e8c50c3c0ee35e2e
|
File details
Details for the file medallion_etl-0.1.29-py3-none-any.whl.
File metadata
- Download URL: medallion_etl-0.1.29-py3-none-any.whl
- Upload date:
- Size: 31.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.3 CPython/3.12.10 Windows/11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
72ffb75eb6d2ead19df697a55ab3d22884a0b1211b72620529a2d47aab8f2938
|
|
| MD5 |
73e69c46dd2f9ae3f38ac0a88e0dd3d1
|
|
| BLAKE2b-256 |
8d178d18ff844b3ac3e85a868a1f123eac3e1b8ead46ecd286896b3cd94a1077
|