Skip to main content

A utils package for Yes4All SOP

Project description

Yes4All SOP Utils Packages

This is a utils package served for SOP Data Analytics team at Yes4All. It contains various modules to work with PostgreSQL, MySQL, MinIO, Google API, Airflow, Telegram...

Author: liuliukiki aka clong kiki

User Guide Documentation

Install this package

$ pip install --upgrade sop-deutils

Modules usage

Airflow

Use case: when having a new scheduled task file on Airflow.

Functional:

Auto naming DAG ID and alerting failed DAG to Telegram:

  • Sample code of base config Airflow dag file:

     from airflow import DAG
     from airflow.decorators import task
     from sop_deutils.y4a_airflow import auto_dag_id, telegram_alert
    
     default_args = {
         "retries":  20,			# number times to retry when the task is failed
         "retry_delay": timedelta(minutes=7),			# time delay among retries
         "start_date": datetime(2023, 7, 14, 0, 0, 0),			# date that the DAG start to run 
         "owner": 'liuliukiki',			# telegram user name of DAG owner
         "on_failure_callback": telegram_alert,			# this contains function to alert to Telegram when the DAG/task is failed
         "execution_timeout": timedelta(hours=4),			# limit time the DAG run
     }
    
     dag = DAG(
         dag_id=auto_dag_id(),			# this contains function to name the DAG based on the file directory
         description='Sample DAG',			# description about the DAG
         default_args=default_args,			# default arguments contains dictionary of predefined params above
         catchup=False,			# If True, the DAG will backfill tasks from the start_date to current date
     )
    
     with dag:
         @task
         def function_1():
             ...
    
         @task
         def function_2():
             ...
    
         function_1() >> function_2()
    

GoogleSheet

(to be developed)

MinIO

MinIO is an object storage, it is API compatible with the Amazon S3 cloud storage service. MinIO can be used as a datalake to store unstructured data (photos, videos, log files, backups, and container images) and structured data.

Use case: when need to store raw data or get raw data from datalake. Notes that the stored data extension must be .parquet .

Notes about how to determine the file_path parameter in minIO when using this module:

minIO file path

For example, if the directory to the data file in minIO is as above, then the file_path is "/scraping/amazon_vendor/avc_bulk_buy_request/2023/9/24/batch_1695525619" (after removing bucket name, data storage mode, and data file extension).

Functional:

Firstly, import minIO utils module class. This class requires two parameters:

  • access_key: the client access key to minIO storage. (str)

  • secret_key: the client secret key to minIO storage. (str)

     from sop_deutils.datalake.y4a_minio import MinioUtils
    
     minio_utils = MinioUtils(
         access_key='your-access-key',
         secret_key='your-secret-key',
     )
    

To check whether data exists in a storage directory, using data_exist method, it has three parameters:

  • mode (required): the data storage mode, the value must be either 'prod' or 'stag'. (str)

  • file_path (required): the data directory to check. (str)

  • bucket_name (optional): the name of the bucket to check. The default value is 'sc-bucket'. (str)

    The method will return True if data exists otherwise False.

     minio_utils.data_exist(
         mode='stag',
         file_path='your-data-path',
         bucket_name='sc-bucket',
     )
    

    Output:

     True
    

To get the distinct values of a specified column of data in a data directory, using get_data_value_exist method, it has four parameters:

  • mode (required): the data storage mode, the value must be either 'prod' or 'stag'. (str)

  • file_path (required): the data directory to get distinct values. (str)

  • column_key (required): the column name to get distinct values. (str)

  • bucket_name (optional): the name of the bucket to get distinct values. The default value is 'sc-bucket'. (str)

    The method will return list of distinct values.

     minio_utils.get_data_value_exist(
         mode='stag',
         file_path='your-data-path',
         column_key='your-chosen-column',
         bucket_name='sc-bucket',
     )
    

    Output:

     ['value_1', 'value_2']
    

To load data from dataframe to storage, using load_data method, it has four parameters:

  • data (required): dataframe contains data to load. (pd.DataFrame)

  • mode (required): the data storage mode, the value must be either 'prod' or 'stag'. (str)

  • file_path (required): the directory to load the data. (str)

  • bucket_name (optional): the name of the bucket to load the data. The default value is 'sc-bucket'. (str)

     minio_utils.load_data(
         data=df,
         mode='stag',
         file_path='your-data-path',
         bucket_name='sc-bucket',
     )
    

To get data from a single file of directory of storage, using get_data method, it has three parameters:

  • mode (required): the data storage mode, the value must be either 'prod' or 'stag'. (str)

  • file_path (required): the data directory to get data. (str)

  • bucket_name (optional): the name of the bucket to get data. The default value is 'sc-bucket'. (str)

    The method will return dataframe contains data to get.

     df = minio_utils.get_data(
         mode='stag',
         file_path='your-data-path',
         bucket_name='sc-bucket',
     )
    
     print(df)
    

    Output:

     | Column1 Header | Column2 Header | Column3 Header |
     | ---------------| ---------------| ---------------|
     | Row1 Value1    | Row1 Value2    | Row1 Value3    |
     | Row2 Value1    | Row2 Value2    | Row2 Value3    |
     | Row3 Value1    | Row3 Value2    | Row3 Value3    |
    

To get data from multiple files of directories of storage, using get_data_wildcard method, it has three parameters:

  • mode (required): the data storage mode, the value must be either 'prod' or 'stag'. (str)

  • file_path (required): the parent data directory to get the data. (str)

  • bucket_name (optional): the name of the bucket to get data. The default value is 'sc-bucket'. (str)

    The method will return dataframe contains data to get.

     df = minio_utils.get_data_wildcard(
         mode='stag',
         file_path='your-parent-data-path',
         bucket_name='sc-bucket',
     )
    
     print(df)
    

    Output:

     | Column1 Header | Column2 Header | Column3 Header |
     | ---------------| ---------------| ---------------|
     | Row1 Value1    | Row1 Value2    | Row1 Value3    |
     | Row2 Value1    | Row2 Value2    | Row2 Value3    |
     | Row3 Value1    | Row3 Value2    | Row3 Value3    |
    

MySQL

(no docs available now)

PostgreSQL

Use case: when interacting with Postgres database.

Functional:

Firstly, import PostgreSQL utils module class. This class requires four parameters:

  • db_user: username or account to connect to PostgreSQL. (str)

  • db_password: password to connect to PostgreSQL. (str)

  • db_password: host url to connect to PostgreSQL. (str)

  • db: database to connect. The default value is 'serving'. (str)

     from sop_deutils.sql.y4a_postgresql import PostgreSQLUtils
    
     pg_utils = PostgreSQLUtils(
         db_user='your-user-name',
         db_password='your-pass-word',
         db_host='host-to-connect',
         db='database-to-connect',
     )
    

To create a new PostgreSQL connection pool, using create_pool_conn method, it has one parameter:

  • pool_size (optional): number of connections in the pool. The default value is 1, it means there is only a connection in pool. (int)

    The method will return connection pool contains connections to the database.

     pool = pg_utils.create_pool_conn(
         pool_size=1,
     )
    

To close and remove the PostgreSQL connection pool after being used, using close_pool_conn method, it has one parameter:

  • db_pool_conn (required): connection pool created by create_pool_conn method (callable)

     pg_utils.close_pool_conn(
         db_pool_conn=pool,
     )
    

To get the SQL query given by SQL file, using read_sql_file method, it has one parameter:

  • sql_file_path (required): the located path of SQL file. (str)

    The method will return the string of SQL query.

     sql = pg_utils.read_sql_file(
         sql_file_path: 'your-path/select_all.sql',
     )
    
     print(sql)
    

    Output:

     "SELECT * FROM your_schema.your_table"
    

To insert data to PostgreSQL table, using insert_data method, it has five parameters:

  • data (required): a dataframe contains data to insert. (pd.DataFrame)

  • schema (required): schema contains table to insert. (str)

  • table (required): table name to insert. (str)

  • commit_every (optional): number rows of data to commit each time. The default value is 1000. (int)

  • db_pool_conn (optional): connection pool to connect to database. The default value is None. If the value is None, a new connection will be created and automatically closed after being used. (callable)

     pg_utils.insert_data(
         data=your_df,
         schema='your-schema',
         table='your-table',
         commit_every=1000,
         db_pool_conn=pool,
     )
    

To insert large data to PostgreSQL table, using bulk_insert_data method, it has five parameters:

  • data (required): a dataframe contains data to insert. (pd.DataFrame)

  • schema (required): schema contains table to insert. (str)

  • table (required): table name to insert. (str)

  • commit_every (optional): number rows of data to commit each time. The default value is 1000. (int)

  • db_pool_conn (optional): connection pool to connect to database. The default value is None. If the value is None, a new connection will be created and automatically closed after being used. (callable)

     pg_utils.bulk_insert_data(
         data=your_df,
         schema='your-schema',
         table='your-table',
         commit_every=1000,
         db_pool_conn=pool,
     )
    

To upsert data to PostgreSQL table, using upsert_data method, it has six parameters:

  • data (required): a dataframe contains data to upsert. (pd.DataFrame)

  • schema (required): schema contains table to upsert. (str)

  • table (required): table name to upsert. (str)

  • primary_keys (required): list of primary keys of the table. (list)

  • commit_every (optional): number rows of data to commit each time. The default value is 1000. (int)

  • db_pool_conn (optional): connection pool to connect to database. The default value is None. If the value is None, a new connection will be created and automatically closed after being used. (callable)

     pg_utils.upsert_data(
         data=your_df,
         schema='your-schema',
         table='your-table',
         primary_keys=['pk1', 'pk2', 'pk3'],
         commit_every=1000,
         db_pool_conn=pool,
     )
    

To upsert large data to PostgreSQL table, using bulk_upsert_data method, it has six parameters:

  • data (required): a dataframe contains data to upsert. (pd.DataFrame)

  • schema (required): schema contains table to upsert. (str)

  • table (required): table name to upsert. (str)

  • primary_keys (required): list of primary keys of the table. (list)

  • commit_every (optional): number rows of data to commit each time. The default value is 1000. (int)

  • db_pool_conn (optional): connection pool to connect to database. The default value is None. If the value is None, a new connection will be created and automatically closed after being used. (callable)

     pg_utils.bulk_upsert_data(
         data=your_df,
         schema='your-schema',
         table='your-table',
         primary_keys=['pk1', 'pk2', 'pk3'],
         commit_every=1000,
         db_pool_conn=pool,
     )
    

To update new data of specific columns in the table based on primary keys, using update_table method, it has seven parameters:

  • data (required): a dataframe contains data to update, including primary keys and columns to update. (pd.DataFrame)

  • schema (required): schema contains table to update data. (str)

  • table (required): table to update data. (str)

  • columns (required): list of column names to update data. (list)

  • primary_keys (required): list of primary keys of table to update data. (list)

  • commit_every (optional): number rows of data to commit each time. The default value is 1000. (int)

  • db_pool_conn (optional): connection pool to connect to database. The default value is None. If the value is None, a new connection will be created and automatically closed after being used. (callable)

     pg_utils.update_table(
         data=your_df,
         schema='your-schema',
         table='your-table',
         columns=['col1', 'col2'],
         primary_keys=['pk1', 'pk2', 'pk3'],
         commit_every=1000,
         db_pool_conn=pool,
     )
    

To get data from PostgreSQL database given by a SQL query, using get_data method, it has two parameters:

  • sql (required): SQL query to get data. (str)

  • db_pool_conn (optional): connection pool to connect to database. The default value is None. If the value is None, a new connection will be created and automatically closed after being used. (callable)

    The method will return dataframe contains data extracted by the given SQL query.

     df = pg_utils.get_data(
         sql='your-query',
         db_pool_conn=pool,
     )
    
     print(df)
    

    Output:

     | Column1 Header | Column2 Header | Column3 Header |
     | ---------------| ---------------| ---------------|
     | Row1 Value1    | Row1 Value2    | Row1 Value3    |
     | Row2 Value1    | Row2 Value2    | Row2 Value3    |
     | Row3 Value1    | Row3 Value2    | Row3 Value3    |
    

To get the distinct values of a specified column in a PostgreSQL table, using select_distinct method, it has four parameters:

  • col (required): column name to get the distinct data. (str)

  • schema (required): schema contains table to get data. (str)

  • table (required): table to get data. (str)

  • db_pool_conn (optional): connection pool to connect to database. The default value is None. If the value is None, a new connection will be created and automatically closed after being used. (callable)

    The method will return list of distinct values.

     distinct_values = pg_utils.select_distinct(
         col='chosen-column',
         schema='your-schema',
         table='your-table',
         db_pool_conn=pool,
     )
    
     print(distinct_values)
    

    Output:

     ['val1', 'val2', 'val3']
    

To get list of columns name of a specific PostgreSQL table, using show_columns method, it has three parameters:

  • schema (required): schema contains table to get columns. (str)

  • table (required): table to get columns. (str)

  • db_pool_conn (optional): connection pool to connect to database. The default value is None. If the value is None, a new connection will be created and automatically closed after being used. (callable)

    The method will return list of column names of the table.

     col_names = pg_utils.show_columns(
         schema='your-schema',
         table='your-table',
         db_pool_conn=pool,
     )
    
     print(col_names)
    

    Output:

     ['col1', 'col2', 'col3']
    

To execute the given SQL query, using execute method, it has three parameters:

  • sql (required): SQL query to execute. (str)

  • fetch_output (optional): whether to fetch the results of the query. The default value is False. (bool)

  • db_pool_conn (optional): connection pool to connect to database. The default value is None. If the value is None, a new connection will be created and automatically closed after being used. (callable)

    The method will return list of query output if fetch_output is True, otherwise None.

     sql = """
         UPDATE
             sales_order_avc_di o,
             (
                 SELECT
                     DISTINCT po_name, 
                     asin,
                     CASE
                         WHEN o.status LIKE '%cancel%' AND a.status IS NULL THEN ''
                         WHEN o.status LIKE '%cancel%' THEN CONCAT(a.status,' ',cancel_date) 
                         ELSE o.status END po_asin_amazon_status
                 FROM
                     sales_order_avc_order_status o
                     LEFT JOIN
                         sales_order_avc_order_asin_status a USING (updated_at, po_name)
                 WHERE updated_at > DATE_SUB(NOW(), INTERVAL 1 DAY)
             ) s
         SET
             o.po_asin_amazon_status = s.po_asin_amazon_status
         WHERE
             o.po_name = s.po_name
             AND o.asin = s.asin
     """
    
     pg_utils.execute(
         sql=sql,
         fetch_output=False,
         db_pool_conn=pool,
     )
    

To create new column for a specific PostgreSQL table, using add_column method, it has six parameters:

  • schema (required): schema contains table to create column. (str)

  • table (required): table to create column. (str)

  • column_name (optional): name of the column to create available when creating single column. The default value is None (str)

  • dtype (optional): data type of the column to create available when creating single column. The default value is None (str)

  • muliple_columns (optional): dictionary contains columns name as key and data type of columns as value respectively. The default value is {} (dict)

  • db_pool_conn (optional): connection pool to connect to database. The default value is None. If the value is None, a new connection will be created and automatically closed after being used. (callable)

     pg_utils.add_column(
         schema='my-schema',
         table='my-table',
         muliple_columns={
             'col1': 'int',
             'col2': 'varchar(50)',
         },
         db_pool_conn=pool,
     )
    

To create new table in PostgreSQL database, using create_table method, it has seven parameters:

  • schema (required): schema contains table to create. (str)

  • table (required): table name to create. (str)

  • columns_with_dtype (required): dictionary contains column names as key and the data type of column as value respectively. (dict)

  • columns_primary_key (optional): list of columns to set primary keys. The default value is []. (list)

  • columns_not_null (optional): list of columns to set constraints not null. The default value is []. (list)

  • columns_with_default (optional): dictionary contains column names as key and the default value of column as value respectively. The default value is {}. (dict)

  • db_pool_conn (optional): connection pool to connect to database. The default value is None. If the value is None, a new connection will be created and automatically closed after being used. (callable)

     pg_utils.create_table(
         schema='my-schema',
         table='my-new-table',
         columns_with_dtype={
             'col1': 'int',
             'col2': 'varchar(50)',
             'col3': 'varchar(10)',
         },
         columns_primary_key=[
             'col1',
         ],
         columns_not_null=[
             'col2',
         ],
         columns_with_default={
             'col3': 'USA',
         },
         db_pool_conn=pool,
     )
    

To remove all the data of PostgreSQL table, using truncate_table method, it has four parameters:

  • schema (required): schema contains table to truncate. (str)

  • table (required): table name to truncate. (str)

  • reset_identity (optional): whether to reset identity of the table. The defaults value is False. (bool)

  • db_pool_conn (optional): connection pool to connect to database. The default value is None. If the value is None, a new connection will be created and automatically closed after being used. (callable)

     pg_utils.truncate_table(
         schema='my-schema',
         table='my-table',
         db_pool_conn=pool,
     )
    

To check if the PostgreSQL table exists in database, using table_exists method, it has three parameters:

  • schema (required): schema contains table to check. (str)

  • table (required): table name to check. (str)

  • db_pool_conn (optional): connection pool to connect to database. The default value is None. If the value is None, a new connection will be created and automatically closed after being used. (callable)

    The method will return True if table exists and False if not.

     pg_utils.table_exists(
         schema='my-schema',
         table='my-exists-table',
         db_pool_conn=pool,
     )
    

    Output:

     True
    

Best practices:

Telegram

Use case: when need to send messages to Telegram by using bot

Functional:

To send messages to Telegram, using send_message method, it has three parameters:

  • text (required): message to send. (str)

  • bot_token (optional): token of the bot which send the message. The default value is None. If the value is None, the bot sleep at 9pm will be used to send messages. (str)

  • chat_id (optional): id of group chat where the message is sent. The default value is None. If the value is None,the group chat Airflow Status Alert will be used. (str)

     from sop_deutils.y4a_telegram import send_message
    
     send_message(
         text='Hello liuliukiki'
     )
    

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sop_deutils-0.0.9.tar.gz (22.4 kB view hashes)

Uploaded Source

Built Distribution

sop_deutils-0.0.9-py3-none-any.whl (21.7 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page