Leveraging graph data structures for complex feature engineering pipelines.
Project description
GraphReduce
Description
GraphReduce is an abstraction for building machine learning feature engineering pipelines that involve many tables in a composable way. The library is intended to help bridge the gap between research feature definitions and production deployment without the overhead of a full feature store. Underneath the hood, GraphReduce uses graph data structures to represent tables/files as nodes and foreign keys as edges.
Compute backends supported: pandas
, dask
, spark
, AWS Athena, Redshift, Snowflake, postgresql, MySQL
Compute backends coming soon: ray
Installation
# from pypi
pip install graphreduce
# from github
pip install 'graphreduce@git+https://github.com/wesmadrigal/graphreduce.git'
# install from source
git clone https://github.com/wesmadrigal/graphreduce && cd graphreduce && python setup.py install
Motivation
Machine learning requires vectors of data, but our tabular datasets
are disconnected. They can be represented as a graph, where tables
are nodes and join keys are edges. In many model building scenarios
there isn't a nice ML-ready vector waiting for us, so we must curate
the data by joining many tables together to flatten them into a vector.
This is the problem graphreduce
sets out to solve.
Prior work
- Deep Feature Synthesis
- [One Button Machine (IBM)](One Button Machine (IBM))
- autofeat (BASF)
- featuretools (inspired by Deep Feature Synthesis)
Shortcomings of prior work
- point in time correctness is not always handled well
- Deep Feature Synthesis and
featuretools
are limited topandas
and a couple of SQL databases - One Button Machine from IBM uses
spark
but their implementation outside of the paper could not be found - none of the prior implementations allow for custom computational graphs or additional third party libraries
We extend prior works and add the following functionality:
- point in time correctness on arbitrarily large computational graphs
- extensible computational layers, with support currently spanning:
pandas
,dask
,spark
, AWS Athena, AWS Redshift, Snowflake, postgresql, mysql, and more coming - customizable node implementations for a mix of dynamic and custom feature engineering with the ability to use third party libraries for portions (e.g., cleanlab for cleaning)
An example dataset might look like the following:
To get this example schema ready for an ML model we need to do the following:
- define the node-level interface and operations for filtering, annotating, normalizing, and reducing
- select the granularity) to which we'll reduce our data: in this example
customer
- specify how much historical data will be included and what holdout period will be used (e.g., 365 days of historical data and 1 month of holdout data for labels)
- filter all data entities to include specified amount of history to prevent data leakage
- depth first, bottom up aggregation operations group by / aggregation operations to reduce data
- End to end example:
import datetime
import pandas as pd
from graphreduce.node import GraphReduceNode, DynamicNode
from graphreduce.enum import ComputeLayerEnum, PeriodUnit
from graphreduce.graph_reduce import GraphReduce
# source from a csv file with the relationships
# using the file at: https://github.com/wesmadrigal/GraphReduce/blob/master/examples/cust_graph_labels.csv
reldf = pd.read_csv('cust_graph_labels.csv')
# using the data from: https://github.com/wesmadrigal/GraphReduce/tree/master/tests/data/cust_data
files = {
'cust.csv' : {'prefix':'cu'},
'orders.csv':{'prefix':'ord'},
'order_products.csv': {'prefix':'op'},
'notifications.csv':{'prefix':'notif'},
'notification_interactions.csv':{'prefix':'ni'},
'notification_interaction_types.csv':{'prefix':'nit'}
}
# create graph reduce nodes
gr_nodes = {
f.split('/')[-1]: DynamicNode(
fpath=f,
fmt='csv',
pk='id',
prefix=files[f]['prefix'],
date_key=None,
compute_layer=GraphReduceComputeLayerEnum.pandas,
compute_period_val=730,
compute_period_unit=PeriodUnit.day,
)
for f in files.keys()
}
gr = GraphReduce(
name='cust_dynamic_graph',
parent_node=gr_nodes['cust.csv'],
fmt='csv',
cut_date=datetime.datetime(2023,9,1),
compute_layer=GraphReduceComputeLayerEnum.pandas,
auto_features=True,
auto_feature_hops_front=1,
auto_feature_hops_back=2,
label_node=gr_nodes['orders.csv'],
label_operation='count',
label_field='id',
label_period_val=60,
label_period_unit=PeriodUnit.day
)
# Add graph edges
for ix, row in reldf.iterrows():
gr.add_entity_edge(
parent_node=gr_nodes[row['to_name']],
relation_node=gr_nodes[row['from_name']],
parent_key=row['to_key'],
relation_key=row['from_key'],
reduce=True
)
gr.do_transformations()
2024-04-23 13:49:41 [info ] hydrating graph attributes
2024-04-23 13:49:41 [info ] hydrating attributes for DynamicNode
2024-04-23 13:49:41 [info ] hydrating attributes for DynamicNode
2024-04-23 13:49:41 [info ] hydrating attributes for DynamicNode
2024-04-23 13:49:41 [info ] hydrating attributes for DynamicNode
2024-04-23 13:49:41 [info ] hydrating attributes for DynamicNode
2024-04-23 13:49:41 [info ] hydrating attributes for DynamicNode
2024-04-23 13:49:41 [info ] hydrating graph data
2024-04-23 13:49:41 [info ] checking for prefix uniqueness
2024-04-23 13:49:41 [info ] running filters, normalize, and annotations for <GraphReduceNode: fpath=notification_interaction_types.csv fmt=csv>
2024-04-23 13:49:41 [info ] running filters, normalize, and annotations for <GraphReduceNode: fpath=notification_interactions.csv fmt=csv>
2024-04-23 13:49:41 [info ] running filters, normalize, and annotations for <GraphReduceNode: fpath=notifications.csv fmt=csv>
2024-04-23 13:49:41 [info ] running filters, normalize, and annotations for <GraphReduceNode: fpath=orders.csv fmt=csv>
2024-04-23 13:49:41 [info ] running filters, normalize, and annotations for <GraphReduceNode: fpath=order_products.csv fmt=csv>
2024-04-23 13:49:41 [info ] running filters, normalize, and annotations for <GraphReduceNode: fpath=cust.csv fmt=csv>
2024-04-23 13:49:41 [info ] depth-first traversal through the graph from source: <GraphReduceNode: fpath=cust.csv fmt=csv>
2024-04-23 13:49:41 [info ] reducing relation <GraphReduceNode: fpath=notification_interactions.csv fmt=csv>
2024-04-23 13:49:41 [info ] performing auto_features on node <GraphReduceNode: fpath=notification_interactions.csv fmt=csv>
2024-04-23 13:49:41 [info ] joining <GraphReduceNode: fpath=notification_interactions.csv fmt=csv> to <GraphReduceNode: fpath=notifications.csv fmt=csv>
2024-04-23 13:49:41 [info ] reducing relation <GraphReduceNode: fpath=notifications.csv fmt=csv>
2024-04-23 13:49:41 [info ] performing auto_features on node <GraphReduceNode: fpath=notifications.csv fmt=csv>
2024-04-23 13:49:41 [info ] joining <GraphReduceNode: fpath=notifications.csv fmt=csv> to <GraphReduceNode: fpath=cust.csv fmt=csv>
2024-04-23 13:49:41 [info ] reducing relation <GraphReduceNode: fpath=order_products.csv fmt=csv>
2024-04-23 13:49:41 [info ] performing auto_features on node <GraphReduceNode: fpath=order_products.csv fmt=csv>
2024-04-23 13:49:41 [info ] joining <GraphReduceNode: fpath=order_products.csv fmt=csv> to <GraphReduceNode: fpath=orders.csv fmt=csv>
2024-04-23 13:49:41 [info ] reducing relation <GraphReduceNode: fpath=orders.csv fmt=csv>
2024-04-23 13:49:41 [info ] performing auto_features on node <GraphReduceNode: fpath=orders.csv fmt=csv>
2024-04-23 13:49:41 [info ] joining <GraphReduceNode: fpath=orders.csv fmt=csv> to <GraphReduceNode: fpath=cust.csv fmt=csv>
2024-04-23 13:49:41 [info ] Had label node <GraphReduceNode: fpath=orders.csv fmt=csv>
2024-04-23 13:49:41 [info ] computed labels for <GraphReduceNode: fpath=orders.csv fmt=csv>
gr.parent_node.df
cu_id cu_name notif_customer_id notif_id_count notif_customer_id_count notif_ts_first notif_ts_min notif_ts_max ni_notification_id_min ni_notification_id_max ni_notification_id_sum ni_id_count_min ni_id_count_max ni_id_count_sum ni_notification_id_count_min ni_notification_id_count_max ni_notification_id_count_sum ni_interaction_type_id_count_min ni_interaction_type_id_count_max ni_interaction_type_id_count_sum ni_ts_first_first ni_ts_first_min ni_ts_first_max ni_ts_min_first ni_ts_min_min ni_ts_min_max ni_ts_max_first ni_ts_max_min ni_ts_max_max ord_customer_id ord_id_count ord_customer_id_count ord_ts_first ord_ts_min ord_ts_max op_order_id_min op_order_id_max op_order_id_sum op_id_count_min op_id_count_max op_id_count_sum op_order_id_count_min op_order_id_count_max op_order_id_count_sum op_product_id_count_min op_product_id_count_max op_product_id_count_sum ord_customer_id_dupe ord_id_label
0 1 wes 1 6 6 2022-08-05 2022-08-05 2023-06-23 101.0 106.0 621.0 1.0 3.0 14.0 1.0 3.0 14.0 1.0 3.0 14.0 2022-08-06 2022-08-06 2023-05-15 2022-08-06 2022-08-06 2023-05-15 2022-08-08 2022-08-08 2023-05-15 1.0 2.0 2.0 2023-05-12 2023-05-12 2023-06-01 1.0 2.0 3.0 4.0 4.0 8.0 4.0 4.0 8.0 4.0 4.0 8.0 1.0 1.0
1 2 john 2 7 7 2022-09-05 2022-09-05 2023-05-22 107.0 110.0 434.0 1.0 1.0 4.0 1.0 1.0 4.0 1.0 1.0 4.0 2023-06-01 2023-06-01 2023-06-04 2023-06-01 2023-06-01 2023-06-04 2023-06-01 2023-06-01 2023-06-04 2.0 1.0 1.0 2023-01-01 2023-01-01 2023-01-01 3.0 3.0 3.0 4.0 4.0 4.0 4.0 4.0 4.0 4.0 4.0 4.0 NaN NaN
2 3 ryan 3 2 2 2023-06-12 2023-06-12 2023-09-01 NaN NaN 0.0 NaN NaN 0.0 NaN NaN 0.0 NaN NaN 0.0 NaT NaT NaT NaT NaT NaT NaT NaT NaT 3.0 1.0 1.0 2023-06-01 2023-06-01 2023-06-01 5.0 5.0 5.0 1.0 1.0 1.0 1.0 1.0 1.0 1.0 1.0 1.0 NaN NaN
3 4 tianji 4 2 2 2024-02-01 2024-02-01 2024-02-15 NaN NaN 0.0 NaN NaN 0.0 NaN NaN 0.0 NaN NaN 0.0
- Plot the graph reduce compute graph.
gr.plot_graph('my_graph_reduce.html')
- Use materialized dataframe for ML / analytics
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
train, test = train_test_split(gr.parent_node.df)
X = [x for x, y in dict(gr.parent_node.df.dtypes).items() if str(y).startswith('int') or str(y).startswith('float')]
# whether or not the user had an order
Y = 'ord_id_label'
mdl = LinearRegression()
mdl.fit(train[X], train[Y])
order of operations
API definition
GraphReduce instantiation and parameters
graphreduce.graph_reduce.GraphReduce
cut_date
controls the date around which we orient the data in the graphcompute_period_val
controls the amount of time back in history we consider during compute over the graphcompute_period_unit
tells us what unit of time we're usingparent_node
specifies the parent-most node in the graph and, typically, the granularity to which to reduce the data
from graphreduce.graph_reduce import GraphReduce
from graphreduce.enums import PeriodUnit
gr = GraphReduce(
cut_date=datetime.datetime(2023, 2, 1),
compute_period_val=365,
compute_period_unit=PeriodUnit.day,
parent_node=customer
)
GraphReduce commonly used functions
do_transformations
perform all data transformationsplot_graph
plot the graphadd_entity_edge
add an edgeadd_node
add a node
Node definition and parameters
graphreduce.node.GraphReduceNode
do_annotate
annotation definitions (e.g., split a string column into a new column)do_filters
filter the data on column(s)do_normalize
clip anomalies like exceedingly large values and do normalizationpost_join_annotate
annotations on current node after relations are merged in and we have access to their columns, toodo_reduce
the most import node function, reduction operations: group bys, sum, min, max, etc.do_labels
label definitions if any
# alternatively can use a dynamic node
from graphreduce.node import DynamicNode
dyna = DynamicNode(
fpath='s3://some.bucket/path.csv',
compute_layer=ComputeLayerEnum.dask,
fmt='csv',
prefix='myprefix',
date_key='ts',
pk='id'
)
Node commonly used functions
colabbr
abbreviate a columnprep_for_features
filter the node's data by the cut date and the compute period for point in time correctness, also referred to as "time travel" in blogsprep_for_labels
filter the node's data by the cut date and the label period to prepare for labeling
Roadmap
- integration with Ray
- more dynamic feature engineering abilities, possible integration with Deep Feature Synthesis
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file graphreduce-1.6.9.tar.gz
.
File metadata
- Download URL: graphreduce-1.6.9.tar.gz
- Upload date:
- Size: 33.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.19
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | f0a051b146c377e3261d3615239b241cb1c98edac7242dff8647c5d48b850706 |
|
MD5 | 8ac866f453e6cc98a3721c2568dca419 |
|
BLAKE2b-256 | 09ad2e3b4367a37c90b20bb2bc82daa7140420f72fe675f0f57f53d508d5e568 |
File details
Details for the file graphreduce-1.6.9-py3-none-any.whl
.
File metadata
- Download URL: graphreduce-1.6.9-py3-none-any.whl
- Upload date:
- Size: 32.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.19
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 847e62ace85757507b5e48fea5cb1d2f8e75e5d7c3ea542bf8e9a0b3fe7b7792 |
|
MD5 | 3bba1fb8000c0d691ed0bbf47848cb31 |
|
BLAKE2b-256 | 7694fcba259446f331cb4f33bd9e24c6358dc1d85f825cafcb177741a2720973 |