Framework to organize data transformation flow.
Project description
Nodalize
Nodalize is a flexible framework for Python projects to easily create and maintain data pipelines. The dependency orchestration is automated and all accesses to the data store are abstracted. Developers can focus on their business logic.
Nodalize supports multiple data storage technologies and allows to easily plug in bespoke ones.
Finally, Nodalize handle multiple dataframe-based calculation frameworks, so that each pipeline can be built with the most convenient tool (Pandas, Polars, PySpark, etc.).
Illustration
Let's start with an example. We assume that we have a data set called "BaseDataSet" with 3 columns "Id", "Numerator" and "Denominator". We can create a derived data set applying some transformations using Python code such as below.
class EnrichedDataSet(DataNode):
@property
def schema(self):
"""Define storage format."""
return {
"Id": (int, ColumnCategory.KEY),
"Ratio": (float, ColumnCategory.VALUE),
}
@property
def dependencies(self):
"""Define parent nodes."""
return {
"df": "BaseDataSet"
}
@property
def calculator_type(self):
"""Define type of calculation framework. The inputs and output of the 'compute' function below will be of the format defined here."""
return "pandas"
def compute(self, parameters, base_data):
"""Load data and enrich."""
df = base_data() # "base_data" is a function returning the data frame on demand. It can be called asynchronously.
df["Ratio"] = df["Numerator"] / df["Denominator"]
return df
coordinator = Coordinator("myApplication")
coordinator.set_data_manager("file", LocalFileDataManager("somelocation"), default=True) # Various DataManager classes available: files, KDB, DeltaLake, and more to come.
coordinator.create_data_node(BaseDataSet())
coordinator.create_data_node(EnrichedDataSet())
coordinator.set_up()
coordinator().compute_and_save("EnrichedDataSet") # Will compute and save the data.
The code above is specific to Pandas, but we can make it support other calculation frameworks (Pandas, Dask, PyArrow, PySpark, Polars).
class EnrichedDataSet(DataNode):
def set_calculator_type(self, calc_type):
"""Set calculation framework at runtime."""
self.calc_type = calc_type
@property
def schema(self):
"""Define storage format."""
return {
"Id": (int, ColumnCategory.KEY),
"Ratio": (float, ColumnCategory.VALUE),
}
@property
def dependencies(self):
"""Define parent nodes."""
return {
"base_data": "BaseDataSet"
}
@property
def calculator_type(self):
"""Define type of calculation framework. The inputs and output of the 'compute' function below will be of the format defined here."""
return self.calc_type
def compute(self, parameters, base_data):
"""Compute data."""
df = base_data() # "base_data" is a function returning the data frame on demand. It can be called asynchronously.
df = self.calculator.add_column(df, "Ratio", self.calculator.get_column(df, "Numerator") / self.calculator.get_column(df, "Denominator"))
return df
coordinator = Coordinator("myApplication", LocalFileDataManager("somelocation"))
coordinator.set_data_manager("file", LocalFileDataManager("somelocation"), default=True) # Various DataManager classes available: files, KDB, DeltaLake, and more to come.
coordinator.create_data_node(BaseDataSet())
coordinator.create_data_node(EnrichedDataSet()).set_calculator_type("pyarrow") # Set framework as PyArrow for this run.
coordinator.set_up()
coordinator().compute_and_save("EnrichedDataSet") # Will compute and save the data.
Dependency Management
Nodalize is able to generate a dependency graph and to update the downstream nodes automatically. The code below will generate the data for "BaseDataSet" and then update the data for "EnrichedDataSet".
coordinator = Coordinator("myApplication")
coordinator.set_data_manager("file", LocalFileDataManager("somelocation"), default=True)
coordinator.create_data_node(BaseDataSet())
coordinator.create_data_node(EnrichedDataSet())
coordinator.set_up()
coordinator().run_recursively(node_identifiers=["BaseDataSet"])
Calculation Frameworks
Nodalize relies on popular dataframe-based frameworks for the data manipulations. The packages supported so far are:
- Pandas
- PyArrow
- Dask
- PySpark
- Polars
The architecture of Nodalize makes it easy to integrate new frameworks and more will follow. Users of Nodalize can also integrate their own dataframe-based framework.
Data Storage
Nodalize was designed to support multiple types of storage.
- Local repository using parquet files
- Relational and non-relational databases (KDB, DeltaLake, Sqlite)
- AWS S3 buckets using parquet files
More types of data storage will be added over time. Users of Nodalize can add their favourite storage solution as well.
Demos
Load company data from Yahoo Finance API and compute accounting ratios before ranking by values
Load S&P stock prices from Yahoo Finance API and compute market betas for all stocks
Usages
API
Customization
Development
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
File details
Details for the file nodalize-1.0.3-py3-none-any.whl
.
File metadata
- Download URL: nodalize-1.0.3-py3-none-any.whl
- Upload date:
- Size: 70.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.10.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 608564c5c8bc70223908373336c20ac69ea866b3474c9b635f48dd1e6327bcde |
|
MD5 | 677c2bf7eb4403cb9a2c3c484d55def8 |
|
BLAKE2b-256 | 1ed6e1ba183dbeb3bd79ca002fff5b9049dfaab2de9373bdf5bdc9bb07b26ce6 |