Data Developer & Engineer IO Utility Objects
Project description
Data Utility Package: IO
Table of Contents:
This Utility IO Object was created for load
the config data from any file
format types like .yaml
, .json
, or .toml
, and manage retention and version
of this config file lifecycle.
Installation
pip install ddeutil-io
Features
Config
The Config Object is the file system handler object.
from pathlib import Path
from ddeutil.io.config import ConfFile
config: ConfFile = ConfFile(path=Path(), compress="gzip")
Register
The Register Object is the metadata generator object for the config data. If you passing name and configs to this object, it will find the config name in any stage storage and generate its metadata to you.
from ddeutil.io.register import Register
from ddeutil.io.models import Params
registry: Register = Register(
name='examples:conn_data_local_file',
config=Params.model_validate({
"stages": {
"raw": {"format": "{naming:%s}.{timestamp:%Y%m%d_%H%M%S}"},
},
}),
)
Link
connection_local_file_landing:
type: "connection.LocalSystem"
endpoint: "file:///${APP_PATH}/data/demo/landing"
from ddeutil.node.connection import Connection
with Connection('demo:connection_local_file_landing').connect() as conn:
conn.glob('*_csv*')
Model
catalog_customer:
type: "catalog.PandasCSVFile"
connection: "demo:conn_local_data_landing"
endpoint: "customer_csv.type01.csv"
schemas:
customer_id: {alias: "id::int", nullable: false}
customer_name: {alias: "name::str", nullable: true}
customer_age: {alias: "age::str", nullable: true}
phone_number: {alias: "phone::str", nullable: true}
register_date: {alias: "datetime64", nullable: false}
active_flag: {alias: "active::bool", nullable: false}
encoding: "utf-8"
delimiter: "|"
header: 0
quoting: 3
from ddeutil.node.catalog import Catalog
Catalog('demo:catalog_customer').load()
>>> customer_id customer_name customer_age phone_number register_date active_flag
>>> 0 1 John@email.com NaN 01-1341 2022-01-01 True
>>> 1 2 Sara Toronto 37 01-2201 2022-01-01 True
>>> 2 3 NaN NaN 04-1772 2022-01-01 False
>>> 3 4 Tome Vee 15 02-1821 2022-01-01 False
>>> 4 5 Vimmy 23 08-2215 2022-01-01 True
>>> 5 6 Queen J. 19 01-1003 2022-01-01 True
Node
node_seller_prepare:
type: 'node.PandasNode'
input:
- alias: "seller"
from: "demo:catalog_seller_csv"
transform:
- alias: "seller_prepare"
input: "seller"
actions:
- type: "GroupBy"
columns: ['customer_id', 'product_id']
aggregate:
order: "('document_date', 'count')"
value_max: "('sales_value', 'max')"
value_margin: "('sales_value', 'lambda x: x.max() - x.min()')"
- type: "RenameColumn"
columns:
order: "order_sales"
- type: "Filter"
condition: 'order_sales >= 2'
- alias: "seller_dq"
input: "seller_prepare"
actions:
- type: "DataQuality"
dq_function: "is_null"
columns: ["customer_id"]
- type: "DataQuality"
dq_function: "outlier"
columns: ["value_margin"]
options:
std_value: 3
output:
- from: "seller_prepare"
to: "demo:catalog_seller_csv_prepare"
mode: "overwrite"
from src.core.loader import Node
node = Node('demo:node_seller_prepare')
node.deploy()
>>> This task: 'seller_prepare' will running in action mode ...
>>> Start action: GroupBy ...
>>> customer_id product_id order value_max value_margin
>>> 0 1 00A 2 300.0 280.0
>>> 1 1 00B 1 300.0 0.0
>>> 2 1 00C 1 75.0 0.0
>>> 3 2 00A 1 300.0 0.0
>>> 4 2 00B 2 250.0 150.0
>>> 5 2 00C 1 105.0 0.0
>>> 6 2 00D 1 15.0 0.0
>>> 7 3 00B 1 550.0 0.0
>>> 8 3 00C 1 60.0 0.0
>>> 9 3 00D 1 300.0 0.0
>>> 10 4 00A 2 300.0 270.0
>>> 11 4 00B 2 200.0 150.0
>>> 12 5 00C 1 30.0 0.0
>>> 13 6 00B 1 50.0 0.0
>>> Start action: RenameColumn ...
>>> customer_id product_id order_sales value_max value_margin
>>> 0 1 00A 2 300.0 280.0
>>> 1 1 00B 1 300.0 0.0
>>> 2 1 00C 1 75.0 0.0
>>> 3 2 00A 1 300.0 0.0
>>> 4 2 00B 2 250.0 150.0
>>> 5 2 00C 1 105.0 0.0
>>> 6 2 00D 1 15.0 0.0
>>> 7 3 00B 1 550.0 0.0
>>> 8 3 00C 1 60.0 0.0
>>> 9 3 00D 1 300.0 0.0
>>> 10 4 00A 2 300.0 270.0
>>> 11 4 00B 2 200.0 150.0
>>> 12 5 00C 1 30.0 0.0
>>> 13 6 00B 1 50.0 0.0
>>> Start action: Filter ...
>>> customer_id product_id order_sales value_max value_margin
>>> 0 1 00A 2 300.0 280.0
>>> 4 2 00B 2 250.0 150.0
>>> 10 4 00A 2 300.0 270.0
>>> 11 4 00B 2 200.0 150.0
>>> This task: 'seller_dq' will running in action mode ...
>>> Start action: DataQuality ...
>>> customer_id product_id ... value_margin customer_id_dq_isnull
>>> 0 1 00A ... 280.0 False
>>> 4 2 00B ... 150.0 False
>>> 10 4 00A ... 270.0 False
>>> 11 4 00B ... 150.0 False
>>> Start action: DataQuality ...
>>> customer_id product_id ... customer_id_dq_isnull value_margin_dq_outlier
>>> 0 1 00A ... False False
>>> 4 2 00B ... False False
>>> 10 4 00A ... False False
>>> 11 4 00B ... False False
Schedule
schd_for_node:
type: 'schedule.BaseSchedule'
cron: "*/5 * * * *"
from src.core.loader import Schedule
schedule = Schedule('schd_for_node')
schedule.cronjob
>>> '*/5 * * * *'
cron_iterate = schedule.generate('2022-01-01 00:00:00')
for _ in range(5):
cron_iterate.next.strftime('%Y-%m-%d %H:%M:%S')
>>> 2022-01-01 00:05:00
>>> 2022-01-01 00:10:00
>>> 2022-01-01 00:15:00
>>> 2022-01-01 00:20:00
>>> 2022-01-01 00:25:00
License
This project was licensed under the terms of the MIT license.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
ddeutil_io-0.1.0.tar.gz
(61.5 kB
view hashes)
Built Distribution
ddeutil_io-0.1.0-py3-none-any.whl
(69.7 kB
view hashes)
Close
Hashes for ddeutil_io-0.1.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 6639d40f448316cc1dbe162530f39455b6911e7bf66f1ab0deed54a11e8cfbab |
|
MD5 | 4df48ac4383e331d9a803bd3aff5629e |
|
BLAKE2b-256 | 3bf7bdc94ac27d5d1a2b5b56e7b3f57a5a6bdfa0e0ccd655651c5b25f84f09ca |