Skip to main content

Una librería modular para construir data pipelines con arquitectura medallion

Project description

Medallion ETL

Una librería modular para construir data pipelines con arquitectura medallion (Bronze-Silver-Gold).

🚀 Características

  • Arquitectura Medallion: Implementación completa del patrón Bronze-Silver-Gold
  • CLI Integrado: Comandos para inicializar proyectos y crear pipelines
  • Modular y Extensible: Componentes reutilizables para cada capa del proceso
  • Validación de Datos: Esquemas con Pydantic para garantizar calidad de datos
  • Procesamiento Eficiente: Powered by Polars para manejo de grandes volúmenes
  • Orquestación: Integración nativa con Prefect para workflows complejos
  • Conectores: Soporte para múltiples fuentes de datos (CSV, JSON, SQL, APIs)
  • Logging Avanzado: Sistema de logging estructurado con Rich

📋 Requisitos

  • Python 3.11+
  • polars >= 1.30
  • pydantic >= 2.7
  • sqlalchemy >= 2.0
  • prefect >= 3.0
  • requests >= 2.25.0
  • rich >= 14.0.0

📦 Instalación

pip install medallion-etl

O desde el código fuente:

git clone https://github.com/JuanManiglia/medallion-etl.git
cd medallion-etl
pip install -e .

🛠️ Comandos CLI

Inicializar un nuevo proyecto

medallion-etl init

O especificar un directorio:

medallion-etl init --project-dir mi_proyecto

Esto creará la siguiente estructura:

mi_proyecto/
├── config.py              # Configuración del proyecto
├── main.py                 # Script principal
├── README.md              # Documentación del proyecto
├── data/                  # Directorio para datos
│   ├── bronze/           # Datos crudos (raw)
│   ├── silver/           # Datos validados y limpios
│   └── gold/             # Datos transformados y agregados
├── logs/                  # Logs del proyecto
├── pipelines/             # Definiciones de pipelines
└── schemas/               # Esquemas de datos (Pydantic)

Crear un nuevo pipeline

medallion-etl create-pipeline MiPipeline

Esto generará:

  • pipelines/mipipeline_pipeline.py - Definición del pipeline
  • schemas/mipipeline_schema.py - Esquema de datos con Pydantic

🏗️ Arquitectura Medallion

🥉 Bronze Layer (Datos Crudos)

  • Propósito: Ingesta de datos en su formato original
  • Extractores disponibles:
    • CSVExtractor - Archivos CSV
    • JSONExtractor - Archivos JSON
    • SQLExtractor - Bases de datos SQL
    • APIExtractor - APIs REST

🥈 Silver Layer (Datos Validados)

  • Propósito: Validación, limpieza y normalización
  • Componentes:
    • SchemaValidator - Validación con esquemas Pydantic
    • DataCleaner - Limpieza de datos (duplicados, nulos)
    • DataNormalizer - Normalización de formatos

🥇 Gold Layer (Datos Transformados)

  • Propósito: Agregaciones y transformaciones para análisis
  • Transformadores:
    • Aggregator - Agregaciones (sum, mean, count, etc.)
    • DataJoiner - Unión de datasets
    • FeatureEngineer - Creación de nuevas características

🔧 Uso Básico

1. Crear un proyecto

medallion-etl init --project-dir mi_etl_project
cd mi_etl_project

2. Crear un pipeline personalizado

medallion-etl create-pipeline Ventas

3. Configurar el esquema de datos

Edita schemas/ventas_schema.py:

from datetime import datetime
from typing import Optional
from medallion_etl.schemas import BaseSchema

class VentasSchema(BaseSchema):
    id: int
    producto: str
    cantidad: int
    precio: float
    fecha: datetime
    cliente: Optional[str] = None

4. Personalizar el pipeline

Edita pipelines/ventas_pipeline.py según tus necesidades.

5. Ejecutar el pipeline

python main.py --pipeline ventas --input data/ventas.csv

📊 Ejemplo de Pipeline Completo

from medallion_etl.core import Pipeline
from medallion_etl.bronze import CSVExtractor
from medallion_etl.silver import SchemaValidator, DataCleaner
from medallion_etl.gold import Aggregator
from schemas.ventas_schema import VentasSchema

def create_sales_pipeline():
    pipeline = Pipeline(name="SalesPipeline")
    
    # Bronze: Extraer datos
    extractor = CSVExtractor(
        name="SalesExtractor",
        output_path=config.bronze_dir / "sales"
    )
    pipeline.add_task(extractor)
    
    # Silver: Validar y limpiar
    validator = SchemaValidator(
        schema_model=VentasSchema,
        name="SalesValidator"
    )
    pipeline.add_task(validator)
    
    cleaner = DataCleaner(
        name="SalesCleaner",
        drop_na=True,
        drop_duplicates=True
    )
    pipeline.add_task(cleaner)
    
    # Gold: Agregar datos
    aggregator = Aggregator(
        group_by=["producto"],
        aggregations={
            "cantidad": "sum",
            "precio": "mean"
        },
        name="SalesAggregator"
    )
    pipeline.add_task(aggregator)
    
    return pipeline

# Ejecutar pipeline
pipeline = create_sales_pipeline()
result = pipeline.run("data/ventas.csv")

🔌 Conectores Disponibles

Extractores (Bronze)

  • CSVExtractor: Archivos CSV con configuración flexible
  • JSONExtractor: Archivos JSON y JSONL
  • SQLExtractor: Bases de datos relacionales
  • APIExtractor: APIs REST con autenticación
  • FileExtractor: Extractor base para otros formatos

Validadores (Silver)

  • SchemaValidator: Validación con esquemas Pydantic
  • DataCleaner: Limpieza automática de datos
  • DataNormalizer: Normalización de tipos y formatos

Transformadores (Gold)

  • Aggregator: Agregaciones grupales
  • DataJoiner: Unión de múltiples datasets
  • FeatureEngineer: Creación de características derivadas

🔧 Configuración

La configuración se maneja a través de la clase MedallionConfig:

from medallion_etl.config import MedallionConfig

config = MedallionConfig(
    bronze_dir="data/bronze",
    silver_dir="data/silver", 
    gold_dir="data/gold",
    log_dir="logs",
    log_level="INFO"
)

🚀 Integración con Prefect

Convierte cualquier pipeline en un flow de Prefect:

from prefect import serve

pipeline = create_sales_pipeline()
flow = pipeline.as_prefect_flow(name="sales-etl")

# Desplegar como servicio
serve(flow)

📝 Logging

Sistema de logging estructurado con Rich:

from medallion_etl.utils import logger

logger.info("Pipeline iniciado", extra={"pipeline": "sales"})
logger.error("Error en validación", extra={"records_failed": 10})

🤝 Contribuir

  1. Fork el repositorio
  2. Crea una rama para tu feature (git checkout -b feature/nueva-funcionalidad)
  3. Commit tus cambios (git commit -am 'Agregar nueva funcionalidad')
  4. Push a la rama (git push origin feature/nueva-funcionalidad)
  5. Crea un Pull Request

📄 Licencia

Este proyecto está bajo la Licencia MIT. Ver el archivo LICENSE para más detalles.

🔗 Enlaces


Medallion ETL - Construye pipelines de datos robustos y escalables con arquitectura medallion 🏅

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

medallion_etl-0.1.26.tar.gz (30.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

medallion_etl-0.1.26-py3-none-any.whl (29.9 kB view details)

Uploaded Python 3

File details

Details for the file medallion_etl-0.1.26.tar.gz.

File metadata

  • Download URL: medallion_etl-0.1.26.tar.gz
  • Upload date:
  • Size: 30.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.12.10 Windows/11

File hashes

Hashes for medallion_etl-0.1.26.tar.gz
Algorithm Hash digest
SHA256 a33b33e038f377bfc65b3d4dafc11f9370a4733d7f429770e25765cb4a8d0b64
MD5 c31b5150152b865e1c0eaa6bbad464d7
BLAKE2b-256 9d1eea9fee9eea167ab6d79d8c84c3823055062b604227a1b2804e57f07b0128

See more details on using hashes here.

File details

Details for the file medallion_etl-0.1.26-py3-none-any.whl.

File metadata

  • Download URL: medallion_etl-0.1.26-py3-none-any.whl
  • Upload date:
  • Size: 29.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.12.10 Windows/11

File hashes

Hashes for medallion_etl-0.1.26-py3-none-any.whl
Algorithm Hash digest
SHA256 93f9ca2fe1321500c23c4bae476b8a95cfd2904f32cc29f6916b4a19f1c069d4
MD5 9522695e1e42f0985223ecf5008687d2
BLAKE2b-256 0f27bd3a3a9a56b64d117fef2b8c8bd57dffd4afdff56e37b06bae5e9937f96f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page