Skip to main content

A pipeline orchestration library executing tasks within one python session. It takes care of SQL table (de)materialization, caching and cache invalidation. Blob storage is supported as well for example for storing model files.

Project description

pydiverse.pipedag

CI

A pipeline orchestration library executing tasks within one python session. It takes care of SQL table (de)materialization, caching and cache invalidation. Blob storage is supported as well for example for storing model files.

This is an early stage version 0.x, however, it is already used in real projects. We are happy to receive your feedback as issues on the GitHub repo. Feel free to also comment on existing issues to extend them to your needs or to add solution ideas.

Usage

pydiverse.pipedag can either be installed via pypi with pip install pydiverse-pipedag pydot or via conda-forge with conda install pydiverse-pipedag pydot -c conda-forge. Our recommendation would be to use pixi which is also based on conda-forge:

mkdir my_project
pixi init
pixi add pydiverse-pipedag pydot

With pixi, you run python like this:

pixi run python -c 'import pydiverse.pipedag'

or this:

pixi run python my_script.py

Example

A flow can look like this (i.e. put this in a file named run_pipeline.py):

import tempfile

import pandas as pd
import sqlalchemy as sa

from pydiverse.pipedag import Flow, Stage, Table, materialize
from pydiverse.pipedag.context import StageLockContext
from pydiverse.pipedag.core.config import create_basic_pipedag_config
from pydiverse.common.util.structlog import setup_logging


@materialize(lazy=True)
def lazy_task_1():
    return sa.select(
        sa.literal(1).label("x"),
        sa.literal(2).label("y"),
    )


@materialize(lazy=True, input_type=sa.Table)
def lazy_task_2(input1: sa.Alias, input2: sa.Alias):
    query = sa.select(
        (input1.c.x * 5).label("x5"),
        input2.c.a,
    ).select_from(input1.outerjoin(input2, input2.c.x == input1.c.x))

    return Table(query, name="task_2_out", primary_key=["a"])


@materialize(lazy=True, input_type=sa.Table)
def lazy_task_3(input1: sa.Alias):
    return sa.text(f"SELECT * FROM {input1.original.schema}.{input1.original.name}")


@materialize(lazy=True, input_type=sa.Table)
def lazy_task_4(input1: sa.Alias):
    return sa.text(f"SELECT * FROM {input1.original.schema}.{input1.original.name}")


@materialize(nout=2, version="1.0.0")
def eager_inputs():
    dfA = pd.DataFrame(
        {
            "a": [0, 1, 2, 4],
            "b": [9, 8, 7, 6],
        }
    )
    dfB = pd.DataFrame(
        {
            "a": [2, 1, 0, 1],
            "x": [1, 1, 2, 2],
        }
    )
    return Table(dfA, "dfA"), Table(dfB, "dfB_%%")


@materialize(version="1.0.0", input_type=pd.DataFrame)
def eager_task(tbl1: pd.DataFrame, tbl2: pd.DataFrame):
    return tbl1.merge(tbl2, on="x")


def main():
    with tempfile.TemporaryDirectory() as temp_dir:
        cfg = create_basic_pipedag_config(
            f"duckdb:///{temp_dir}/db.duckdb",
            disable_stage_locking=True,  # This is special for duckdb
            # Attention: If uncommented, stage and task names might be sent to the following URL.
            #   You can self-host kroki if you like:
            #   https://docs.kroki.io/kroki/setup/install/
            #   You need to install optional dependency 'pydot' for any visualization
            #   URL to appear.
            # kroki_url="https://kroki.io",
        ).get("default")
        with cfg:
            with Flow() as f:
                with Stage("stage_1"):
                    lazy_1 = lazy_task_1()
                    a, b = eager_inputs()

                with Stage("stage_2"):
                    lazy_2 = lazy_task_2(lazy_1, b)
                    lazy_3 = lazy_task_3(lazy_2)
                    eager = eager_task(lazy_1, b)

                with Stage("stage_3"):
                    lazy_4 = lazy_task_4(lazy_2)
                _ = lazy_3, lazy_4, eager  # unused terminal output tables

            # Run flow
            result = f.run()
            assert result.successful

            # Run in a different way for testing
            with StageLockContext():
                result = f.run()
                assert result.successful
                assert result.get(lazy_1, as_type=pd.DataFrame)["x"][0] == 1


if __name__ == "__main__":
    setup_logging()  # you can set up the logging and/or structlog libraries as you wish
    main()

The with tempfile.TemporaryDirectory() is only needed to have an OS independent temporary directory available. You can also get rid of it like this:

def main():
    cfg = create_basic_pipedag_config(
        "duckdb:////tmp/pipedag/{instance_id}/db.duckdb",
        disable_stage_locking=True,  # This is special for duckdb
    ).get("default")
    ...

For a more sophisticated setup with a pipedag.yaml configuration file and with a separate database (i.e. containerized Postgres), please look here.

Troubleshooting

Installing mssql odbc driver for linux

Installing with instructions here worked. But odbcinst -j revealed that it installed the configuration in /etc/unixODBC/*. But conda installed pyodbc brings its own odbcinst executable and that shows odbc config files are expected in /etc/*. Symlinks were enough to fix the problem. Try python -c 'import pyodbc;print(pyodbc.drivers())' and see whether you get more than an empty list. Furthermore, make sure you use 127.0.0.1 instead of localhost. It seems that /etc/hosts is ignored.

Incompatibility with specific pydiverse.transform Versions

pydiverse.pipedag currently doesn't support pydiverse.transform Versions (0.2.0, 0.2.1, 0.2.2), due to major differences to pdt 0.2.3 and pdt <0.2. However, it does still work with pdt <0.2.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pydiverse_pipedag-0.12.13.tar.gz (3.5 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pydiverse_pipedag-0.12.13-py3-none-any.whl (236.2 kB view details)

Uploaded Python 3

File details

Details for the file pydiverse_pipedag-0.12.13.tar.gz.

File metadata

  • Download URL: pydiverse_pipedag-0.12.13.tar.gz
  • Upload date:
  • Size: 3.5 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for pydiverse_pipedag-0.12.13.tar.gz
Algorithm Hash digest
SHA256 6131b11e27600639ba4f2af3958d66a1fbe494af2e1bc9ca804e5cab10b9c95b
MD5 b3ac55e97dcbfc034cfd68f7cffd9bbf
BLAKE2b-256 0b35ae328a8f5f10ac231ca1d3585fae94ea6d1dad4ea128443d9f6cbd9e55be

See more details on using hashes here.

Provenance

The following attestation bundles were made for pydiverse_pipedag-0.12.13.tar.gz:

Publisher: release.yml on pydiverse/pydiverse.pipedag

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pydiverse_pipedag-0.12.13-py3-none-any.whl.

File metadata

File hashes

Hashes for pydiverse_pipedag-0.12.13-py3-none-any.whl
Algorithm Hash digest
SHA256 906a5dc7046f8c7151c626dd2e66a1c6cf9a2b9089c3bb33d2e0eb1d8ddc1db9
MD5 d622cfc155a3e8abad43ccc6894149b0
BLAKE2b-256 34559a3c449b211a3ba90e27612b36409020961035ed23e5c9ead531323d62ea

See more details on using hashes here.

Provenance

The following attestation bundles were made for pydiverse_pipedag-0.12.13-py3-none-any.whl:

Publisher: release.yml on pydiverse/pydiverse.pipedag

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page