Lakehouse automation framework for DataOps, DevOps, QA, and Medallion architecture on Databricks
Project description
DKOps
Framework de gobierno de tablas Delta y orquestación de pipelines Spark para entornos híbridos local ↔ Databricks.
El mismo código corre en tu PC y en Databricks — sin cambios.
¿Qué es DKOps?
DKOps es un framework Python que profesionaliza la construcción de pipelines de datos sobre Spark + Delta Lake. Resuelve los problemas que aparecen cuando un equipo crece más allá de "scripts sueltos":
- Contratos de tabla — el schema, los permisos, el particionado y los metadatos viven en JSON versionado, no enterrados en código.
TableWriter— API unificada:overwrite,append,upsert,overwrite_partition,delete. Un solo objeto, sin ruido.- merge_schema — declara
"merge_schema": trueen el contrato y Delta añade columnas nuevas automáticamente al hacer append, sin recrear la tabla. - Enmascaramiento de columnas — declara
"mask": "security.fn"en una columna y el framework aplicaALTER TABLE … SET MASKpost-escritura en Unity Catalog. - Migraciones seguras —
SafeMigratorcompara contrato vs estado real y genera un plan de cambios sin pérdida de datos. - Runtime-agnóstico — el mismo pipeline corre en local PC (Spark + Delta) y en Databricks (Connect o cluster nativo). El framework detecta el entorno y se adapta.
- Configuración por entorno — placeholders
{catalog.bronze},{path.silver}se resuelven contradev/proddesde un únicoconfig.json.
from DKOps.launcher import Launcher
from DKOps.table_governance import load_contract, TableWriter
launcher = Launcher("config/config.json")
contract = load_contract("tables/fact_ventas.json")
TableWriter(contract).overwrite(df) # full load
TableWriter(contract).upsert(df_nuevo, keys=["venta_id"]) # SCD1
TableWriter(contract).append(df_evolucionado) # schema evolution automática
Tabla de contenidos
- Arquitectura
- Instalación
- Configuración
- Quickstart
- Demos
- Build
- Estado del proyecto
- Contribuir
- Licencia
🏗️ Arquitectura
DKOps/
├── launcher.py # punto de entrada — detecta runtime y crea SparkSession
├── environment_config.py # resuelve catalogs/paths/secrets según workspace activo
├── logger_config.py # logging estructurado (loguru) con contexto
└── table_governance/
├── contracts/
│ ├── loader.py # carga JSON → TableContract tipado (merge_schema, mask)
│ └── validator.py # valida DataFrame contra contrato (tipos, nulls)
├── writers/
│ ├── table_writer.py # ★ fachada pública: overwrite/append/upsert/delete
│ ├── base_writer.py # bridge local PC ↔ Databricks + merge_schema + masks
│ ├── create_writer.py # CREATE OR REPLACE TABLE
│ ├── append_writer.py # INSERT INTO
│ ├── upsert_writer.py # MERGE INTO (SCD1)
│ ├── partition_writer.py # overwrite de partición específica
│ └── delete_writer.py # DELETE WHERE
└── migrations/
└── safe_migrator.py # compara contrato vs tabla real → plan de migración
Filosofía: pasar spark y env a cada componente es ruido. El Launcher se auto-registra como singleton del proceso; los writers, loaders y migrator obtienen lo que necesitan vía Launcher.current(). La API queda mínima: TableWriter(contract).overwrite(df).
📦 Instalación
Requisitos
- Python 3.10+ (3.11 recomendado)
- Java 11 o 17 (requerido por Spark)
- Git
DKOps se distribuye con pyproject.toml. Recomendamos dos virtual environments separados — uno para correr localmente con Spark, otro para Databricks Connect — porque tienen dependencias incompatibles entre sí (PySpark vanilla vs databricks-connect).
Entorno local PC (.venv-local)
Para desarrollo y tests en tu máquina con Spark + Delta Lake configurados desde cero.
# 1. Clonar el repo
git clone https://github.com/brrsanchezfi/BigDataFrameworkSpark.git
cd <NOMBRE_REPO>
# 2. Crear el venv local
python3 -m venv .venv-local
source .venv-local/bin/activate # Linux/Mac/WSL
# .venv-local\Scripts\activate # Windows PowerShell
# 3. Instalar el framework + dependencias locales
pip install --upgrade pip
pip install -e ".[local]"
Esto instala:
pyspark3.5.x (con Delta Lake configurado vía JARs en runtime)logurupara logging estructuradopytestpara tests- DKOps en modo editable (
-e) — los cambios al código se reflejan al instante
Verificación:
python -c "from DKOps.launcher import Launcher; print('OK')"
Entorno Databricks Connect (.venv-databricks)
Para conectarte desde tu máquina a un cluster Databricks remoto. No mezcles este venv con el local — las versiones de PySpark son incompatibles.
# 1. Crear el venv (asegúrate de NO tener el local activo)
deactivate 2>/dev/null
python3 -m venv .venv-databricks
source .venv-databricks/bin/activate
# 2. Instalar el framework + extras de Databricks
pip install --upgrade pip
pip install -e ".[databricks]"
Esto instala:
databricks-connect(versión que coincida con el runtime de tu cluster)databricks-sdkloguru,pytest- DKOps en modo editable
Configurar credenciales (PAT o OAuth):
# Opción A: Personal Access Token (rápido para desarrollo)
export DATABRICKS_HOST="https://<workspace>.azuredatabricks.net"
export DATABRICKS_TOKEN="<tu-pat>"
# Opción B: OAuth via Databricks CLI (recomendado para uso prolongado)
databricks auth login
Luego edita tu config.json:
{
"EXECUTION_ENVIRONMENT": "databricks",
"CLUSTER_ID": "<tu-cluster-id>"
}
Verificación:
python -c "from databricks.connect import DatabricksSession; \
DatabricksSession.builder.getOrCreate().sql('SELECT 1').show()"
Cuál venv activar
| Estás haciendo... | Activa |
|---|---|
| Desarrollo del framework, tests unitarios, demos en local | .venv-local |
| Ejecutar contra un cluster Databricks remoto desde la PC | .venv-databricks |
| Notebook dentro del workspace Databricks | Ninguno — usa el del cluster |
⚙️ Configuración
DKOps lee un config.json que define:
- El runtime (
localodatabricks). - Los environments del proyecto (
dev,prod) con sus catálogos, paths y secrets scopes. - Configuración de logging.
Estructura mínima:
{
"EXECUTION_ENVIRONMENT": "local",
"SPARK_APP_NAME": "miPipeline",
"SPARK_WAREHOUSE_DIR": "/tmp/spark-warehouse",
"DELTA_VERSION": "3.2.0",
"environments": {
"<workspace_id>": {
"env": "dev",
"env_short": "d",
"catalogs": {
"bronze": "bronze_dev",
"silver": "silver_dev",
"gold": "gold_dev"
},
"paths": {
"bronze": "abfss://bronze@<storage>.dfs.core.windows.net",
"silver": "abfss://silver@<storage>.dfs.core.windows.net"
}
}
}
}
DKOps busca el config en este orden:
- Argumento explícito:
Launcher("ruta/config.json") - Variable de entorno:
PATH_CONFIG_LAUNCHER=ruta/config.json
🚀 Quickstart
from DKOps.launcher import Launcher
from DKOps.table_governance import load_contract, TableWriter
# 1. Inicializa el Launcher (auto-detecta runtime, crea SparkSession)
launcher = Launcher("config/config.json")
# 2. Carga un contrato JSON — los placeholders {catalog.silver} se resuelven solos
contract = load_contract("tables/fact_ventas.json")
# 3. Construye tu DataFrame (de un source, una transformación, lo que sea)
df = launcher.spark.read.parquet("source/ventas.parquet")
# 4. Escribe — full load inicial
TableWriter(contract).overwrite(df)
# 5. Día siguiente — solo añadir lo nuevo
TableWriter(contract).upsert(df_delta, keys=["venta_id", "fecha"])
# 6. Schema evolution — el contrato tiene merge_schema: true
df_nuevo_campo = df_delta.withColumn("canal", lit("web"))
TableWriter(contract).append(df_nuevo_campo) # Delta añade la columna automáticamente
Para ejemplos completos con varias capas y tests, ver la carpeta demos/.
📚 Demos
Cada demo es independiente y autocontenido, pensado como referencia de uso.
| Demo | Tema | Qué demuestra |
|---|---|---|
demos/demo_1 |
Contratos y writers gobernados | Bootstrap, append, upsert, partition overwrite, delete y migración con SafeMigrator. Dominio: aeronáutica. |
demos/demo_2 |
Transformaciones testeables y Data Quality | Pipeline bronze → silver → gold con funciones puras de transformación, tests pytest y motor de DQ declarativo. Dominio: manufactura de aseo. |
demos/demo_3 |
merge_schema y enmascaramiento | Schema evolution con merge_schema: true y column masking con mask en contratos. Dominio: e-commerce. |
Para correr un demo:
source .venv-local/bin/activate
cd demos/demo_1
python pipeline_aeronautica.py
🔨 Build
DKOps usa pyproject.toml (PEP 517/621). Para construir el wheel distribuible:
source .venv-local/bin/activate
pip install --upgrade build
python -m build
Esto genera en dist/:
dkops-X.Y.Z-py3-none-any.whl— wheel para instalar en Databricks o cualquier entornodkops-X.Y.Z.tar.gz— sdist
Subir a Databricks como librería del cluster:
databricks libraries install --cluster-id <id> --whl dist/dkops-X.Y.Z-py3-none-any.whl
Versionado: DKOps sigue Semantic Versioning. La versión vive en pyproject.toml.
📊 Estado del proyecto
| Componente | Estado |
|---|---|
Launcher (multi-runtime) |
✅ Estable |
Contratos + ContractLoader |
✅ Estable |
TableWriter (fachada unificada) |
✅ Estable |
Writers individuales (Create, Append, Upsert, Partition, Delete) |
✅ Estables |
merge_schema (schema evolution) |
✅ Disponible |
Enmascaramiento de columnas (mask) |
✅ Disponible (Databricks / Unity Catalog) |
SafeMigrator (esquema seguro) |
✅ Estable |
| Demos (1, 2, 3) | ✅ Disponibles |
| Tests del framework (36 tests) | ✅ Disponibles |
| Documentación de API | 🚧 En desarrollo |
| Soporte SCD2 | 📋 Backlog |
| Módulo de Data Quality nativo | 📋 Backlog (existe prototipo en demo_2) |
🤝 Contribuir
¿Te interesa lo que estamos construyendo? Las contribuciones son bienvenidas y muy apreciadas.
Áreas donde nos vendría especialmente bien ayuda:
- 🧪 Más tests — la suite cubre contratos, writers y migrator; faltan tests de integración y cobertura de casos extremos.
- 📖 Documentación — guías de uso, referencia de API, casos reales.
- 🎨 Más demos — dominios distintos, patrones distintos.
- 🐛 Reportar bugs — abre un issue con un caso reproducible.
- 💡 Discutir ideas — el módulo de Data Quality, soporte SCD2, integración con Great Expectations son temas abiertos.
Cómo contribuir
- Haz fork del repo y crea una rama:
git checkout -b feature/mi-mejora - Activa el venv local:
source .venv-local/bin/activate - Haz tus cambios siguiendo el estilo del código existente.
- Si añades funcionalidad, añade un test o un demo que la demuestre.
- Verifica que los demos siguen pasando:
cd demos/demo_2 && pytest - Abre un Pull Request describiendo el cambio y por qué es útil.
¿Primera vez contribuyendo a un proyecto open source? Consulta esta guía de GitHub.
📄 Licencia
DKOps se distribuye bajo licencia MIT. Ver LICENSE para los términos completos.
Hecho con ☕ y ❤️ por el equipo de Data Engineering.
Si DKOps te resulta útil, considera darle una ⭐ al repo — ayuda a que otros lo encuentren.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dkops-0.2.4.tar.gz.
File metadata
- Download URL: dkops-0.2.4.tar.gz
- Upload date:
- Size: 60.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5ad3d12934989eaf9f68093081ac2f6160999c5584269513747301c3433370dd
|
|
| MD5 |
08e62b79fff019ac5ea02160f9b7e291
|
|
| BLAKE2b-256 |
c8161c63ecc29e0666ee7c85ea20a549344d29daf1bb33ec87562fc07a83db37
|
Provenance
The following attestation bundles were made for dkops-0.2.4.tar.gz:
Publisher:
publish.yml on brrsanchezfi/DKOps
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dkops-0.2.4.tar.gz -
Subject digest:
5ad3d12934989eaf9f68093081ac2f6160999c5584269513747301c3433370dd - Sigstore transparency entry: 1597678027
- Sigstore integration time:
-
Permalink:
brrsanchezfi/DKOps@df55e3b27ffd5f9311e2f6cc5870aeee6c718a0d -
Branch / Tag:
refs/tags/v0.2.4 - Owner: https://github.com/brrsanchezfi
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@df55e3b27ffd5f9311e2f6cc5870aeee6c718a0d -
Trigger Event:
release
-
Statement type:
File details
Details for the file dkops-0.2.4-py3-none-any.whl.
File metadata
- Download URL: dkops-0.2.4-py3-none-any.whl
- Upload date:
- Size: 54.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ebe0b790adcaaa9de4e7443abaa95faf998a3a911480df0dee7591821a57489c
|
|
| MD5 |
4937c82b748562439477cca2ec040aa1
|
|
| BLAKE2b-256 |
700aa68c410011e4f940d304b77f032dc50cc1de5fc5c673562a2c839f4be10e
|
Provenance
The following attestation bundles were made for dkops-0.2.4-py3-none-any.whl:
Publisher:
publish.yml on brrsanchezfi/DKOps
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dkops-0.2.4-py3-none-any.whl -
Subject digest:
ebe0b790adcaaa9de4e7443abaa95faf998a3a911480df0dee7591821a57489c - Sigstore transparency entry: 1597678302
- Sigstore integration time:
-
Permalink:
brrsanchezfi/DKOps@df55e3b27ffd5f9311e2f6cc5870aeee6c718a0d -
Branch / Tag:
refs/tags/v0.2.4 - Owner: https://github.com/brrsanchezfi
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@df55e3b27ffd5f9311e2f6cc5870aeee6c718a0d -
Trigger Event:
release
-
Statement type: