Skip to main content

Framework to organize data transformation flow.

Project description

Nodalize

Nodalize is a flexible framework for Python projects to easily create and maintain data pipelines. The dependency orchestration is automated and all accesses to the data store are abstracted. Developers can focus on their business logic.

Nodalize supports multiple data storage technologies and allows to easily plug in bespoke ones.

Finally, Nodalize handle multiple dataframe-based calculation frameworks, so that each pipeline can be built with the most convenient tool (Pandas, Polars, PySpark, etc.).

Illustration

Let's start with an example. We assume that we have a data set called "BaseDataSet" with 3 columns "Id", "Numerator" and "Denominator". We can create a derived data set applying some transformations using Python code such as below.

class EnrichedDataSet(DataNode):
    @property
    def schema(self):
        """Define storage format."""
        return {
            "Id": (int, ColumnCategory.KEY),
            "Ratio": (float, ColumnCategory.VALUE),
        }

    @property
    def dependencies(self):
        """Define parent nodes."""
        return {
            "df": "BaseDataSet"
        }

    @property
    def calculator_type(self):
        """Define type of calculation framework. The inputs and output of the 'compute' function below will be of the format defined here."""
        return "pandas"

    def compute(self, parameters, base_data):
        """Load data and enrich."""
        df = base_data()  # "base_data" is a function returning the data frame on demand. It can be called asynchronously.
        df["Ratio"] = df["Numerator"] / df["Denominator"]
        return df

coordinator = Coordinator("myApplication")
coordinator.set_data_manager("file", LocalFileDataManager("somelocation"), default=True)  # Various DataManager classes available: files, KDB, DeltaLake, and more to come.
coordinator.create_data_node(BaseDataSet())
coordinator.create_data_node(EnrichedDataSet())
coordinator.set_up()
coordinator().compute_and_save("EnrichedDataSet")  # Will compute and save the data.

The code above is specific to Pandas, but we can make it support other calculation frameworks (Pandas, Dask, PyArrow, PySpark, Polars).

class EnrichedDataSet(DataNode):
    def set_calculator_type(self, calc_type):
        """Set calculation framework at runtime."""
        self.calc_type = calc_type

    @property
    def schema(self):
        """Define storage format."""
        return {
            "Id": (int, ColumnCategory.KEY),
            "Ratio": (float, ColumnCategory.VALUE),
        }

    @property
    def dependencies(self):
        """Define parent nodes."""
        return {
            "base_data": "BaseDataSet"
        }

    @property
    def calculator_type(self):
        """Define type of calculation framework. The inputs and output of the 'compute' function below will be of the format defined here."""
        return self.calc_type

    def compute(self, parameters, base_data):
        """Compute data."""
        df = base_data()  # "base_data" is a function returning the data frame on demand. It can be called asynchronously.
        df = self.calculator.add_column(df, "Ratio", self.calculator.get_column(df, "Numerator") / self.calculator.get_column(df, "Denominator"))
        return df

coordinator = Coordinator("myApplication", LocalFileDataManager("somelocation"))
coordinator.set_data_manager("file", LocalFileDataManager("somelocation"), default=True)  # Various DataManager classes available: files, KDB, DeltaLake, and more to come.
coordinator.create_data_node(BaseDataSet())
coordinator.create_data_node(EnrichedDataSet()).set_calculator_type("pyarrow")  # Set framework as PyArrow for this run.
coordinator.set_up()
coordinator().compute_and_save("EnrichedDataSet")  # Will compute and save the data.

Dependency Management

Nodalize is able to generate a dependency graph and to update the downstream nodes automatically. The code below will generate the data for "BaseDataSet" and then update the data for "EnrichedDataSet".

coordinator = Coordinator("myApplication")
coordinator.set_data_manager("file", LocalFileDataManager("somelocation"), default=True)
coordinator.create_data_node(BaseDataSet())
coordinator.create_data_node(EnrichedDataSet())
coordinator.set_up()
coordinator().run_recursively(node_identifiers=["BaseDataSet"])

Calculation Frameworks

Nodalize relies on popular dataframe-based frameworks for the data manipulations. The packages supported so far are:

  • Pandas
  • PyArrow
  • Dask
  • PySpark
  • Polars

The architecture of Nodalize makes it easy to integrate new frameworks and more will follow. Users of Nodalize can also integrate their own dataframe-based framework.

Data Storage

Nodalize was designed to support multiple types of storage.

  • Local repository using parquet files
  • Relational and non-relational databases (KDB, DeltaLake, Sqlite)
  • AWS S3 buckets using parquet files

More types of data storage will be added over time. Users of Nodalize can add their favourite storage solution as well.

Demos

Load company data from Yahoo Finance API and compute accounting ratios before ranking by values

Load S&P stock prices from Yahoo Finance API and compute market betas for all stocks

Usages

Join data sets

Working with time series

Loading data with lookback

Cascading calculations

Retry and fallback

API

Compute nodes

Define node

Customization

Manage your own data store

Create your own calculator

Development

How to contribute

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

nodalize-1.0.3-py3-none-any.whl (70.4 kB view details)

Uploaded Python 3

File details

Details for the file nodalize-1.0.3-py3-none-any.whl.

File metadata

  • Download URL: nodalize-1.0.3-py3-none-any.whl
  • Upload date:
  • Size: 70.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.4

File hashes

Hashes for nodalize-1.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 608564c5c8bc70223908373336c20ac69ea866b3474c9b635f48dd1e6327bcde
MD5 677c2bf7eb4403cb9a2c3c484d55def8
BLAKE2b-256 1ed6e1ba183dbeb3bd79ca002fff5b9049dfaab2de9373bdf5bdc9bb07b26ce6

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page