Skip to main content

A DataOps framework for building a lakehouse

Project description

Laktory

pypi test downloads versions license

An open-source DataOps and dataframe-centric ETL framework for building lakehouses.

laktory logo

Laktory is your all-in-one solution for defining both data transformations and Databricks resources. Imagine if Terraform, Databricks Asset Bundles, and dbt combined forces—that’s essentially Laktory.

This open-source framework simplifies the creation, deployment, and execution of data pipelines while adhering to essential DevOps practices like version control, code reviews, and CI/CD integration. With Apache Spark and Polars driving its data transformation, Laktory ensures reliable and scalable data processing. Its modular, flexible approach allows you to seamlessly combine SQL statements with DataFrame operations.

what is laktory

Since Laktory pipelines are built on top of Spark and Polars, they can run in any environment that supports python—from your local machine to a Kubernetes cluster. They can also be deployed and orchestrated as Databricks Jobs or Delta Live Tables, offering a simple, fully managed, and low-maintenance solution.

But Laktory goes beyond data pipelines. It empowers you to define and deploy your entire Databricks data platform—from Unity Catalog and access grants to compute and quality monitoring—providing a complete, modern solution for data platform management. This empowers your data team to take full ownership of the solution, eliminating the need to juggle multiple technologies. Say goodbye to relying on external Terraform experts to handle compute, workspace configuration, and Unity Catalog, while your data engineers and analysts try to combine Databricks Asset Bundles and dbt to build data pipelines. Laktory consolidates these functions, simplifying the entire process and reducing the overall cost.

dataops

Help

See documentation for more details.

Installation

Install using

pip install laktory

For more installation options, see the Install section in the documentation.

A Basic Example

from laktory import models


node_brz = models.PipelineNode(
    name="brz_stock_prices",
    source={
        "format": "PARQUET",
        "path": "./data/brz_stock_prices/"
    },
    transformer={
        "nodes": [
        ]
    }
)

node_slv = models.PipelineNode(
    name="slv_stock_prices",
    source={
        "node_name": "brz_stock_prices"
    },
    sink={
        "path": "./data/slv_stock_prices",
        "mode": "OVERWRITE",
        "format": "PARQUET",
    },
    transformer={
        "nodes": [
            
            # SQL Transformation
            {
                "sql_expr": """
                    SELECT
                      data.created_at AS created_at,
                      data.symbol AS symbol,
                      data.open AS open,
                      data.close AS close,
                      data.high AS high,
                      data.low AS low,
                      data.volume AS volume
                    FROM
                      {df}
                """   
            },
            
            # Spark Transformation
            {
                "func_name": "drop_duplicates",
                "func_kwargs": {
                    "subset": ["created_at", "symbol"]
                }
            },
        ]
    }
)

pipeline = models.Pipeline(
    name="stock_prices",
    nodes=[node_brz, node_slv],
)

print(pipeline)
#> resource_name_=None options=ResourceOptions(variables={}, depends_on=[], provider=None, aliases=None, delete_before_replace=True, ignore_changes=None, import_=None, parent=None, replace_on_changes=None) variables={} databricks_job=None dlt=None name='stock_prices' nodes=[PipelineNode(variables={}, add_layer_columns=True, dlt_template='DEFAULT', description=None, drop_duplicates=None, drop_source_columns=False, transformer=SparkChain(variables={}, nodes=[SparkChainNode(variables={}, allow_missing_column_args=False, column=None, spark_func_args=[SparkFuncArg(variables={}, value='symbol'), SparkFuncArg(variables={}, value='timestamp'), SparkFuncArg(variables={}, value='open'), SparkFuncArg(variables={}, value='close')], spark_func_kwargs={}, spark_func_name='select', sql_expression=None)]), expectations=[], layer='BRONZE', name='brz_stock_prices', primary_key=None, sink=None, source=FileDataSource(variables={}, as_stream=False, broadcast=False, cdc=None, dataframe_type='SPARK', drops=None, filter=None, mock_df=None, renames=None, selects=None, watermark=None, format='PARQUET', header=True, multiline=False, path='./data/brz_stock_prices/', read_options={}, schema_location=None), timestamp_key=None), PipelineNode(variables={}, add_layer_columns=True, dlt_template='DEFAULT', description=None, drop_duplicates=None, drop_source_columns=True, transformer=SparkChain(variables={}, nodes=[SparkChainNode(variables={}, allow_missing_column_args=False, column=None, spark_func_args=[], spark_func_kwargs={'subset': SparkFuncArg(variables={}, value=['timestamp', 'symbol'])}, spark_func_name='drop_duplicates', sql_expression=None)]), expectations=[], layer='SILVER', name='slv_stock_prices', primary_key=None, sink=FileDataSink(variables={}, mode='OVERWRITE', checkpoint_location=None, format='PARQUET', path='./data/slv_stock_prices', write_options={}), source=PipelineNodeDataSource(variables={}, as_stream=False, broadcast=False, cdc=None, dataframe_type='SPARK', drops=None, filter=None, mock_df=None, renames=None, selects=None, watermark=None, node_name='brz_stock_prices', node=PipelineNode(variables={}, add_layer_columns=True, dlt_template='DEFAULT', description=None, drop_duplicates=None, drop_source_columns=False, transformer=SparkChain(variables={}, nodes=[SparkChainNode(variables={}, allow_missing_column_args=False, column=None, spark_func_args=[SparkFuncArg(variables={}, value='symbol'), SparkFuncArg(variables={}, value='timestamp'), SparkFuncArg(variables={}, value='open'), SparkFuncArg(variables={}, value='close')], spark_func_kwargs={}, spark_func_name='select', sql_expression=None)]), expectations=[], layer='BRONZE', name='brz_stock_prices', primary_key=None, sink=None, source=FileDataSource(variables={}, as_stream=False, broadcast=False, cdc=None, dataframe_type='SPARK', drops=None, filter=None, mock_df=None, renames=None, selects=None, watermark=None, format='PARQUET', header=True, multiline=False, path='./data/brz_stock_prices/', read_options={}, schema_location=None), timestamp_key=None)), timestamp_key=None)] orchestrator=None udfs=[]

pipeline.execute(spark=spark)

To get started with a more useful example, jump into the Quickstart.

A Lakehouse DataOps Template

A comprehensive template on how to deploy a lakehouse as code using Laktory is maintained here: https://github.com/okube-ai/lakehouse-as-code.

In this template, 4 pulumi projects are used to:

  • {cloud_provider}_infra: Deploy the required resources on your cloud provider
  • unity-catalog: Setup users, groups, catalogs, schemas and manage grants
  • workspace: Setup secrets, clusters and warehouses and common files/notebooks
  • workflows: The data workflows to build your lakehouse

Okube Company

okube logo

Okube is dedicated to building open source frameworks, known as the kubes, empowering businesses to build, deploy and operate highly scalable data platforms and AI models.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

laktory-0.5.0.tar.gz (6.2 MB view details)

Uploaded Source

Built Distribution

laktory-0.5.0-py3-none-any.whl (472.7 kB view details)

Uploaded Python 3

File details

Details for the file laktory-0.5.0.tar.gz.

File metadata

  • Download URL: laktory-0.5.0.tar.gz
  • Upload date:
  • Size: 6.2 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.10.15

File hashes

Hashes for laktory-0.5.0.tar.gz
Algorithm Hash digest
SHA256 d2b38e883dfba6c67034b8aa13ec1a198d9726fa529bac787ae8b6cc7cba1f85
MD5 8d685df2e3f6b7789c71b85658cc6b6f
BLAKE2b-256 d05a4ae5f20cd8773dedd00d53b92c4cfb9e472729ccd9327682a06ac402477a

See more details on using hashes here.

File details

Details for the file laktory-0.5.0-py3-none-any.whl.

File metadata

  • Download URL: laktory-0.5.0-py3-none-any.whl
  • Upload date:
  • Size: 472.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.10.15

File hashes

Hashes for laktory-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6c3fe458e84a43b464594573ca7c49aa8250dd40481f46337a29445f57e7003e
MD5 3ffae3f6b9cf31c6a9bd3fdfc5c1b7ed
BLAKE2b-256 c6f47227d9fe590f5d61ab904f5f5da9e406d7dd5aa1919dce3bbd3459767bb6

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page