Skip to main content

Una librería modular para construir data pipelines con arquitectura medallion

Project description

Medallion ETL

Una librería modular para construir data pipelines con arquitectura medallion (Bronze-Silver-Gold).

Características

  • Arquitectura medallion (Bronze-Silver-Gold) para procesamiento de datos
  • Interfaz simple para definir nuevos pipelines
  • Funciones reutilizables para cada capa del proceso
  • Modularidad clara entre extracción, validación y carga
  • Compatibilidad con SQLAlchemy para persistencia en bases de datos
  • Integración con Prefect para orquestación de flujos
  • Validación de datos con Pydantic
  • Procesamiento eficiente con Polars

Requisitos

  • Python 3.11+
  • polars>=1.30
  • pydantic>=2.7
  • sqlalchemy>=2.0
  • prefect>=2.0

Instalación

pip install medallion-etl

O desde el código fuente:

git clone https://github.com/usuario/medallion-etl.git
cd medallion-etl
pip install -e .

Estructura de la librería

medallion_etl/
├── bronze/            # Capa de ingesta de datos crudos
├── silver/            # Capa de validación y limpieza
├── gold/              # Capa de transformación y agregación
├── core/              # Componentes centrales de la librería
├── pipelines/         # Definición de flujos completos
├── schemas/           # Modelos Pydantic para validación
├── connectors/        # Conectores para diferentes fuentes/destinos
├── utils/             # Utilidades generales
├── config/            # Configuraciones
└——— templates/         # Plantillas para nuevos pipelines

Uso básico

Crear un pipeline simple

from medallion_etl.core import MedallionPipeline
from medallion_etl.bronze import CSVExtractor
from medallion_etl.silver import SchemaValidator
from medallion_etl.gold import Aggregator
from medallion_etl.schemas import BaseSchema

# Definir esquema de datos
class UserSchema(BaseSchema):
    id: int
    name: str
    age: int
    email: str

# Crear pipeline
pipeline = MedallionPipeline(name="UserPipeline")

# Agregar tareas
pipeline.add_bronze_task(CSVExtractor(name="UserExtractor"))
pipeline.add_silver_task(SchemaValidator(schema_model=UserSchema))
pipeline.add_gold_task(Aggregator(group_by=["age"], aggregations={"id": "count"}))

# Ejecutar pipeline
result = pipeline.run("data/users.csv")
print(result.metadata)

Usar con Prefect

from medallion_etl.core import MedallionPipeline
from medallion_etl.bronze import CSVExtractor

# Crear pipeline
pipeline = MedallionPipeline(name="SimplePipeline")
pipeline.add_bronze_task(CSVExtractor())

# Convertir a flow de Prefect
flow = pipeline.as_prefect_flow()

# Ejecutar flow
flow("data/sample.csv")

Ejemplos

Consulta la carpeta examples/ para ver ejemplos completos de pipelines:

  • weather_pipeline.py: Pipeline para procesar datos meteorológicos
  • sales_etl_pipeline.py: Pipeline ETL para datos de ventas

Contribuir

Las contribuciones son bienvenidas! Por favor, siente libre de enviar un Pull Request.

Licencia

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

medallion_etl-0.1.7.tar.gz (23.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

medallion_etl-0.1.7-py3-none-any.whl (25.0 kB view details)

Uploaded Python 3

File details

Details for the file medallion_etl-0.1.7.tar.gz.

File metadata

  • Download URL: medallion_etl-0.1.7.tar.gz
  • Upload date:
  • Size: 23.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.7.6

File hashes

Hashes for medallion_etl-0.1.7.tar.gz
Algorithm Hash digest
SHA256 bcc05e2090b87bee5b7c18b5bd07db1053c8d63792ec1c512e55114dc1567e84
MD5 6e35d6b83a2e0296f48e3b73d09cafff
BLAKE2b-256 656c59a7c5c700e7a2a3fd11c9fa022d807c0af67bd6ccff2fcdffa4f9808205

See more details on using hashes here.

File details

Details for the file medallion_etl-0.1.7-py3-none-any.whl.

File metadata

File hashes

Hashes for medallion_etl-0.1.7-py3-none-any.whl
Algorithm Hash digest
SHA256 e1669a8ec7733e1da895fb8bd1808b60bde4697dff2805ffbff7df6658cd649a
MD5 16690f4acbe272d0f227e1b1dad2f516
BLAKE2b-256 bf3e0b57a89b2caa4e09d2c10c280ca973ff1100ed22719c50bbfcf983fdb6c1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page