Leveraging graph data structures for complex feature engineering pipelines.
Project description
GraphReduce
Description
GraphReduce is an abstraction for building machine learning feature engineering pipelines that involve many tables in a composable way. The library is intended to help bridge the gap between research feature definitions and production deployment without the overhead of a full feature store. Underneath the hood, GraphReduce uses graph data structures to represent tables/files as nodes and foreign keys as edges.
GraphReduce allows for a unified feature engineering interface
to plug & play with multiple backends: dask
, pandas
, and spark
are currently supported
Installation
# from pypi
pip install graphreduce
# from github
pip install 'graphreduce@git+https://github.com/wesmadrigal/graphreduce.git'
# install from source
git clone https://github.com/wesmadrigal/graphreduce && cd graphreduce && python setup.py install
Motivation
Machine learning requires vectors of data, but our tabular datasets
are disconnected. They can be represented as a graph, where tables
are nodes and join keys are edges. In many model building scenarios
there isn't a nice ML-ready vector waiting for us, so we must curate
the data by joining many tables together to flatten them into a vector.
This is the problem graphreduce
sets out to solve.
An example dataset might look like the following:
data granularity and time travel
But we need to flatten this to a specific granularity.
To further complicate things we need to handle orientation in time to prevent
data leakage and properly frame our train/test datasets. All of this
is controlled in graphreduce
from top-level parameters.
example of granularity and time travel parameters
cut_date
controls the date around which we orient the data in the graphcompute_period_val
controls the amount of time back in history we consider during compute over the graphcompute_period_unit
tells us what unit of time we're usingparent_node
specifies the parent-most node in the graph and, typically, the granularity to which to reduce the data
from graphreduce.graph_reduce import GraphReduce
from graphreduce.enums import PeriodUnit
gr = GraphReduce(
cut_date=datetime.datetime(2023, 2, 1),
compute_period_val=365,
compute_period_unit=PeriodUnit.day,
parent_node=customer
)
Node definition and parameterization
GraphReduce takes convention over configuration, so the user is required to define a number of methods on each node class:
do_annotate
annotation definitions (e.g., split a string column into a new column)do_filters
filter the data on column(s)do_clip_cols
clip anomalies like exceedingly large values and do normalizationpost_join_annotate
annotations on current node after relations are merged in and we have access to their columns, toodo_reduce
the most import node function, reduction operations: group bys, sum, min, max, etc.do_labels
label definitions if any At the instance level we need to parameterize a few things, such as where the data is coming from, the date key, the primary key, prefixes for preserving where the data originated after compute, and a few other optional parameters.
from graphreduce.node import GraphReduceNode
# define the customer node
class CustomerNode(GraphReduceNode):
def do_annotate(self):
# use the `self.colabbr` function to use prefixes
self.df[self.colabbr('is_big_spender')] = self.df[self.colabbr('total_revenue')].apply(
lambda x: x > 1000.00 then 1 else 0
)
def do_filters(self):
self.df = self.df[self.df[self.colabbr('some_bool_col')] == 0]
def do_clip_cols(self):
self.df[self.colabbr('high_variance_column')] = self.df[self.colabbr('high_variance_column')].apply(
lambda col: 1000 if col > 1000 else col
)
def post_join_annotate(self):
# filters after children are joined
pass
def do_reduce(self, reduce_key):
pass
def do_labels(self, reduce_key):
pass
cust = CustomerNode(
fpath='s3://somebucket/some/path/customer.parquet',
fmt='parquet',
prefix='cust',
date_key='last_login',
pk='customer_id'
)
Usage
Pandas
from graphreduce.node import GraphReduceNode
from graphreduce.graph_reduce import GraphReduce
class NodeA(GraphReduceNode):
def do_annotate(self):
pass
def do_filters(self):
pass
def do_clip_cols(self):
pass
def do_slice_data(self):
pass
def do_post_join_annotate(self):
import uuid
self.df[self.colabbr('uuid')] = self.df[self.colabbr(self.pk)].apply(lambda x: str(uuid.uuid4()))
def do_reduce(self, key):
pass
def do_labels(self, key):
pass
class NodeB(GraphReduce):
def do_annotate(self):
pass
def do_filters(self):
pass
def do_clip_cols(self):
pass
def do_slice_data(self):
pass
def do_post_join_annotate(self):
import uuid
self.df[self.colabbr('uuid')] = self.df[self.colabbr(self.pk)].apply(lambda x: str(uuid.uuid4()))
def do_reduce(self, key):
return self.prep_for_features().groupby(self.colabbr(reduce_key)).agg(
**{
self.colabbr(f'{self.pk}_counts') : pd.NamedAgg(column=self.colabbr(self.pk), aggfunc='count'),
self.colabbr(f'{self.pk}_min') : pd.NamedAgg(column=self.colabbr(self.pk), aggfunc='min'),
self.colabbr(f'{self.pk}_min'): pd.NamedAgg(column=self.colabbr(self.pk), aggfunc='max'),
self.colabbr(f'{self.date_key}_min') : pd.NamedAgg(column=self.colabbr(self.date_key), aggfunc='min'),
self.colabbr(f'{self.date_key}_max') : pd.NamedAgg(column=self.colabbr(self.date_key), aggfunc='max')
}
).reset_index()
def do_labels(self, key):
pass
nodea = NodeA(fpath='nodea.parquet', fmt='parquet', date_key='ts', prefix='nodea', pk='id')
nodeb = NodeB(fpath='nodeb.parquet', fmt='parquet', date_key='created_at', prefix='nodeb', pk='id')
gr = GraphReduce(
cut_date=datetime.datetime(2023,5,1),
parent_node=nodea,
compute_layer=ComputeLayerEnum.pandas
)
gr.add_entity_edge(
parent_node=nodea,
relation_node=nodeb,
parent_key='id',
relation_key='nodea_foreign_key_id',
relation_type='parent_child',
reduce=True
)
# plot the graph to see what compute graph will run
# note, you may have to open this file in a browser
gr.plot_graph(fname='demo_graph.html')
# perform all transformations
gr.do_transformations()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.