A DataOps framework for building a lakehouse
Project description
Laktory
An open-source, dataframe-centric ETL framework for building lakehouses with a DataOps approach.
Laktory, the lakehouse factory, is an open-source framework designed for building, deploying, and executing data pipelines while adhering to essential DevOps best practices such as source control, code reviews, and CI/CD. Leveraging Apache Spark and Polars as its core data transformation engines, Laktory ensures robust and scalable data processing capabilities.
While a Laktory data pipeline can be run locally for small datasets or prototyping, it really starts to shine when deployed and orchestrated on a cloud data platform, such as Databricks. When combined with Delta Live Tables, it provides a top-tier, simple and low maintenance fully managed solution.
Beyond just data pipelines, Laktory allows for the comprehensive definition and deployment of your entire data platform. This includes everything from cloud infrastructure to data tables, security, and quality monitoring systems, providing an all-in-one solution for modern data platform management.
Help
See documentation for more details.
Installation
Install using
pip install laktory[{cloud_provider}]
where {cloud_provider}
is azure
, aws
or gcp
.
For more installation options, see the Install section in the documentation.
A Basic Example
from laktory import models
node_brz = models.PipelineNode(
name="brz_stock_prices",
layer="BRONZE",
source={
"format": "PARQUET",
"path": "./data/brz_stock_prices/"
},
transformer={
"nodes": [
{
"func_name": "select",
"func_args": [
"symbol",
"timestamp",
"open",
"close",
],
},
]
}
)
node_slv = models.PipelineNode(
name="slv_stock_prices",
layer="SILVER",
source={
"node_name": "brz_stock_prices"
},
sink={
"path": "./data/slv_stock_prices",
"mode": "OVERWRITE",
"format": "PARQUET",
},
transformer={
"nodes": [
{
"func_name": "drop_duplicates",
"func_kwargs": {
"subset": ["timestamp", "symbol"]
}
},
]
}
)
pipeline = models.Pipeline(
name="stock_prices",
nodes=[node_brz, node_slv],
)
print(pipeline)
#> resource_name_=None options=ResourceOptions(variables={}, depends_on=[], provider=None, aliases=None, delete_before_replace=True, ignore_changes=None, import_=None, parent=None, replace_on_changes=None) variables={} databricks_job=None dlt=None name='stock_prices' nodes=[PipelineNode(variables={}, add_layer_columns=True, dlt_template='DEFAULT', description=None, drop_duplicates=None, drop_source_columns=False, transformer=SparkChain(variables={}, nodes=[SparkChainNode(variables={}, allow_missing_column_args=False, column=None, spark_func_args=[SparkFuncArg(variables={}, value='symbol'), SparkFuncArg(variables={}, value='timestamp'), SparkFuncArg(variables={}, value='open'), SparkFuncArg(variables={}, value='close')], spark_func_kwargs={}, spark_func_name='select', sql_expression=None)]), expectations=[], layer='BRONZE', name='brz_stock_prices', primary_key=None, sink=None, source=FileDataSource(variables={}, as_stream=False, broadcast=False, cdc=None, dataframe_type='SPARK', drops=None, filter=None, mock_df=None, renames=None, selects=None, watermark=None, format='PARQUET', header=True, multiline=False, path='./data/brz_stock_prices/', read_options={}, schema_location=None), timestamp_key=None), PipelineNode(variables={}, add_layer_columns=True, dlt_template='DEFAULT', description=None, drop_duplicates=None, drop_source_columns=True, transformer=SparkChain(variables={}, nodes=[SparkChainNode(variables={}, allow_missing_column_args=False, column=None, spark_func_args=[], spark_func_kwargs={'subset': SparkFuncArg(variables={}, value=['timestamp', 'symbol'])}, spark_func_name='drop_duplicates', sql_expression=None)]), expectations=[], layer='SILVER', name='slv_stock_prices', primary_key=None, sink=FileDataSink(variables={}, mode='OVERWRITE', checkpoint_location=None, format='PARQUET', path='./data/slv_stock_prices', write_options={}), source=PipelineNodeDataSource(variables={}, as_stream=False, broadcast=False, cdc=None, dataframe_type='SPARK', drops=None, filter=None, mock_df=None, renames=None, selects=None, watermark=None, node_name='brz_stock_prices', node=PipelineNode(variables={}, add_layer_columns=True, dlt_template='DEFAULT', description=None, drop_duplicates=None, drop_source_columns=False, transformer=SparkChain(variables={}, nodes=[SparkChainNode(variables={}, allow_missing_column_args=False, column=None, spark_func_args=[SparkFuncArg(variables={}, value='symbol'), SparkFuncArg(variables={}, value='timestamp'), SparkFuncArg(variables={}, value='open'), SparkFuncArg(variables={}, value='close')], spark_func_kwargs={}, spark_func_name='select', sql_expression=None)]), expectations=[], layer='BRONZE', name='brz_stock_prices', primary_key=None, sink=None, source=FileDataSource(variables={}, as_stream=False, broadcast=False, cdc=None, dataframe_type='SPARK', drops=None, filter=None, mock_df=None, renames=None, selects=None, watermark=None, format='PARQUET', header=True, multiline=False, path='./data/brz_stock_prices/', read_options={}, schema_location=None), timestamp_key=None)), timestamp_key=None)] orchestrator=None udfs=[]
pipeline.execute(spark=spark)
To get started with a more useful example, jump into the Quickstart.
A Lakehouse DataOps Template
A comprehensive template on how to deploy a lakehouse as code using Laktory is maintained here: https://github.com/okube-ai/lakehouse-as-code.
In this template, 4 pulumi projects are used to:
{cloud_provider}_infra
: Deploy the required resources on your cloud providerunity-catalog
: Setup users, groups, catalogs, schemas and Lakehouse grantworkspace-conf
: Setup secrets, clusters and warehousesworkspace
: The data workflows to build your lakehouse.
Okube Company
Okube is dedicated to building open source frameworks, known as the kubes, empowering businesses to build, deploy and operate highly scalable data platforms and AI models.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.