Abstraction layer that is used by the DKIST Science Data Processing pipelines to process DKIST data using Apache Airflow.
Project description
Overview
The dkist-processing-core package provides an abstraction layer between the dkist data processing code, the workflow engine that supports it (airflow), and the logging infrastructure. By providing the abstraction layer to airflow specifically a versioning system is implemented.
There are 3 main entities which implement the abstraction:
Task : The Task provides a definition of the dkist data processing task interface. It additionally implements some methods that should be global for all dkist processing tasks. It also provides an api to the application performance monitoring infrastructure.
Workflow : The Workflow defines an api independent of the workflow engine it abstracts. It also implements the translation to engine specific workflow definitions. In the case of airflow this is a DAG.
Node : The Node is used by the Workflow to translate a Task to the engine specific implementation of the Task which runs inside of a python virtual environment. The virtual environment enables the loading of only that tasks dependencies.
Additional support functions are provided in build_utils.
Usage
The Workflow and Task are the primary objects used by client libraries. The Task is used as a base class and the subclass must at a minimum implement run. A Workflow is used to give the tasks an order of execution and a name for the flow.
from dkist_processing_core import TaskBase
from dkist_processing_core import Workflow
# Task definitions
class MyTask1(TaskBase):
def run(self):
print("Running MyTask1")
class MyTask2(TaskBase):
def run(self):
print("Running MyTask2")
# Workflow definition
# MyTask1 -> MyTask2
w = Workflow(process_category="My", process_name="Workflow", workflow_package=__package__, workflow_version="dev")
w.add_node(MyTask1, upstreams=None)
w.add_node(MyTask2, upstreams=MyTask1)
Using dkist-processing-core for data processing with airflow involves a project structure and build process that results in code artifacts deployed to PyPI and a zip of workflow artifacts deployed to artifactory.
The client dkist data processing libraries should implement a structure and build pipeline using dkist-processing-test as an example. The build pipelines for a client repo can leverage the build_utils for test and export.
Specifically for airflow, the resulting deployment has the versioned workflow artifacts all available to the scheduler and the versioned code artifacts available to workers for task execution
Build
dkist-processing-core is built using bitbucket-pipelines
Deployment
dkist-processing-core is deployed to PyPI
Environment Variables
Variable |
Description |
Type |
Default |
---|---|---|---|
BUILD_VERSION |
Build/Export pipelines only. This is the value that will be appended to all artifacts and represents their unique version |
STR |
dev |
MESH_CONFIG |
Provides the dkistdc cloud mesh configuration. Specifically the location of the message broker |
JSON |
|
ISB_USERNAME |
Message broker user name |
STR |
|
ISB_PASSWORD |
Message broker password |
STR |
Development
git clone git@bitbucket.org:dkistdc/dkist-processing-core.git
cd dkist-processing-core
pre-commit install
pip install -e .[test]
pytest -v --cov dkist_processing_core
Changelog
When you make any change to this repository it MUST be accompanied by a changelog file. The changelog for this repository uses the towncrier package. Entries in the changelog for the next release are added as individual files (one per change) to the changelog/ directory.
Writing a Changelog Entry
A changelog entry accompanying a change should be added to the changelog/ directory. The name of a file in this directory follows a specific template:
<PULL REQUEST NUMBER>.<TYPE>[.<COUNTER>].rst
The fields have the following meanings:
<PULL REQUEST NUMBER>: This is the number of the pull request, so people can jump from the changelog entry to the diff on BitBucket.
<TYPE>: This is the type of the change and must be one of the values described below.
<COUNTER>: This is an optional field, if you make more than one change of the same type you can append a counter to the subsequent changes, i.e. 100.bugfix.rst and 100.bugfix.1.rst for two bugfix changes in the same PR.
The list of possible types is defined the the towncrier section of pyproject.toml, the types are:
feature: This change is a new code feature.
bugfix: This is a change which fixes a bug.
doc: A documentation change.
removal: A deprecation or removal of public API.
misc: Any small change which doesn’t fit anywhere else, such as a change to the package infrastructure.
Rendering the Changelog at Release Time
When you are about to tag a release first you must run towncrier to render the changelog. The steps for this are as follows:
Run towncrier build –version vx.y.z using the version number you want to tag.
Agree to have towncrier remove the fragments.
Add and commit your changes.
Tag the release.
NOTE: If you forget to add a Changelog entry to a tagged release (either manually or automatically with towncrier) then the Bitbucket pipeline will fail. To be able to use the same tag you must delete it locally and on the remote branch:
# First, actually update the CHANGELOG and commit the update
git commit
# Delete tags
git tag -d vWHATEVER.THE.VERSION
git push --delete origin vWHATEVER.THE.VERSION
# Re-tag with the same version
git tag vWHATEVER.THE.VERSION
git push --tags origin main
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Hashes for dkist-processing-core-2.1.0.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0a63890c457ccd543eae33e190cba827645fceb83bb7d94e464338409d83c3c0 |
|
MD5 | 42288f79d222e93bd36023c10b03b103 |
|
BLAKE2b-256 | 7b2d10a81649b9e71725994b8bc3f91dae9434684cb96268e23306c8b9d65993 |