Data Developer & Engineer Workflow Utility Objects
Project description
Data Utility: Workflow
Table of Contents:
This Utility Workflow objects was created for easy to make a simple metadata
driven pipeline that able to ETL, T, EL, or ELT by .yaml
file.
I think we should not create the multiple pipeline per use-case if we able to write some dynamic pipeline that just change the input parameters per use-case instead. This way we can handle a lot of pipelines in our orgs with metadata only. It called Metadata Driven.
Next, we should get some monitoring tools for manage logging that return from pipeline running. Because it not show us what is a use-case that running data pipeline.
[!NOTE] Disclaimer: I inspire the dynamic statement from the GitHub Action
.yml
files and all of config file from several data orchestration framework tools from my experience on Data Engineer.
Installation
pip install ddeutil-workflow
This project need ddeutil-io
, ddeutil-model
extension namespace packages.
Getting Started
The first step, you should start create the connections and datasets for In and Out of you data that want to use in pipeline of workflow. Some of this component is similar component of the Airflow because I like it concepts.
The main feature of this project is the Pipeline
object that can call any
registried function.
Connection
The connection for worker able to do any thing.
conn_postgres_data:
type: conn.Postgres
url: 'postgres//username:${ENV_PASS}@hostname:port/database?echo=True&time_out=10'
from ddeutil.workflow.conn import Conn
conn = Conn.from_loader(name='conn_postgres_data', externals={})
assert conn.ping()
Dataset
The dataset is define any objects on the connection.
ds_postgres_customer_tbl:
type: dataset.PostgresTbl
conn: 'conn_postgres_data'
features:
id: serial primary key
name: varchar( 100 ) not null
from ddeutil.workflow.dataset import PostgresTbl
dataset = PostgresTbl.from_loader(name='ds_postgres_customer_tbl', externals={})
assert dataset.exists()
Schedule
schd_for_node:
type: schedule.Scdl
cron: "*/5 * * * *"
from ddeutil.workflow.schedule import Scdl
scdl = Scdl.from_loader(name='schd_for_node', externals={})
assert '*/5 * * * *' == str(scdl.cronjob)
cron_iterate = scdl.generate('2022-01-01 00:00:00')
assert '2022-01-01 00:05:00' f"{cron_iterate.next:%Y-%m-%d %H:%M:%S}"
assert '2022-01-01 00:10:00' f"{cron_iterate.next:%Y-%m-%d %H:%M:%S}"
assert '2022-01-01 00:15:00' f"{cron_iterate.next:%Y-%m-%d %H:%M:%S}"
assert '2022-01-01 00:20:00' f"{cron_iterate.next:%Y-%m-%d %H:%M:%S}"
assert '2022-01-01 00:25:00' f"{cron_iterate.next:%Y-%m-%d %H:%M:%S}"
Examples
This is examples that use workflow file for running common Data Engineering use-case.
Python
The state of doing lists that worker should to do. It be collection of the stage.
run_py_local:
type: ddeutil.workflow.pipe.Pipeline
params:
author-run:
type: str
run-date:
type: datetime
jobs:
first-job:
stages:
- name: Printing Information
id: define-func
run: |
x = '${{ params.author-run }}'
print(f'Hello {x}')
def echo(name: str):
print(f'Hello {name}')
- name: Run Sequence and use var from Above
vars:
x: ${{ params.author-run }}
run: |
print(f'Receive x from above with {x}')
# Change x value
x: int = 1
- name: Call Function
vars:
echo: ${{ stages.define-func.outputs.echo }}
run: |
echo('Caller')
from ddeutil.workflow.pipeline import Pipeline
pipe = Pipeline.from_loader(name='run_py_local', externals={})
pipe.execute(params={'author-run': 'Local Workflow', 'run-date': '2024-01-01'})
> Hello Local Workflow
> Receive x from above with Local Workflow
> Hello Caller
Tasks (Extract & Load)
pipe_el_pg_to_lake:
type: ddeutil.workflow.pipe.Pipeline
params:
run-date:
type: datetime
author-email:
type: str
jobs:
extract-load:
stages:
- name: "Extract Load from Postgres to Lake"
id: extract-load
task: tasks/postgres-to-delta@polars
with:
source:
conn: conn_postgres_url
query: |
select * from ${{ params.name }}
where update_date = '${{ params.datetime }}'
sink:
conn: conn_az_lake
endpoint: "/${{ params.name }}"
Tasks (Transform)
pipe_hook_mssql_proc:
type: ddeutil.workflow.pipe.Pipeline
params:
run_date:
type: datetime
sp_name:
type: str
source_name:
type: str
target_name:
type: str
jobs:
transform:
stages:
- name: "Transform Data in MS SQL Server"
id: transform
task: tasks/mssql-proc@odbc
with:
exec: ${{ params.sp_name }}
params:
run_mode: "T"
run_date: ${{ params.run_date }}
source: ${{ params.source_name }}
target: ${{ params.target_name }}
License
This project was licensed under the terms of the MIT license.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for ddeutil_workflow-0.0.2-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | cf7b68a883a783eb148f9b7dba1367987f56dac9fc176dcc115b0d3956edc762 |
|
MD5 | 4a08e80238aa962e6e8f00d4e00f5cc4 |
|
BLAKE2b-256 | fcb87c53062943f4fdb3003f61261a01dd2c922693cb195a3aaf520e50551fa6 |