Skip to main content

Una librería modular para construir data pipelines con arquitectura medallion

Project description

Medallion ETL

Una librería modular para construir data pipelines con arquitectura medallion (Bronze-Silver-Gold).

🚀 Características

  • Arquitectura Medallion: Implementación completa del patrón Bronze-Silver-Gold
  • CLI Integrado: Comandos para inicializar proyectos y crear pipelines
  • Modular y Extensible: Componentes reutilizables para cada capa del proceso
  • Validación de Datos: Esquemas con Pydantic para garantizar calidad de datos
  • Procesamiento Eficiente: Powered by Polars para manejo de grandes volúmenes
  • Orquestación: Integración nativa con Prefect para workflows complejos
  • Conectores: Soporte para múltiples fuentes de datos (CSV, JSON, SQL, APIs)
  • Logging Avanzado: Sistema de logging estructurado con Rich

📋 Requisitos

  • Python 3.11+
  • polars >= 1.30
  • pydantic >= 2.7
  • sqlalchemy >= 2.0
  • prefect >= 3.0
  • requests >= 2.25.0
  • rich >= 14.0.0

📦 Instalación

pip install medallion-etl

O desde el código fuente:

git clone https://github.com/JuanManiglia/medallion-etl.git
cd medallion-etl
pip install -e .

🛠️ Comandos CLI

Inicializar un nuevo proyecto

medallion-etl init

O especificar un directorio:

medallion-etl init --project-dir mi_proyecto

Esto creará la siguiente estructura:

mi_proyecto/
├── config.py              # Configuración del proyecto
├── main.py                 # Script principal
├── README.md              # Documentación del proyecto
├── data/                  # Directorio para datos
│   ├── bronze/           # Datos crudos (raw)
│   ├── silver/           # Datos validados y limpios
│   └── gold/             # Datos transformados y agregados
├── logs/                  # Logs del proyecto
├── pipelines/             # Definiciones de pipelines
└── schemas/               # Esquemas de datos (Pydantic)

Crear un nuevo pipeline

medallion-etl create-pipeline MiPipeline

Esto generará:

  • pipelines/mipipeline_pipeline.py - Definición del pipeline
  • schemas/mipipeline_schema.py - Esquema de datos con Pydantic

🏗️ Arquitectura Medallion

🥉 Bronze Layer (Datos Crudos)

  • Propósito: Ingesta de datos en su formato original
  • Extractores disponibles:
    • CSVExtractor - Archivos CSV
    • JSONExtractor - Archivos JSON
    • SQLExtractor - Bases de datos SQL
    • APIExtractor - APIs REST

🥈 Silver Layer (Datos Validados)

  • Propósito: Validación, limpieza y normalización
  • Componentes:
    • SchemaValidator - Validación con esquemas Pydantic
    • DataCleaner - Limpieza de datos (duplicados, nulos)
    • DataNormalizer - Normalización de formatos

🥇 Gold Layer (Datos Transformados)

  • Propósito: Agregaciones y transformaciones para análisis
  • Transformadores:
    • Aggregator - Agregaciones (sum, mean, count, etc.)
    • DataJoiner - Unión de datasets
    • FeatureEngineer - Creación de nuevas características

🔧 Uso Básico

1. Crear un proyecto

medallion-etl init --project-dir mi_etl_project
cd mi_etl_project

2. Crear un pipeline personalizado

medallion-etl create-pipeline Ventas

3. Configurar el esquema de datos

Edita schemas/ventas_schema.py:

from datetime import datetime
from typing import Optional
from medallion_etl.schemas import BaseSchema

class VentasSchema(BaseSchema):
    id: int
    producto: str
    cantidad: int
    precio: float
    fecha: datetime
    cliente: Optional[str] = None

4. Personalizar el pipeline

Edita pipelines/ventas_pipeline.py según tus necesidades.

5. Ejecutar el pipeline

python main.py --pipeline ventas --input data/ventas.csv

📊 Ejemplo de Pipeline Completo

from medallion_etl.core import Pipeline
from medallion_etl.bronze import CSVExtractor
from medallion_etl.silver import SchemaValidator, DataCleaner
from medallion_etl.gold import Aggregator
from schemas.ventas_schema import VentasSchema

def create_sales_pipeline():
    pipeline = Pipeline(name="SalesPipeline")
    
    # Bronze: Extraer datos
    extractor = CSVExtractor(
        name="SalesExtractor",
        output_path=config.bronze_dir / "sales"
    )
    pipeline.add_task(extractor)
    
    # Silver: Validar y limpiar
    validator = SchemaValidator(
        schema_model=VentasSchema,
        name="SalesValidator"
    )
    pipeline.add_task(validator)
    
    cleaner = DataCleaner(
        name="SalesCleaner",
        drop_na=True,
        drop_duplicates=True
    )
    pipeline.add_task(cleaner)
    
    # Gold: Agregar datos
    aggregator = Aggregator(
        group_by=["producto"],
        aggregations={
            "cantidad": "sum",
            "precio": "mean"
        },
        name="SalesAggregator"
    )
    pipeline.add_task(aggregator)
    
    return pipeline

# Ejecutar pipeline
pipeline = create_sales_pipeline()
result = pipeline.run("data/ventas.csv")

🔌 Conectores Disponibles

Extractores (Bronze)

  • CSVExtractor: Archivos CSV con configuración flexible
  • JSONExtractor: Archivos JSON y JSONL
  • SQLExtractor: Bases de datos relacionales
  • APIExtractor: APIs REST con autenticación
  • FileExtractor: Extractor base para otros formatos

Validadores (Silver)

  • SchemaValidator: Validación con esquemas Pydantic
  • DataCleaner: Limpieza automática de datos
  • DataNormalizer: Normalización de tipos y formatos

Transformadores (Gold)

  • Aggregator: Agregaciones grupales
  • DataJoiner: Unión de múltiples datasets
  • FeatureEngineer: Creación de características derivadas

🔧 Configuración

La configuración se maneja a través de la clase MedallionConfig:

from medallion_etl.config import MedallionConfig

config = MedallionConfig(
    bronze_dir="data/bronze",
    silver_dir="data/silver", 
    gold_dir="data/gold",
    log_dir="logs",
    log_level="INFO"
)

🚀 Integración con Prefect

Convierte cualquier pipeline en un flow de Prefect:

from prefect import serve

pipeline = create_sales_pipeline()
flow = pipeline.as_prefect_flow(name="sales-etl")

# Desplegar como servicio
serve(flow)

📝 Logging

Sistema de logging estructurado con Rich:

from medallion_etl.utils import logger

logger.info("Pipeline iniciado", extra={"pipeline": "sales"})
logger.error("Error en validación", extra={"records_failed": 10})

🤝 Contribuir

  1. Fork el repositorio
  2. Crea una rama para tu feature (git checkout -b feature/nueva-funcionalidad)
  3. Commit tus cambios (git commit -am 'Agregar nueva funcionalidad')
  4. Push a la rama (git push origin feature/nueva-funcionalidad)
  5. Crea un Pull Request

📄 Licencia

Este proyecto está bajo la Licencia MIT. Ver el archivo LICENSE para más detalles.

🔗 Enlaces


Medallion ETL - Construye pipelines de datos robustos y escalables con arquitectura medallion 🏅

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

medallion_etl-0.1.17.tar.gz (27.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

medallion_etl-0.1.17-py3-none-any.whl (27.5 kB view details)

Uploaded Python 3

File details

Details for the file medallion_etl-0.1.17.tar.gz.

File metadata

  • Download URL: medallion_etl-0.1.17.tar.gz
  • Upload date:
  • Size: 27.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.12.10 Windows/11

File hashes

Hashes for medallion_etl-0.1.17.tar.gz
Algorithm Hash digest
SHA256 8e3ce5a5ad7ed8cca8666678e0bd0d9ea1db6eb0748dcebae04b32384c65f687
MD5 0449bb128ac245cd16e84915154e6ae5
BLAKE2b-256 a21384851bffc440246038574d1291e6c0f22d0c64065021732f12cd521977ca

See more details on using hashes here.

File details

Details for the file medallion_etl-0.1.17-py3-none-any.whl.

File metadata

  • Download URL: medallion_etl-0.1.17-py3-none-any.whl
  • Upload date:
  • Size: 27.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.12.10 Windows/11

File hashes

Hashes for medallion_etl-0.1.17-py3-none-any.whl
Algorithm Hash digest
SHA256 ea0d3e64d974b4cd21dead3c3a8410955326df1c19a86e4b41b12dfced631684
MD5 4b3cd11f667b1e8177bb1ca81957add1
BLAKE2b-256 951558ea2dba08b6c8faade9fdb7cc81d7c82bb7e7d9bc238ca201e0610aff34

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page