Skip to main content

Data Developer & Engineer Workflow Utility Objects

Project description

Data Utility: Workflow

test python support version size

Table of Contents:

This Utility Workflow objects was created for easy to make a simple metadata driven pipeline that able to ETL, T, EL, or ELT by .yaml file.

I think we should not create the multiple pipeline per use-case if we able to write some dynamic pipeline that just change the input parameters per use-case instead. This way we can handle a lot of pipelines in our orgs with metadata only. It called Metadata Driven.

Next, we should get some monitoring tools for manage logging that return from pipeline running. Because it not show us what is a use-case that running data pipeline.

[!NOTE] Disclaimer: I inspire the dynamic statement from the GitHub Action .yml files and all of config file from several data orchestration framework tools from my experience on Data Engineer.

Installation

pip install ddeutil-workflow

This project need ddeutil-io, ddeutil-model extension namespace packages.

Getting Started

The first step, you should start create the connections and datasets for In and Out of you data that want to use in pipeline of workflow. Some of this component is similar component of the Airflow because I like it concepts.

Connection

The connection for worker able to do any thing.

conn_postgres_data:
  type: conn.Postgres
  url: 'postgres//username:${ENV_PASS}@hostname:port/database?echo=True&time_out=10'
from ddeutil.workflow.conn import Conn

conn = Conn.from_loader(name='conn_postgres_data', externals={})
assert conn.ping()

Dataset

The dataset is define any objects on the connection.

ds_postgres_customer_tbl:
  type: dataset.PostgresTbl
  conn: 'conn_postgres_data'
  features:
    id: serial primary key
    name: varchar( 100 ) not null
from ddeutil.workflow.dataset import PostgresTbl

dataset = PostgresTbl.from_loader(name='ds_postgres_customer_tbl', externals={})
assert dataset.exists()

Schedule

schd_for_node:
  type: schedule.Scdl
  cron: "*/5 * * * *"
from ddeutil.workflow.schedule import Scdl

scdl = Scdl.from_loader(name='schd_for_node', externals={})
assert '*/5 * * * *' == str(scdl.cronjob)

cron_iterate = scdl.generate('2022-01-01 00:00:00')
assert '2022-01-01 00:05:00' f"{cron_iterate.next:%Y-%m-%d %H:%M:%S}"
assert '2022-01-01 00:10:00' f"{cron_iterate.next:%Y-%m-%d %H:%M:%S}"
assert '2022-01-01 00:15:00' f"{cron_iterate.next:%Y-%m-%d %H:%M:%S}"
assert '2022-01-01 00:20:00' f"{cron_iterate.next:%Y-%m-%d %H:%M:%S}"
assert '2022-01-01 00:25:00' f"{cron_iterate.next:%Y-%m-%d %H:%M:%S}"

Examples

This is examples that use workflow file for running common Data Engineering use-case.

Python

The state of doing lists that worker should to do. It be collection of the stage.

run_py_local:
  type: ddeutil.workflow.pipe.Pipeline
  params:
    author-run: utils.receive.string
    run-date: utils.receive.datetime
  jobs:
    first-job:
      stages:
        - name: Printing Information
          id: define-func
          run: |
            x = '${{ params.author-run }}'
            print(f'Hello {x}')

            def echo(name: str):
              print(f'Hello {name}')

        - name: Run Sequence and use var from Above
          vars:
            x: ${{ params.author-run }}
          run: |
            print(f'Receive x from above with {x}')
            # Change x value
            x: int = 1

        - name: Call Function
          vars:
            echo: ${{ stages.define-func.outputs.echo }}
          run: |
            echo('Caller')
from ddeutil.workflow.pipeline import Pipeline

pipe = Pipeline.from_loader(name='run_py_local', externals={})
pipe.execute(params={'author-run': 'Local Workflow', 'run-date': '2024-01-01'})
> Hello Local Workflow
> Receive x from above with Local Workflow
> Hello Caller

Tasks (Extract & Load)

pipe_el_pg_to_lake:
  type: ddeutil.workflow.pipe.Pipeline
  params:
    run-date: utils.receive.datetime
    author-email: utils.receive.string
  jobs:
    extract-load:
      stages:
        - name: "Extract Load from Postgres to Lake"
          id: extract
          task: tasks/postgres-to-delta@polars
          with:
            source:
              conn: conn_postgres_url
              query: |
                select * from ${{ params.name }}
                where update_date = '${{ params.datetime }}'
            sink:
              conn: conn_az_lake
              endpoint: "/${{ params.name }}"

Hooks (Transform)

pipe_hook_mssql_proc:
  type: ddeutil.workflow.pipe.Pipeline
  params:
    run_date: utils.receive.datetime
    sp_name: utils.receive.string
    source_name: utils.receive.string
    target_name: utils.receive.string
  jobs:
    transform:
      stages:
        - name: "Transform Data in MS SQL Server"
          hook: hooks/mssql-proc@odbc
          with:
            exec: ${{ params.sp_name }}
            params:
              run_mode: "T"
              run_date: ${{ params.run_date }}
              source: ${{ params.source_name }}
              target: ${{ params.target_name }}

License

This project was licensed under the terms of the MIT license.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ddeutil_workflow-0.0.1.tar.gz (36.2 kB view hashes)

Uploaded Source

Built Distribution

ddeutil_workflow-0.0.1-py3-none-any.whl (35.5 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page