Skip to main content

No project description provided

Project description

pydwt

The pydwt library provides a set of tools for orchestrating tasks of data processing in a directed acyclic graph (DAG). This DAG is composed of tasks that have dependencies between them and can be executed in parallel or sequentially, depending on their dependencies.

Installation

pip install pydwt

Guide

In this document, we will provide a brief explanation of the main modules of the pydwt library, which are:

  • session.py: module for interacting with a database and creating DataFrame objects to manipulate data.
  • dataframe.py: module for defining a DataFrame class for working with data.
  • task.py: module for defining tasks in the DAG.
  • dag.py: module for creating and traversing the DAG.
  • workflow.py: module for running the DAG.

Session

The session.py module is responsible for interacting with a database and creating DataFrame objects to manipulate data. To use this module, you need to create an instance of the Session class, passing an SQLAlchemy engine object and the schema of the database (if it has one). Then, you can use the table method to create a DataFrame object from a table in the database.

Here is an example of how to create a Session object and use the table method to create a DataFrame object:

from sqlalchemy import create_engine
from pydwt.sql.session import Session

engine = create_engine("postgresql://user:password@localhost/dbname")
session = Session(engine, schema="my_schema")

df = session.table("my_table")

DataFrame

The dataframe.py module defines a DataFrame class for working with data. A DataFrame object is essentially a table with labeled columns and rows. You can use it to perform operations such as selecting, filtering, grouping, and aggregating data.

You can also materialize a DataFrame as a table or view in the database by calling the materialize method.

Here is an example of how to create a DataFrame object and perform some operations on it:

from pydwt.sql.session import Session
from pydwt.sql.dataframe import DataFrame

session = Session(engine, schema="my_schema")

df = session.table("my_table")

# select some columns
df = df.select(df.col1, df.col2)

# filter rows based on a condition
df = df.where(df.col1 > 10)

# group by a column and aggregate another column
df = df.group_by(df.col2, agg={df.col1: (func.sum, "sum_col1")})

# show the resulting DataFrame
df.materialize("new_table", as_="table")

Task

The task.py module defines a Task class for representing a task in the DAG. A Task object has a run method that is responsible for executing the task. You can also define the task's dependencies, schedule, and other parameters when creating the object.

To create a Task object, you can use the @Task decorator and define the run method. Here is an example of how to create a Task object:

from pydwt.core.task import Task

@Task()
def task_one():
    df = session.table("features")
    df = df.with_column("new_column", case((df.preds == "hw", "W")))
    df.materialize("new_table", as_="table")


@Task(depends_on=[task_one])
def task_two():
    df = session.table("new_table")
    df = df.where((df.new_column == "W"))
    df = df.with_column("new_column", case((df.preds == "hw", "W")))
    df.show()

Create a new pydwt project:

pydwt new <my_project>

This command will create a new project with the name "my_project" and the required file structure.

my_project/
    models/
        example.py
    dags/
settings.yml
  • project_name/models: where you will put your tasks
  • project_name/dags/: where the corresponding dag PNG file will be
  • settings.yml: a configuration file for your project. This file includes the configuration options for your project, such as the path to your data directory.

Export the DAG

pydwt export-dag

will export the current state of your dag in the project_name/dags/ as PNG file with timestamp.

Run your project

pydwt run <module.function_name>

If no argument provided will run the current state of your DAG. It will process the tasks in the DAG by level and parallelize it with the ThreadExecutor. It a task failed then its child tasks will not be run.

If argument provided in the form of module.function_name for instance example.task_one then will run all tasks in the dag leading to this task.
If parent tasks succeeded then run the task.

Test your connection setup

pydwt test-connection

will test the current setup of your DB connectiona according to your settings.yml file.

Configuration of your pydwt project

The settings.yml file is a configuration file for your pydwt project. It stores various settings such as the project name, database connection details, and DAG tasks.

connection

The connection section contains the configuration details for connecting to the database. The available options are:

  • url: the connection string to your db

You can add others keys that will be forwarded to the underlying create_engine function for instance you can add a echo : true and it will call create_engine(url=url, echo=echo) see here supported args.

project

The project section contains the project-related settings. The available options are:

name: the name of the project

tasks

This section contains the configuration for each task defined in the pydwt project.

Each task is identified by its name, and the configuration is stored as a dictionary.

The dictionary can contain any key-value pairs that the task implementation may need to use, but it must have a key named materialize.

  • The materialize key specifies how the task output should be stored. The value can be either view or table. The value of the materialize key determines whether the task output should be stored as a SQL view or a SQL table. If the value is view, the output is stored as a SQL view. If the value is table, the output is stored as a SQL table.

Each task implementation can access its configuration by injecting the config argument and specifying Provide[Container.config.tasks.<task_name>]. The injected config argument is a dictionary containing the configuration for the specified task.

example :

from pydwt.core.task import Task
from dependency_injector.wiring import inject, Provide
from pydwt.core.containers import Container

@Task()
@inject
def task_one(config:dict = Provide[Container.config.tasks.task_one]):
    print(config)

@Task(depends_on=[task_one])
def task_two():
    print("somme processing")

sources

The sources section contains the database sources that can be used in the project. Each source must have a unique name and specify the schema and table to use for the source.

sources:
  name_alias:
    table: xxx
    schema: yyy

You can then required this datasource in your tasks

from pydwt.core.task import Task
from dependency_injector.wiring import inject, Provide
from pydwt.core.containers import Container

@Task()
@inject
def task_one(
    config:dict = Provide[Container.config.tasks.task_one],
    repo= Provide[Container.datasources],
    ):
    df = repo.get_sources("name_alias")

License

This project is licensed under GPL.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pydwt-0.1.1.tar.gz (17.2 kB view details)

Uploaded Source

Built Distribution

pydwt-0.1.1-py3-none-any.whl (19.1 kB view details)

Uploaded Python 3

File details

Details for the file pydwt-0.1.1.tar.gz.

File metadata

  • Download URL: pydwt-0.1.1.tar.gz
  • Upload date:
  • Size: 17.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.2.2 CPython/3.8.10 Linux/5.10.16.3-microsoft-standard-WSL2

File hashes

Hashes for pydwt-0.1.1.tar.gz
Algorithm Hash digest
SHA256 45448b7d7396defdf422688b1c09c3b54f735d3eda498cc8037bf6fa79ec9eac
MD5 d6c53240c88423bed252d1da04b28e6e
BLAKE2b-256 6150e63a0853e1113cd102f14d3d03225217cb6e0b25ae203f55c2939fb3a67b

See more details on using hashes here.

File details

Details for the file pydwt-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: pydwt-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 19.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.2.2 CPython/3.8.10 Linux/5.10.16.3-microsoft-standard-WSL2

File hashes

Hashes for pydwt-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 753339213582b4ce564447035f26f44bd3a500fa692dcb492f6f3b9e22e0c171
MD5 f5cff58c5083af5043f1fe457446417f
BLAKE2b-256 f7080bb4a40d8abdc61faf3b3bba0732736515646f735100bb9cb531cdfcdb11

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page