Skip to main content

A utils package for Yes4All SOP

Project description

This is a utils package served for SOP Data Analytics team at Yes4All. It contains various modules to work with PostgreSQL, MinIO, Google API, Airflow, Telegram…

liuliukiki

Contents Overview

Install package

$ pip install --upgrade sop-deutils

Modules usage

Airflow

Use case: When having a new scheduled task file on Airflow.

Functional:

Auto naming DAG ID and alerting failed DAG to Telegram:

  • Sample code of base config Airflow dag file:

from airflow import DAG
from airflow.decorators import task
from sop_deutils.y4a_airflow import auto_dag_id, telegram_alert

default_args = {
    "retries":  20,                 # number times to retry when the task is failed
    "retry_delay": timedelta(minutes=7),                    # time delay among retries
    "start_date": datetime(2023, 7, 14, 0, 0, 0),                   # date that the DAG start to run
    "owner": 'duikha',                      # account name of DAG owner
    "on_failure_callback": telegram_alert,                  # this contains function to alert to Telegram when the DAG/task is failed
    "execution_timeout": timedelta(hours=4),                        # limit time the DAG run
}

dag = DAG(
    dag_id=auto_dag_id(),                   # this contains function to name the DAG based on the file directory
    description='Sample DAG',                       # description about the DAG
    default_args=default_args,                      # default arguments contains dictionary of predefined params above
    catchup=False,                  # If True, the DAG will backfill tasks from the start_date to current date
)

with dag:
    @task
    def function_1():
        ...

    @task
    def function_2():
        ...

    function_1() >> function_2()
  • List of account name can be found here.


GoogleSheet

Use case: When interacting with Google Sheet.

Functional:

2.1 initialize

Firstly, import GoogleSheet utils module class. If want to use personal credentials, provide the dictionary of credentials as value of parameter user_creds.

from sop_deutils.gg_api.y4a_sheet import GGSheetUtils

sheet_utils = GGSheetUtils(
    user_creds=None,
)
2.2 create_spread_sheet

To create a new spread sheet, using create_spread_sheet method, it has the following parameters:

  • sheet_name (required): Name of the sheet to create. (str)

  • folder_id (optional): ID of the folder contains spreadsheet. The default value is None. (str)

  • share_to (optional): List of email to share the spreadsheet. The default value is []. (list)

The method will return the created spreadsheet id.

spread_sheet_id = sheet_utils.create_spread_sheet(
    sheet_name='my-sheet-name',
    folder_id='my-folder-id',
    share_to=['longnc@yes4all.com'],
)

print(spread_sheet_id)

Output:

1vTjZOcRfd5eiF5Qo8DCha29Vdt0zvYP11XPbq54eCMg
2.3 list_all_work_sheets

To get all available worksheet of spreadsheet, using list_all_work_sheets method, it has the following parameter:

  • sheet_id (required): Spreadsheet id. (str)

The method will return list all worksheets of spreadsheet.

work_sheets = sheet_utils.list_all_work_sheets(
    sheet_id='my-sheet-id',
)

print(work_sheets)

Output:

['Sheet1']
2.4 delete_work_sheet

To delete specific worksheet of spreadsheet, using delete_work_sheet method, it has the following parameters:

  • sheet_id (required): Spreadsheet id. (str)

  • sheet_name (optional): Worksheet name. The default value is 'Sheet1'. (str)

sheet_utils.delete_work_sheet(
    sheet_id='my-sheet-id',
    sheet_name='my-sheet-name',
)
2.5 clear_work_sheet

To clear all data of specific worksheet of spreadsheet, using clear_work_sheet method, it has the following parameters:

  • sheet_id (required): Spreadsheet id. (str)

  • sheet_name (optional): Worksheet name. The default value is 'Sheet1'. (str)

sheet_utils.clear_work_sheet(
    sheet_id='my-sheet-id',
    sheet_name='my-sheet-name',
)
2.6 get_data

To get data from the given sheet, using get_data method, it has the following parameters:

  • sheet_id (required): Spreadsheet id. (str)

  • sheet_name (optional): Worksheet name. The default value is 'Sheet1'. (str)

  • range_from (optional): The begining of the range of data from sheet to get. The default value is 'A'. (str)

  • range_to (optional): The end of the range of data from sheet to get. The default value is 'Z'. (str)

  • columns_first_row (optional): Whether to convert the first row to columns. The default value is False. (bool)

df = sheet_utils.get_data(
    sheet_id='my-sheet-id',
    columns_first_row=True,
)

print(df)

Output:

| Column1 Header | Column2 Header | Column3 Header |
| ---------------| ---------------| ---------------|
| Row1 Value1    | Row1 Value2    | Row1 Value3    |
| Row2 Value1    | Row2 Value2    | Row2 Value3    |
| Row3 Value1    | Row3 Value2    | Row3 Value3    |
2.7 insert_data

To insert data to the given sheet, using insert_data method, it has the following parameters:

  • data (required): Dataframe containing data to insert. (pd.DataFrame)

  • sheet_id (required): Spreadsheet ID. (str)

  • sheet_name (optional): Worksheet name. The default value is 'Sheet1'. (str)

  • from_row_index (optional): The index of the row from which to begin inserting. The default value is 1. (int)

  • insert_column_names (optional): Whether to insert column names. The default value is False. (bool)

sheet_utils.insert_data(
    data=df,
    sheet_id='my-sheet-id',
    from_row_index=2,
    insert_column_names=False,
)
2.8 update_data

To update data of the given sheet, using the update_data method, it has the following parameters:

  • data (required): Dataframe containing data to update. (pd.DataFrame)

  • sheet_id (required): Spreadsheet ID. (str)

  • sheet_name (optional): Worksheet name. The default value is 'Sheet1'. (str)

  • range_from (optional): The beginning of the range of data to update. The default value is 'A'. (str)

sheet_utils.update_data(
    data=new_df,
    sheet_id='my-sheet-id',
    range_from='A4',
)
2.9 remove_data

To remove data from a specific range of the given sheet, using the remove_data method, it has the following parameters:

  • sheet_id (required): Spreadsheet ID. (str)

  • sheet_name (optional): Worksheet name. The default value is 'Sheet1'. (str)

  • list_range (optional): List of data ranges to remove. The default value is ['A1:Z1', 'A4:Z4']. (list)

sheet_utils.remove_data(
    sheet_id='my-sheet-id',
    list_range=[
        'A2:D5',
        'E5:G6',
    ],
)

MinIO

MinIO is an object storage, it is API compatible with the Amazon S3 cloud storage service. MinIO can be used as a datalake to store unstructured data (photos, videos, log files, backups, and container images) and structured data.

Use case: when need to store raw data or get raw data from datalake. Notes that the stored data extension must be .parquet .

Notes about how to determine the file_path parameter in minIO when using this module:

minIO file path

For example, if the directory to the data file in minIO is as above, then the file_path is "/scraping/amazon_vendor/avc_bulk_buy_request/2023/9/24/batch_1695525619" (after removing bucket name, data storage mode, and data file extension).

Functional:

3.1 initialize

Firstly, import minIO utils module class.

from sop_deutils.datalake.y4a_minio import MinioUtils

minio_utils = MinioUtils()
3.2 data_exist

To check whether data exists in a storage directory, using the data_exist method, it has the following parameters:

  • mode (required): The data storage mode. The value must be either 'prod' or 'stag'. (str)

  • file_path (required): The data directory to check. (str)

  • bucket_name (optional): The name of the bucket to check. The default value is 'sop-bucket'. (str)

The method will return True if data exists; otherwise, it returns False.

minio_utils.data_exist(
    mode='stag',
    file_path='your-data-path',
)

Output:

True
3.3 get_data_value_exist

To get the distinct values of a specified column of data in a data directory, using the get_data_value_exist method, it has the following parameters:

  • mode (required): The data storage mode. The value must be either 'prod' or 'stag'. (str)

  • file_path (required): The data directory to get distinct values. (str)

  • column_key (required): The column name to get distinct values. (str)

  • bucket_name (optional): The name of the bucket to get distinct values. The default value is 'sop-bucket'. (str)

The method will return a list of distinct values.

minio_utils.get_data_value_exist(
    mode='stag',
    file_path='your-data-path',
    column_key='your-chosen-column',
)

Output:

['value_1', 'value_2']
3.4 load_data

To load data from a dataframe to storage, using the load_data method, it has the following parameters:

  • data (required): Dataframe containing data to load. (pd.DataFrame)

  • mode (required): The data storage mode. The value must be either 'prod' or 'stag'. (str)

  • file_path (required): The directory to load the data. (str)

  • bucket_name (optional): The name of the bucket to load the data. The default value is 'sop-bucket'. (str)

minio_utils.load_data(
    data=df,
    mode='stag',
    file_path='your-data-path',
)
3.5 get_data

To get data from a single file of a storage directory, using the get_data method, it has the following parameters:

  • mode (required): The data storage mode. The value must be either 'prod' or 'stag'. (str)

  • file_path (required): The data directory to get data. (str)

  • bucket_name (optional): The name of the bucket to get data. The default value is 'sop-bucket'. (str)

The method will return a dataframe containing the data to get.

df = minio_utils.get_data(
    mode='stag',
    file_path='your-data-path',
)

print(df)

Output:

| Column1 Header | Column2 Header | Column3 Header |
| ---------------| ---------------| ---------------|
| Row1 Value1    | Row1 Value2    | Row1 Value3    |
| Row2 Value1    | Row2 Value2    | Row2 Value3    |
| Row3 Value1    | Row3 Value2    | Row3 Value3    |
3.6 get_data_wildcard

To get data from multiple files in storage directories, using the get_data_wildcard method, it has the following parameters:

  • mode (required): The data storage mode. The value must be either 'prod' or 'stag'. (str)

  • file_path (required): The parent data directory to get the data. (str)

  • bucket_name (optional): The name of the bucket to get data. The default value is 'sop-bucket'. (str)

The method will return a dataframe containing the data to get.

df = minio_utils.get_data_wildcard(
    mode='stag',
    file_path='your-parent-data-path',
)

print(df)

Output:

| Column1 Header | Column2 Header | Column3 Header |
| ---------------| ---------------| ---------------|
| Row1 Value1    | Row1 Value2    | Row1 Value3    |
| Row2 Value1    | Row2 Value2    | Row2 Value3    |
| Row3 Value1    | Row3 Value2    | Row3 Value3    |

PostgreSQL

Use case: when interacting with Postgres database.

Functional:

4.1 initialize

Firstly, import PostgreSQL utils module class. This class has four parameters:

  • account_name: The shortcode of client account name to connect to PostgreSQL. The value can be used as DA member name. The default value is None. If not provide, must use params pg_account and pg_password. List of account name can be found here. (str)

  • pg_name: PostgreSQL db name to connect. Accepted values are 'raw_master', 'raw_repl', 'serving_master', 'serving_repl'. (str)

  • pg_account: The client account to connect to PostgreSQL. The default value is None. (str)

  • pg_password: The client password to connect to PostgreSQL. The default value is None. (str)

from sop_deutils.sql.y4a_postgresql import PostgreSQLUtils

pg_utils = PostgreSQLUtils(
    pg_name='serving_master',
    account_name='user1',
)

# or

pg_utils = PostgreSQLUtils(
    pg_name='serving_master',
    pg_account='y4a_sop_user1',
    pg_password='password-of-user1',
)

# đều được
4.2 read_sql_file

To get the SQL query from an SQL file, using the read_sql_file method, it has the following parameter:

  • sql_file_path (required): The located path of the SQL file. (str)

The method will return the string representation of the SQL query.

sql = pg_utils.read_sql_file(
    sql_file_path='your-path/select_all.sql',
)

print(sql)

Output:

SELECT * FROM your_schema.your_table
4.3 insert_data

To insert data into a PostgreSQL table, using the insert_data method, it has the following parameters:

  • data (required): A dataframe containing the data to insert. (pd.DataFrame)

  • schema (required): The schema containing the table to insert. (str)

  • table (required): The name of the table to insert the data into. (str)

  • ignore_errors (optional): Whether to ignore errors when inserting data. The default value is False. (bool)

  • commit_every (optional): The number of rows of data to commit each time. The default value is 1000. (int)

  • db_pool_conn (optional): The connection pool to connect to the database. The default value is None. If the value is None, a new connection will be created and automatically closed after being used. (callable)

pg_utils.insert_data(
    data=your_df,
    schema='your-schema',
    table='your-table',
)
4.4 bulk_insert_data

To insert a large amount of data into a PostgreSQL table and need high performance, using the bulk_insert_data method, it has the following parameters:

  • data (required): A dataframe containing the data to insert. (pd.DataFrame)

  • schema (required): The schema containing the table to insert. (str)

  • table (required): The name of the table to insert the data into. (str)

  • commit_every (optional): The number of rows of data to commit each time. The default value is 1000. (int)

  • db_pool_conn (optional): The connection pool to connect to the database. The default value is None. If the value is None, a new connection will be created and automatically closed after being used. (callable)

pg_utils.bulk_insert_data(
    data=your_df,
    schema='your-schema',
    table='your-table',
)
4.5 upsert_data

To upsert data in a PostgreSQL table, using the upsert_data method, it has the following parameters:

  • data (required): A dataframe containing the data to upsert. Note that if the dataframe contains duplicated rows, they will be dropped. (pd.DataFrame)

  • schema (required): The schema containing the table to upsert. (str)

  • table (required): The name of the table to upsert the data into. (str)

  • where_conditions (optional): A string of a query that uses conditions to update. The default value is None. (str)

  • commit_every (optional): The number of rows of data to commit each time. The default value is 1000. (int)

  • db_pool_conn (optional): The connection pool to connect to the database. The default value is None. If the value is None, a new connection will be created and automatically closed after being used. (callable)

pg_utils.upsert_data(
    data=your_df,
    schema='your-schema',
    table='your-table',
)
4.6 bulk_upsert_data

To upsert large data to a PostgreSQL table and need high performance, using the bulk_upsert_data method, it has the following parameters:

  • data (required): A DataFrame containing data to upsert. If the DataFrame contains duplicated rows, they will be dropped. (pd.DataFrame)

  • schema (required): The schema containing the table to upsert. (str)

  • table (required): The name of the table to upsert the data into. (str)

  • where_conditions (optional): A string of a query that uses conditions to update. The default value is None. (str)

  • commit_every (optional): The number of rows of data to commit each time. The default value is 1000. (int)

  • db_pool_conn (optional): A connection pool to connect to the database. The default value is None. If the value is None, a new connection will be created and automatically closed after being used. (callable)

pg_utils.bulk_upsert_data(
    data=your_df,
    schema='your-schema',
    table='your-table',
)
4.7 update_table

To update new data of specific columns in a table based on primary keys, using the update_table method, it has the following parameters:

  • data (required): A DataFrame containing data to update, including primary keys and columns to update. (pd.DataFrame)

  • schema (required): The schema containing the table to update data. (str)

  • table (required): The table to update data. (str)

  • columns (required): A list of column names to update data. (list)

  • commit_every (optional): The number of rows of data to commit each time. The default value is 1000. (int)

  • db_pool_conn (optional): A connection pool to connect to the database. The default value is None. If the value is None, a new connection will be created and automatically closed after being used. (callable)

pg_utils.update_table(
    data=your_df,
    schema='your-schema',
    table='your-table',
    columns=['col1', 'col2'],
)
4.8 get_data

To get data from a PostgreSQL database using a SQL query, use the get_data method. This method has the following parameters:

  • sql (required): SQL query to get data. (str)

  • db_pool_conn (optional): A connection pool to connect to the database. The default value is None. If the value is None, a new connection will be created and automatically closed after being used. (callable)

The method will return a dataframe that contains data extracted by the given SQL query.

Here’s how to use the get_data method in Python:

df = pg_utils.get_data(
    sql='your-query',
)

print(df)

Output:

| Column1 Header | Column2 Header | Column3 Header |
| ---------------| ---------------| ---------------|
| Row1 Value1    | Row1 Value2    | Row1 Value3    |
| Row2 Value1    | Row2 Value2    | Row2 Value3    |
| Row3 Value1    | Row3 Value2    | Row3 Value3    |
4.9 select_distinct

To retrieve the distinct values of a specified column in a PostgreSQL table, use the select_distinct method, it has the following parameters:

  • col (required): Column name to get the distinct data.. (str)

  • schema (required): Schema contains table to get data. (str)

  • table (required): Table to get data. (str)

  • db_pool_conn (optional): A connection pool to connect to the database. The default value is None. If the value is None, a new connection will be created and automatically closed after being used. (callable)

The method will return a list of distinct values from the specified column.

distinct_values = pg_utils.select_distinct(
    col='chosen-column',
    schema='your-schema',
    table='your-table',
)

print(distinct_values)

Output:

['val1', 'val2', 'val3']
4.10 show_columns

To retrieve a list of column names for a specific PostgreSQL table, use the show_columns method. It has the following parameters:

  • schema (required): The schema that contains the table from which to retrieve columns. (str)

  • table (required): The name of the table from which to retrieve columns. (str)

  • db_pool_conn (optional): A connection pool to connect to the database. The default value is None. If the value is None, a new connection will be created and automatically closed after being used. (callable)

The method will return a list of column names for the specified table.

col_names = pg_utils.show_columns(
    schema='your-schema',
    table='your-table',
)

print(col_names)

Output:

['col1', 'col2', 'col3']
4.11 execute

To execute a given SQL query, use the execute method. It has the following parameters:

  • sql (required): The SQL query to execute. (str)

  • fetch_output (optional): Whether to fetch the results of the query. The default value is False. (bool)

  • db_pool_conn (optional): A connection pool to connect to the database. The default value is None. If the value is None, a new connection will be created and automatically closed after being used. (callable)

The method will return a list of query output if fetch_output is True, otherwise None.

sql = """
    UPDATE
        sales_order_avc_di o,
        (
            SELECT
                DISTINCT po_name,
                asin,
                CASE
                    WHEN o.status LIKE '%cancel%' AND a.status IS NULL THEN ''
                    WHEN o.status LIKE '%cancel%' THEN CONCAT(a.status,' ',cancel_date)
                    ELSE o.status END po_asin_amazon_status
            FROM
                sales_order_avc_order_status o
                LEFT JOIN
                    sales_order_avc_order_asin_status a USING (updated_at, po_name)
            WHERE updated_at > DATE_SUB(NOW(), INTERVAL 1 DAY)
        ) s
    SET
        o.po_asin_amazon_status = s.po_asin_amazon_status
    WHERE
        o.po_name = s.po_name
        AND o.asin = s.asin
"""

pg_utils.execute(
    sql=sql,
)
4.12 add_column

To add a new column to a specific PostgreSQL table, use the add_column method. It has the following parameters:

  • schema (required): The schema containing the table to create the column. (str)

  • table (required): The name of the table to create the column. (str)

  • column_name (optional): The name of the column to create (available when creating a single column). The default value is None. (str)

  • dtype (optional): The data type of the column to create (available when creating a single column). The default value is None. (str)

  • multiple_columns (optional): A dictionary containing column names as keys and their corresponding data types as values (available when creating multiple columns). The default value is an empty dictionary. (dict)

  • db_pool_conn (optional): A connection pool to connect to the database. The default value is None. If the value is None, a new connection will be created and automatically closed after being used. (callable)

pg_utils.add_column(
    schema='my-schema',
    table='my-table',
    multiple_columns={
        'col1': 'int',
        'col2': 'varchar(50)',
    },
)
4.13 create_table

To create a new table in a PostgreSQL database, use the create_table method. It has the following parameters:

  • schema (required): The schema containing the table to create. (str)

  • table (required): The name of the table to create. (str)

  • columns_with_dtype (required): A dictionary containing column names as keys and their corresponding data types as values. (dict)

  • columns_primary_key (optional): A list of columns to set as primary keys. The default value is []. (list)

  • columns_not_null (optional): A list of columns to set as “not null” constraints. The default value is []. (list)

  • columns_with_default (optional): A dictionary containing column names as keys and their default values as values. The default value is an empty dictionary. (dict)

  • db_pool_conn (optional): A connection pool to connect to the database. The default value is None. If the value is None, a new connection will be created and automatically closed after being used. (callable)

pg_utils.create_table(
    schema='my-schema',
    table='my-new-table',
    columns_with_dtype={
        'col1': 'int',
        'col2': 'varchar(50)',
        'col3': 'varchar(10)',
    },
    columns_primary_key=[
        'col1',
    ],
    columns_not_null=[
        'col2',
    ],
    columns_with_default={
        'col3': 'USA',
    },
)
4.14 auto_grant

To grant table privileges to users in PostgreSQL, use the auto_grant method. It has the following parameters:

  • schema (required): The schema containing the table to grant. (str)

  • table (required): The table name to grant. (str)

  • list_users (optional): A list of users to grant access. The default value is None. If None, the table will be granted with the predefined rule. (list)

  • privileges (optional): A list of privileges to grant. The default value is ['SELECT']. Accepted values in the privileges list are: 'SELECT', 'INSERT', 'UPDATE', 'DELETE', 'TRUNCATE', 'REFERENCES', 'TRIGGER'. (list)

  • all_privileges (optional): Whether to grant all privileges. The default value is False. (bool)

pg_utils.auto_grant(
    schema='my-schema',
    table='my-new-table',
    list_users=[
        'linhvk',
        'trieuna',
    ],
    privileges=[
        'SELECT',
        'INSERT',
        'UPDATE',
    ],
)
4.15 truncate_table

To remove all the data from a PostgreSQL table, use the truncate_table method. It has the following parameters:

  • schema (required): The schema containing the table to truncate. (str)

  • table (required): The table name to truncate. (str)

  • reset_identity (optional): Whether to reset the identity of the table. The default value is False. (bool)

  • db_pool_conn (optional): A connection pool to connect to the database. The default value is None. If the value is None, a new connection will be created and automatically closed after being used. (callable)

pg_utils.truncate_table(
    schema='my-schema',
    table='my-table',
)
4.16 table_exists

To check if the PostgreSQL table exists in the database, use the table_exists method. It has the following parameters:

  • schema (required): The schema containing the table to check. (str)

  • table (required): The table name to check. (str)

  • db_pool_conn (optional): A connection pool to connect to the database. The default value is None. If the value is None, a new connection will be created and automatically closed after being used. (callable)

The method will return True if the table exists and False if it does not.

pg_utils.table_exists(
    schema='my-schema',
    table='my-exists-table',
)

Output:

True

Telegram

Use case: When need to send messages to Telegram by using bot

Functional:

To send messages to Telegram, using send_message method, it has the following parameters:

  • text (required): Message to send. (str)

  • bot_token (optional): Token of the bot which send the message. The default value is None. If the value is None, the bot sleep at 9pm will be used to send messages. (str)

  • chat_id (optional): ID of group chat where the message is sent. The default value is None. If the value is None, the group chat Airflow Status Alert will be used. (str)

from sop_deutils.y4a_telegram import send_message

send_message(
    text='Hello liuliukiki'
)

All in one (DAConfig)

Use case: So far, there are a lot of platforms that needs to access frequently, in order not to import lots of modules, users can inherit all of above modules as simplest way.

Functional:

Firstly, import DAConfig class. This class has the following parameter:

  • account_name: The client account name to access platforms. The value can be used as DA member name. List of account name can be found here. (str)

from sop_deutils.base.y4a_da_cfg import DAConfig

da_cfg = DAConfig(
    account_name='your-account-name',

This class will have its attributes as all above modules (PostgreSQL, MinIO, Google API, Airflow, Telegram) that users don’t need to import and config to connect individually to each platform, each platform attributes will have the its own methods that listed above. List of attributes are:

  • minio_utils

  • pg_raw_r_utils (connected to PostgreSQL raw read - repl)

  • pg_raw_w_utils (connected to PostgreSQL raw write - master)

  • pg_serving_r_utils (connected to PostgreSQL serving read - repl)

  • pg_serving_w_utils (connected to PostgreSQL serving write - master)

  • sheet_utils

print(da_cfg.minio_utils)
print(da_cfg.pg_raw_r_utils)
print(da_cfg.pg_raw_w_utils)
print(da_cfg.pg_serving_r_utils)
print(da_cfg.pg_serving_w_utils)
print(da_cfg.sheet_utils)

Output:

<sop_deutils.datalake.y4a_minio.MinioUtils object at 0x7fe6e704d6f0>
<sop_deutils.sql.y4a_postgresql.PostgreSQLUtils object at 0x7fe6e704d9f0>
<sop_deutils.sql.y4a_postgresql.PostgreSQLUtils object at 0x7fe6e704dae0>
<sop_deutils.sql.y4a_postgresql.PostgreSQLUtils object at 0x7fe6e704e170>
<sop_deutils.sql.y4a_postgresql.PostgreSQLUtils object at 0x7fe6e704e0b0>
<sop_deutils.gg_api.y4a_sheet.GGSheetUtils object at 0x7fe72c65e1d0>

Workflow example

from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
import pandas as pd
from sop_deutils.y4a_airflow import auto_dag_id, telegram_alert
from sop_deutils.base.y4a_da_cfg import DAConfig

owner = 'linhvu'

cfg = DAConfig(owner)

default_args = {
    "retries":  20,                 # number times to retry when the task is failed
    "retry_delay": timedelta(minutes=7),                    # time delay among retries
    "start_date": datetime(2023, 7, 14, 0, 0, 0),                   # date that the DAG start to run
    "owner": owner,                 # account name of DAG owner
    "on_failure_callback": telegram_alert,                  # this contains function to alert to Telegram when the DAG/task is failed
    "execution_timeout": timedelta(hours=4),                        # limit time the DAG run
}
dag = DAG(
    dag_id=auto_dag_id(),                   # this contains function to name the DAG based on the file directory
    description='Sample DAG',                       # description about the DAG
    default_args=default_args,                      # default arguments contains dictionary of predefined params above
    catchup=False,                  # If True, the DAG will backfill tasks from the start_date to current date
)

with dag:
    @task
    def create_spreadsheet():
        spread_sheet_id = cfg.sheet_utils.create_spread_sheet(
            sheet_name='test_sheet_231020',
            share_to=['longnc@yes4all.com'],
        )

        return spread_sheet_id

    @task
    def insert_data_spreadsheet(spread_sheet_id):
        df = pd.DataFrame(
            [[1, 2, 3, 4]]*20,
            columns=['col1', 'col2', 'col3', 'col4']
        )

        cfg.sheet_utils.insert_data(
            data=df,
            sheet_id=spread_sheet_id,
            from_row_index=1,
            insert_column_names=True,
        )

    @task
    def process_data_spreadsheet(spread_sheet_id):
        cfg.sheet_utils.remove_data(
            sheet_id=spread_sheet_id,
            list_range=[
                'A3:D3',
                'A15:D15',
            ],
        )

    @task
    def etl_from_sheet_to_db(spread_sheet_id):
        df_from_sheet = cfg.sheet_utils.get_data(
            sheet_id=spread_sheet_id,
            columns_first_row=True,
        )

        df_from_sheet['total'] = df_from_sheet['col1'] + df_from_sheet['col2']\
            + df_from_sheet['col3'] + df_from_sheet['col4']
        df_from_sheet.dropna(inplace=True)
        for col in df_from_sheet.columns:
            df_from_sheet[col] = df_from_sheet[col].astype('int')

        cfg.pg_serving_w_utils.create_table(
            schema='y4a_sop_analyst',
            table='test_231020',
            columns_with_dtype={
                'col1': 'int',
                'col2': 'int',
                'col3': 'int',
                'col4': 'int',
                'total': 'int',
            },
        )

        cfg.pg_serving_w_utils.insert_data(
            data=df_from_sheet,
            schema='y4a_sop_analyst',
            table='test_231020',
        )

    @task
    def execute_query():
        df_from_db = cfg.pg_serving_r_utils.get_data(
            sql='SELECT * FROM y4a_sop_analyst.test_231020',
        )
        print(df_from_db)

        cfg.pg_serving_w_utils.execute(
            sql='TRUNCATE TABLE y4a_sop_analyst.test_231020',
        )

    spread_sheet_id = create_spreadsheet()

    insert_data_spreadsheet(spread_sheet_id) \
        >> process_data_spreadsheet(spread_sheet_id) \
            >>  etl_from_sheet_to_db(spread_sheet_id) \
                >> execute_query()

provided by liuliukiki

and special thank to duiikha for contributing api method to get and secure account credentials.

Project details


Release history Release notifications | RSS feed

This version

0.5.4

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sop_deutils-0.5.4.tar.gz (36.0 kB view hashes)

Uploaded Source

Built Distribution

sop_deutils-0.5.4-py3-none-any.whl (27.5 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page