Skip to main content

dtflw is a Python framework for building modular data pipelines based on Databricks dbutils.notebook API.

Project description

dtflw / dataflow

GitHub Workflow Status badge GitHub release (latest by date) GitHub

dtflw is a Python framework for building modular data pipelines based on Databricks dbutils.notebook API. It was conceived with an intention to facilitate development and maintenance of Databricks data pipelines.

Why dtflw?

Databricks offers everything necessary to organize and manage code of data pipelines according to different requirements and tastes. Also, it does not impose any specific structure on a repo of a pipeline neither regulates relationships between data (tables) and code transforming them (notebooks).

In general, such freedom is an advantage, but with a growing number of notebooks, variety of data and complexity of analysis logic

it gets laborious to work with a codebase of a pipeline while debugging, extending or refactoring it.

Among dozens of notebooks of a pipeline and thousands lines of code,

it is difficult to keep in mind which tables a notebook requires and what tables it produces.

On the other side, when exploring tables (files) on a storage (e.g. Azure Blob, AWS S3),

it is unclear which code produced those tables.

The complexity rises even more when a team needs to

maintain numerous pipelines, each structured in its own way.

How does dtflw work?

This project identifies implicit relationships between data and code in a pipeline as the main reason for increasing complexity.

Therefore, dtflw makes relationships between tables and notebooks explicit by building around a simple dataflow model:

Each notebook of a pipeline

  1. consumes input tables (possibly none),
  2. produces output tables (possibly none),
  3. and may require additional arguments to run.

Thus, a pipeline is a sequence of notebooks chained by input-output tables.

Here is an example of a Databricks pipeline built using dtflw:

# Notebook 
# /Repos/user@company.com/project/main'

from dtflw import init_flow
from dtflw.storage.azure import init_storage

storage = init_storage("account", "container", root_dir="analysis")
flow = init_flow(storage)
is_lazy = True

(
    flow.notebook("ingest_data")
        .input("SalesRecordsRaw", file_path="raw_data/sales_records_*.csv")
        # Bronze
        .output("SalesRecords")
        .run(is_lazy)
)

(
    flow.notebook("import_data")
        .input("SalesRecords")
        # Silver
        .output("SalesOrders")
        .output("Products")
        .output("Customers")
        .run(is_lazy)
)

(
    flow.notebook("calculate_sales_stats")
        .args({
            "year" : "2022"
        })
        .input("SalesOrders")
        .input("Customers")
        .input("Products")
        # Gold
        .output("SalesStatsPerProduct")
        .output("SalesStatsPerCustomer")
        .run(is_lazy)
)

storage.read_table(_flow["SalesStatsPerProduct"]).display()

File paths of input and output tables are passed to a callee notebook as arguments. dbutils.widgets API is used to fetch values passed at runtime.

# Notebook 
# /Repos/user@company.com/project/calculate_sales_stats'
from dtflw import init_args, init_inputs, init_outputs

args = init_args("year")
inputs = init_inputs("SalesOrders", "Customers", "Products")
outputs = init_outputs("SalesStatsPerProduct", "SalesStatsPerCustomer")

# Load inputs
sales_orders_df = spark.read.parquet(inputs["SalesOrders"].value)
# ...

# Save outputs
sales_stats_per_product_df.write.mode("overwrite")\
    .parquet(outputs["SalesStatsPerProduct"].value)
# ...

Additionally, dtflw takes care of constructing file paths for output tables.

It derives file paths of outputs from a path of corresponding notebook which save them.

For the example above, an Azure blob container would look something like this:

https://account.blob.core.windows.net/container/

    raw_data/
        sales_records_2020.csv
        sales_records_2021.csv
        sales_records_2022.csv

    analysis/
        project/
            ingest_data/
                SalesRecords.parquet/
            import_data/
                SalesOrders.parquet/
                Products.parquet/
                Customers.parquet/
            calculate_sales_stats/
                SalesStatsPerProduct.parquet/
                SalesStatsPerCustomer.parquet/

Getting Started

Clone the repo git clone https://github.com/SoleyIo/dtflw.git.

Prerequisites

dtflw is tested with Python 3.8.*.

Install dependencies from the install_requires section in setup.py. You may want to to do that in a virtual environment and could use virtualenv for that.

Building

Build a .whl Python package python setup.py sdist bdist_wheel.

Installing

As soon as you have a .whl Python package, install it on a Databricks cluster.

dtflw is ready to be used.

Changes

Please, refer to the log of changes. Here we record all notable changes made to the codebase in every version.

Built With

dtflw is implemented using dbutils.

Contributing

Please refer to our code of conduct and to our contributing guidelines for details.

License

This project is licensed under the BSD 3-Clause License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dtflw-0.5.1.tar.gz (25.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dtflw-0.5.1-py3-none-any.whl (22.1 kB view details)

Uploaded Python 3

File details

Details for the file dtflw-0.5.1.tar.gz.

File metadata

  • Download URL: dtflw-0.5.1.tar.gz
  • Upload date:
  • Size: 25.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.6

File hashes

Hashes for dtflw-0.5.1.tar.gz
Algorithm Hash digest
SHA256 cd9b2e5ea52d3597bf62f0a6ca194c519e5cc76befe7edc5e46fc3ca8366526f
MD5 4ae19c8752b5086407a2e84357879b6a
BLAKE2b-256 9279925dc97933389529b66176569a331ce70e1bf66214d1106f7c50801e337e

See more details on using hashes here.

File details

Details for the file dtflw-0.5.1-py3-none-any.whl.

File metadata

  • Download URL: dtflw-0.5.1-py3-none-any.whl
  • Upload date:
  • Size: 22.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.6

File hashes

Hashes for dtflw-0.5.1-py3-none-any.whl
Algorithm Hash digest
SHA256 de9946cee7d06e5ebfddda70803778d6bb59b31bc9a46d09129d5a331c415043
MD5 e4a9ee1d9569b368d26b365c3654418a
BLAKE2b-256 bda73df23e08086c8b642269e651ccabad650f2d8c4bdec2a96a5c454a843aaf

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page