Lightweight workflow orchestration with YAML template
Project description
Workflow Orchestration
The Lightweight Workflow Orchestration with fewer dependencies the was created
for easy to make a simple metadata driven data workflow. It can use for data operator
by a .yaml template.
[!WARNING] This package provide only orchestration workload. That mean you should not use the workflow stage to process any large volume data which use a lot of compute resource :cold_sweat:.
:pushpin: Rules of This Workflow:
- The Minimum frequency unit of built-in scheduling is 1 Minute 🕘
- Can not re-run only failed stage and its pending downstream ↩️
- All parallel tasks inside workflow core engine use Multi-Threading pool (Python 3.13 unlock GIL 🐍🔓)
- Recommend to pass a Secret Value with environment variable in YAML template 🔐
- Any datatime value convert to UTC Timezone 🌐
:memo: Workflow Diagrams:
This diagram show where is this application run on the production infrastructure. You will see that this application do only running code with stress-less which mean you should to set the data layer separate this core program before run this application.
flowchart LR
A((fa:fa-user User))
subgraph Docker Container
direction TB
G@{ shape: rounded, label: "📡Observe<br>Application" }
end
subgraph Docker Container
direction TB
B@{ shape: rounded, label: "🏃Workflow<br>Application" }
end
A <-->|action &<br>response| B
B -...-> |response| G
G -...-> |request| B
subgraph Data Context
D@{ shape: processes, label: "Logs" }
E@{ shape: lin-cyl, label: "Audit<br>Logs" }
end
subgraph Config Context
F@{ shape: tag-rect, label: "YAML<br>files" }
end
A ---> |push| H(Repo)
H -.-> |pull| F
B <-->|disable &<br>read| F
B <-->|read &<br>write| E
B -->|write| D
D -.->|read| G
E -.->|read| G
[!WARNING] Disclaimer: I inspire the dynamic YAML statement from the GitHub Action, and my experience of data framework configs pattern. :grimacing:
Other workflow orchestration services that I interest and pick them to be this project inspiration:
📦 Installation
This project need ddeutil and ddeutil-io extension namespace packages to be
the base deps.
If you want to install this package with application add-ons, you should add
app in installation;
| Use-case | Install Optional | Support |
|---|---|---|
| Python | ddeutil-workflow |
✅ |
| FastAPI Server | ddeutil-workflow[all] |
✅ |
Check the version of the current workflow package:
$ pip install ddeutil-workflow
$ workflow-cli version
Initial workflow project:
$ workflow-cli init
📖 Documentation
For comprehensive API documentation, examples, and best practices:
- Full Documentation - Complete user guide and API reference
- Getting Started - Quick start guide
- API Reference - Detailed API documentation
🎯 Usage
This is examples that use workflow file for running common Data Engineering use-case.
[!IMPORTANT] I recommend you to use the
callstage for all actions that you want to do with workflow activity that you want to orchestrate. Because it is able to dynamic an input argument with the same call function that make you use less time to maintenance your data workflows.
run-py-local:
# Validate model that use to parsing exists for template file
type: Workflow
on:
# If workflow deploy to schedule, it will run every 5 minutes
# with Asia/Bangkok timezone.
- cronjob: '*/5 * * * *'
timezone: "Asia/Bangkok"
params:
# Incoming execution parameters will validate with this type. It allows
# to set default value or templating.
source-extract: str
run-date: datetime
jobs:
getting-api-data:
runs-on:
type: local
stages:
- name: "Retrieve API Data"
id: retrieve-api
uses: tasks/get-api-with-oauth-to-s3@requests
with:
# Arguments of source data that want to retrieve.
method: post
url: https://finances/open-data/currency-pairs/
body:
resource: ${{ params.source-extract }}
# You can use filtering like Jinja template but this
# package does not use it.
filter: ${{ params.run-date | fmt(fmt='%Y%m%d') }}
auth:
type: bearer
keys: ${API_ACCESS_REFRESH_TOKEN}
# Arguments of target data that want to land.
writing_mode: flatten
aws:
path: my-data/open-data/${{ params.source-extract }}
# This Authentication code should implement with your custom call
# function. The template allow you to use environment variable.
access_client_id: ${AWS_ACCESS_CLIENT_ID}
access_client_secret: ${AWS_ACCESS_CLIENT_SECRET}
Before execute this workflow, you should implement caller function first.
registry-caller/
╰─ tasks.py
This function will store as module that will import from WORKFLOW_CORE_REGISTRY_CALLER
value (This config can override by extra parameters with registry_caller key).
[!NOTE] You can use Pydantic Model as argument of your caller function. The core workflow engine will auto use the
model_validatemethod before run your caller function.
from ddeutil.workflow import Result, CallerSecret, tag
from ddeutil.workflow.errors import StageError
from pydantic import BaseModel
class AwsCredential(BaseModel):
path: str
access_client_id: str
access_client_secret: CallerSecret
class RestAuth(BaseModel):
type: str
keys: CallerSecret
@tag("requests", alias="get-api-with-oauth-to-s3")
def get_api_with_oauth_to_s3(
method: str,
url: str,
body: dict[str, str],
auth: RestAuth,
writing_node: str,
aws: AwsCredential,
result: Result,
) -> dict[str, int]:
"""Get the data from RestAPI via Authenticate with OAuth and then store to
AWS S3 service.
"""
result.trace.info("[CALLER]: Start get data via RestAPI to S3.")
result.trace.info(f"... {method}: {url}")
if method != "post":
raise StageError(f"RestAPI does not support for {method} action.")
# NOTE: If you want to use secret, you can use `auth.keys.get_secret_value()`.
return {"records": 1000}
The above workflow template is main executor pipeline that you want to do. If you
want to schedule this workflow, you want to dynamic its parameters change base on
execution time such as run-date should change base on that workflow running date.
from ddeutil.workflow import Workflow, Result
workflow: Workflow = Workflow.from_conf('run-py-local')
result: Result = workflow.execute(
params={"source-extract": "USD-THB", "run-date": "2024-01-01"}
)
:cookie: Configuration
The main configuration that use to dynamic changing this workflow engine for your objective use environment variable only. If any configuration values do not set yet, it will use default value and do not raise any error to you.
[!IMPORTANT] The config value that you will set on the environment should combine with prefix, component, and name which is
WORKFLOW_{component}_{name}(Upper case).
| Name | Component | Default | Description |
|---|---|---|---|
| REGISTRY_CALLER | CORE | . |
List of importable string for the call stage. |
| REGISTRY_FILTER | CORE | ddeutil.workflow.reusables |
List of importable string for the filter template. |
| CONF_PATH | CORE | ./conf |
The config path that keep all template .yaml files. |
| STAGE_DEFAULT_ID | CORE | false |
A flag that enable default stage ID that use for catch an execution output. |
| GENERATE_ID_SIMPLE_MODE | CORE | true |
A flog that enable generating ID with md5 algorithm. |
| DEBUG_MODE | LOG | true |
A flag that enable logging with debug level mode. |
| TIMEZONE | LOG | Asia/Bangkok |
A Timezone string value that will pass to ZoneInfo object. |
| TRACE_HANDLERS | LOG | [{"type": "console"}] |
A Json string of list of trace handler config data that use to emit log message. |
| AUDIT_CONF | LOG | {"type": "file", "path": "./audits"} |
A Json string of audit config data that use to write audit metrix. |
| AUDIT_ENABLE_WRITE | LOG | true |
A flag that enable writing audit log after end execution in the workflow release step. |
:rocket: Deployment
This package able to run as an application service for receive manual trigger from any node via RestAPI with the FastAPI package.
API Server
This server use FastAPI package to be the base application.
(.venv) $ workflow-cli api --host 127.0.0.1 --port 80
[!NOTE] If this package already deploy, it is able to use multiprocess;
$ workflow-cli api --host 127.0.0.1 --port 80 --workers 4
Docker Container
Build a Docker container from this package.
$ docker pull ghcr.io/ddeutils/ddeutil-workflow:latest
$ docker run --rm ghcr.io/ddeutils/ddeutil-workflow:latest ddeutil-worker
:speech_balloon: Contribute
I do not think this project will go around the world because it has specific propose, and you can create by your coding without this project dependency for long term solution. So, on this time, you can open the GitHub issue on this project :raised_hands: for fix bug or request new feature if you want it.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ddeutil_workflow-0.0.86.tar.gz.
File metadata
- Download URL: ddeutil_workflow-0.0.86.tar.gz
- Upload date:
- Size: 169.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
12abea2d267c8b406c4d32b74b5fa329ff1a6bf0041edb7eb2a0fd89acbc9c71
|
|
| MD5 |
e7c138318904cc95ed2ef9af3c9f1eae
|
|
| BLAKE2b-256 |
9b68b82ff483dd41a3e75505674dd55804ab2a4b605caeb9b881c3ba5167afce
|
Provenance
The following attestation bundles were made for ddeutil_workflow-0.0.86.tar.gz:
Publisher:
publish.yml on ddeutils/ddeutil-workflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
ddeutil_workflow-0.0.86.tar.gz -
Subject digest:
12abea2d267c8b406c4d32b74b5fa329ff1a6bf0041edb7eb2a0fd89acbc9c71 - Sigstore transparency entry: 362928277
- Sigstore integration time:
-
Permalink:
ddeutils/ddeutil-workflow@74f0a6ec968de07cf62ec3db4de010a2892f1418 -
Branch / Tag:
refs/tags/v0.0.86 - Owner: https://github.com/ddeutils
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@74f0a6ec968de07cf62ec3db4de010a2892f1418 -
Trigger Event:
release
-
Statement type:
File details
Details for the file ddeutil_workflow-0.0.86-py3-none-any.whl.
File metadata
- Download URL: ddeutil_workflow-0.0.86-py3-none-any.whl
- Upload date:
- Size: 153.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
60b3a44ded23f8a5f99867327e3be7d90a204b6cd2662b032663e7ec880a6ee0
|
|
| MD5 |
ebcd9fc4582a4b07843b0fceb5d205b0
|
|
| BLAKE2b-256 |
5411ce4ec45a817f1e9264182268fc042f04d0d496ecb5b86dd3965858a5cc4a
|
Provenance
The following attestation bundles were made for ddeutil_workflow-0.0.86-py3-none-any.whl:
Publisher:
publish.yml on ddeutils/ddeutil-workflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
ddeutil_workflow-0.0.86-py3-none-any.whl -
Subject digest:
60b3a44ded23f8a5f99867327e3be7d90a204b6cd2662b032663e7ec880a6ee0 - Sigstore transparency entry: 362928290
- Sigstore integration time:
-
Permalink:
ddeutils/ddeutil-workflow@74f0a6ec968de07cf62ec3db4de010a2892f1418 -
Branch / Tag:
refs/tags/v0.0.86 - Owner: https://github.com/ddeutils
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@74f0a6ec968de07cf62ec3db4de010a2892f1418 -
Trigger Event:
release
-
Statement type: