Skip to main content

<insert description here>

Project description

dagster-teradata

A dagster module that provides integration with Teradata Vantage.

Installation

The dagster_teradata module is available as a PyPI package - install with your preferred python environment manager.

source .venv/bin/activate
pip install dagster-teradata

Example Usage

This offers seamless integration with Teradata Vantage, facilitating efficient workflows for data processing, management, and transformation. This module supports a range of scenarios, such as executing queries, managing tables, and integrating with cloud storage solutions like AWS S3 and Azure Data Lake Storage (ADLS). Additionally, it enables compute cluster management for Teradata Vantage Cloud Lake.

import os
import pytest
from dagster import job, op, EnvVar
from dagster_teradata import TeradataResource

td_resource = TeradataResource(
    host=EnvVar("TERADATA_HOST"),
    user=EnvVar("TERADATA_USER"),
    password=EnvVar("TERADATA_PASSWORD"),
    database=EnvVar("TERADATA_DATABASE"),
)

def test_execute_query(tmp_path):
    @op(required_resource_keys={"teradata"})
    def example_test_execute_query(context):
        result = context.resources.teradata.execute_queries(
            ["select order_id from orders_24", "select order_id from orders_25"], True
        )
        context.log.info(result)

    @job(resource_defs={"teradata": td_resource})
    def example_job():
        example_test_execute_query()

    example_job.execute_in_process(resources={"teradata": td_resource})
import os
import pytest
from dagster import job, op, EnvVar
from dagster_teradata import TeradataResource

td_resource = TeradataResource(
    host=EnvVar("TERADATA_HOST"),
    user=EnvVar("TERADATA_USER"),
    password=EnvVar("TERADATA_PASSWORD"),
    database=EnvVar("TERADATA_DATABASE"),
)

def test_drop_table(tmp_path):
    @op(required_resource_keys={"teradata"})
    def example_test_drop_table(context):
        result = context.resources.teradata.drop_table(["process_tmp1", "process_tmp2"])
        context.log.info(result)

    @job(resource_defs={"teradata": td_resource})
    def example_job():
        example_test_drop_table()

    example_job.execute_in_process(resources={"teradata": td_resource})

Here is another example of compute cluster management in Teradata VantageCloud Lake:

import os

import pytest
from dagster import job, op, EnvVar
from dagster_teradata import teradata_resource

def test_create_teradata_compute_cluster(tmp_path):
    @op(required_resource_keys={"teradata"})
    def example_create_teradata_compute_cluster(context):
        """Args for create_teradata_compute_cluster():
        compute_profile_name: Name of the Compute Profile to manage.
        compute_group_name: Name of compute group to which compute profile belongs.
        query_strategy: Query strategy to use. Refers to the approach or method used by the
                Teradata Optimizer to execute SQL queries efficiently within a Teradata computer cluster.
                Valid query_strategy value is either 'STANDARD' or 'ANALYTIC'. Default at database level is STANDARD
        compute_map: ComputeMapName of the compute map. The compute_map in a compute cluster profile refers
                to the mapping of compute resources to a specific node or set of nodes within the cluster.
        compute_attribute: Optional attributes of compute profile. Example compute attribute
                MIN_COMPUTE_COUNT(1) MAX_COMPUTE_COUNT(5) INITIALLY_SUSPENDED('FALSE')
                   compute_attribute (str, optional): Additional attributes for compute profile. Defaults to None.
        """
        context.resources.teradata.create_teradata_compute_cluster(
            "ShippingCG01",
            "Shipping",
            "STANDARD",
            "TD_COMPUTE_MEDIUM",
            "MIN_COMPUTE_COUNT(1) MAX_COMPUTE_COUNT(1) INITIALLY_SUSPENDED('FALSE')",
        )

    @job(resource_defs={"teradata": teradata_resource})
    def example_job():
        example_create_teradata_compute_cluster()

    example_job.execute_in_process(
        run_config={
            "resources": {
                "teradata": {
                    "config": {
                        "host": EnvVar("TERADATA_HOST"),
                        "user": EnvVar("TERADATA_USER"),
                        "password": EnvVar("TERADATA_PASSWORD"),
                        "database": EnvVar("TERADATA_DATABASE"),
                    }
                }
            }
        }
    )

BTEQ Operator

The bteq_operator method enables execution of SQL statements or BTEQ (Basic Teradata Query) scripts using the Teradata BTEQ utility. It supports running commands either on the local machine or on a remote machine over SSH — in both cases, the BTEQ utility must be installed on the target system.

Key Features

  • Executes SQL provided as a string or from a script file (only one can be used at a time).
  • Supports custom encoding for the script or session.
  • Configurable timeout and return code handling.
  • Remote execution supports authentication using a password or an SSH key.
  • Works in both local and remote setups, provided the BTEQ tool is installed on the system where execution takes place.

Ensure that the Teradata BTEQ utility is installed on the machine where the SQL statements or scripts will be executed.

This could be:

  • The local machine where Dagster runs the task, for local execution.
  • The remote host accessed via SSH, for remote execution.
  • If executing remotely, also ensure that an SSH server (e.g., sshd) is running and accessible on the remote machine.

Parameters

  • sql: SQL statement(s) to be executed using BTEQ. (optional, mutually exclusive with file_path)
  • file_path: If provided, this file will be used instead of the sql content. This path represents remote file path when executing remotely via SSH, or local file path when executing locally. (optional, mutually exclusive with sql)
  • remote_host: Hostname or IP address for remote execution. If not provided, execution is assumed to be local. (optional)
  • remote_user: Username used for SSH authentication on the remote host. Required if remote_host is specified.
  • remote_password: Password for SSH authentication. Optional, and used as an alternative to ssh_key_path.
  • ssh_key_path: Path to the SSH private key used for authentication. Optional, and used as an alternative to remote_password.
  • remote_port: SSH port number for the remote host. Defaults to 22 if not specified. (optional)
  • remote_working_dir: Temporary directory location on the remote host (via SSH) where the BTEQ script will be transferred and executed. Defaults to /tmp if not specified. This is only applicable when ssh_conn_id is provided.
  • bteq_script_encoding: Character encoding for the BTEQ script file. Defaults to ASCII if not specified.
  • bteq_session_encoding: Character set encoding for the BTEQ session. Defaults to ASCII if not specified.
  • bteq_quit_rc: Accepts a single integer, list, or tuple of return codes. Specifies which BTEQ return codes should be treated as successful, allowing subsequent tasks to continue execution.
  • timeout: Timeout (in seconds) for executing the BTEQ command. Default is 600 seconds (10 minutes).
  • timeout_rc: Return code to use if the BTEQ execution fails due to a timeout. To allow Ops execution to continue after a timeout, include this value in bteq_quit_rc. If not specified, a timeout will raise an exception and stop the Ops.

Returns

  • Output of the BTEQ execution, or None if no output was produced.

Raises

  • ValueError: For invalid input or configuration
  • DagsterError: If BTEQ execution fails or times out

Notes

  • Either sql or file_path must be provided, but not both.
  • For remote execution, provide either remote_password or ssh_key_path (not both).
  • Encoding and timeout handling are customizable.
  • Validates remote port and authentication parameters.

Example Usage

# Local execution with direct SQL
output = bteq_operator(sql="SELECT * FROM table;")

# Remote execution with file
output = bteq_operator(
    file_path="script.sql",
    remote_host="example.com",
    remote_user="user",
    ssh_key_path="/path/to/key.pem"
)

Development

The Makefile provides the tools required to test and lint your local installation.

make test
make ruff
make check

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagster_teradata-0.0.4.tar.gz (18.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dagster_teradata-0.0.4-py3-none-any.whl (16.9 kB view details)

Uploaded Python 3

File details

Details for the file dagster_teradata-0.0.4.tar.gz.

File metadata

  • Download URL: dagster_teradata-0.0.4.tar.gz
  • Upload date:
  • Size: 18.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.7.17

File hashes

Hashes for dagster_teradata-0.0.4.tar.gz
Algorithm Hash digest
SHA256 1ac2cfd7d28587196f667b7c8e2c1024a95a610ae38b07ddc52b9a97f79582aa
MD5 16e2fbe1fe61e4bde27173c67cc7089e
BLAKE2b-256 b250509ea2b40300c2baaff3f493ca2f5cf45bfbf550fdeac9c4c53abab02385

See more details on using hashes here.

File details

Details for the file dagster_teradata-0.0.4-py3-none-any.whl.

File metadata

File hashes

Hashes for dagster_teradata-0.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 76b6c2ce72dd378e57dc135f67f81a55746d5869ec56cf9595cbc5fff6ae45a3
MD5 ec60c04815eb15e9d2e70adeccef9063
BLAKE2b-256 11e5ce9341d4b38ecab729fa967933251ca3395d97d6d13947ec8371a32be4eb

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page