Skip to main content

Una librería modular para construir data pipelines con arquitectura medallion

Project description

Medallion ETL

Una librería modular para construir data pipelines con arquitectura medallion (Bronze-Silver-Gold).

🚀 Características

  • Arquitectura Medallion: Implementación completa del patrón Bronze-Silver-Gold
  • CLI Integrado: Comandos para inicializar proyectos y crear pipelines
  • Modular y Extensible: Componentes reutilizables para cada capa del proceso
  • Validación de Datos: Esquemas con Pydantic para garantizar calidad de datos
  • Procesamiento Eficiente: Powered by Polars para manejo de grandes volúmenes
  • Orquestación: Integración nativa con Prefect para workflows complejos
  • Conectores: Soporte para múltiples fuentes de datos (CSV, JSON, SQL, APIs)
  • Logging Avanzado: Sistema de logging estructurado con Rich

📋 Requisitos

  • Python 3.11+
  • polars >= 1.30
  • pydantic >= 2.7
  • sqlalchemy >= 2.0
  • prefect >= 3.0
  • requests >= 2.25.0
  • rich >= 14.0.0

📦 Instalación

pip install medallion-etl

O desde el código fuente:

git clone https://github.com/JuanManiglia/medallion-etl.git
cd medallion-etl
pip install -e .

🛠️ Comandos CLI

Inicializar un nuevo proyecto

medallion-etl init

O especificar un directorio:

medallion-etl init --project-dir mi_proyecto

Esto creará la siguiente estructura:

mi_proyecto/
├── config.py              # Configuración del proyecto
├── main.py                 # Script principal
├── README.md              # Documentación del proyecto
├── data/                  # Directorio para datos
│   ├── bronze/           # Datos crudos (raw)
│   ├── silver/           # Datos validados y limpios
│   └── gold/             # Datos transformados y agregados
├── logs/                  # Logs del proyecto
├── pipelines/             # Definiciones de pipelines
└── schemas/               # Esquemas de datos (Pydantic)

Crear un nuevo pipeline

medallion-etl create-pipeline MiPipeline

Esto generará:

  • pipelines/mipipeline_pipeline.py - Definición del pipeline
  • schemas/mipipeline_schema.py - Esquema de datos con Pydantic

🏗️ Arquitectura Medallion

🥉 Bronze Layer (Datos Crudos)

  • Propósito: Ingesta de datos en su formato original
  • Extractores disponibles:
    • CSVExtractor - Archivos CSV
    • JSONExtractor - Archivos JSON
    • SQLExtractor - Bases de datos SQL
    • APIExtractor - APIs REST

🥈 Silver Layer (Datos Validados)

  • Propósito: Validación, limpieza y normalización
  • Componentes:
    • SchemaValidator - Validación con esquemas Pydantic
    • DataCleaner - Limpieza de datos (duplicados, nulos)
    • DataNormalizer - Normalización de formatos

🥇 Gold Layer (Datos Transformados)

  • Propósito: Agregaciones y transformaciones para análisis
  • Transformadores:
    • Aggregator - Agregaciones (sum, mean, count, etc.)
    • DataJoiner - Unión de datasets
    • FeatureEngineer - Creación de nuevas características

🔧 Uso Básico

1. Crear un proyecto

medallion-etl init --project-dir mi_etl_project
cd mi_etl_project

2. Crear un pipeline personalizado

medallion-etl create-pipeline Ventas

3. Configurar el esquema de datos

Edita schemas/ventas_schema.py:

from datetime import datetime
from typing import Optional
from medallion_etl.schemas import BaseSchema

class VentasSchema(BaseSchema):
    id: int
    producto: str
    cantidad: int
    precio: float
    fecha: datetime
    cliente: Optional[str] = None

4. Personalizar el pipeline

Edita pipelines/ventas_pipeline.py según tus necesidades.

5. Ejecutar el pipeline

python main.py --pipeline ventas --input data/ventas.csv

📊 Ejemplo de Pipeline Completo

from medallion_etl.core import Pipeline
from medallion_etl.bronze import CSVExtractor
from medallion_etl.silver import SchemaValidator, DataCleaner
from medallion_etl.gold import Aggregator
from schemas.ventas_schema import VentasSchema

def create_sales_pipeline():
    pipeline = Pipeline(name="SalesPipeline")
    
    # Bronze: Extraer datos
    extractor = CSVExtractor(
        name="SalesExtractor",
        output_path=config.bronze_dir / "sales"
    )
    pipeline.add_task(extractor)
    
    # Silver: Validar y limpiar
    validator = SchemaValidator(
        schema_model=VentasSchema,
        name="SalesValidator"
    )
    pipeline.add_task(validator)
    
    cleaner = DataCleaner(
        name="SalesCleaner",
        drop_na=True,
        drop_duplicates=True
    )
    pipeline.add_task(cleaner)
    
    # Gold: Agregar datos
    aggregator = Aggregator(
        group_by=["producto"],
        aggregations={
            "cantidad": "sum",
            "precio": "mean"
        },
        name="SalesAggregator"
    )
    pipeline.add_task(aggregator)
    
    return pipeline

# Ejecutar pipeline
pipeline = create_sales_pipeline()
result = pipeline.run("data/ventas.csv")

🔌 Conectores Disponibles

Extractores (Bronze)

  • CSVExtractor: Archivos CSV con configuración flexible
  • JSONExtractor: Archivos JSON y JSONL
  • SQLExtractor: Bases de datos relacionales
  • APIExtractor: APIs REST con autenticación
  • FileExtractor: Extractor base para otros formatos

Validadores (Silver)

  • SchemaValidator: Validación con esquemas Pydantic
  • DataCleaner: Limpieza automática de datos
  • DataNormalizer: Normalización de tipos y formatos

Transformadores (Gold)

  • Aggregator: Agregaciones grupales
  • DataJoiner: Unión de múltiples datasets
  • FeatureEngineer: Creación de características derivadas

🔧 Configuración

La configuración se maneja a través de la clase MedallionConfig:

from medallion_etl.config import MedallionConfig

config = MedallionConfig(
    bronze_dir="data/bronze",
    silver_dir="data/silver", 
    gold_dir="data/gold",
    log_dir="logs",
    log_level="INFO"
)

🚀 Integración con Prefect

Convierte cualquier pipeline en un flow de Prefect:

from prefect import serve

pipeline = create_sales_pipeline()
flow = pipeline.as_prefect_flow(name="sales-etl")

# Desplegar como servicio
serve(flow)

📝 Logging

Sistema de logging estructurado con Rich:

from medallion_etl.utils import logger

logger.info("Pipeline iniciado", extra={"pipeline": "sales"})
logger.error("Error en validación", extra={"records_failed": 10})

🤝 Contribuir

  1. Fork el repositorio
  2. Crea una rama para tu feature (git checkout -b feature/nueva-funcionalidad)
  3. Commit tus cambios (git commit -am 'Agregar nueva funcionalidad')
  4. Push a la rama (git push origin feature/nueva-funcionalidad)
  5. Crea un Pull Request

📄 Licencia

Este proyecto está bajo la Licencia MIT. Ver el archivo LICENSE para más detalles.

🔗 Enlaces


Medallion ETL - Construye pipelines de datos robustos y escalables con arquitectura medallion 🏅

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

medallion_etl-0.1.20.tar.gz (28.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

medallion_etl-0.1.20-py3-none-any.whl (28.6 kB view details)

Uploaded Python 3

File details

Details for the file medallion_etl-0.1.20.tar.gz.

File metadata

  • Download URL: medallion_etl-0.1.20.tar.gz
  • Upload date:
  • Size: 28.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.12.10 Windows/11

File hashes

Hashes for medallion_etl-0.1.20.tar.gz
Algorithm Hash digest
SHA256 932fabfb0e60cf8b8f1c63e6bff74a281d7522b09f601fa172be99aab366befa
MD5 f02b4bcbb2d6d886a3f65399bf94ca6a
BLAKE2b-256 d22e78445bb7b2469d5e8d220850b8dff44bd263a94ee5696faaf3bdd6014a61

See more details on using hashes here.

File details

Details for the file medallion_etl-0.1.20-py3-none-any.whl.

File metadata

  • Download URL: medallion_etl-0.1.20-py3-none-any.whl
  • Upload date:
  • Size: 28.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.12.10 Windows/11

File hashes

Hashes for medallion_etl-0.1.20-py3-none-any.whl
Algorithm Hash digest
SHA256 47edd5b278f2a00e8fc8d1242935aa6a3e1e2e0abefc0e540914f48e15a987b0
MD5 f9fb1ad3b790e99c2220304e1805d5bd
BLAKE2b-256 32c33fd7ce00c73a094e40708c756f81a95f5d20a76cb41d29c4ee9156ea8d62

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page