Skip to main content

Lakehouse automation framework for DataOps, DevOps, QA, and Medallion architecture on Databricks

Project description

DKOps

Framework de gobierno de tablas Delta y orquestación de pipelines Spark para entornos híbridos local ↔ Databricks.

Python PySpark Delta Lake License: MIT PRs Welcome

El mismo código corre en tu PC y en Databricks — sin cambios.


¿Qué es DKOps?

DKOps es un framework Python que profesionaliza la construcción de pipelines de datos sobre Spark + Delta Lake. Resuelve los problemas que aparecen cuando un equipo crece más allá de "scripts sueltos":

  • Contratos de tabla — el schema, los permisos, el particionado y los metadatos viven en JSON versionado, no enterrados en código.
  • TableWriter — API unificada: overwrite, append, upsert, overwrite_partition, delete. Un solo objeto, sin ruido.
  • merge_schema — declara "merge_schema": true en el contrato y Delta añade columnas nuevas automáticamente al hacer append, sin recrear la tabla.
  • Enmascaramiento de columnas — declara "mask": "security.fn" en una columna y el framework aplica ALTER TABLE … SET MASK post-escritura en Unity Catalog.
  • Migraciones segurasSafeMigrator compara contrato vs estado real y genera un plan de cambios sin pérdida de datos.
  • Runtime-agnóstico — el mismo pipeline corre en local PC (Spark + Delta) y en Databricks (Connect o cluster nativo). El framework detecta el entorno y se adapta.
  • Configuración por entorno — placeholders {catalog.bronze}, {path.silver} se resuelven contra dev/prod desde un único config.json.
from DKOps.launcher import Launcher
from DKOps.table_governance import load_contract, TableWriter

launcher = Launcher("config/config.json")
contract = load_contract("tables/fact_ventas.json")

TableWriter(contract).overwrite(df)                         # full load
TableWriter(contract).upsert(df_nuevo, keys=["venta_id"])   # SCD1
TableWriter(contract).append(df_evolucionado)               # schema evolution automática

Tabla de contenidos


🏗️ Arquitectura

DKOps/
├── launcher.py                  # punto de entrada — detecta runtime y crea SparkSession
├── environment_config.py        # resuelve catalogs/paths/secrets según workspace activo
├── logger_config.py             # logging estructurado (loguru) con contexto
└── table_governance/
    ├── contracts/
    │   ├── loader.py            # carga JSON → TableContract tipado (merge_schema, mask)
    │   └── validator.py         # valida DataFrame contra contrato (tipos, nulls)
    ├── writers/
    │   ├── table_writer.py      # ★ fachada pública: overwrite/append/upsert/delete
    │   ├── base_writer.py       # bridge local PC ↔ Databricks + merge_schema + masks
    │   ├── create_writer.py     # CREATE OR REPLACE TABLE
    │   ├── append_writer.py     # INSERT INTO
    │   ├── upsert_writer.py     # MERGE INTO (SCD1)
    │   ├── partition_writer.py  # overwrite de partición específica
    │   └── delete_writer.py     # DELETE WHERE
    └── migrations/
        └── safe_migrator.py     # compara contrato vs tabla real → plan de migración

Filosofía: pasar spark y env a cada componente es ruido. El Launcher se auto-registra como singleton del proceso; los writers, loaders y migrator obtienen lo que necesitan vía Launcher.current(). La API queda mínima: TableWriter(contract).overwrite(df).


📦 Instalación

Requisitos

  • Python 3.10+ (3.11 recomendado)
  • Java 11 o 17 (requerido por Spark)
  • Git

DKOps se distribuye con pyproject.toml. Recomendamos dos virtual environments separados — uno para correr localmente con Spark, otro para Databricks Connect — porque tienen dependencias incompatibles entre sí (PySpark vanilla vs databricks-connect).

Entorno local PC (.venv-local)

Para desarrollo y tests en tu máquina con Spark + Delta Lake configurados desde cero.

# 1. Clonar el repo
git clone https://github.com/brrsanchezfi/BigDataFrameworkSpark.git
cd <NOMBRE_REPO>

# 2. Crear el venv local
python3 -m venv .venv-local
source .venv-local/bin/activate          # Linux/Mac/WSL
# .venv-local\Scripts\activate           # Windows PowerShell

# 3. Instalar el framework + dependencias locales
pip install --upgrade pip
pip install -e ".[local]"

Esto instala:

  • pyspark 3.5.x (con Delta Lake configurado vía JARs en runtime)
  • loguru para logging estructurado
  • pytest para tests
  • DKOps en modo editable (-e) — los cambios al código se reflejan al instante

Verificación:

python -c "from DKOps.launcher import Launcher; print('OK')"

Entorno Databricks Connect (.venv-databricks)

Para conectarte desde tu máquina a un cluster Databricks remoto. No mezcles este venv con el local — las versiones de PySpark son incompatibles.

# 1. Crear el venv (asegúrate de NO tener el local activo)
deactivate 2>/dev/null
python3 -m venv .venv-databricks
source .venv-databricks/bin/activate

# 2. Instalar el framework + extras de Databricks
pip install --upgrade pip
pip install -e ".[databricks]"

Esto instala:

  • databricks-connect (versión que coincida con el runtime de tu cluster)
  • databricks-sdk
  • loguru, pytest
  • DKOps en modo editable

Configurar credenciales (PAT o OAuth):

# Opción A: Personal Access Token (rápido para desarrollo)
export DATABRICKS_HOST="https://<workspace>.azuredatabricks.net"
export DATABRICKS_TOKEN="<tu-pat>"

# Opción B: OAuth via Databricks CLI (recomendado para uso prolongado)
databricks auth login

Luego edita tu config.json:

{
  "EXECUTION_ENVIRONMENT": "databricks",
  "CLUSTER_ID": "<tu-cluster-id>"
}

Verificación:

python -c "from databricks.connect import DatabricksSession; \
           DatabricksSession.builder.getOrCreate().sql('SELECT 1').show()"

Cuál venv activar

Estás haciendo... Activa
Desarrollo del framework, tests unitarios, demos en local .venv-local
Ejecutar contra un cluster Databricks remoto desde la PC .venv-databricks
Notebook dentro del workspace Databricks Ninguno — usa el del cluster

⚙️ Configuración

DKOps lee un config.json que define:

  • El runtime (local o databricks).
  • Los environments del proyecto (dev, prod) con sus catálogos, paths y secrets scopes.
  • Configuración de logging.

Estructura mínima:

{
  "EXECUTION_ENVIRONMENT": "local",
  "SPARK_APP_NAME": "miPipeline",
  "SPARK_WAREHOUSE_DIR": "/tmp/spark-warehouse",
  "DELTA_VERSION": "3.2.0",

  "environments": {
    "<workspace_id>": {
      "env": "dev",
      "env_short": "d",
      "catalogs": {
        "bronze": "bronze_dev",
        "silver": "silver_dev",
        "gold":   "gold_dev"
      },
      "paths": {
        "bronze": "abfss://bronze@<storage>.dfs.core.windows.net",
        "silver": "abfss://silver@<storage>.dfs.core.windows.net"
      }
    }
  }
}

DKOps busca el config en este orden:

  1. Argumento explícito: Launcher("ruta/config.json")
  2. Variable de entorno: PATH_CONFIG_LAUNCHER=ruta/config.json

🚀 Quickstart

from DKOps.launcher import Launcher
from DKOps.table_governance import load_contract, TableWriter

# 1. Inicializa el Launcher (auto-detecta runtime, crea SparkSession)
launcher = Launcher("config/config.json")

# 2. Carga un contrato JSON — los placeholders {catalog.silver} se resuelven solos
contract = load_contract("tables/fact_ventas.json")

# 3. Construye tu DataFrame (de un source, una transformación, lo que sea)
df = launcher.spark.read.parquet("source/ventas.parquet")

# 4. Escribe — full load inicial
TableWriter(contract).overwrite(df)

# 5. Día siguiente — solo añadir lo nuevo
TableWriter(contract).upsert(df_delta, keys=["venta_id", "fecha"])

# 6. Schema evolution — el contrato tiene merge_schema: true
df_nuevo_campo = df_delta.withColumn("canal", lit("web"))
TableWriter(contract).append(df_nuevo_campo)   # Delta añade la columna automáticamente

Para ejemplos completos con varias capas y tests, ver la carpeta demos/.


📚 Demos

Cada demo es independiente y autocontenido, pensado como referencia de uso.

Demo Tema Qué demuestra
demos/demo_1 Contratos y writers gobernados Bootstrap, append, upsert, partition overwrite, delete y migración con SafeMigrator. Dominio: aeronáutica.
demos/demo_2 Transformaciones testeables y Data Quality Pipeline bronze → silver → gold con funciones puras de transformación, tests pytest y motor de DQ declarativo. Dominio: manufactura de aseo.
demos/demo_3 merge_schema y enmascaramiento Schema evolution con merge_schema: true y column masking con mask en contratos. Dominio: e-commerce.

Para correr un demo:

source .venv-local/bin/activate
cd demos/demo_1
python pipeline_aeronautica.py

🔨 Build

DKOps usa pyproject.toml (PEP 517/621). Para construir el wheel distribuible:

source .venv-local/bin/activate
pip install --upgrade build
python -m build

Esto genera en dist/:

  • dkops-X.Y.Z-py3-none-any.whl — wheel para instalar en Databricks o cualquier entorno
  • dkops-X.Y.Z.tar.gz — sdist

Subir a Databricks como librería del cluster:

databricks libraries install --cluster-id <id> --whl dist/dkops-X.Y.Z-py3-none-any.whl

Versionado: DKOps sigue Semantic Versioning. La versión vive en pyproject.toml.


📊 Estado del proyecto

Componente Estado
Launcher (multi-runtime) ✅ Estable
Contratos + ContractLoader ✅ Estable
TableWriter (fachada unificada) ✅ Estable
Writers individuales (Create, Append, Upsert, Partition, Delete) ✅ Estables
merge_schema (schema evolution) ✅ Disponible
Enmascaramiento de columnas (mask) ✅ Disponible (Databricks / Unity Catalog)
SafeMigrator (esquema seguro) ✅ Estable
Demos (1, 2, 3) ✅ Disponibles
Tests del framework (36 tests) ✅ Disponibles
Documentación de API 🚧 En desarrollo
Soporte SCD2 📋 Backlog
Módulo de Data Quality nativo 📋 Backlog (existe prototipo en demo_2)

🤝 Contribuir

¿Te interesa lo que estamos construyendo? Las contribuciones son bienvenidas y muy apreciadas.

Issues abiertos PRs abiertos Last commit

Áreas donde nos vendría especialmente bien ayuda:

  • 🧪 Más tests — la suite cubre contratos, writers y migrator; faltan tests de integración y cobertura de casos extremos.
  • 📖 Documentación — guías de uso, referencia de API, casos reales.
  • 🎨 Más demos — dominios distintos, patrones distintos.
  • 🐛 Reportar bugs — abre un issue con un caso reproducible.
  • 💡 Discutir ideas — el módulo de Data Quality, soporte SCD2, integración con Great Expectations son temas abiertos.

Cómo contribuir

  1. Haz fork del repo y crea una rama: git checkout -b feature/mi-mejora
  2. Activa el venv local: source .venv-local/bin/activate
  3. Haz tus cambios siguiendo el estilo del código existente.
  4. Si añades funcionalidad, añade un test o un demo que la demuestre.
  5. Verifica que los demos siguen pasando: cd demos/demo_2 && pytest
  6. Abre un Pull Request describiendo el cambio y por qué es útil.

¿Primera vez contribuyendo a un proyecto open source? Consulta esta guía de GitHub.


📄 Licencia

DKOps se distribuye bajo licencia MIT. Ver LICENSE para los términos completos.


Hecho con ☕ y ❤️ por el equipo de Data Engineering.

Si DKOps te resulta útil, considera darle una ⭐ al repo — ayuda a que otros lo encuentren.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dkops-0.2.0.tar.gz (58.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dkops-0.2.0-py3-none-any.whl (52.8 kB view details)

Uploaded Python 3

File details

Details for the file dkops-0.2.0.tar.gz.

File metadata

  • Download URL: dkops-0.2.0.tar.gz
  • Upload date:
  • Size: 58.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for dkops-0.2.0.tar.gz
Algorithm Hash digest
SHA256 0e057ed938152ba3e9c113514bc777c93dc5f052cc4300d9f7034aed33987585
MD5 9e11aa285f1e558186d4a4257b42a735
BLAKE2b-256 005c60584561ea984ab527b7d1642aa4d8d25fd1444c4f513d3fd75cf6f4bce0

See more details on using hashes here.

Provenance

The following attestation bundles were made for dkops-0.2.0.tar.gz:

Publisher: publish.yml on brrsanchezfi/DKOps

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dkops-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: dkops-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 52.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for dkops-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6838a9a0546cf45e45eed36f132c80609ffeb8d8d6e89ac30144275b17069bda
MD5 922bb189c17a9e6560711512d22d97ac
BLAKE2b-256 e9bcb8965af37812a21ecd5d8cd81e1e23cc8ea4948b14dd579a7002e4757674

See more details on using hashes here.

Provenance

The following attestation bundles were made for dkops-0.2.0-py3-none-any.whl:

Publisher: publish.yml on brrsanchezfi/DKOps

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page