Skip to main content

Framework to organize data transformation flow.

Project description

Nodalize

Nodalize is a flexible framework for Python projects to easily create and maintain data pipelines. The dependency orchestration is automated and all accesses to the data store are abstracted. Developers can focus on their business logic.

Nodalize supports multiple data storage technologies and allows to easily plug in bespoke ones.

Finally, Nodalize handle multiple dataframe-based calculation frameworks, so that each pipeline can be built with the most convenient tool (Pandas, Polars, PySpark, etc.).

Illustration

Let's start with an example. We assume that we have a data set called "BaseDataSet" with 3 columns "Id", "Numerator" and "Denominator". We can create a derived data set applying some transformations using Python code such as below.

class EnrichedDataSet(DataNode):
    @property
    def schema(self):
        """Define storage format."""
        return {
            "Id": (int, ColumnCategory.KEY),
            "Ratio": (float, ColumnCategory.VALUE),
        }

    @property
    def dependencies(self):
        """Define parent nodes."""
        return {
            "df": "BaseDataSet"
        }

    @property
    def calculator_type(self):
        """Define type of calculation framework. The inputs and output of the 'compute' function below will be of the format defined here."""
        return "pandas"

    def compute(self, parameters, base_data):
        """Load data and enrich."""
        df = base_data()  # "base_data" is a function returning the data frame on demand. It can be called asynchronously.
        df["Ratio"] = df["Numerator"] / df["Denominator"]
        return df

coordinator = Coordinator("myApplication")
coordinator.set_data_manager("file", LocalFileDataManager("somelocation"), default=True)  # Various DataManager classes available: files, KDB, DeltaLake, and more to come.
coordinator.create_data_node(BaseDataSet())
coordinator.create_data_node(EnrichedDataSet())
coordinator.set_up()
coordinator().compute_and_save("EnrichedDataSet")  # Will compute and save the data.

The code above is specific to Pandas, but we can make it support other calculation frameworks (Pandas, Dask, PyArrow, PySpark, Polars).

class EnrichedDataSet(DataNode):
    def set_calculator_type(self, calc_type):
        """Set calculation framework at runtime."""
        self.calc_type = calc_type

    @property
    def schema(self):
        """Define storage format."""
        return {
            "Id": (int, ColumnCategory.KEY),
            "Ratio": (float, ColumnCategory.VALUE),
        }

    @property
    def dependencies(self):
        """Define parent nodes."""
        return {
            "base_data": "BaseDataSet"
        }

    @property
    def calculator_type(self):
        """Define type of calculation framework. The inputs and output of the 'compute' function below will be of the format defined here."""
        return self.calc_type

    def compute(self, parameters, base_data):
        """Compute data."""
        df = base_data()  # "base_data" is a function returning the data frame on demand. It can be called asynchronously.
        df = self.calculator.add_column(df, "Ratio", self.calculator.get_column(df, "Numerator") / self.calculator.get_column(df, "Denominator"))
        return df

coordinator = Coordinator("myApplication", LocalFileDataManager("somelocation"))
coordinator.set_data_manager("file", LocalFileDataManager("somelocation"), default=True)  # Various DataManager classes available: files, KDB, DeltaLake, and more to come.
coordinator.create_data_node(BaseDataSet())
coordinator.create_data_node(EnrichedDataSet()).set_calculator_type("pyarrow")  # Set framework as PyArrow for this run.
coordinator.set_up()
coordinator().compute_and_save("EnrichedDataSet")  # Will compute and save the data.

Dependency Management

Nodalize is able to generate a dependency graph and to update the downstream nodes automatically. The code below will generate the data for "BaseDataSet" and then update the data for "EnrichedDataSet".

coordinator = Coordinator("myApplication")
coordinator.set_data_manager("file", LocalFileDataManager("somelocation"), default=True)
coordinator.create_data_node(BaseDataSet())
coordinator.create_data_node(EnrichedDataSet())
coordinator.set_up()
coordinator().run_recursively(node_identifiers=["BaseDataSet"])

Calculation Frameworks

Nodalize relies on popular dataframe-based frameworks for the data manipulations. The packages supported so far are:

  • Pandas
  • PyArrow
  • Dask
  • PySpark
  • Polars

The architecture of Nodalize makes it easy to integrate new frameworks and more will follow. Users of Nodalize can also integrate their own dataframe-based framework.

Data Storage

Nodalize was designed to support multiple types of storage.

  • Local repository using parquet files
  • Relational and non-relational databases (KDB, DeltaLake, Sqlite)
  • AWS S3 buckets using parquet files

More types of data storage will be added over time. Users of Nodalize can add their favourite storage solution as well.

Demos

Load company data from Yahoo Finance API and compute accounting ratios before ranking by values

Load S&P stock prices from Yahoo Finance API and compute market betas for all stocks

Usages

Join data sets

Working with time series

Loading data with lookback

Cascading calculations

Retry and fallback

API

Compute nodes

Define node

Customization

Manage your own data store

Create your own calculator

Development

How to contribute

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

nodalize-1.0.3-py3-none-any.whl (70.4 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page