Skip to main content

Abstraction layer that is used by the DKIST Science Data Processing pipelines to process DKIST data using Apache Airflow.

Project description

Overview

The dkist-processing-core package provides an abstraction layer between the dkist data processing code, the workflow engine that supports it (airflow), and the logging infrastructure. By providing the abstraction layer to airflow specifically a versioning system is implemented.

Core, Common, and Instrument Brick Diagram

There are 3 main entities which implement the abstraction:

Task : The Task provides a definition of the dkist data processing task interface. It additionally implements some methods that should be global for all dkist processing tasks. It also provides an api to the application performance monitoring infrastructure.

Workflow : The Workflow defines an api independent of the workflow engine it abstracts. It also implements the translation to engine specific workflow definitions. In the case of airflow this is a DAG.

Node : The Node is used by the Workflow to translate a Task to the engine specific implementation of the Task which runs inside of a python virtual environment. The virtual environment enables the loading of only that tasks dependencies.

Additional support functions are provided in build_utils.

Usage

The Workflow and Task are the primary objects used by client libraries. The Task is used as a base class and the subclass must at a minimum implement run. A Workflow is used to give the tasks an order of execution and a name for the flow.

from dkist_processing_core import TaskBase
from dkist_processing_core import Workflow

# Task definitions
class MyTask1(TaskBase):
    def run(self):
        print("Running MyTask1")


class MyTask2(TaskBase):
    def run(self):
        print("Running MyTask2")

# Workflow definition
# MyTask1 -> MyTask2
w = Workflow(process_category="My", process_name="Workflow", workflow_package=__package__, workflow_version="dev")
w.add_node(MyTask1, upstreams=None)
w.add_node(MyTask2, upstreams=MyTask1)

Using dkist-processing-core for data processing with airflow involves a project structure and build process that results in code artifacts deployed to PyPI and a zip of workflow artifacts deployed to artifactory.

Build Artifacts Diagram

The client dkist data processing libraries should implement a structure and build pipeline using dkist-processing-test as an example. The build pipelines for a client repo can leverage the build_utils for test and export.

Specifically for airflow, the resulting deployment has the versioned workflow artifacts all available to the scheduler and the versioned code artifacts available to workers for task execution

Airflow Deployment Diagram

Build

dkist-processing-core is built using bitbucket-pipelines

Deployment

dkist-processing-core is deployed to PyPI

Environment Variables

Variable

Description

Type

Default

BUILD_VERSION

Build/Export pipelines only. This is the value that will be appended to all artifacts and represents their unique version

STR

dev

MESH_CONFIG

Provides the dkistdc cloud mesh configuration. Specifically the location of the message broker

JSON

ISB_USERNAME

Message broker user name

STR

ISB_PASSWORD

Message broker password

STR

Development

git clone git@bitbucket.org:dkistdc/dkist-processing-core.git
cd dkist-processing-core
pre-commit install
pip install -e .[test]
pytest -v --cov dkist_processing_core

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dkist-processing-core-0.3.1.tar.gz (375.7 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page