Skip to main content

Una librería modular para construir data pipelines con arquitectura medallion

Project description

Medallion ETL

Una librería modular para construir data pipelines con arquitectura medallion (Bronze-Silver-Gold).

🚀 Características

  • Arquitectura Medallion: Implementación completa del patrón Bronze-Silver-Gold
  • CLI Integrado: Comandos para inicializar proyectos y crear pipelines
  • Modular y Extensible: Componentes reutilizables para cada capa del proceso
  • Validación de Datos: Esquemas con Pydantic para garantizar calidad de datos
  • Procesamiento Eficiente: Powered by Polars para manejo de grandes volúmenes
  • Orquestación: Integración nativa con Prefect para workflows complejos
  • Conectores: Soporte para múltiples fuentes de datos (CSV, JSON, SQL, APIs)
  • Logging Avanzado: Sistema de logging estructurado con Rich

📋 Requisitos

  • Python 3.11+
  • polars >= 1.30
  • pydantic >= 2.7
  • sqlalchemy >= 2.0
  • prefect >= 3.0
  • requests >= 2.25.0
  • rich >= 14.0.0

📦 Instalación

pip install medallion-etl

O desde el código fuente:

git clone https://github.com/JuanManiglia/medallion-etl.git
cd medallion-etl
pip install -e .

🛠️ Comandos CLI

Inicializar un nuevo proyecto

medallion-etl init

O especificar un directorio:

medallion-etl init --project-dir mi_proyecto

Esto creará la siguiente estructura:

mi_proyecto/
├── config.py              # Configuración del proyecto
├── main.py                 # Script principal
├── README.md              # Documentación del proyecto
├── data/                  # Directorio para datos
│   ├── bronze/           # Datos crudos (raw)
│   ├── silver/           # Datos validados y limpios
│   └── gold/             # Datos transformados y agregados
├── logs/                  # Logs del proyecto
├── pipelines/             # Definiciones de pipelines
└── schemas/               # Esquemas de datos (Pydantic)

Crear un nuevo pipeline

medallion-etl create-pipeline MiPipeline

Esto generará:

  • pipelines/mipipeline_pipeline.py - Definición del pipeline
  • schemas/mipipeline_schema.py - Esquema de datos con Pydantic

🏗️ Arquitectura Medallion

🥉 Bronze Layer (Datos Crudos)

  • Propósito: Ingesta de datos en su formato original
  • Extractores disponibles:
    • CSVExtractor - Archivos CSV
    • JSONExtractor - Archivos JSON
    • SQLExtractor - Bases de datos SQL
    • APIExtractor - APIs REST

🥈 Silver Layer (Datos Validados)

  • Propósito: Validación, limpieza y normalización
  • Componentes:
    • SchemaValidator - Validación con esquemas Pydantic
    • DataCleaner - Limpieza de datos (duplicados, nulos)
    • DataNormalizer - Normalización de formatos

🥇 Gold Layer (Datos Transformados)

  • Propósito: Agregaciones y transformaciones para análisis
  • Transformadores:
    • Aggregator - Agregaciones (sum, mean, count, etc.)
    • DataJoiner - Unión de datasets
    • FeatureEngineer - Creación de nuevas características

🔧 Uso Básico

1. Crear un proyecto

medallion-etl init --project-dir mi_etl_project
cd mi_etl_project

2. Crear un pipeline personalizado

medallion-etl create-pipeline Ventas

3. Configurar el esquema de datos

Edita schemas/ventas_schema.py:

from datetime import datetime
from typing import Optional
from medallion_etl.schemas import BaseSchema

class VentasSchema(BaseSchema):
    id: int
    producto: str
    cantidad: int
    precio: float
    fecha: datetime
    cliente: Optional[str] = None

4. Personalizar el pipeline

Edita pipelines/ventas_pipeline.py según tus necesidades.

5. Ejecutar el pipeline

python main.py --pipeline ventas --input data/ventas.csv

📊 Ejemplo de Pipeline Completo

from medallion_etl.core import Pipeline
from medallion_etl.bronze import CSVExtractor
from medallion_etl.silver import SchemaValidator, DataCleaner
from medallion_etl.gold import Aggregator
from schemas.ventas_schema import VentasSchema

def create_sales_pipeline():
    pipeline = Pipeline(name="SalesPipeline")
    
    # Bronze: Extraer datos
    extractor = CSVExtractor(
        name="SalesExtractor",
        output_path=config.bronze_dir / "sales"
    )
    pipeline.add_task(extractor)
    
    # Silver: Validar y limpiar
    validator = SchemaValidator(
        schema_model=VentasSchema,
        name="SalesValidator"
    )
    pipeline.add_task(validator)
    
    cleaner = DataCleaner(
        name="SalesCleaner",
        drop_na=True,
        drop_duplicates=True
    )
    pipeline.add_task(cleaner)
    
    # Gold: Agregar datos
    aggregator = Aggregator(
        group_by=["producto"],
        aggregations={
            "cantidad": "sum",
            "precio": "mean"
        },
        name="SalesAggregator"
    )
    pipeline.add_task(aggregator)
    
    return pipeline

# Ejecutar pipeline
pipeline = create_sales_pipeline()
result = pipeline.run("data/ventas.csv")

🔌 Conectores Disponibles

Extractores (Bronze)

  • CSVExtractor: Archivos CSV con configuración flexible
  • JSONExtractor: Archivos JSON y JSONL
  • SQLExtractor: Bases de datos relacionales
  • APIExtractor: APIs REST con autenticación
  • FileExtractor: Extractor base para otros formatos

Validadores (Silver)

  • SchemaValidator: Validación con esquemas Pydantic
  • DataCleaner: Limpieza automática de datos
  • DataNormalizer: Normalización de tipos y formatos

Transformadores (Gold)

  • Aggregator: Agregaciones grupales
  • DataJoiner: Unión de múltiples datasets
  • FeatureEngineer: Creación de características derivadas

🔧 Configuración

La configuración se maneja a través de la clase MedallionConfig:

from medallion_etl.config import MedallionConfig

config = MedallionConfig(
    bronze_dir="data/bronze",
    silver_dir="data/silver", 
    gold_dir="data/gold",
    log_dir="logs",
    log_level="INFO"
)

🚀 Integración con Prefect

Convierte cualquier pipeline en un flow de Prefect:

from prefect import serve

pipeline = create_sales_pipeline()
flow = pipeline.as_prefect_flow(name="sales-etl")

# Desplegar como servicio
serve(flow)

📝 Logging

Sistema de logging estructurado con Rich:

from medallion_etl.utils import logger

logger.info("Pipeline iniciado", extra={"pipeline": "sales"})
logger.error("Error en validación", extra={"records_failed": 10})

🤝 Contribuir

  1. Fork el repositorio
  2. Crea una rama para tu feature (git checkout -b feature/nueva-funcionalidad)
  3. Commit tus cambios (git commit -am 'Agregar nueva funcionalidad')
  4. Push a la rama (git push origin feature/nueva-funcionalidad)
  5. Crea un Pull Request

📄 Licencia

Este proyecto está bajo la Licencia MIT. Ver el archivo LICENSE para más detalles.

🔗 Enlaces


Medallion ETL - Construye pipelines de datos robustos y escalables con arquitectura medallion 🏅

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

medallion_etl-0.1.27.tar.gz (30.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

medallion_etl-0.1.27-py3-none-any.whl (29.9 kB view details)

Uploaded Python 3

File details

Details for the file medallion_etl-0.1.27.tar.gz.

File metadata

  • Download URL: medallion_etl-0.1.27.tar.gz
  • Upload date:
  • Size: 30.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.12.10 Windows/11

File hashes

Hashes for medallion_etl-0.1.27.tar.gz
Algorithm Hash digest
SHA256 4c38a98bc421b530fd1474585a339290c4bd68a5484dd4b972e39be7f1014f88
MD5 784d606ff9168bd5ab7b3c167530139c
BLAKE2b-256 b04826cd449c55ac28d2a1907da1f430137b7593a666538161e487220d15d462

See more details on using hashes here.

File details

Details for the file medallion_etl-0.1.27-py3-none-any.whl.

File metadata

  • Download URL: medallion_etl-0.1.27-py3-none-any.whl
  • Upload date:
  • Size: 29.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.12.10 Windows/11

File hashes

Hashes for medallion_etl-0.1.27-py3-none-any.whl
Algorithm Hash digest
SHA256 f51798b8539deaa41119217b7b2c012741e84cdccad0cb92c61847977189f599
MD5 3580bc2eccc8a337814515ec896bda81
BLAKE2b-256 c5eb2c9d010eb05584d6df1bea34ecef496ba22b25fd9ddfdea6a2806282c9d5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page