Skip to main content

Una librería modular para construir data pipelines con arquitectura medallion

Project description

Medallion ETL

Una librería modular para construir data pipelines con arquitectura medallion (Bronze-Silver-Gold).

🚀 Características

  • Arquitectura Medallion: Implementación completa del patrón Bronze-Silver-Gold
  • CLI Integrado: Comandos para inicializar proyectos y crear pipelines
  • Modular y Extensible: Componentes reutilizables para cada capa del proceso
  • Validación de Datos: Esquemas con Pydantic para garantizar calidad de datos
  • Procesamiento Eficiente: Powered by Polars para manejo de grandes volúmenes
  • Orquestación: Integración nativa con Prefect para workflows complejos
  • Conectores: Soporte para múltiples fuentes de datos (CSV, JSON, SQL, APIs)
  • Logging Avanzado: Sistema de logging estructurado con Rich

📋 Requisitos

  • Python 3.11+
  • polars >= 1.30
  • pydantic >= 2.7
  • sqlalchemy >= 2.0
  • prefect >= 3.0
  • requests >= 2.25.0
  • rich >= 14.0.0

📦 Instalación

pip install medallion-etl

O desde el código fuente:

git clone https://github.com/JuanManiglia/medallion-etl.git
cd medallion-etl
pip install -e .

🛠️ Comandos CLI

Inicializar un nuevo proyecto

medallion-etl init

O especificar un directorio:

medallion-etl init --project-dir mi_proyecto

Esto creará la siguiente estructura:

mi_proyecto/
├── config.py              # Configuración del proyecto
├── main.py                 # Script principal
├── README.md              # Documentación del proyecto
├── data/                  # Directorio para datos
│   ├── bronze/           # Datos crudos (raw)
│   ├── silver/           # Datos validados y limpios
│   └── gold/             # Datos transformados y agregados
├── logs/                  # Logs del proyecto
├── pipelines/             # Definiciones de pipelines
└── schemas/               # Esquemas de datos (Pydantic)

Crear un nuevo pipeline

medallion-etl create-pipeline MiPipeline

Esto generará:

  • pipelines/mipipeline_pipeline.py - Definición del pipeline
  • schemas/mipipeline_schema.py - Esquema de datos con Pydantic

🏗️ Arquitectura Medallion

🥉 Bronze Layer (Datos Crudos)

  • Propósito: Ingesta de datos en su formato original
  • Extractores disponibles:
    • CSVExtractor - Archivos CSV
    • JSONExtractor - Archivos JSON
    • SQLExtractor - Bases de datos SQL
    • APIExtractor - APIs REST

🥈 Silver Layer (Datos Validados)

  • Propósito: Validación, limpieza y normalización
  • Componentes:
    • SchemaValidator - Validación con esquemas Pydantic
    • DataCleaner - Limpieza de datos (duplicados, nulos)
    • DataNormalizer - Normalización de formatos

🥇 Gold Layer (Datos Transformados)

  • Propósito: Agregaciones y transformaciones para análisis
  • Transformadores:
    • Aggregator - Agregaciones (sum, mean, count, etc.)
    • DataJoiner - Unión de datasets
    • FeatureEngineer - Creación de nuevas características

🔧 Uso Básico

1. Crear un proyecto

medallion-etl init --project-dir mi_etl_project
cd mi_etl_project

2. Crear un pipeline personalizado

medallion-etl create-pipeline Ventas

3. Configurar el esquema de datos

Edita schemas/ventas_schema.py:

from datetime import datetime
from typing import Optional
from medallion_etl.schemas import BaseSchema

class VentasSchema(BaseSchema):
    id: int
    producto: str
    cantidad: int
    precio: float
    fecha: datetime
    cliente: Optional[str] = None

4. Personalizar el pipeline

Edita pipelines/ventas_pipeline.py según tus necesidades.

5. Ejecutar el pipeline

python main.py --pipeline ventas --input data/ventas.csv

📊 Ejemplo de Pipeline Completo

from medallion_etl.core import Pipeline
from medallion_etl.bronze import CSVExtractor
from medallion_etl.silver import SchemaValidator, DataCleaner
from medallion_etl.gold import Aggregator
from schemas.ventas_schema import VentasSchema

def create_sales_pipeline():
    pipeline = Pipeline(name="SalesPipeline")
    
    # Bronze: Extraer datos
    extractor = CSVExtractor(
        name="SalesExtractor",
        output_path=config.bronze_dir / "sales"
    )
    pipeline.add_task(extractor)
    
    # Silver: Validar y limpiar
    validator = SchemaValidator(
        schema_model=VentasSchema,
        name="SalesValidator"
    )
    pipeline.add_task(validator)
    
    cleaner = DataCleaner(
        name="SalesCleaner",
        drop_na=True,
        drop_duplicates=True
    )
    pipeline.add_task(cleaner)
    
    # Gold: Agregar datos
    aggregator = Aggregator(
        group_by=["producto"],
        aggregations={
            "cantidad": "sum",
            "precio": "mean"
        },
        name="SalesAggregator"
    )
    pipeline.add_task(aggregator)
    
    return pipeline

# Ejecutar pipeline
pipeline = create_sales_pipeline()
result = pipeline.run("data/ventas.csv")

🔌 Conectores Disponibles

Extractores (Bronze)

  • CSVExtractor: Archivos CSV con configuración flexible
  • JSONExtractor: Archivos JSON y JSONL
  • SQLExtractor: Bases de datos relacionales
  • APIExtractor: APIs REST con autenticación
  • FileExtractor: Extractor base para otros formatos

Validadores (Silver)

  • SchemaValidator: Validación con esquemas Pydantic
  • DataCleaner: Limpieza automática de datos
  • DataNormalizer: Normalización de tipos y formatos

Transformadores (Gold)

  • Aggregator: Agregaciones grupales
  • DataJoiner: Unión de múltiples datasets
  • FeatureEngineer: Creación de características derivadas

🔧 Configuración

La configuración se maneja a través de la clase MedallionConfig:

from medallion_etl.config import MedallionConfig

config = MedallionConfig(
    bronze_dir="data/bronze",
    silver_dir="data/silver", 
    gold_dir="data/gold",
    log_dir="logs",
    log_level="INFO"
)

🚀 Integración con Prefect

Convierte cualquier pipeline en un flow de Prefect:

from prefect import serve

pipeline = create_sales_pipeline()
flow = pipeline.as_prefect_flow(name="sales-etl")

# Desplegar como servicio
serve(flow)

📝 Logging

Sistema de logging estructurado con Rich:

from medallion_etl.utils import logger

logger.info("Pipeline iniciado", extra={"pipeline": "sales"})
logger.error("Error en validación", extra={"records_failed": 10})

🤝 Contribuir

  1. Fork el repositorio
  2. Crea una rama para tu feature (git checkout -b feature/nueva-funcionalidad)
  3. Commit tus cambios (git commit -am 'Agregar nueva funcionalidad')
  4. Push a la rama (git push origin feature/nueva-funcionalidad)
  5. Crea un Pull Request

📄 Licencia

Este proyecto está bajo la Licencia MIT. Ver el archivo LICENSE para más detalles.

🔗 Enlaces


Medallion ETL - Construye pipelines de datos robustos y escalables con arquitectura medallion 🏅

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

medallion_etl-0.1.25.tar.gz (30.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

medallion_etl-0.1.25-py3-none-any.whl (29.9 kB view details)

Uploaded Python 3

File details

Details for the file medallion_etl-0.1.25.tar.gz.

File metadata

  • Download URL: medallion_etl-0.1.25.tar.gz
  • Upload date:
  • Size: 30.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.12.10 Windows/11

File hashes

Hashes for medallion_etl-0.1.25.tar.gz
Algorithm Hash digest
SHA256 9a1af46d1c873777f40ca57c88f4a92aa05cbd02f7e7a178c5cba3347cddefdb
MD5 b251c12c1b08ca76d1d3f3cb51c13a84
BLAKE2b-256 2dababdf5199ad7a84cc17e724e705c11eab0cdc75324eb7a2f1faa52276db51

See more details on using hashes here.

File details

Details for the file medallion_etl-0.1.25-py3-none-any.whl.

File metadata

  • Download URL: medallion_etl-0.1.25-py3-none-any.whl
  • Upload date:
  • Size: 29.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.12.10 Windows/11

File hashes

Hashes for medallion_etl-0.1.25-py3-none-any.whl
Algorithm Hash digest
SHA256 205c528b8cec0267d6a725397a6fc5785f8e53ba74df8992868ad19d3595a3b4
MD5 9ff6be2a6ab87b1e24ed278429192244
BLAKE2b-256 0e999f6a8b106b48340338400f0181d7f22c6be4ea7ca45b2b641dfd6725a968

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page