Skip to main content

A pipeline orchestration library executing tasks within one python session. It takes care of SQL table (de)materialization, caching and cache invalidation. Blob storage is supported as well for example for storing model files.

Project description

pydiverse.pipedag

CI

A pipeline orchestration library executing tasks within one python session. It takes care of SQL table (de)materialization, caching and cache invalidation. Blob storage is supported as well for example for storing model files.

This is an early stage version 0.x which lacks documentation. Please contact https://github.com/orgs/pydiverse/teams/code-owners if you like to become an early adopter or to contribute early stage usage examples.

Usage

pydiverse.pipedag can either be installed via pypi with pip install pydiverse-pipedag or via conda-forge with conda install pydiverse-pipedag -c conda-forge.

Example

A flow can look like this (i.e. put this in a file named run_pipeline.py):

import pandas as pd
import sqlalchemy as sa

from pydiverse.pipedag import Flow, Stage, Table, materialize
from pydiverse.pipedag.context import StageLockContext


@materialize(lazy=True)
def lazy_task_1():
    return sa.select(
        sa.literal(1).label("x"),
        sa.literal(2).label("y"),
    )


@materialize(lazy=True, input_type=sa.Table)
def lazy_task_2(input1: sa.Table, input2: sa.Table):
    query = sa.select(
        (input1.c.x * 5).label("x5"),
        input2.c.a,
    ).select_from(input1.outerjoin(input2, input2.c.x == input1.c.x))

    return Table(query, name="task_2_out", primary_key=["a"])


@materialize(lazy=True, input_type=sa.Table)
def lazy_task_3(input1: sa.Table):
    return sa.text(f"SELECT * FROM {input1.original.schema}.{input1.name}")


@materialize(lazy=True, input_type=sa.Table)
def lazy_task_4(input1: sa.Table):
    return sa.text(f"SELECT * FROM {input1.original.schema}.{input1.name}")


@materialize(nout=2, version="1.0.0")
def eager_inputs():
    dfA = pd.DataFrame(
        {
            "a": [0, 1, 2, 4],
            "b": [9, 8, 7, 6],
        }
    )
    dfB = pd.DataFrame(
        {
            "a": [2, 1, 0, 1],
            "x": [1, 1, 2, 2],
        }
    )
    return Table(dfA, "dfA"), Table(dfB, "dfB_%%")


@materialize(version="1.0.0", input_type=pd.DataFrame)
def eager_task(tbl1: pd.DataFrame, tbl2: pd.DataFrame):
    return tbl1.merge(tbl2, on="x")


def main():
    with Flow() as f:
        with Stage("stage_1"):
            lazy_1 = lazy_task_1()
            a, b = eager_inputs()

        with Stage("stage_2"):
            lazy_2 = lazy_task_2(lazy_1, b)
            lazy_3 = lazy_task_3(lazy_2)
            eager = eager_task(lazy_1, b)

        with Stage("stage_3"):
            lazy_4 = lazy_task_4(lazy_2)
        _ = lazy_3, lazy_4, eager  # unused terminal output tables

    # Run flow
    result = f.run()
    assert result.successful

    # Run in a different way for testing
    with StageLockContext():
        result = f.run()
        assert result.successful
        assert result.get(lazy_1, as_type=pd.DataFrame)["x"][0] == 1


if __name__ == "__main__":
    main()

Create a file called pipedag.yaml in the same directory:

name: pipedag_tests
table_store_connections:
  postgres:
    args:
      # Postgres: this can be used after running `docker-compose up`  
      url: "postgresql://{$POSTGRES_USERNAME}:{$POSTGRES_PASSWORD}@127.0.0.1:6543/{instance_id}"

instances:
  __any__:
    # listen-interface for pipedag context server which synchronizes some task state during DAG execution
    network_interface: "127.0.0.1"
    # classes to be materialized to table store even without pipedag Table wrapper (we have loose coupling between
    # pipedag and pydiverse.transform, so consider adding 'pydiverse.transform.Table' in your config)
    auto_table: ["pandas.DataFrame", "sqlalchemy.sql.elements.TextClause", "sqlalchemy.sql.selectable.Selectable"]
    fail_fast: true

    instance_id: pipedag_default
    table_store:
      class: "pydiverse.pipedag.backend.table.SQLTableStore"

      # Postgres: this can be used after running `docker-compose up`
      table_store_connection: postgres
      args:
        create_database_if_not_exists: True
        
        # print select statements before being encapsualted in materialize expressions and tables before writing to
        # database
        print_materialize: true
        # print final sql statements
        print_sql: true

      local_table_cache:
        store_input: true
        store_output: true
        use_stored_input_as_cache: true
        class: "pydiverse.pipedag.backend.table_cache.ParquetTableCache"
        args:
          base_path: "/tmp/pipedag/table_cache"

    blob_store:
      class: "pydiverse.pipedag.backend.blob.FileBlobStore"
      args:
        base_path: "/tmp/pipedag/blobs"

    lock_manager:
      class: "pydiverse.pipedag.backend.lock.ZooKeeperLockManager"
      args:
        hosts: "localhost:2181"

    orchestration:
      class: "pydiverse.pipedag.engine.SequentialEngine"

If you don't have a postgres, Microsoft SQL Server, or IBM DB2 database at hand, you can start a postgres database with the following docker-compose.yaml file:

version: "3.9"
services:
  postgres:
    image: postgres
    environment:
      POSTGRES_USER: sa
      POSTGRES_PASSWORD: Pydiverse23
      POSTGRES_PORT: 6543
    ports:
      - 6543:5432
  zoo:
    image: zookeeper
    environment:
      ZOO_4LW_COMMANDS_WHITELIST: ruok
      ZOO_MAX_CLIENT_CNXNS: 100
    ports:
      - 2181:2181

Run docker-compose up in the directory of your docker-compose.yaml and then execute the flow script as follows with a shell like bash and a python environment that includes pydiverse-pipedag, pandas, and sqlalchemy:

export POSTGRES_USERNAME=sa
export POSTGRES_PASSWORD=Pydiverse23
python run_pipeline.py

Finally, you may connect to your localhost postgres database pipedag_default and look at tables in schemas stage_1..stage_3.

If you don't have a SQL UI at hand, you may use psql command line tool inside the docker container. Check out the NAMES column in docker ps output. If the name of your postgres container is example_postgres_1, then you can look at output tables like this:

docker exec example_postgres_1 psql --username=sa --dbname=pipedag_default -c 'select * from stage_1.dfa;'

Or more interactively:

docker exec -t -i example_postgres_1 bash
psql --username=sa --dbname=pipedag_default
\dt stage_*.*
select * from stage_2.task_2_out;

Troubleshooting

Installing mssql odbc driver for linux

Installing with instructions here worked. But odbcinst -j revealed that it installed the configuration in /etc/unixODBC/*. But conda installed pyodbc brings its own odbcinst executable and that shows odbc config files are expected in /etc/*. Symlinks were enough to fix the problem. Try python -c 'import pyodbc;print(pyodbc.drivers())' and see whether you get more than an empty list. Furthermore, make sure you use 127.0.0.1 instead of localhost. It seems that /etc/hosts is ignored.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pydiverse_pipedag-0.5.0.tar.gz (93.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pydiverse_pipedag-0.5.0-py3-none-any.whl (113.3 kB view details)

Uploaded Python 3

File details

Details for the file pydiverse_pipedag-0.5.0.tar.gz.

File metadata

  • Download URL: pydiverse_pipedag-0.5.0.tar.gz
  • Upload date:
  • Size: 93.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.5.1 CPython/3.10.6 Linux/5.15.0-1040-azure

File hashes

Hashes for pydiverse_pipedag-0.5.0.tar.gz
Algorithm Hash digest
SHA256 44fb6805d8c8ed82df7c730698f64104780fdb898ca606afe9c26c9cbc18734a
MD5 059a05abfd33440af6445733328b1bf8
BLAKE2b-256 cef79fc5282e3631d5ac1b3c6e9b11f6d751d7d2040f10ab1b015b6edee82131

See more details on using hashes here.

File details

Details for the file pydiverse_pipedag-0.5.0-py3-none-any.whl.

File metadata

  • Download URL: pydiverse_pipedag-0.5.0-py3-none-any.whl
  • Upload date:
  • Size: 113.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.5.1 CPython/3.10.6 Linux/5.15.0-1040-azure

File hashes

Hashes for pydiverse_pipedag-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 350181e8350d0b14b40d8e60fb6b214d217242ccf59403a6dea3131f6bb8e001
MD5 258aa23b16dcf43598ec1213a6330f03
BLAKE2b-256 0bf414a39bf1fffc51df90fde6f927581a8f8bd0ba05625a2bd1b2f1b3a7c6a9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page