dtflw is a Python framework for building modular data pipelines based on Databricks dbutils.notebook API.
Project description
dtflw / dataflow
dtflw is a Python framework for building modular data pipelines based on Databricks dbutils.notebook API. It was conceived with an intention to facilitate development and maintenance of Databricks data pipelines.
Why dtflw?
Databricks offers everything necessary to organize and manage code of data pipelines according to different requirements and tastes. Also, it does not impose any specific structure on a repo of a pipeline neither regulates relationships between data (tables) and code transforming them (notebooks).
In general, such freedom is an advantage, but with a growing number of notebooks, variety of data and complexity of analysis logic
it gets laborious to work with a codebase of a pipeline while debugging, extending or refactoring it.
Among dozens of notebooks of a pipeline and thousands lines of code,
it is difficult to keep in mind which tables a notebook requires and what tables it produces.
On the other side, when exploring tables (files) on a storage (e.g. Azure Blob, AWS S3),
it is unclear which code produced those tables.
The complexity rises even more when a team needs to
maintain numerous pipelines, each structured in its own way.
How does dtflw work?
This project identifies implicit relationships between data and code in a pipeline as the main reason for increasing complexity.
Therefore, dtflw makes relationships between tables and notebooks explicit by building around a simple dataflow model:
Each notebook of a pipeline
- consumes input tables (possibly none),
- produces output tables (possibly none),
- and may require additional arguments to run.
Thus, a pipeline is a sequence of notebooks chained by input-output tables.
Here is an example of a Databricks pipeline built using dtflw:
# Notebook
# /Repos/user@company.com/project/main'
from dtflw import init_flow
from dtflw.storage.azure import init_storage
storage = init_storage("account", "container", root_dir="analysis")
flow = init_flow(storage)
is_lazy = True
(
flow.notebook("ingest_data")
.input("SalesRecordsRaw", file_path="raw_data/sales_records_*.csv")
# Bronze
.output("SalesRecords")
.run(is_lazy)
)
(
flow.notebook("import_data")
.input("SalesRecords")
# Silver
.output("SalesOrders")
.output("Products")
.output("Customers")
.run(is_lazy)
)
(
flow.notebook("calculate_sales_stats")
.args({
"year" : "2022"
})
.input("SalesOrders")
.input("Customers")
.input("Products")
# Gold
.output("SalesStatsPerProduct")
.output("SalesStatsPerCustomer")
.run(is_lazy)
)
storage.read_table(_flow["SalesStatsPerProduct"]).display()
File paths of input and output tables are passed to a callee notebook as arguments. dbutils.widgets API is used to fetch values passed at runtime.
# Notebook
# /Repos/user@company.com/project/calculate_sales_stats'
from dtflw import init_args, init_inputs, init_outputs
args = init_args("year")
inputs = init_inputs("SalesOrders", "Customers", "Products")
outputs = init_outputs("SalesStatsPerProduct", "SalesStatsPerCustomer")
# Load inputs
sales_orders_df = spark.read.parquet(inputs["SalesOrders"].value)
# ...
# Save outputs
sales_stats_per_product_df.write.mode("overwrite")\
.parquet(outputs["SalesStatsPerProduct"].value)
# ...
Additionally, dtflw takes care of constructing file paths for output tables.
It derives file paths of outputs from a path of corresponding notebook which save them.
For the example above, an Azure blob container would look something like this:
https://account.blob.core.windows.net/container/
raw_data/
sales_records_2020.csv
sales_records_2021.csv
sales_records_2022.csv
analysis/
project/
ingest_data/
SalesRecords.parquet/
import_data/
SalesOrders.parquet/
Products.parquet/
Customers.parquet/
calculate_sales_stats/
SalesStatsPerProduct.parquet/
SalesStatsPerCustomer.parquet/
Getting Started
Clone the repo git clone https://github.com/SoleyIo/dtflw.git.
Prerequisites
dtflw is tested with Python 3.8.*.
Install dependencies from the install_requires section in setup.py. You may want to to do that in a virtual environment and could use virtualenv for that.
Building
Build a .whl Python package python setup.py sdist bdist_wheel.
Installing
As soon as you have a .whl Python package, install it on a Databricks cluster.
dtflw is ready to be used.
Changes
Please, refer to the log of changes. Here we record all notable changes made to the codebase in every version.
Built With
dtflw is implemented using dbutils.
Contributing
Please refer to our code of conduct and to our contributing guidelines for details.
License
This project is licensed under the BSD 3-Clause License.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dtflw-0.5.1.tar.gz.
File metadata
- Download URL: dtflw-0.5.1.tar.gz
- Upload date:
- Size: 25.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cd9b2e5ea52d3597bf62f0a6ca194c519e5cc76befe7edc5e46fc3ca8366526f
|
|
| MD5 |
4ae19c8752b5086407a2e84357879b6a
|
|
| BLAKE2b-256 |
9279925dc97933389529b66176569a331ce70e1bf66214d1106f7c50801e337e
|
File details
Details for the file dtflw-0.5.1-py3-none-any.whl.
File metadata
- Download URL: dtflw-0.5.1-py3-none-any.whl
- Upload date:
- Size: 22.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
de9946cee7d06e5ebfddda70803778d6bb59b31bc9a46d09129d5a331c415043
|
|
| MD5 |
e4a9ee1d9569b368d26b365c3654418a
|
|
| BLAKE2b-256 |
bda73df23e08086c8b642269e651ccabad650f2d8c4bdec2a96a5c454a843aaf
|