Skip to main content

Data Developer & Engineer IO Utility Objects

Project description

Data Utility Package: IO

test python support version size

Table of Contents:

This Utility IO Object was created for load the config data from any file format types like .yaml, .json, or .toml, and manage retention and version of this config file lifecycle.

Installation

pip install ddeutil-io

Features

Config

The Config Object is the file system handler object.

from pathlib import Path
from ddeutil.io.config import ConfFile

config: ConfFile = ConfFile(path=Path(), compress="gzip")

Register

The Register Object is the metadata generator object for the config data. If you passing name and configs to this object, it will find the config name in any stage storage and generate its metadata to you.

from ddeutil.io.register import Register
from ddeutil.io.models import Params

registry: Register = Register(
    name='examples:conn_data_local_file',
    config=Params.model_validate({
        "stages": {
          "raw": {"format": "{naming:%s}.{timestamp:%Y%m%d_%H%M%S}"},
        },
    }),
)

Link

connection_local_file_landing:
    type: "connection.LocalSystem"
    endpoint: "file:///${APP_PATH}/data/demo/landing"
from ddeutil.node.connection import Connection

with Connection('demo:connection_local_file_landing').connect() as conn:
    conn.glob('*_csv*')

Model

catalog_customer:
    type: "catalog.PandasCSVFile"
    connection: "demo:conn_local_data_landing"
    endpoint: "customer_csv.type01.csv"
    schemas:
        customer_id: {alias: "id::int", nullable: false}
        customer_name: {alias: "name::str", nullable: true}
        customer_age: {alias: "age::str", nullable: true}
        phone_number: {alias: "phone::str", nullable: true}
        register_date: {alias: "datetime64", nullable: false}
        active_flag: {alias: "active::bool", nullable: false}
    encoding: "utf-8"
    delimiter: "|"
    header: 0
    quoting: 3
from ddeutil.node.catalog import Catalog

Catalog('demo:catalog_customer').load()
>>>    customer_id   customer_name customer_age phone_number register_date  active_flag
>>> 0            1  John@email.com          NaN      01-1341    2022-01-01         True
>>> 1            2    Sara Toronto           37      01-2201    2022-01-01         True
>>> 2            3             NaN          NaN      04-1772    2022-01-01        False
>>> 3            4        Tome Vee           15      02-1821    2022-01-01        False
>>> 4            5           Vimmy           23      08-2215    2022-01-01         True
>>> 5            6        Queen J.           19      01-1003    2022-01-01         True

Node

node_seller_prepare:
    type: 'node.PandasNode'
    input:
        - alias: "seller"
          from: "demo:catalog_seller_csv"
    transform:
        - alias: "seller_prepare"
          input: "seller"
          actions:
              - type: "GroupBy"
                columns: ['customer_id', 'product_id']
                aggregate:
                    order: "('document_date', 'count')"
                    value_max: "('sales_value', 'max')"
                    value_margin: "('sales_value', 'lambda x: x.max() - x.min()')"
              - type: "RenameColumn"
                columns:
                    order: "order_sales"
              - type: "Filter"
                condition: 'order_sales >= 2'
        - alias: "seller_dq"
          input: "seller_prepare"
          actions:
              - type: "DataQuality"
                dq_function: "is_null"
                columns: ["customer_id"]
              - type: "DataQuality"
                dq_function: "outlier"
                columns: ["value_margin"]
                options:
                    std_value: 3
    output:
        - from: "seller_prepare"
          to: "demo:catalog_seller_csv_prepare"
          mode: "overwrite"
from src.core.loader import Node

node = Node('demo:node_seller_prepare')
node.deploy()
>>> This task: 'seller_prepare' will running in action mode ...
>>> Start action: GroupBy ...
>>>     customer_id product_id  order  value_max  value_margin
>>> 0             1        00A      2      300.0         280.0
>>> 1             1        00B      1      300.0           0.0
>>> 2             1        00C      1       75.0           0.0
>>> 3             2        00A      1      300.0           0.0
>>> 4             2        00B      2      250.0         150.0
>>> 5             2        00C      1      105.0           0.0
>>> 6             2        00D      1       15.0           0.0
>>> 7             3        00B      1      550.0           0.0
>>> 8             3        00C      1       60.0           0.0
>>> 9             3        00D      1      300.0           0.0
>>> 10            4        00A      2      300.0         270.0
>>> 11            4        00B      2      200.0         150.0
>>> 12            5        00C      1       30.0           0.0
>>> 13            6        00B      1       50.0           0.0
>>> Start action: RenameColumn ...
>>>     customer_id product_id  order_sales  value_max  value_margin
>>> 0             1        00A            2      300.0         280.0
>>> 1             1        00B            1      300.0           0.0
>>> 2             1        00C            1       75.0           0.0
>>> 3             2        00A            1      300.0           0.0
>>> 4             2        00B            2      250.0         150.0
>>> 5             2        00C            1      105.0           0.0
>>> 6             2        00D            1       15.0           0.0
>>> 7             3        00B            1      550.0           0.0
>>> 8             3        00C            1       60.0           0.0
>>> 9             3        00D            1      300.0           0.0
>>> 10            4        00A            2      300.0         270.0
>>> 11            4        00B            2      200.0         150.0
>>> 12            5        00C            1       30.0           0.0
>>> 13            6        00B            1       50.0           0.0
>>> Start action: Filter ...
>>>     customer_id product_id  order_sales  value_max  value_margin
>>> 0             1        00A            2      300.0         280.0
>>> 4             2        00B            2      250.0         150.0
>>> 10            4        00A            2      300.0         270.0
>>> 11            4        00B            2      200.0         150.0
>>> This task: 'seller_dq' will running in action mode ...
>>> Start action: DataQuality ...
>>>     customer_id product_id  ...  value_margin  customer_id_dq_isnull
>>> 0             1        00A  ...         280.0                  False
>>> 4             2        00B  ...         150.0                  False
>>> 10            4        00A  ...         270.0                  False
>>> 11            4        00B  ...         150.0                  False
>>> Start action: DataQuality ...
>>>     customer_id product_id  ...  customer_id_dq_isnull  value_margin_dq_outlier
>>> 0             1        00A  ...                  False                    False
>>> 4             2        00B  ...                  False                    False
>>> 10            4        00A  ...                  False                    False
>>> 11            4        00B  ...                  False                    False

Schedule

schd_for_node:
    type: 'schedule.BaseSchedule'
    cron: "*/5 * * * *"
from src.core.loader import Schedule

schedule = Schedule('schd_for_node')
schedule.cronjob
>>> '*/5 * * * *'
cron_iterate = schedule.generate('2022-01-01 00:00:00')
for _ in range(5):
   cron_iterate.next.strftime('%Y-%m-%d %H:%M:%S')
>>> 2022-01-01 00:05:00
>>> 2022-01-01 00:10:00
>>> 2022-01-01 00:15:00
>>> 2022-01-01 00:20:00
>>> 2022-01-01 00:25:00

License

This project was licensed under the terms of the MIT license.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ddeutil_io-0.1.0.tar.gz (61.5 kB view hashes)

Uploaded Source

Built Distribution

ddeutil_io-0.1.0-py3-none-any.whl (69.7 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page