Skip to main content

Lightweight workflow orchestration with less dependencies

Project description

Workflow

test codecov pypi version python support version size gh license code style: black

The Lightweight workflow orchestration with less dependencies the was created for easy to make a simple metadata driven for data workflow orchestration. It can to use for data operator by a .yaml template.

[!WARNING] This package provide only orchestration workload. That mean you should not use the workflow stage to process any large volume data which use lot of compute resource. :cold_sweat:

In my opinion, I think it should not create duplicate workflow codes if I can write with dynamic input parameters on the one template workflow that just change the input parameters per use-case instead. This way I can handle a lot of logical workflows in our orgs with only metadata configuration. It called Metadata Driven Data Workflow.

:pushpin: Rules of This Workflow engine:

  1. The Minimum frequency unit of scheduling is 1 minute :warning:
  2. Can not re-run only failed stage and its pending downstream :rotating_light:
  3. All parallel tasks inside workflow engine use Multi-Threading (Python 3.13 unlock GIL :unlock:)

[!NOTE] Disclaimer: I inspire the dynamic statement from the GitHub Action with .yml files and all of config file from several data orchestration framework tools from my experience on Data Engineer. :grimacing:

Other workflow tools that I interest on them and pick some interested feature implement to this package:

:round_pushpin: Installation

This project need ddeutil and ddeutil-io extension namespace packages. If you want to install this package with application add-ons, you should add app in installation;

Usecase Install Optional Support
Python pip install ddeutil-workflow :heavy_check_mark:
FastAPI Server pip install ddeutil-workflow[api] :heavy_check_mark:

:beers: Usage

This is examples that use workflow file for running common Data Engineering use-case.

[!IMPORTANT] I recommend you to use the hook stage for all actions that you want to do with workflow activity that you want to orchestrate. Because it able to dynamic an input argument with the same hook function that make you use less time to maintenance your data workflows.

run-py-local:

   # Validate model that use to parsing exists for template file
   type: ddeutil.workflow.workflow.Workflow
   on:
      # If workflow deploy to schedule, it will running every 5 minutes
      # with Asia/Bangkok timezone.
      - cronjob: '*/5 * * * *'
        timezone: "Asia/Bangkok"
   params:
      # Incoming execution parameters will validate with this type. It allow
      # to set default value or templating.
      source-extract: str
      run-date: datetime
   jobs:
      getting-api-data:
         stages:
            - name: "Retrieve API Data"
              id: retrieve-api
              uses: tasks/get-api-with-oauth-to-s3@requests
              with:
                 # Arguments of source data that want to retrieve.
                 method: post
                 url: https://finances/open-data/currency-pairs/
                 body:
                    resource: ${{ params.source-extract }}

                    # You can able to use filtering like Jinja template but this
                    # package does not use it.
                    filter: ${{ params.run-date | fmt(fmt='%Y%m%d') }}
                 auth:
                    type: bearer
                    keys: ${API_ACCESS_REFRESH_TOKEN}

                 # Arguments of target data that want to landing.
                 writing_mode: flatten
                 aws_s3_path: my-data/open-data/${{ params.source-extract }}

                 # This Authentication code should implement with your custom hook
                 # function. The template allow you to use environment variable.
                 aws_access_client_id: ${AWS_ACCESS_CLIENT_ID}
                 aws_access_client_secret: ${AWS_ACCESS_CLIENT_SECRET}

The above workflow template is main executor pipeline that you want to do. If you want to schedule this workflow, you want to dynamic its parameters change base on execution time such as run-date should change base on that workflow running date.

So, this package provide the Schedule template for this action.

schedule-run-local-wf:

   # Validate model that use to parsing exists for template file
   type: ddeutil.workflow.scheduler.Schedule
   workflows:

      # Map existing workflow that want to deploy with scheduler application.
      # It allow you to passing release parameter that dynamic change depend the
      # current context of this scheduler application releasing that time.
      - name: run-py-local
        params:
          source-extract: "USD-THB"
          asat-dt: "${{ release.logical_date }}"

:cookie: Configuration

The main configuration that use to dynamic changing with your propose of this application. If any configuration values do not set yet, it will use default value and do not raise any error to you.

Environment Component Default Description Remark
WORKFLOW_ROOT_PATH Core . The root path of the workflow application.
WORKFLOW_CORE_REGISTRY Core src,src.ddeutil.workflow,tests,tests.utils List of importable string for the hook stage.
WORKFLOW_CORE_REGISTRY_FILTER Core src.ddeutil.workflow.utils,ddeutil.workflow.utils List of importable string for the filter template.
WORKFLOW_CORE_PATH_CONF Core conf The config path that keep all template .yaml files.
WORKFLOW_CORE_TIMEZONE Core Asia/Bangkok A Timezone string value that will pass to ZoneInfo object.
WORKFLOW_CORE_STAGE_DEFAULT_ID Core true A flag that enable default stage ID that use for catch an execution output.
WORKFLOW_CORE_STAGE_RAISE_ERROR Core false A flag that all stage raise StageException from stage execution.
WORKFLOW_CORE_JOB_DEFAULT_ID Core false A flag that enable default job ID that use for catch an execution output. The ID that use will be sequence number.
WORKFLOW_CORE_JOB_RAISE_ERROR Core true A flag that all job raise JobException from job strategy execution.
WORKFLOW_CORE_MAX_NUM_POKING Core 4 .
WORKFLOW_CORE_MAX_JOB_PARALLEL Core 2 The maximum job number that able to run parallel in workflow executor.
WORKFLOW_CORE_MAX_JOB_EXEC_TIMEOUT Core 600
WORKFLOW_CORE_MAX_CRON_PER_WORKFLOW Core 5
WORKFLOW_CORE_MAX_QUEUE_COMPLETE_HIST Core 16
WORKFLOW_CORE_GENERATE_ID_SIMPLE_MODE Core true A flog that enable generating ID with md5 algorithm.
WORKFLOW_LOG_DEBUG_MODE Log true A flag that enable logging with debug level mode.
WORKFLOW_LOG_ENABLE_WRITE Log true A flag that enable logging object saving log to its destination.
WORKFLOW_APP_MAX_PROCESS Schedule 2 The maximum process worker number that run in scheduler app module.
WORKFLOW_APP_MAX_SCHEDULE_PER_PROCESS Schedule 100 A schedule per process that run parallel.
WORKFLOW_APP_STOP_BOUNDARY_DELTA Schedule '{"minutes": 5, "seconds": 20}' A time delta value that use to stop scheduler app in json string format.

API Application:

Environment Component Default Description Remark
WORKFLOW_API_ENABLE_ROUTE_WORKFLOW API true A flag that enable workflow route to manage execute manually and workflow logging.
WORKFLOW_API_ENABLE_ROUTE_SCHEDULE API true A flag that enable run scheduler.

:rocket: Deployment

This package able to run as a application service for receive manual trigger from the master node via RestAPI or use to be Scheduler background service like crontab job but via Python API.

API Server

(venv) $ uvicorn src.ddeutil.workflow.api:app --host 127.0.0.1 --port 80

[!NOTE] If this package already deploy, it able to use multiprocess; uvicorn ddeutil.workflow.api:app --host 127.0.0.1 --port 80 --workers 4

Docker Container

Create Docker image;

$ docker build -t ddeutil-workflow:latest -f .container/Dockerfile .

Run the above Docker image;

$ docker run -i ddeutil-workflow:latest

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ddeutil_workflow-0.0.26.tar.gz (81.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ddeutil_workflow-0.0.26-py3-none-any.whl (67.8 kB view details)

Uploaded Python 3

File details

Details for the file ddeutil_workflow-0.0.26.tar.gz.

File metadata

  • Download URL: ddeutil_workflow-0.0.26.tar.gz
  • Upload date:
  • Size: 81.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.12.8

File hashes

Hashes for ddeutil_workflow-0.0.26.tar.gz
Algorithm Hash digest
SHA256 6595d14f824f081a654789d5ad71b3a66ca17ba746432fde6e5daac8a62cad34
MD5 8ee9f4d5386429f98273d3082faf759c
BLAKE2b-256 978ca73a10e53a90e207e88b2e078ff0d4b0e7a80ba9ea23fc29c857b11c306c

See more details on using hashes here.

File details

Details for the file ddeutil_workflow-0.0.26-py3-none-any.whl.

File metadata

File hashes

Hashes for ddeutil_workflow-0.0.26-py3-none-any.whl
Algorithm Hash digest
SHA256 a7b0043b7ecb16c8fa272ac0db7439e3f2e50383be5dc5f29b041ab7f37e25ed
MD5 602770c940550edac857866f9039c9a6
BLAKE2b-256 85d6b1b5b6cedfff2baec17060c6a9dfb35f57d0b862dd26725531ef6ac1db7d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page