Skip to main content

A pipeline orchestration library executing tasks within one python session. It takes care of SQL table (de)materialization, caching and cache invalidation. Blob storage is supported as well for example for storing model files.

Project description

pydiverse.pipedag

CI

A pipeline orchestration library executing tasks within one python session. It takes care of SQL table (de)materialization, caching and cache invalidation. Blob storage is supported as well for example for storing model files.

This is an early stage version 0.x which lacks documentation. Please contact https://github.com/orgs/pydiverse/teams/code-owners if you like to become an early adopter or to contribute early stage usage examples.

Usage

pydiverse.pipedag can either be installed via pypi with pip install pydiverse-pipedag or via conda-forge with conda install pydiverse-pipedag -c conda-forge.

Example

A flow can look like this (i.e. put this in a file named run_pipeline.py):

import pandas as pd
import sqlalchemy as sa

from pydiverse.pipedag import Flow, Stage, Table, materialize
from pydiverse.pipedag.context import StageLockContext


@materialize(lazy=True)
def lazy_task_1():
    return sa.select(
        sa.literal(1).label("x"),
        sa.literal(2).label("y"),
    )


@materialize(lazy=True, input_type=sa.Table)
def lazy_task_2(input1: sa.Table, input2: sa.Table):
    query = sa.select(
        (input1.c.x * 5).label("x5"),
        input2.c.a,
    ).select_from(input1.outerjoin(input2, input2.c.x == input1.c.x))

    return Table(query, name="task_2_out", primary_key=["a"])


@materialize(lazy=True, input_type=sa.Table)
def lazy_task_3(input1: sa.Table):
    return sa.text(f"SELECT * FROM {input1.original.schema}.{input1.name}")


@materialize(lazy=True, input_type=sa.Table)
def lazy_task_4(input1: sa.Table):
    return sa.text(f"SELECT * FROM {input1.original.schema}.{input1.name}")


@materialize(nout=2, version="1.0.0")
def eager_inputs():
    dfA = pd.DataFrame(
        {
            "a": [0, 1, 2, 4],
            "b": [9, 8, 7, 6],
        }
    )
    dfB = pd.DataFrame(
        {
            "a": [2, 1, 0, 1],
            "x": [1, 1, 2, 2],
        }
    )
    return Table(dfA, "dfA"), Table(dfB, "dfB_%%")


@materialize(version="1.0.0", input_type=pd.DataFrame)
def eager_task(tbl1: pd.DataFrame, tbl2: pd.DataFrame):
    return tbl1.merge(tbl2, on="x")


def main():
    with Flow() as f:
        with Stage("stage_1"):
            lazy_1 = lazy_task_1()
            a, b = eager_inputs()

        with Stage("stage_2"):
            lazy_2 = lazy_task_2(lazy_1, b)
            lazy_3 = lazy_task_3(lazy_2)
            eager = eager_task(lazy_1, b)

        with Stage("stage_3"):
            lazy_4 = lazy_task_4(lazy_2)
        _ = lazy_3, lazy_4, eager  # unused terminal output tables

    # Run flow
    result = f.run()
    assert result.successful

    # Run in a different way for testing
    with StageLockContext():
        result = f.run()
        assert result.successful
        assert result.get(lazy_1, as_type=pd.DataFrame)["x"][0] == 1


if __name__ == "__main__":
    main()

Create a file called pipedag.yaml in the same directory:

name: pipedag_tests
table_store_connections:
  postgres:
    args:
      # Postgres: this can be used after running `docker-compose up`  
      url: "postgresql://{$POSTGRES_USERNAME}:{$POSTGRES_PASSWORD}@127.0.0.1:6543/{instance_id}"

instances:
  __any__:
    # listen-interface for pipedag context server which synchronizes some task state during DAG execution
    network_interface: "127.0.0.1"
    # classes to be materialized to table store even without pipedag Table wrapper (we have loose coupling between
    # pipedag and pydiverse.transform, so consider adding 'pydiverse.transform.Table' in your config)
    auto_table: ["pandas.DataFrame", "sqlalchemy.sql.elements.TextClause", "sqlalchemy.sql.selectable.Selectable"]
    fail_fast: true

    instance_id: pipedag_default
    table_store:
      class: "pydiverse.pipedag.backend.table.SQLTableStore"

      # Postgres: this can be used after running `docker-compose up`
      table_store_connection: postgres
      args:
        create_database_if_not_exists: True
        
        # print select statements before being encapsualted in materialize expressions and tables before writing to
        # database
        print_materialize: true
        # print final sql statements
        print_sql: true

      local_table_cache:
        store_input: true
        store_output: true
        use_stored_input_as_cache: true
        class: "pydiverse.pipedag.backend.table_cache.ParquetTableCache"
        args:
          base_path: "/tmp/pipedag/table_cache"

    blob_store:
      class: "pydiverse.pipedag.backend.blob.FileBlobStore"
      args:
        base_path: "/tmp/pipedag/blobs"

    lock_manager:
      class: "pydiverse.pipedag.backend.lock.ZooKeeperLockManager"
      args:
        hosts: "localhost:2181"

    orchestration:
      class: "pydiverse.pipedag.engine.SequentialEngine"

If you don't have a postgres, Microsoft SQL Server, or IBM DB2 database at hand, you can start a postgres database with the following docker-compose.yaml file:

version: "3.9"
services:
  postgres:
    image: postgres
    environment:
      POSTGRES_USER: sa
      POSTGRES_PASSWORD: Pydiverse23
      POSTGRES_PORT: 6543
    ports:
      - 6543:5432
  zoo:
    image: zookeeper
    environment:
      ZOO_4LW_COMMANDS_WHITELIST: ruok
      ZOO_MAX_CLIENT_CNXNS: 100
    ports:
      - 2181:2181

Run docker-compose up in the directory of your docker-compose.yaml and then execute the flow script as follows with a shell like bash and a python environment that includes pydiverse-pipedag, pandas, and sqlalchemy:

export POSTGRES_USERNAME=sa
export POSTGRES_PASSWORD=Pydiverse23
python run_pipeline.py

Finally, you may connect to your localhost postgres database pipedag_default and look at tables in schemas stage_1..stage_3.

If you don't have a SQL UI at hand, you may use psql command line tool inside the docker container. Check out the NAMES column in docker ps output. If the name of your postgres container is example_postgres_1, then you can look at output tables like this:

docker exec example_postgres_1 psql --username=sa --dbname=pipedag_default -c 'select * from stage_1.dfa;'

Or more interactively:

docker exec -t -i example_postgres_1 bash
psql --username=sa --dbname=pipedag_default
\dt stage_*.*
select * from stage_2.task_2_out;

Troubleshooting

Installing mssql odbc driver for linux

Installing with instructions here worked. But odbcinst -j revealed that it installed the configuration in /etc/unixODBC/*. But conda installed pyodbc brings its own odbcinst executable and that shows odbc config files are expected in /etc/*. Symlinks were enough to fix the problem. Try python -c 'import pyodbc;print(pyodbc.drivers())' and see whether you get more than an empty list. Furthermore, make sure you use 127.0.0.1 instead of localhost. It seems that /etc/hosts is ignored.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pydiverse_pipedag-0.5.0.tar.gz (93.1 kB view hashes)

Uploaded Source

Built Distribution

pydiverse_pipedag-0.5.0-py3-none-any.whl (113.3 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page