🧛 Row-level data lineage tracking
Project description
𝕭𝖑𝖔𝖔𝖉𝖑𝖎𝖓𝖊
Bloodline is a small library to track row-level provenance of data. It is not invasive and does not require modifying your code. Bloodline supports pandas, but could to be extended to other dataframe libraries.
We use this at Carbonfact to track data lineage across all the ETL pipelines we use to ingest our customers' data. This allows us to tell them where each data point they see comes from, and to keep track of data quality issues back to their source.
Installation
pip install bloodline
For local development:
git clone https://github.com/carbonfact/bloodline
cd bloodline && uv sync
uv run pytest
Getting started
Bloodline provides a Lineage object, which you use as a decorator. Here’s a minimal example:
>>> import bloodline as bl
>>> import pandas as pd
>>> lineage = bl.Lineage()
>>> @lineage
... def read_products():
... return pd.read_csv('tests/examples/products.csv')
>>> products = read_products()
The dataframe products now has a data_lineage column, which records the source of each column’s values:
>>> from pprint import pprint
>>> pprint(products["data_lineage"].iloc[0])
{'mass': {'source_metadata': {'file_path': 'tests/examples/products.csv'},
'source_type': 'DATA_SOURCE'},
'price': {'source_metadata': {'file_path': 'tests/examples/products.csv'},
'source_type': 'DATA_SOURCE'},
'sku': {'source_metadata': {'file_path': 'tests/examples/products.csv'},
'source_type': 'DATA_SOURCE'}}
What's nice is that you don't have to change anything about how you use pandas. You just need to decorate your data-processing functions with @lineage. Bloodline takes care of the rest.
Bloodline's default behavior is to impute a default source for new columns:
>>> @lineage
... def calculate_mass_in_kg(products):
... products["mass_kg"] = products["mass"] / 1000
... return products
>>> products = calculate_mass_in_kg(products)
>>> pprint(products['data_lineage'].iloc[0])
{'mass': {'source_metadata': {'file_path': 'tests/examples/products.csv'},
'source_type': 'DATA_SOURCE'},
'mass_kg': {'source_metadata': {}, 'source_type': 'UNKNOWN'},
'price': {'source_metadata': {'file_path': 'tests/examples/products.csv'},
'source_type': 'DATA_SOURCE'},
'sku': {'source_metadata': {'file_path': 'tests/examples/products.csv'},
'source_type': 'DATA_SOURCE'}}
Bloodline automatically merges data_lineage columns when you merge dataframes.
>>> @lineage
... def read_purchases():
... purchases = pd.read_csv("tests/examples/purchases.csv")
... purchases = pd.merge(purchases, products, on="sku", how="left")
... users = pd.read_csv("tests/examples/users.csv")
... purchases = pd.merge(purchases, users, left_on="user_id", right_on="id", how="left")
... return purchases
>>> purchases = read_purchases()
>>> for col in purchases.columns.difference(['data_lineage']):
... print(f"{col}: {purchases['data_lineage'].iloc[0][col]}")
date: {'source_type': 'DATA_SOURCE', 'source_metadata': {'file_path': 'tests/examples/purchases.csv'}}
id: {'source_type': 'DATA_SOURCE', 'source_metadata': {'file_path': 'tests/examples/users.csv'}}
mass: {'source_type': 'DATA_SOURCE', 'source_metadata': {'file_path': 'tests/examples/products.csv'}}
mass_kg: {'source_type': 'UNKNOWN', 'source_metadata': {}}
price: {'source_type': 'DATA_SOURCE', 'source_metadata': {'file_path': 'tests/examples/products.csv'}}
sku: {'source_type': 'DATA_SOURCE', 'source_metadata': {'file_path': 'tests/examples/products.csv'}}
user_id: {'source_type': 'DATA_SOURCE', 'source_metadata': {'file_path': 'tests/examples/purchases.csv'}}
How it works
Here's what happens under the hood at a high level:
- The
@lineagedecorator enters a context manager, which temporarily installs lineage-aware pandas hooks. - Reader functions like
pd.read_csvpopulate the dataframe with adata_lineagecolumn, tagging each column with its source (e.g., file path) - Join methods like
pd.mergeandDataFrame.joinfusedata_lineagetogether, preserving provenance across dataframes. - When the function returns, Bloodline's decorator imputes lineage for any new columns using the decorator's default source.
What's crucial is that data lineage is tracked at a row level. If you concatenate two dataframes, each row in the resulting dataframe retains its own provenance. This differs from column-level lineage tracking, which can lose granularity when rows originate from different sources.
The advantages of Bloodline's design are:
- Users write normal pandas code. They don't have to swap
pd.mergefor a custom accessor. - The
@lineagedecorator overrides pandas' methods temporarily, which avoids global side effects. - Returning from the decorator triggers a final
apply_data_lineagepass, ensuring each data point has lineage.
Here are the caveats:
- Not all pandas operations are necessarily covered, which can lead to missed lineage.
- Performance overhead can be significant for large dataframes due to the need to inspect data at a row level.
User guide
n-ary return outputs
The @lineage decorator assumes by default that the decorated function returns a single dataframe. This can be customized with the return_arg parameter:
>>> @lineage(return_arg=0)
... def read_data():
... return pd.DataFrame({'foo': [1, 2]}), 'bar'
>>> df, label = read_data()
>>> assert 'data_lineage' in df.columns
Using custom sources
The default source can be overriden when instantiating the Lineage object:
>>> lineage = bl.Lineage(
... default_source=bl.Source(
... source_type="CUSTOM_DEFAULT",
... source_metadata={"reason": "default for new columns"}
... )
... )
You can also specify custom sources when using the decorator:
>>> @lineage.with_source(
... source=bl.Source(
... source_type="SNOWFLAKE"
... )
... )
... def read_data_from_db():
... df = pd.DataFrame({'price': [10, 15]}) # simulate reading from a database
... return df
>>> df = read_data_from_db()
>>> pprint(df['data_lineage'].iloc[0])
{'price': {'source_metadata': {}, 'source_type': 'SNOWFLAKE'}}
Inheriting lineage for derived columns
When creating new columns derived from existing ones, you can instruct Bloodline to inherit lineage from parent columns:
>>> @lineage(
... inheritance={"discounted_price": "price"}
... )
... def add_discounted_price(df: pd.DataFrame) -> pd.DataFrame:
... df["discounted_price"] = df["price"] * 0.9
... df["foo"] = 42 # this column won't inherit lineage
... return df
>>> products = add_discounted_price(df)
>>> pprint(df['data_lineage'].iloc[0])
{'discounted_price': {'source_metadata': {}, 'source_type': 'SNOWFLAKE'},
'foo': {'source_metadata': {'reason': 'default for new columns'},
'source_type': 'CUSTOM_DEFAULT'},
'price': {'source_metadata': {}, 'source_type': 'SNOWFLAKE'}}
Manually updating lineage
The @lineage decorator should cover most use cases, but sometimes you may need to manually adjust lineage information. This is where bl.apply_data_lineage() comes in handy. For example, you maybe want to update the lineage for a specific table subset after a bulk operation:
>>> def clip_price(df: pd.DataFrame) -> pd.DataFrame:
... threshold = 10
... row_mask = df["price"] > threshold
... df.loc[row_mask, "price"] = threshold
... return bl.apply_data_lineage(
... table=df,
... default_source=bl.Source(source_type="HEURISTIC", source_metadata={"heuristic_name": "mass_filler"}),
... row_mask=row_mask,
... column_names=["price"],
... override=True,
... )
>>> df = clip_price(df)
>>> pprint(df['data_lineage'].iloc[1])
{'discounted_price': {'source_metadata': {}, 'source_type': 'SNOWFLAKE'},
'foo': {'source_metadata': {'reason': 'default for new columns'},
'source_type': 'CUSTOM_DEFAULT'},
'price': {'source_metadata': {'heuristic_name': 'mass_filler'},
'source_type': 'HEURISTIC'}}
Generate entity relationship diagrams
Bloodline keeps track of each join between tables. You can generate E/R diagrams from the detected relationships. The relationship type is inferred from the unicity of the join keys.
>>> import pandas as pd
>>> import bloodline as bl
>>> lineage = bl.Lineage()
>>> @lineage
... def load_products():
... return pd.read_csv("tests/examples/products.csv")
>>> @lineage
... def load_users():
... return pd.read_csv("tests/examples/users.csv")
>>> @lineage
... def load_purchases():
... purchases = pd.read_csv("tests/examples/purchases.csv")
... users = load_users()
... merged = pd.merge(left=purchases, right=users, left_on="user_id", right_on="id")
... products = load_products()
... merged = pd.merge(left=merged, right=products, on="sku")
... return merged
>>> purchases = load_purchases()
>>> with open("tests/examples/erd.mmd", "w") as f:
... _ = f.write(lineage.erd.to_mermaid())
erDiagram
"tests/examples/users.csv" ||--o{ "tests/examples/purchases.csv" : "id -> user_id"
"tests/examples/products.csv" ||--o{ "tests/examples/purchases.csv" : "sku -> sku"
Toggling tracking on/off
Bloodline exposes methods to control tracking at runtime:
bl.disable_data_lineage_tracking()~ disable tracking globally.bl.enable_data_lineage_tracking()~ enable tracking globally.bl.is_data_lineage_tracked()~ check if tracking is enabled.bl.temporarily_disable_tracking()~ context manager to disable tracking temporarily.
Roadmap
- Bring more pandas operations under the hook manager (e.g.,
DataFrame.fillna,DataFrame.where,DataFrame.assign,pd.melt). - Flesh out the developer guide with real-world recipes as new hooks land.
- Experiment with lightweight lineage visualizations once the API surface settles.
Contributing
Feel free to reach out to alexis@carbonfact.com and max@carbonfact.com if you want to know more and/or contribute 😊
License
Bloodline is free and open-source software licensed under the Apache License, Version 2.0.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file bloodline-0.2.0.tar.gz.
File metadata
- Download URL: bloodline-0.2.0.tar.gz
- Upload date:
- Size: 19.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.9.11 {"installer":{"name":"uv","version":"0.9.11"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
55182190ee3a16698c0670ac188120526a1d46a1ca6b2f22d197b54ff7b47d00
|
|
| MD5 |
a2fcf58932c1be850ff0db651fd8284d
|
|
| BLAKE2b-256 |
53e5eeb6b53446f7d3dcdf1fc40adbabe3e11a3614f6db1915b2c91185cec7f8
|
File details
Details for the file bloodline-0.2.0-py3-none-any.whl.
File metadata
- Download URL: bloodline-0.2.0-py3-none-any.whl
- Upload date:
- Size: 18.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.9.11 {"installer":{"name":"uv","version":"0.9.11"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7f0d492891ed10b79b5b8a6556c457ba84cf4f0082ef1a058636adcdd64d691d
|
|
| MD5 |
aa542a6dff67b31fcd75f946cf5888c6
|
|
| BLAKE2b-256 |
74477b9ccde65edbb1cd13aff04256013200448fe51580479ad701a89c2c8494
|