A DataOps framework for building a lakehouse
Project description
Laktory
An open-source DataOps and dataframe-centric ETL framework for building lakehouses.
Laktory is your all-in-one solution for defining both data transformations and Databricks resources. Imagine if Terraform, Databricks Asset Bundles, and dbt combined forces—that’s essentially Laktory.
This open-source framework simplifies the creation, deployment, and execution of data pipelines while adhering to essential DevOps practices like version control, code reviews, and CI/CD integration. With Apache Spark and Polars driving its data transformation, Laktory ensures reliable and scalable data processing. Its modular, flexible approach allows you to seamlessly combine SQL statements with DataFrame operations.
Since Laktory pipelines are built on top of Spark and Polars, they can run in any environment that supports python—from your local machine to a Kubernetes cluster. They can also be deployed and orchestrated as Databricks Jobs or Delta Live Tables, offering a simple, fully managed, and low-maintenance solution.
But Laktory goes beyond data pipelines. It empowers you to define and deploy your entire Databricks data platform—from Unity Catalog and access grants to compute and quality monitoring—providing a complete, modern solution for data platform management.
Help
See documentation for more details.
Installation
Install using
pip install laktory[{cloud_provider}]
where {cloud_provider}
is azure
, aws
or gcp
.
For more installation options, see the Install section in the documentation.
A Basic Example
from laktory import models
node_brz = models.PipelineNode(
name="brz_stock_prices",
layer="BRONZE",
source={
"format": "PARQUET",
"path": "./data/brz_stock_prices/"
},
transformer={
"nodes": [
{
"func_name": "select",
"func_args": [
"symbol",
"timestamp",
"open",
"close",
],
},
]
}
)
node_slv = models.PipelineNode(
name="slv_stock_prices",
layer="SILVER",
source={
"node_name": "brz_stock_prices"
},
sink={
"path": "./data/slv_stock_prices",
"mode": "OVERWRITE",
"format": "PARQUET",
},
transformer={
"nodes": [
{
"func_name": "drop_duplicates",
"func_kwargs": {
"subset": ["timestamp", "symbol"]
}
},
]
}
)
pipeline = models.Pipeline(
name="stock_prices",
nodes=[node_brz, node_slv],
)
print(pipeline)
#> resource_name_=None options=ResourceOptions(variables={}, depends_on=[], provider=None, aliases=None, delete_before_replace=True, ignore_changes=None, import_=None, parent=None, replace_on_changes=None) variables={} databricks_job=None dlt=None name='stock_prices' nodes=[PipelineNode(variables={}, add_layer_columns=True, dlt_template='DEFAULT', description=None, drop_duplicates=None, drop_source_columns=False, transformer=SparkChain(variables={}, nodes=[SparkChainNode(variables={}, allow_missing_column_args=False, column=None, spark_func_args=[SparkFuncArg(variables={}, value='symbol'), SparkFuncArg(variables={}, value='timestamp'), SparkFuncArg(variables={}, value='open'), SparkFuncArg(variables={}, value='close')], spark_func_kwargs={}, spark_func_name='select', sql_expression=None)]), expectations=[], layer='BRONZE', name='brz_stock_prices', primary_key=None, sink=None, source=FileDataSource(variables={}, as_stream=False, broadcast=False, cdc=None, dataframe_type='SPARK', drops=None, filter=None, mock_df=None, renames=None, selects=None, watermark=None, format='PARQUET', header=True, multiline=False, path='./data/brz_stock_prices/', read_options={}, schema_location=None), timestamp_key=None), PipelineNode(variables={}, add_layer_columns=True, dlt_template='DEFAULT', description=None, drop_duplicates=None, drop_source_columns=True, transformer=SparkChain(variables={}, nodes=[SparkChainNode(variables={}, allow_missing_column_args=False, column=None, spark_func_args=[], spark_func_kwargs={'subset': SparkFuncArg(variables={}, value=['timestamp', 'symbol'])}, spark_func_name='drop_duplicates', sql_expression=None)]), expectations=[], layer='SILVER', name='slv_stock_prices', primary_key=None, sink=FileDataSink(variables={}, mode='OVERWRITE', checkpoint_location=None, format='PARQUET', path='./data/slv_stock_prices', write_options={}), source=PipelineNodeDataSource(variables={}, as_stream=False, broadcast=False, cdc=None, dataframe_type='SPARK', drops=None, filter=None, mock_df=None, renames=None, selects=None, watermark=None, node_name='brz_stock_prices', node=PipelineNode(variables={}, add_layer_columns=True, dlt_template='DEFAULT', description=None, drop_duplicates=None, drop_source_columns=False, transformer=SparkChain(variables={}, nodes=[SparkChainNode(variables={}, allow_missing_column_args=False, column=None, spark_func_args=[SparkFuncArg(variables={}, value='symbol'), SparkFuncArg(variables={}, value='timestamp'), SparkFuncArg(variables={}, value='open'), SparkFuncArg(variables={}, value='close')], spark_func_kwargs={}, spark_func_name='select', sql_expression=None)]), expectations=[], layer='BRONZE', name='brz_stock_prices', primary_key=None, sink=None, source=FileDataSource(variables={}, as_stream=False, broadcast=False, cdc=None, dataframe_type='SPARK', drops=None, filter=None, mock_df=None, renames=None, selects=None, watermark=None, format='PARQUET', header=True, multiline=False, path='./data/brz_stock_prices/', read_options={}, schema_location=None), timestamp_key=None)), timestamp_key=None)] orchestrator=None udfs=[]
pipeline.execute(spark=spark)
To get started with a more useful example, jump into the Quickstart.
A Lakehouse DataOps Template
A comprehensive template on how to deploy a lakehouse as code using Laktory is maintained here: https://github.com/okube-ai/lakehouse-as-code.
In this template, 4 pulumi projects are used to:
{cloud_provider}_infra
: Deploy the required resources on your cloud providerunity-catalog
: Setup users, groups, catalogs, schemas and manage grantsworkspace
: Setup secrets, clusters and warehouses and common files/notebooksworkflows
: The data workflows to build your lakehouse
Okube Company
Okube is dedicated to building open source frameworks, known as the kubes, empowering businesses to build, deploy and operate highly scalable data platforms and AI models.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.