Skip to main content

Create lineage graphs from SQL queries

Project description

sql2lineage

PyPI - Version PyPI - Python Version PyPI - Types Lint Code Base Test Code Base codecov

The sql2lineage package makes it easy to understand the data lineage of your SQL ETL files.

Features

  • Parse SQL strings to create data lineage
  • Build a graph to represent the data lineage
  • Search and print neighbourhoods

Example

Creating Lineage

With an example SQL file:

WITH orders_with_tax AS (
    SELECT
        order_id,
        customer_id,
        order_total * 1.2 AS total_with_tax
    FROM raw.orders
),
filtered_orders AS (
    SELECT
        order_id,
        customer_id,
        total_with_tax
    FROM orders_with_tax
)

CREATE TABLE big_orders AS
SELECT * FROM filtered_orders;

We can parse the content to create lineage.

from sql2lineage.graph import LineageGraph
from sql2lineage.parser import SQLLineageParser

with open("example.sql") as f:
    sql = f.read()

parser = SQLLineageParser()
r = parser.extract_lineage(sql)


graph = LineageGraph()
graph.from_parsed(r.expressions)
graph.pretty_print()

Output

filtered_orders --> big_orders [type: TABLE]
raw.orders --> orders_with_tax [type: TABLE]
orders_with_tax --> filtered_orders [type: TABLE]
filtered_orders.customer_id --> big_orders.customer_id [type: COLUMN, action: COPY]
filtered_orders.total_with_tax --> big_orders.total_with_tax [type: COLUMN, action: COPY]
filtered_orders.order_id --> big_orders.order_id [type: COLUMN, action: COPY]
orders_with_tax.customer_id --> filtered_orders.customer_id [type: COLUMN, action: COPY]
orders_with_tax.total_with_tax --> filtered_orders.total_with_tax [type: COLUMN, action: COPY]
orders_with_tax.order_id --> filtered_orders.order_id [type: COLUMN, action: COPY]
raw.orders.order_id --> orders_with_tax.order_id [type: COLUMN, action: COPY]
raw.orders.order_total --> orders_with_tax.order_total [type: COLUMN, action: TRANSFORM]
raw.orders.customer_id --> orders_with_tax.customer_id [type: COLUMN, action: COPY]

Searching Neighbours

Using the previously created graph, we can find all the neighbours of node orders_with_tax.order_id:

paths = graph.get_node_neighbours("orders_with_tax.order_id")
graph.print_neighbourhood(paths)

Output

Neighbourhood:
  ↳ {'source': 'raw.orders.order_id', 'target': 'orders_with_tax.order_id', 'type': 'COLUMN', 'action': 'COPY'}
  ↳ {'source': 'orders_with_tax.order_id', 'target': 'filtered_orders.order_id', 'type': 'COLUMN', 'action': 'COPY'}
  ↳ {'source': 'filtered_orders.order_id', 'target': 'big_orders.order_id', 'type': 'COLUMN', 'action': 'COPY'}

Pre Transform

To allow for SQL strings to have pre-processing applied you can pass a callable to the extract_lineage and extract_lineages methods of the SQLLineageParser.

This is intended to simplify applying transformations to the SQL strings. The callable should accept a str and return a transformed string.

An example use case is where a SQL statement is preceeded by a BigQuery write disposition, this must be replaced with the correct DDL.

A pre-transform function can be written:

import re


def preprocess(sql):
    pattern = re.compile(r"(?P<object>(?:\w+)?\.?\w+\.\w+)\:(?P<action>\w+)\:")
    matches = pattern.findall(sql)
    for m in matches:
        object_name, action = m
        match action:
            case "DELETE":
                # used for truncate table & delete, DML not needed
                sql = sql.replace(f"{object_name}:{action}:", "")
            case "WRITE_APPEND":
                # used for insert, DML needed
                sql = sql.replace(
                    f"{object_name}:{action}:", f"insert into {object_name}"
                )
            case "WRITE_TRUNCATE":
                # used for insert overwrite, DML needed
                sql = sql.replace(
                    f"{object_name}:{action}:", f"create or replace table {object_name}"
                )
            case _:
                # fallback
                sql = sql.replace(f"{object_name}:{action}:", "")

    return sql

When the extract_lineage method is called with our preprocess callable (parser.extract_lineage(sql, pre_transform=preprocess)) and the following SQL, the pre_transform callable will be called before parsing the SQL string.

Input

WITH orders_with_tax AS (
    SELECT
        order_id,
        customer_id,
        order_total * 1.2 AS total_with_tax
    FROM raw.orders
),
filtered_orders AS (
    SELECT
        order_id,
        customer_id,
        total_with_tax
    FROM orders_with_tax
)

orders_dataset.big_orders:WRITE_TRUNCATE:
SELECT * FROM filtered_orders;

Output

WITH orders_with_tax AS (
    SELECT
        order_id,
        customer_id,
        order_total * 1.2 AS total_with_tax
    FROM raw.orders
),
filtered_orders AS (
    SELECT
        order_id,
        customer_id,
        total_with_tax
    FROM orders_with_tax
)

create or replace table orders_dataset.big_orders
SELECT * FROM filtered_orders;

Structs

When a struct is created in a statement, for example using struct(), the lineage will be maintained. However, if the struct is selected from an existing table, to maintain full lineage a struct_override must be provided.

Given the following SQL:

with customers as (
select c.id,
       c.name,
       struct(
        a.address_line_1,
        a.address_line_2,
        a.city,
        a.zip
       ) address
  from customer_raw c
  join customer_address a
    on c.id = a.customer_id
)

select *
  from customers
;

Without Override

Without the override the lineage for the final statement would only include customers.address:

customers.address --> expr0001.address [type: COLUMN]

With Override

With the override the lineage for the final statement will include the columns which form the struct customers.address:

customers.address_line_1 --> expr0001.address_line_1 [type: COLUMN]
customers.address_line_2 --> expr0001.address_line_2 [type: COLUMN]
customers.city --> expr0001.city [type: COLUMN]
customers.zip --> expr0001.zip [type: COLUMN]

By providing the override we also ensure the unbroken lineage back to the source table customer_address.

Providing an Override

The override should be a dictionary where the key value is the fully qualified name of the source struct, the value is a list of the field names contained within the struct:

{
    "customers.address": [
        "address_line_1",
        "address_line_2",
        "city",
        "zip",
    ]
}

Bugs/Requests

Please use the GitHub Issue Tracker to submit bugs or requests.

License

Copyright Sean Conkie, 2025.

Distributed under the terms of the MIT license, sql2lineage is free and open source software.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sql2lineage-0.5.1.tar.gz (33.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sql2lineage-0.5.1-py3-none-any.whl (26.8 kB view details)

Uploaded Python 3

File details

Details for the file sql2lineage-0.5.1.tar.gz.

File metadata

  • Download URL: sql2lineage-0.5.1.tar.gz
  • Upload date:
  • Size: 33.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for sql2lineage-0.5.1.tar.gz
Algorithm Hash digest
SHA256 94868663cf9860a74d78d4aca9597d64e6b99913047edcc4af2ddc7b2323bdb0
MD5 8939fb5a3d5b85ed242efbc0a241c03d
BLAKE2b-256 722e91fc51926febdf3a0d3dee21ba120ef58aeb0e67986538b7a436c4c38b29

See more details on using hashes here.

Provenance

The following attestation bundles were made for sql2lineage-0.5.1.tar.gz:

Publisher: publish.yml on sean-conkie/sql2lineage

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file sql2lineage-0.5.1-py3-none-any.whl.

File metadata

  • Download URL: sql2lineage-0.5.1-py3-none-any.whl
  • Upload date:
  • Size: 26.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for sql2lineage-0.5.1-py3-none-any.whl
Algorithm Hash digest
SHA256 1e529abbcaddad19f7e2a4e42bdd15b340be1ee385cce7aaa0905a073673c646
MD5 d9e5bca96a6adf6af25912851d01c59c
BLAKE2b-256 a1e166acb2238e1c6ecf3e2d69d44acd7b3a7511287d6f987dc48698e6ae50ef

See more details on using hashes here.

Provenance

The following attestation bundles were made for sql2lineage-0.5.1-py3-none-any.whl:

Publisher: publish.yml on sean-conkie/sql2lineage

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page