Lakehouse automation framework — IngestionEngine, Medallion architecture and Delta governance for Spark and Databricks
Project description
DKOps
Framework de gobierno de tablas Delta y orquestacion de pipelines Spark para entornos hibrido local/Databricks.
El mismo codigo corre en tu PC y en Databricks — sin cambios.
Que es DKOps?
DKOps es un framework Python que profesionaliza la construccion de pipelines de datos sobre Apache Spark + Delta Lake, siguiendo la arquitectura Lakehouse Medallion.
Resuelve los problemas que aparecen cuando un equipo crece mas alla de "scripts sueltos":
- Contratos de tabla — schema, permisos, particionado y metadatos viven en JSON versionado, no enterrados en codigo.
- Motor de ingesta — mueve datos de Landing a Bronze a Silver con estrategias declarativas:
full_merge,cdc_merge,incremental_replace,append_dedup. TableWriter— API unificada:overwrite,append,upsert,overwrite_partition,delete.TableReader— lectura gobernada:read(),read_partition(),read_stream(),read_cdf().- Migraciones seguras —
SafeMigratorcompara contrato vs. estado real y genera un plan de cambios sin perdida de datos. - Runtime-agnostico — el mismo pipeline corre en local PC y en Databricks. El framework detecta el entorno.
- Configuracion por entorno — placeholders
{catalog.bronze},{path.silver}se resuelven desdeconfig.json.
Arquitectura Lakehouse: Landing -> Bronze -> Silver -> Gold
DKOps implementa la arquitectura Medallion de 4 capas:
[Fuentes externas]
|
LANDING Archivos crudos: JSON, CSV, Avro, Parquet
| (depositados por Data Factory, Kafka, FTP, etc.)
[IngestionEngine]
|
BRONZE Datos sin transformar + metadatos de ingesta
| _ingested_at, _ingested_date (particion), _source_file
[SilverPromoter]
|
SILVER Datos limpios, deduplicados, con claves de negocio
| full_merge | cdc_merge | incremental_replace | append_dedup
[TableWriter/SQL]
|
GOLD Agregaciones, KPIs y metricas de negocio
Calculadas via Spark SQL sobre Silver
Modulo 1: ingestion (Landing -> Silver)
El IngestionEngine es el punto de entrada unico para mover datos desde Landing hasta Silver.
Ingesta Bronze — Landing -> Bronze:
- Lee archivos JSON/CSV/Parquet/Kafka desde la zona Landing.
- Enriquece cada fila con
_ingested_at,_ingested_datey_source_file. - Escribe en Delta con partition overwrite idempotente (
_ingested_date). - Soporta batch (incremental/full/CDC) y streaming (Structured Streaming).
Patron de marca de agua Bronze — el campo _ingested_date sirve como particion de ingesta. Cada ejecucion reemplaza solo la particion del dia actual (overwrite_partition). Esto garantiza idempotencia: ejecutar dos veces el mismo dia produce el mismo resultado.
Promocion Silver — Bronze -> Silver via estrategias declarativas:
| Estrategia | Cuando usarla |
|---|---|
full_merge |
Snapshot completo que puede actualizar o insertar |
cdc_merge |
CDC con op_type: I/U/D desde sistemas transaccionales |
incremental_replace |
Incremental sin CDC — reemplaza por clave |
append_dedup |
Append con deduplicacion — para eventos y clickstream |
Modulo 2: table_governance (Silver -> Gold)
El modulo de gobierno proporciona:
TableWriter— escribe DataFrames respetando el contrato de tabla.TableReader— lee tablas Delta con CDF, streaming y filtros declarativos.SafeMigrator— planifica y aplica migraciones de schema sin perdida de datos.ContractLoader— carga y resuelve contratos JSON con placeholders de entorno.
Tipos de carga y estrategias
Tipos de carga Landing -> Bronze
| Tipo | Descripcion | Ejemplo de uso |
|---|---|---|
incremental |
Solo archivos nuevos del dia | Ventas diarias, vuelos |
full |
Snapshot completo — reemplaza lo anterior | Catalogo de productos |
cdc |
Eventos de cambio con op_type: I/U/D |
Pedidos, ordenes ERP |
streaming |
Lectura continua via Structured Streaming | Clickstream, alertas IoT |
Estrategias de promocion Bronze -> Silver
full_merge — Para snapshots completos. MERGE INTO con todas las claves de negocio. Si existe el registro, lo actualiza; si no, lo inserta. Util para catalogos que llegan completos cada dia.
cdc_merge — Para datos CDC. Aplica inserciones, actualizaciones y eliminaciones (soft delete como is_deleted=true) segun el campo op_type. Mantiene el ultimo estado de cada entidad.
incremental_replace — Para datos incrementales sin CDC. Inserta los registros nuevos y actualiza los existentes por clave primaria. No genera soft deletes.
append_dedup — Para eventos y logs. Hace append de registros nuevos excluyendo duplicados por clave (merge_keys). Util para clickstream, metricas de eventos, alertas IoT.
Batch vs. Streaming
Batch — Lee todos los archivos disponibles en la ruta de Landing y los ingesta en una sola operacion transaccional. Ideal para cargas diarias o periodicas.
Streaming — Usa Spark Structured Streaming con trigger availableNow. Procesa todos los archivos pendientes y para automaticamente. En Databricks usa Auto Loader para escalabilidad; en local usa FileStreamReader.
Integracion con catalogo (Unity Catalog / local)
En Databricks (Unity Catalog): Las tablas se crean como catalog.schema.name. Los contratos usan placeholders {catalog.bronze} que se resuelven al catalog Unity correspondiente (p.ej. ct_bronze_dlsuraanaliticadev).
En local (PC de desarrollo): El catalogo se omite y las tablas se crean como schema.name en el warehouse de Spark. Los placeholders resuelven al nombre simple (p.ej. bronze).
El framework detecta el entorno automaticamente via EXECUTION_ENVIRONMENT: "local" en config.json.
Estructura de config.json
{
"EXECUTION_ENVIRONMENT": "local",
"SPARK_APP_NAME": "DKOps-Demo1",
"SPARK_WAREHOUSE_DIR": "/tmp/dkops_demo1/warehouse",
"DATABRICKS_TARGET": "local",
"DELTA_VERSION": "3.2.0",
"environments": {
"local": {
"env": "local",
"env_short": "l",
"catalogs": {
"bronze": "bronze",
"silver": "silver",
"gold": "gold"
},
"paths": {
"landing": "/tmp/dkops_demo1/landing",
"bronze": "/tmp/dkops_demo1/bronze",
"silver": "/tmp/dkops_demo1/silver",
"gold": "/tmp/dkops_demo1/gold",
"checkpoint": "/tmp/dkops_demo1/checkpoints",
"ops": "/tmp/dkops_demo1/ops"
}
}
}
}
Los placeholders {catalog.bronze}, {path.landing}, {env}, {env_short} se resuelven en todos los archivos de contrato JSON (.json de tablas y de ingestion).
Instalacion
# Clonar el repositorio
git clone https://github.com/brrsanchezfi/DKOps
cd DKOps
# Instalacion para desarrollo local (incluye PySpark + Delta)
pip install -e ".[local]"
# Instalacion para Databricks Connect
pip install -e ".[databricks-connect]"
Demos
| Demo | Tema | Estrategias Silver | Feature especial |
|---|---|---|---|
| Demo 1 | Aeronautica | full_merge (vuelos, aeropuertos) |
SafeMigrator dry_run, columna INTEGER |
| Demo 2 | Manufactura | incremental_replace, cdc_merge, full_merge |
DQ engine, transformations/, CSV landing |
| Demo 3 | E-commerce | full_merge, cdc_merge, append_dedup |
merge_schema, column masking, streaming |
| Demo 4 | Retail/Inventario | full_merge, append_dedup, append_dedup |
read_cdf(), read_stream(), SafeMigrator |
| Demo 5 | Marketplace | cdc_merge, full_merge, streaming |
Gold layer con revenue y engagement |
Cada demo sigue el flujo completo: Landing -> Bronze -> Silver -> Gold.
Para ejecutar cualquier demo:
# Demo 1 — Aeronautica
python demos/demo_1/pipeline.py
# Demo 2 — Manufactura
python demos/demo_2/pipeline.py
# Demo 3 — E-commerce
python demos/demo_3/pipeline.py
# Demo 4 — Retail/Inventario
python demos/demo_4/pipeline.py
# Demo 5 — Marketplace
python demos/demo_5/pipeline.py
API de referencia rapida
IngestionEngine
from DKOps.launcher import Launcher
from DKOps.ingestion.engine import IngestionEngine
launcher = Launcher("config/config.json")
engine = IngestionEngine.from_spark(
spark = launcher.spark,
env = launcher.env,
bronze_contracts_dir = "ingestion/batch",
streaming_contracts_dir = "ingestion/streaming",
silver_contracts_dir = "ingestion/silver",
tables_base_dir = ".",
ops_path = "/tmp/ops/control",
)
# Landing -> Bronze (batch)
engine.ingest_bronze()
# Landing -> Bronze (streaming, availableNow)
engine.run_streaming()
# Bronze -> Silver
engine.promote_silver()
# Estado de tablas
engine.status()
TableWriter
from DKOps.table_governance import load_contract, TableWriter
contract = load_contract("tables/gold/mi_tabla.json")
writer = TableWriter(contract)
writer.overwrite(df) # CREATE OR REPLACE
writer.append(df) # INSERT INTO
writer.upsert(df, keys=["id"]) # MERGE INTO
writer.overwrite_partition(df, {"fecha": "2024-01-15"})
writer.delete("distancia_km = 0")
TableReader
from DKOps.table_governance import load_contract, TableReader
contract = load_contract("tables/silver/productos_current.json")
reader = TableReader(contract)
df = reader.read() # Tabla completa
df = reader.read(filter="activo = true") # Con filtro SQL
df = reader.read_partition({"categoria": "ROPA"}) # Por particion
df = reader.read_stream() # Streaming DataFrame
df = reader.read_cdf(starting_version=5) # Change Data Feed
SafeMigrator
from DKOps.table_governance import load_contract, SafeMigrator
contract = load_contract("tables/silver/vuelos_current.json")
# Planificar (no ejecuta)
SafeMigrator(contract, dry_run=True).apply()
# Aplicar cambios
SafeMigrator(contract, dry_run=False).apply()
Contratos de tabla
Un contrato JSON define completamente una tabla Delta:
{
"catalog": "{catalog.silver}",
"schema": "aeronautica",
"name": "vuelos_current",
"type": "MANAGED",
"format": "DELTA",
"columns": [
{ "name": "vuelo_id", "type": "STRING", "nullable": false },
{ "name": "estado", "type": "STRING", "nullable": true },
{ "name": "retraso_min","type": "INTEGER", "nullable": true },
{ "name": "email", "type": "STRING", "nullable": true, "mask": "security.mask_email" }
],
"partitions": ["iata_aerolinea"],
"properties": {
"merge_schema": true,
"change_data_feed": true,
"quality": "curated",
"layer": "silver"
}
}
Flags especiales en properties:
merge_schema: true— activamergeSchema=trueen append/overwrite_partition.change_data_feed: true— activadelta.enableChangeDataFeed=trueen TBLPROPERTIES.
Contratos de ingestion
Contrato de ingesta batch (Landing -> Bronze):
{
"name": "vuelos_diarios",
"ingest_type": "batch",
"load_type": "incremental",
"enabled": true,
"source": {
"format": "json",
"path": "{path.landing}/vuelos_diarios"
},
"destination_contract": "../../tables/bronze/vuelos_raw.json",
"metadata": {
"add_ingested_at": true,
"add_ingested_date": true,
"add_source_file": true
}
}
Contrato de promocion Silver (Bronze -> Silver):
{
"name": "vuelos_current",
"ingest_type": "batch",
"strategy": "full_merge",
"enabled": true,
"source": { "format": "delta" },
"source_contract": "../../tables/bronze/vuelos_raw.json",
"destination_contract": "../../tables/silver/vuelos_current.json",
"merge_keys": ["vuelo_id"],
"watermark_col": "updated_at",
"metadata": { "add_silver_timestamps": true }
}
Estado del proyecto — v0.3.0
| Modulo | Estado | Descripcion |
|---|---|---|
table_governance |
Estable | TableWriter, TableReader, SafeMigrator, ContractLoader |
ingestion |
Estable | IngestionEngine, BronzeIngestor, SilverPromoter |
| Estrategias Silver | Estable | full_merge, cdc_merge, incremental_replace, append_dedup |
| Streaming | Estable | FileStreamReader, auto-schema inference, availableNow |
| Demo 1 — Aeronautica | Completo | Writers gobernados + SafeMigrator |
| Demo 2 — Manufactura | Completo | IngestionEngine + DQ declarativo + tests pytest |
| Demo 3 — E-commerce | Completo | merge_schema + column masking + streaming |
| Demo 4 — Inventario | Completo | read_cdf() + read_stream() + SafeMigrator |
| Demo 5 — Marketplace | Completo | cdc_merge + full_merge + Gold revenue/engagement |
| Tests unitarios | 147 tests | 0 fallos |
Tests
# Suite completa (excluye test_luncher que requiere cluster Databricks)
python -m pytest tests/ --ignore=tests/test_luncher.py -v
# Solo tests de contratos
python -m pytest tests/test_contracts.py -v
# Tests del motor de ingesta
python -m pytest tests/ingestion/ -v
# Tests del demo 2 (sin Delta, DataFrames en memoria)
cd demos/demo_2 && pytest tests/ -v
Licencia
MIT — ver LICENSE.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dkops-0.3.0.tar.gz.
File metadata
- Download URL: dkops-0.3.0.tar.gz
- Upload date:
- Size: 82.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3932dd6bdbed5ea228b1ff50365a475e0e47bfbac86b6bedce086a60d1dda855
|
|
| MD5 |
269184e1ace1a0ac3bb1633fb34fe894
|
|
| BLAKE2b-256 |
43ed6fbe3ed26f7e74980285a11cb703f9ec9e048439559f9a8805d389076c2e
|
Provenance
The following attestation bundles were made for dkops-0.3.0.tar.gz:
Publisher:
publish.yml on brrsanchezfi/DKOps
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dkops-0.3.0.tar.gz -
Subject digest:
3932dd6bdbed5ea228b1ff50365a475e0e47bfbac86b6bedce086a60d1dda855 - Sigstore transparency entry: 1616270828
- Sigstore integration time:
-
Permalink:
brrsanchezfi/DKOps@fcee84d19447ef8435cf6b57bec068aa44970340 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/brrsanchezfi
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@fcee84d19447ef8435cf6b57bec068aa44970340 -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file dkops-0.3.0-py3-none-any.whl.
File metadata
- Download URL: dkops-0.3.0-py3-none-any.whl
- Upload date:
- Size: 88.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
98a8ced999dbe0da885d6556fd8c4900ea7d9610ef966c2a4fdeeb3576c5d328
|
|
| MD5 |
0b2c24c5e6efe4b6a9bfbb0cdbdfb4e7
|
|
| BLAKE2b-256 |
19d3dbcb9f6c24fc33a43d8e47f9e2dea336eb8aceae42f799ef384e43b688f8
|
Provenance
The following attestation bundles were made for dkops-0.3.0-py3-none-any.whl:
Publisher:
publish.yml on brrsanchezfi/DKOps
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dkops-0.3.0-py3-none-any.whl -
Subject digest:
98a8ced999dbe0da885d6556fd8c4900ea7d9610ef966c2a4fdeeb3576c5d328 - Sigstore transparency entry: 1616270836
- Sigstore integration time:
-
Permalink:
brrsanchezfi/DKOps@fcee84d19447ef8435cf6b57bec068aa44970340 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/brrsanchezfi
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@fcee84d19447ef8435cf6b57bec068aa44970340 -
Trigger Event:
workflow_dispatch
-
Statement type: