Skip to main content

Data Developer & Engineer Workflow Utility Objects

Project description

Workflow

test python support version size gh license

Table of Contents:

This Workflow objects was created for easy to make a simple metadata driven pipeline that able to ETL, T, EL, or ELT by .yaml file.

I think we should not create the multiple pipeline per use-case if we able to write some dynamic pipeline that just change the input parameters per use-case instead. This way we can handle a lot of pipelines in our orgs with metadata only. It called Metadata Driven.

Next, we should get some monitoring tools for manage logging that return from pipeline running. Because it not show us what is a use-case that running data pipeline.

[!NOTE] Disclaimer: I inspire the dynamic statement from the GitHub Action .yml files and all of config file from several data orchestration framework tools from my experience on Data Engineer.

Installation

pip install ddeutil-workflow

This project need ddeutil-io extension namespace packages. If you want to install this package with application add-ons, you should add app in installation;

pip install ddeutil-workflow[app]

Getting Started

The first step, you should start create the connections and datasets for In and Out of you data that want to use in pipeline of workflow. Some of this component is similar component of the Airflow because I like it orchestration concepts.

The main feature of this project is the Pipeline object that can call any registries function. The pipeline can handle everything that you want to do, it will passing parameters and catching the output for re-use it to next step.

[!IMPORTANT] In the future of this project, I will drop the connection and dataset to dynamic registries instead of main features because it have a lot of maintain vendor codes and deps. (I do not have time to handle this features)

On

The On is schedule object.

on_every_5_min:
  type: on.On
  cron: "*/5 * * * *"
from ddeutil.workflow.on import On

schedule = On.from_loader(name='on_every_5_min', externals={})
assert '*/5 * * * *' == str(schedule.cronjob)

cron_iter = schedule.generate('2022-01-01 00:00:00')
assert '2022-01-01 00:05:00' f"{cron_iter.next:%Y-%m-%d %H:%M:%S}"
assert '2022-01-01 00:10:00' f"{cron_iter.next:%Y-%m-%d %H:%M:%S}"
assert '2022-01-01 00:15:00' f"{cron_iter.next:%Y-%m-%d %H:%M:%S}"
assert '2022-01-01 00:20:00' f"{cron_iter.next:%Y-%m-%d %H:%M:%S}"

Pipeline

The Pipeline object that is the core feature of this project.

run_py_local:
  type: ddeutil.workflow.pipeline.Pipeline
  on: 'on_every_5_min'
  params:
    author-run:
      type: str
    run-date:
      type: datetime
from ddeutil.workflow.pipeline import Pipeline

pipe = Pipeline.from_loader(name='run_py_local', externals={})
pipe.execute(params={'author-run': 'Local Workflow', 'run-date': '2024-01-01'})

[!NOTE] The above parameter use short declarative statement. You can pass a parameter type to the key of a parameter name.

params:
  author-run: str
  run-date: datetime

And for the type, you can remove ddeutil.workflow prefix because we can find it by looping search from WORKFLOW_CORE_REGISTRY value.

Usage

This is examples that use workflow file for running common Data Engineering use-case.

[!IMPORTANT] I recommend you to use task stage for all actions that you want to do with pipeline object.

Python & Bash

run_py_local:
  type: pipeline.Pipeline
  params:
    author-run: str
    run-date: datetime
  jobs:
    first-job:
      stages:
        - name: "Printing Information"
          id: define-func
          run: |
            x = '${{ params.author-run }}'
            print(f'Hello {x}')

            def echo(name: str):
              print(f'Hello {name}')

        - name: "Run Sequence and use var from Above"
          vars:
            x: ${{ params.author-run }}
          run: |
            print(f'Receive x from above with {x}')
            # Change x value
            x: int = 1

        - name: "Call Function"
          vars:
            echo: ${{ stages.define-func.outputs.echo }}
          run: |
            echo('Caller')
    second-job:
      stages:
        - name: "Echo Bash Script"
          id: shell-echo
          bash: |
            echo "Hello World from Shell"
from ddeutil.workflow.pipeline import Pipeline

pipe = Pipeline.from_loader(name='run_py_local', externals={})
pipe.execute(params={'author-run': 'Local Workflow', 'run-date': '2024-01-01'})
> Hello Local Workflow
> Receive x from above with Local Workflow
> Hello Caller
> Hello World from Shell

Hook (Extract & Load)

pipe_el_pg_to_lake:
  type: pipeline.Pipeline
  params:
    run-date: datetime
    author-email: str
  jobs:
    extract-load:
      stages:
        - name: "Extract Load from Postgres to Lake"
          id: extract-load
          uses: tasks/postgres-to-delta@polars
          with:
            source:
              conn: conn_postgres_url
              query: |
                select * from ${{ params.name }}
                where update_date = '${{ params.datetime }}'
            sink:
              conn: conn_az_lake
              endpoint: "/${{ params.name }}"

Hook (Transform)

pipeline_hook_mssql_proc:
  type: pipeline.Pipeline
  params:
    run_date: datetime
    sp_name: str
    source_name: str
    target_name: str
  jobs:
    transform:
      stages:
        - name: "Transform Data in MS SQL Server"
          id: transform
          uses: tasks/mssql-proc@odbc
          with:
            exec: ${{ params.sp_name }}
            params:
              run_mode: "T"
              run_date: ${{ params.run_date }}
              source: ${{ params.source_name }}
              target: ${{ params.target_name }}

Configuration

export WORKFLOW_ROOT_PATH=.
export WORKFLOW_CORE_REGISTRY=ddeutil.workflow,tests.utils
export WORKFLOW_CORE_PATH_CONF=conf

Application config:

export WORKFLOW_APP_DB_URL=postgresql+asyncpg://user:pass@localhost:5432/schedule
export WORKFLOW_APP_INTERVAL=10

Deployment

This package able to run as a application service for receive manual trigger from the master node via RestAPI.

[!WARNING] This feature do not start yet because I still research and find the best tool to use it provision an app service, like starlette, fastapi, apscheduler.

(venv) $ workflow start -p 7070

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ddeutil_workflow-0.0.6.tar.gz (33.9 kB view hashes)

Uploaded Source

Built Distribution

ddeutil_workflow-0.0.6-py3-none-any.whl (29.6 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page