Skip to main content

A DataOps framework for building a lakehouse

Project description

Laktory

pypi test downloads versions license

An open-source, dataframe-centric ETL framework for building lakehouses with a DataOps approach.

laktory logo

Laktory, the lakehouse factory, is an open-source framework designed for building, deploying, and executing data pipelines while adhering to essential DevOps best practices such as source control, code reviews, and CI/CD. Leveraging Apache Spark and Polars (under development) as its core data transformation engines, Laktory ensures robust and scalable data processing capabilities.

While a Laktory data pipeline can be run locally for small datasets or prototyping, it really starts to shine when deployed and orchestrated on a cloud data platform, such as Databricks. When combined with Delta Live Tables, it provides a top-tier, simple and low maintenance fully managed solution.

Beyond just data pipelines, Laktory allows for the comprehensive definition and deployment of your entire data platform. This includes everything from cloud infrastructure to data tables, security, and quality monitoring systems, providing an all-in-one solution for modern data platform management.

what is laktory

Help

See documentation for more details.

Installation

Install using

pip install laktory[{cloud_provider}]

where {cloud_provider} is azure, aws or gcp.

For more installation options, see the Install section in the documentation.

A Basic Example

from laktory import models

node_brz = models.PipelineNode(
    name="brz_stock_prices",
    layer="BRONZE",
    source={
        "format": "PARQUET",
        "path": "./data/brz_stock_prices/"
    },
    transformer={
        "nodes": [
            {
                "spark_func_name": "select",
                "spark_func_args": [
                    "symbol",
                    "timestamp",
                    "open",
                    "close",
                ],
            },
        ]
    }
)

node_slv = models.PipelineNode(
    name="slv_stock_prices",
    layer="SILVER",
    source={
        "node_name": "brz_stock_prices"
    },
    sink={
        "path": "./data/slv_stock_prices",
        "mode": "OVERWRITE",
        "format": "PARQUET",
    },
    transformer={
        "nodes": [
            {
                "spark_func_name": "drop_duplicates",
                "spark_func_kwargs": {
                    "subset": ["timestamp", "symbol"]
                }
            },
        ]
    }
)

pipeline = models.Pipeline(
    name="stock_prices",
    nodes=[node_brz, node_slv],
)

print(pipeline)
#> resource_name_=None options=ResourceOptions(variables={}, depends_on=[], provider=None, aliases=None, delete_before_replace=True, ignore_changes=None, import_=None, parent=None, replace_on_changes=None) variables={} databricks_job=None dlt=None name='stock_prices' nodes=[PipelineNode(variables={}, add_layer_columns=True, dlt_template='DEFAULT', description=None, drop_duplicates=None, drop_source_columns=False, transformer=SparkChain(variables={}, nodes=[SparkChainNode(variables={}, allow_missing_column_args=False, column=None, spark_func_args=[SparkFuncArg(variables={}, value='symbol'), SparkFuncArg(variables={}, value='timestamp'), SparkFuncArg(variables={}, value='open'), SparkFuncArg(variables={}, value='close')], spark_func_kwargs={}, spark_func_name='select', sql_expression=None)]), expectations=[], layer='BRONZE', name='brz_stock_prices', primary_key=None, sink=None, source=FileDataSource(variables={}, as_stream=False, broadcast=False, cdc=None, dataframe_type='SPARK', drops=None, filter=None, mock_df=None, renames=None, selects=None, watermark=None, format='PARQUET', header=True, multiline=False, path='./data/brz_stock_prices/', read_options={}, schema_location=None), timestamp_key=None), PipelineNode(variables={}, add_layer_columns=True, dlt_template='DEFAULT', description=None, drop_duplicates=None, drop_source_columns=True, transformer=SparkChain(variables={}, nodes=[SparkChainNode(variables={}, allow_missing_column_args=False, column=None, spark_func_args=[], spark_func_kwargs={'subset': SparkFuncArg(variables={}, value=['timestamp', 'symbol'])}, spark_func_name='drop_duplicates', sql_expression=None)]), expectations=[], layer='SILVER', name='slv_stock_prices', primary_key=None, sink=FileDataSink(variables={}, mode='OVERWRITE', checkpoint_location=None, format='PARQUET', path='./data/slv_stock_prices', write_options={}), source=PipelineNodeDataSource(variables={}, as_stream=False, broadcast=False, cdc=None, dataframe_type='SPARK', drops=None, filter=None, mock_df=None, renames=None, selects=None, watermark=None, node_name='brz_stock_prices', node=PipelineNode(variables={}, add_layer_columns=True, dlt_template='DEFAULT', description=None, drop_duplicates=None, drop_source_columns=False, transformer=SparkChain(variables={}, nodes=[SparkChainNode(variables={}, allow_missing_column_args=False, column=None, spark_func_args=[SparkFuncArg(variables={}, value='symbol'), SparkFuncArg(variables={}, value='timestamp'), SparkFuncArg(variables={}, value='open'), SparkFuncArg(variables={}, value='close')], spark_func_kwargs={}, spark_func_name='select', sql_expression=None)]), expectations=[], layer='BRONZE', name='brz_stock_prices', primary_key=None, sink=None, source=FileDataSource(variables={}, as_stream=False, broadcast=False, cdc=None, dataframe_type='SPARK', drops=None, filter=None, mock_df=None, renames=None, selects=None, watermark=None, format='PARQUET', header=True, multiline=False, path='./data/brz_stock_prices/', read_options={}, schema_location=None), timestamp_key=None)), timestamp_key=None)] orchestrator=None udfs=[]

pipeline.execute(spark=spark)

To get started with a more useful example, jump into the Quickstart.

A Lakehouse DataOps Template

A comprehensive template on how to deploy a lakehouse as code using Laktory is maintained here: https://github.com/okube-ai/lakehouse-as-code.

In this template, 4 pulumi projects are used to:

  • {cloud_provider}_infra: Deploy the required resources on your cloud provider
  • unity-catalog: Setup users, groups, catalogs, schemas and Lakehouse grant
  • workspace-conf: Setup secrets, clusters and warehouses
  • workspace: The data workflows to build your lakehouse.

Okube Company

okube logo

Okube is dedicated to building open source frameworks, known as the kubes, empowering businesses to build, deploy and operate highly scalable data platforms and AI models.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

laktory-0.3.3.tar.gz (3.8 MB view hashes)

Uploaded Source

Built Distribution

laktory-0.3.3-py3-none-any.whl (158.8 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page