Skip to main content

🧛 Row-level data lineage tracking

Project description

𝕭𝖑𝖔𝖔𝖉𝖑𝖎𝖓𝖊

PyPI Testing

Bloodline is a small library to track row-level provenance of data. It is not invasive and does not require modifying your code. Bloodline supports pandas, but could to be extended to other dataframe libraries.

We use this at Carbonfact to track data lineage across all the ETL pipelines we use to ingest our customers' data. This allows us to tell them where each data point they see comes from, and to keep track of data quality issues back to their source.

Installation

pip install bloodline

For local development:

git clone https://github.com/carbonfact/bloodline
cd bloodline && uv sync
uv run pytest

Getting started

Bloodline provides a Lineage object, which you use as a decorator. Here’s a minimal example:

>>> import bloodline as bl
>>> import pandas as pd

>>> lineage = bl.Lineage()

>>> @lineage
... def read_products():
...     return pd.read_csv('tests/examples/products.csv')

>>> products = read_products()

The dataframe products now has a data_lineage column, which records the source of each column’s values:

>>> from pprint import pprint
>>> pprint(products["data_lineage"].iloc[0])
{'mass': {'source_metadata': {'file_path': 'tests/examples/products.csv'},
          'source_type': 'DATA_SOURCE'},
 'price': {'source_metadata': {'file_path': 'tests/examples/products.csv'},
           'source_type': 'DATA_SOURCE'},
 'sku': {'source_metadata': {'file_path': 'tests/examples/products.csv'},
         'source_type': 'DATA_SOURCE'}}

What's nice is that you don't have to change anything about how you use pandas. You just need to decorate your data-processing functions with @lineage. Bloodline takes care of the rest.

Bloodline's default behavior is to impute a default source for new columns:

>>> @lineage
... def calculate_mass_in_kg(products):
...     products["mass_kg"] = products["mass"] / 1000
...     return products

>>> products = calculate_mass_in_kg(products)
>>> pprint(products['data_lineage'].iloc[0])
{'mass': {'source_metadata': {'file_path': 'tests/examples/products.csv'},
          'source_type': 'DATA_SOURCE'},
 'mass_kg': {'source_metadata': {}, 'source_type': 'UNKNOWN'},
 'price': {'source_metadata': {'file_path': 'tests/examples/products.csv'},
           'source_type': 'DATA_SOURCE'},
 'sku': {'source_metadata': {'file_path': 'tests/examples/products.csv'},
         'source_type': 'DATA_SOURCE'}}

Bloodline automatically merges data_lineage columns when you merge dataframes.

>>> @lineage
... def read_purchases():
...     purchases = pd.read_csv("tests/examples/purchases.csv")
...     purchases = pd.merge(purchases, products, on="sku", how="left")
...     users = pd.read_csv("tests/examples/users.csv")
...     purchases = pd.merge(purchases, users, left_on="user_id", right_on="id", how="left")
...     return purchases

>>> purchases = read_purchases()
>>> for col in purchases.columns.difference(['data_lineage']):
...     print(f"{col}: {purchases['data_lineage'].iloc[0][col]}")
date: {'source_type': 'DATA_SOURCE', 'source_metadata': {'file_path': 'tests/examples/purchases.csv'}}
id: {'source_type': 'DATA_SOURCE', 'source_metadata': {'file_path': 'tests/examples/users.csv'}}
mass: {'source_type': 'DATA_SOURCE', 'source_metadata': {'file_path': 'tests/examples/products.csv'}}
mass_kg: {'source_type': 'UNKNOWN', 'source_metadata': {}}
price: {'source_type': 'DATA_SOURCE', 'source_metadata': {'file_path': 'tests/examples/products.csv'}}
sku: {'source_type': 'DATA_SOURCE', 'source_metadata': {'file_path': 'tests/examples/products.csv'}}
user_id: {'source_type': 'DATA_SOURCE', 'source_metadata': {'file_path': 'tests/examples/purchases.csv'}}

How it works

Here's what happens under the hood at a high level:

  1. The @lineage decorator enters a context manager, which temporarily installs lineage-aware pandas hooks.
  2. Reader functions like pd.read_csv populate the dataframe with a data_lineage column, tagging each column with its source (e.g., file path)
  3. Join methods like pd.merge and DataFrame.join fuse data_lineage together, preserving provenance across dataframes.
  4. When the function returns, Bloodline's decorator imputes lineage for any new columns using the decorator's default source.

What's crucial is that data lineage is tracked at a row level. If you concatenate two dataframes, each row in the resulting dataframe retains its own provenance. This differs from column-level lineage tracking, which can lose granularity when rows originate from different sources.

The advantages of Bloodline's design are:

  • Users write normal pandas code. They don't have to swap pd.merge for a custom accessor.
  • The @lineage decorator overrides pandas' methods temporarily, which avoids global side effects.
  • Returning from the decorator triggers a final apply_data_lineage pass, ensuring each data point has lineage.

Here are the caveats:

  • Not all pandas operations are necessarily covered, which can lead to missed lineage.
  • Performance overhead can be significant for large dataframes due to the need to inspect data at a row level.

User guide

n-ary return outputs

The @lineage decorator assumes by default that the decorated function returns a single dataframe. This can be customized with the return_arg parameter:

>>> @lineage(return_arg=0)
... def read_data():
...     return pd.DataFrame({'foo': [1, 2]}), 'bar'

>>> df, label = read_data()
>>> assert 'data_lineage' in df.columns

Using custom sources

The default source can be overriden when instantiating the Lineage object:

>>> lineage = bl.Lineage(
...     default_source=bl.Source(
...         source_type="CUSTOM_DEFAULT",
...         source_metadata={"reason": "default for new columns"}
...     )
... )

You can also specify custom sources when using the decorator:

>>> @lineage.with_source(
...     source=bl.Source(
...         source_type="SNOWFLAKE"
...     )
... )
... def read_data_from_db():
...     df = pd.DataFrame({'price': [10, 15]})  # simulate reading from a database
...     return df

>>> df = read_data_from_db()
>>> pprint(df['data_lineage'].iloc[0])
{'price': {'source_metadata': {}, 'source_type': 'SNOWFLAKE'}}

Inheriting lineage for derived columns

When creating new columns derived from existing ones, you can instruct Bloodline to inherit lineage from parent columns:

>>> @lineage(
...    inheritance={"discounted_price": "price"}
... )
... def add_discounted_price(df: pd.DataFrame) -> pd.DataFrame:
...     df["discounted_price"] = df["price"] * 0.9
...     df["foo"] = 42  # this column won't inherit lineage
...     return df

>>> products = add_discounted_price(df)
>>> pprint(df['data_lineage'].iloc[0])
{'discounted_price': {'source_metadata': {}, 'source_type': 'SNOWFLAKE'},
 'foo': {'source_metadata': {'reason': 'default for new columns'},
         'source_type': 'CUSTOM_DEFAULT'},
 'price': {'source_metadata': {}, 'source_type': 'SNOWFLAKE'}}

Manually updating lineage

The @lineage decorator should cover most use cases, but sometimes you may need to manually adjust lineage information. This is where bl.apply_data_lineage() comes in handy. For example, you maybe want to update the lineage for a specific table subset after a bulk operation:

>>> def clip_price(df: pd.DataFrame) -> pd.DataFrame:
...     threshold = 10
...     row_mask = df["price"] > threshold
...     df.loc[row_mask, "price"] = threshold
...     return bl.apply_data_lineage(
...         table=df,
...         default_source=bl.Source(source_type="HEURISTIC", source_metadata={"heuristic_name": "mass_filler"}),
...         row_mask=row_mask,
...         column_names=["price"],
...         override=True,
...     )

>>> df = clip_price(df)
>>> pprint(df['data_lineage'].iloc[1])
{'discounted_price': {'source_metadata': {}, 'source_type': 'SNOWFLAKE'},
 'foo': {'source_metadata': {'reason': 'default for new columns'},
         'source_type': 'CUSTOM_DEFAULT'},
 'price': {'source_metadata': {'heuristic_name': 'mass_filler'},
           'source_type': 'HEURISTIC'}}

Generate entity relationship diagrams

Bloodline keeps track of each join between tables. You can generate E/R diagrams from the detected relationships. The relationship type is inferred from the unicity of the join keys.

>>> import pandas as pd
>>> import bloodline as bl

>>> lineage = bl.Lineage()

>>> @lineage
... def load_products():
...     return pd.read_csv("tests/examples/products.csv")

>>> @lineage
... def load_users():
...     return pd.read_csv("tests/examples/users.csv")

>>> @lineage
... def load_purchases():
...     purchases = pd.read_csv("tests/examples/purchases.csv")
...     users = load_users()
...     merged = pd.merge(left=purchases, right=users, left_on="user_id", right_on="id")
...     products = load_products()
...     merged = pd.merge(left=merged, right=products, on="sku")
...     return merged

>>> purchases = load_purchases()

>>> with open("tests/examples/erd.mmd", "w") as f:
...     _ = f.write(lineage.erd.to_mermaid())
erDiagram
    "tests/examples/users.csv" ||--o{ "tests/examples/purchases.csv" : "id -> user_id"
    "tests/examples/products.csv" ||--o{ "tests/examples/purchases.csv" : "sku -> sku"

Toggling tracking on/off

Bloodline exposes methods to control tracking at runtime:

  • bl.disable_data_lineage_tracking() ~ disable tracking globally.
  • bl.enable_data_lineage_tracking() ~ enable tracking globally.
  • bl.is_data_lineage_tracked() ~ check if tracking is enabled.
  • bl.temporarily_disable_tracking() ~ context manager to disable tracking temporarily.

Roadmap

  • Bring more pandas operations under the hook manager (e.g., DataFrame.fillna, DataFrame.where, DataFrame.assign, pd.melt).
  • Flesh out the developer guide with real-world recipes as new hooks land.
  • Experiment with lightweight lineage visualizations once the API surface settles.

Contributing

Feel free to reach out to alexis@carbonfact.com and max@carbonfact.com if you want to know more and/or contribute 😊

License

Bloodline is free and open-source software licensed under the Apache License, Version 2.0.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

bloodline-0.2.0.tar.gz (19.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

bloodline-0.2.0-py3-none-any.whl (18.0 kB view details)

Uploaded Python 3

File details

Details for the file bloodline-0.2.0.tar.gz.

File metadata

  • Download URL: bloodline-0.2.0.tar.gz
  • Upload date:
  • Size: 19.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.9.11 {"installer":{"name":"uv","version":"0.9.11"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for bloodline-0.2.0.tar.gz
Algorithm Hash digest
SHA256 55182190ee3a16698c0670ac188120526a1d46a1ca6b2f22d197b54ff7b47d00
MD5 a2fcf58932c1be850ff0db651fd8284d
BLAKE2b-256 53e5eeb6b53446f7d3dcdf1fc40adbabe3e11a3614f6db1915b2c91185cec7f8

See more details on using hashes here.

File details

Details for the file bloodline-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: bloodline-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 18.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.9.11 {"installer":{"name":"uv","version":"0.9.11"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for bloodline-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 7f0d492891ed10b79b5b8a6556c457ba84cf4f0082ef1a058636adcdd64d691d
MD5 aa542a6dff67b31fcd75f946cf5888c6
BLAKE2b-256 74477b9ccde65edbb1cd13aff04256013200448fe51580479ad701a89c2c8494

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page