Skip to main content

Lakehouse automation framework for DataOps, DevOps, QA, and Medallion architecture on Databricks

Project description

DKOps

Framework de gobierno de tablas Delta y orquestación de pipelines Spark para entornos híbridos local ↔ Databricks.

Python PySpark Delta Lake License: MIT PRs Welcome

El mismo código corre en tu PC y en Databricks — sin cambios.


¿Qué es DKOps?

DKOps es un framework Python que profesionaliza la construcción de pipelines de datos sobre Spark + Delta Lake. Resuelve los problemas que aparecen cuando un equipo crece más allá de "scripts sueltos":

  • Contratos de tabla — el schema, los permisos, el particionado y los metadatos viven en JSON versionado, no enterrados en código.
  • TableWriter — API unificada: overwrite, append, upsert, overwrite_partition, delete. Un solo objeto, sin ruido.
  • merge_schema — declara "merge_schema": true en el contrato y Delta añade columnas nuevas automáticamente al hacer append, sin recrear la tabla.
  • Enmascaramiento de columnas — declara "mask": "security.fn" en una columna y el framework aplica ALTER TABLE … SET MASK post-escritura en Unity Catalog.
  • Migraciones segurasSafeMigrator compara contrato vs estado real y genera un plan de cambios sin pérdida de datos.
  • Runtime-agnóstico — el mismo pipeline corre en local PC (Spark + Delta) y en Databricks (Connect o cluster nativo). El framework detecta el entorno y se adapta.
  • Configuración por entorno — placeholders {catalog.bronze}, {path.silver} se resuelven contra dev/prod desde un único config.json.
from DKOps.launcher import Launcher
from DKOps.table_governance import load_contract, TableWriter

launcher = Launcher("config/config.json")
contract = load_contract("tables/fact_ventas.json")

TableWriter(contract).overwrite(df)                         # full load
TableWriter(contract).upsert(df_nuevo, keys=["venta_id"])   # SCD1
TableWriter(contract).append(df_evolucionado)               # schema evolution automática

Tabla de contenidos


🏗️ Arquitectura

DKOps/
├── launcher.py                  # punto de entrada — detecta runtime y crea SparkSession
├── environment_config.py        # resuelve catalogs/paths/secrets según workspace activo
├── logger_config.py             # logging estructurado (loguru) con contexto
└── table_governance/
    ├── contracts/
    │   ├── loader.py            # carga JSON → TableContract tipado (merge_schema, mask)
    │   └── validator.py         # valida DataFrame contra contrato (tipos, nulls)
    ├── writers/
    │   ├── table_writer.py      # ★ fachada pública: overwrite/append/upsert/delete
    │   ├── base_writer.py       # bridge local PC ↔ Databricks + merge_schema + masks
    │   ├── create_writer.py     # CREATE OR REPLACE TABLE
    │   ├── append_writer.py     # INSERT INTO
    │   ├── upsert_writer.py     # MERGE INTO (SCD1)
    │   ├── partition_writer.py  # overwrite de partición específica
    │   └── delete_writer.py     # DELETE WHERE
    └── migrations/
        └── safe_migrator.py     # compara contrato vs tabla real → plan de migración

Filosofía: pasar spark y env a cada componente es ruido. El Launcher se auto-registra como singleton del proceso; los writers, loaders y migrator obtienen lo que necesitan vía Launcher.current(). La API queda mínima: TableWriter(contract).overwrite(df).


📦 Instalación

Requisitos

  • Python 3.10+ (3.11 recomendado)
  • Java 11 o 17 (requerido por Spark)
  • Git

DKOps se distribuye con pyproject.toml. Recomendamos dos virtual environments separados — uno para correr localmente con Spark, otro para Databricks Connect — porque tienen dependencias incompatibles entre sí (PySpark vanilla vs databricks-connect).

Entorno local PC (.venv-local)

Para desarrollo y tests en tu máquina con Spark + Delta Lake configurados desde cero.

# 1. Clonar el repo
git clone https://github.com/brrsanchezfi/BigDataFrameworkSpark.git
cd <NOMBRE_REPO>

# 2. Crear el venv local
python3 -m venv .venv-local
source .venv-local/bin/activate          # Linux/Mac/WSL
# .venv-local\Scripts\activate           # Windows PowerShell

# 3. Instalar el framework + dependencias locales
pip install --upgrade pip
pip install -e ".[local]"

Esto instala:

  • pyspark 3.5.x (con Delta Lake configurado vía JARs en runtime)
  • loguru para logging estructurado
  • pytest para tests
  • DKOps en modo editable (-e) — los cambios al código se reflejan al instante

Verificación:

python -c "from DKOps.launcher import Launcher; print('OK')"

Entorno Databricks Connect (.venv-databricks)

Para conectarte desde tu máquina a un cluster Databricks remoto. No mezcles este venv con el local — las versiones de PySpark son incompatibles.

# 1. Crear el venv (asegúrate de NO tener el local activo)
deactivate 2>/dev/null
python3 -m venv .venv-databricks
source .venv-databricks/bin/activate

# 2. Instalar el framework + extras de Databricks
pip install --upgrade pip
pip install -e ".[databricks]"

Esto instala:

  • databricks-connect (versión que coincida con el runtime de tu cluster)
  • databricks-sdk
  • loguru, pytest
  • DKOps en modo editable

Configurar credenciales (PAT o OAuth):

# Opción A: Personal Access Token (rápido para desarrollo)
export DATABRICKS_HOST="https://<workspace>.azuredatabricks.net"
export DATABRICKS_TOKEN="<tu-pat>"

# Opción B: OAuth via Databricks CLI (recomendado para uso prolongado)
databricks auth login

Luego edita tu config.json:

{
  "EXECUTION_ENVIRONMENT": "databricks",
  "CLUSTER_ID": "<tu-cluster-id>"
}

Verificación:

python -c "from databricks.connect import DatabricksSession; \
           DatabricksSession.builder.getOrCreate().sql('SELECT 1').show()"

Cuál venv activar

Estás haciendo... Activa
Desarrollo del framework, tests unitarios, demos en local .venv-local
Ejecutar contra un cluster Databricks remoto desde la PC .venv-databricks
Notebook dentro del workspace Databricks Ninguno — usa el del cluster

⚙️ Configuración

DKOps lee un config.json que define:

  • El runtime (local o databricks).
  • Los environments del proyecto (dev, prod) con sus catálogos, paths y secrets scopes.
  • Configuración de logging.

Estructura mínima:

{
  "EXECUTION_ENVIRONMENT": "local",
  "SPARK_APP_NAME": "miPipeline",
  "SPARK_WAREHOUSE_DIR": "/tmp/spark-warehouse",
  "DELTA_VERSION": "3.2.0",

  "environments": {
    "<workspace_id>": {
      "env": "dev",
      "env_short": "d",
      "catalogs": {
        "bronze": "bronze_dev",
        "silver": "silver_dev",
        "gold":   "gold_dev"
      },
      "paths": {
        "bronze": "abfss://bronze@<storage>.dfs.core.windows.net",
        "silver": "abfss://silver@<storage>.dfs.core.windows.net"
      }
    }
  }
}

DKOps busca el config en este orden:

  1. Argumento explícito: Launcher("ruta/config.json")
  2. Variable de entorno: PATH_CONFIG_LAUNCHER=ruta/config.json

🚀 Quickstart

from DKOps.launcher import Launcher
from DKOps.table_governance import load_contract, TableWriter

# 1. Inicializa el Launcher (auto-detecta runtime, crea SparkSession)
launcher = Launcher("config/config.json")

# 2. Carga un contrato JSON — los placeholders {catalog.silver} se resuelven solos
contract = load_contract("tables/fact_ventas.json")

# 3. Construye tu DataFrame (de un source, una transformación, lo que sea)
df = launcher.spark.read.parquet("source/ventas.parquet")

# 4. Escribe — full load inicial
TableWriter(contract).overwrite(df)

# 5. Día siguiente — solo añadir lo nuevo
TableWriter(contract).upsert(df_delta, keys=["venta_id", "fecha"])

# 6. Schema evolution — el contrato tiene merge_schema: true
df_nuevo_campo = df_delta.withColumn("canal", lit("web"))
TableWriter(contract).append(df_nuevo_campo)   # Delta añade la columna automáticamente

Para ejemplos completos con varias capas y tests, ver la carpeta demos/.


📚 Demos

Cada demo es independiente y autocontenido, pensado como referencia de uso.

Demo Tema Qué demuestra
demos/demo_1 Contratos y writers gobernados Bootstrap, append, upsert, partition overwrite, delete y migración con SafeMigrator. Dominio: aeronáutica.
demos/demo_2 Transformaciones testeables y Data Quality Pipeline bronze → silver → gold con funciones puras de transformación, tests pytest y motor de DQ declarativo. Dominio: manufactura de aseo.
demos/demo_3 merge_schema y enmascaramiento Schema evolution con merge_schema: true y column masking con mask en contratos. Dominio: e-commerce.

Para correr un demo:

source .venv-local/bin/activate
cd demos/demo_1
python pipeline_aeronautica.py

🔨 Build

DKOps usa pyproject.toml (PEP 517/621). Para construir el wheel distribuible:

source .venv-local/bin/activate
pip install --upgrade build
python -m build

Esto genera en dist/:

  • dkops-X.Y.Z-py3-none-any.whl — wheel para instalar en Databricks o cualquier entorno
  • dkops-X.Y.Z.tar.gz — sdist

Subir a Databricks como librería del cluster:

databricks libraries install --cluster-id <id> --whl dist/dkops-X.Y.Z-py3-none-any.whl

Versionado: DKOps sigue Semantic Versioning. La versión vive en pyproject.toml.


📊 Estado del proyecto

Componente Estado
Launcher (multi-runtime) ✅ Estable
Contratos + ContractLoader ✅ Estable
TableWriter (fachada unificada) ✅ Estable
Writers individuales (Create, Append, Upsert, Partition, Delete) ✅ Estables
merge_schema (schema evolution) ✅ Disponible
Enmascaramiento de columnas (mask) ✅ Disponible (Databricks / Unity Catalog)
SafeMigrator (esquema seguro) ✅ Estable
Demos (1, 2, 3) ✅ Disponibles
Tests del framework (36 tests) ✅ Disponibles
Documentación de API 🚧 En desarrollo
Soporte SCD2 📋 Backlog
Módulo de Data Quality nativo 📋 Backlog (existe prototipo en demo_2)

🤝 Contribuir

¿Te interesa lo que estamos construyendo? Las contribuciones son bienvenidas y muy apreciadas.

Issues abiertos PRs abiertos Last commit

Áreas donde nos vendría especialmente bien ayuda:

  • 🧪 Más tests — la suite cubre contratos, writers y migrator; faltan tests de integración y cobertura de casos extremos.
  • 📖 Documentación — guías de uso, referencia de API, casos reales.
  • 🎨 Más demos — dominios distintos, patrones distintos.
  • 🐛 Reportar bugs — abre un issue con un caso reproducible.
  • 💡 Discutir ideas — el módulo de Data Quality, soporte SCD2, integración con Great Expectations son temas abiertos.

Cómo contribuir

  1. Haz fork del repo y crea una rama: git checkout -b feature/mi-mejora
  2. Activa el venv local: source .venv-local/bin/activate
  3. Haz tus cambios siguiendo el estilo del código existente.
  4. Si añades funcionalidad, añade un test o un demo que la demuestre.
  5. Verifica que los demos siguen pasando: cd demos/demo_2 && pytest
  6. Abre un Pull Request describiendo el cambio y por qué es útil.

¿Primera vez contribuyendo a un proyecto open source? Consulta esta guía de GitHub.


📄 Licencia

DKOps se distribuye bajo licencia MIT. Ver LICENSE para los términos completos.


Hecho con ☕ y ❤️ por el equipo de Data Engineering.

Si DKOps te resulta útil, considera darle una ⭐ al repo — ayuda a que otros lo encuentren.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dkops-0.2.3.tar.gz (59.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dkops-0.2.3-py3-none-any.whl (54.1 kB view details)

Uploaded Python 3

File details

Details for the file dkops-0.2.3.tar.gz.

File metadata

  • Download URL: dkops-0.2.3.tar.gz
  • Upload date:
  • Size: 59.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for dkops-0.2.3.tar.gz
Algorithm Hash digest
SHA256 bd96860523350f1ab70da3a1a316d4f3e1c9effc748df4c48d2ac21f666fe727
MD5 01f4f04fe0bf8a2b4bc55cb0a82ccb2a
BLAKE2b-256 153407971abfc860a5a03667449ceb92356a4cd2a58a839be754375409e063e7

See more details on using hashes here.

Provenance

The following attestation bundles were made for dkops-0.2.3.tar.gz:

Publisher: publish.yml on brrsanchezfi/DKOps

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dkops-0.2.3-py3-none-any.whl.

File metadata

  • Download URL: dkops-0.2.3-py3-none-any.whl
  • Upload date:
  • Size: 54.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for dkops-0.2.3-py3-none-any.whl
Algorithm Hash digest
SHA256 bdf4e4367890e2df21b4c0ce38ac67fa262f89c62c56e7bacd1dbaa5458bed90
MD5 c7296fa9340aa1f9d0a8d9e6a55a96d4
BLAKE2b-256 b623856a5a26858d68a204f40f9f63aeeba6ae213a36aae5da77ad8526806a89

See more details on using hashes here.

Provenance

The following attestation bundles were made for dkops-0.2.3-py3-none-any.whl:

Publisher: publish.yml on brrsanchezfi/DKOps

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page