Skip to main content

Supervisor operators and configuration for long-running tasks

Project description

airflow-supervisor

Apache Airflow utilities for running long-running or always-on jobs with supervisord

Build Status codecov License PyPI

Overview

This library provides a configuration-driven way of generating supervisor configurations and airflow operators/sensors for long-running or always-on jobs. Configuration is managed by Pydantic, Hydra, and OmegaConf via the supervisor-pydantic library.

How To: Use in Airflow

airflow-supervisor can be installed in your airflow server environment and imported in your dag files. It provides two convenient top level DAG subclasses:

  • Supervisor: creates a DAG representing a local supervisor instance running on the airflow worker node (underlying task will use PythonOperator and BashOperator to communicate between airflow and supervisor)
  • SupervisorSSH: creates a DAG representing a remote supervisor instance running on another machine (underlying tasks will use SSHOperator to communicate between airflow and supervisor)

We expose DAGs composed of a variety of tasks and sensors, which are exposed as a discrete pipeline of steps:

  1. Setup supervisord configuration
  2. Start the supervisord daemon
  3. Start the supervised programs with supervisorctl
  4. Start sensors to query the programs' state via supervisor's XML-RPC API
  5. Evaluate and take action according to the program's state changes
  6. Restart programs if necessary
  7. Tear down the sensors from (4)
  8. Stop the supervised programs from (3)
  9. Stop the supervisord daemon from (2)
  10. Remove configuration from (1)

This setup provides maximal configureability with a minimal requirements on the machine (for example, no requirements on an existing supervisord daemon via e.g. systemd). It also lets you hook your own tasks into any step of the process. For example, if we detect a process has died in step (5), you could configure your own task to take some custom action before/instead of the default restart of step 6.

Here is a nice overview of the DAG, with annotations for code paths and the actions taken by Supervisor:

More docs and code examples coming soon!

Example Dag:

from airflow import DAG
from datetime import timedelta, datetime
from airflow_supervisor import SupervisorAirflowConfiguration, Supervisor, ProgramConfiguration


# Create supervisor configuration
cfg = SupervisorAirflowConfiguration(
    working_dir="/data/airflow/supervisor",
    config_path="/data/airflow/supervisor/supervisor.conf",
    program={
        "test": ProgramConfiguration(
            command="bash -c 'sleep 14400; exit 1'",
        )
    },
)

# Create DAG as normal
with DAG(
    dag_id="test-supervisor",
    schedule=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:

    # Link supervisor config to dag
    supervisor = Supervisor(dag=dag, cfg=cfg)

Example DAG: airflow-config

# @package _global_
_target_: airflow_config.Configuration
default_args:
  _target_: airflow_config.DefaultArgs
  retries: 0
  depends_on_past: false
all_dags:
  _target_: airflow_config.DagArgs
  start_date: "2024-01-01"
  catchup: false
extensions:
  supervisor:
    _target_: airflow_supervisor.SupervisorAirflowConfiguration
    port: 9091
    working_dir: "/data/airflow/supervisor"
    config_path: "/data/airflow/supervisor/supervisor.conf"
    program:
      test:
        _target_: airflow_supervisor.ProgramConfiguration
        command: "bash -c 'sleep 14400; exit 1'"
from datetime import timedelta
from airflow_config import load_config, DAG
from airflow_supervisor import Supervisor

config = load_config(config_name="airflow")

with DAG(
    dag_id="test-supervisor",
    schedule=timedelta(days=1),
    config=config,
) as dag:
    supervisor = Supervisor(dag=dag, cfg=config.extensions["supervisor"])

Configuration

See supervisor-pydantic for reference.

  • SupervisorAirflowConfiguration: Wrapper around supervisor_pydantic.SupervisorConvenienceConfiguration, with added airflow-specific configuration
  • SupervisorSSHAirflowConfiguration: Wrapper around SupervisorAirflowConfiguration, with added parameters for airflow's SSHOperator
classDiagram
    SupervisorConvenienceConfiguration <|-- SupervisorAirflowConfiguration
    SupervisorAirflowConfiguration <|-- SupervisorSSHAirflowConfiguration

    class SupervisorConvenienceConfiguration {
      supervisor_pydantic.SupervisorConvenienceConfiguration
    }

    class SupervisorAirflowConfiguration{
        # PythonSensor arguments
        check_interval: timedelta
        check_timeout: timedelta

        # HighAvailabilityOperator arguments
        runtime: timedelta
        endtime: time
        maxretrigger: int
        reference_date: str

        # Airflow arguments
        stop_on_exit: bool
        cleanup: bool
        restart_on_initial: bool
        restart_on_retrigger: bool
    }
    class SupervisorSSHAirflowConfiguration {
      command_prefix: str

      # Airflow SSHOperator Arguments
      ssh_operator_args: SSHOperatorArgs
    }

[!NOTE] This library is built on supervisor-pydantic, which provides configuration elements for all supervisor structures, as well as self-contained tools for interacting with supervisor instances.

[!NOTE] This library was generated using copier from the Base Python Project Template repository.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_supervisor-1.10.2.tar.gz (19.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

airflow_supervisor-1.10.2-py3-none-any.whl (28.4 kB view details)

Uploaded Python 3

File details

Details for the file airflow_supervisor-1.10.2.tar.gz.

File metadata

  • Download URL: airflow_supervisor-1.10.2.tar.gz
  • Upload date:
  • Size: 19.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for airflow_supervisor-1.10.2.tar.gz
Algorithm Hash digest
SHA256 2764281ae8ee784896199ae8b1882faffd3aee737f358bd3fe376a319ad3108f
MD5 574ce227fe69114f55f6f130bd27c148
BLAKE2b-256 2512cd03546521a2cf583031c99e55dac03367290d3dbe16233740cf27d42ec1

See more details on using hashes here.

File details

Details for the file airflow_supervisor-1.10.2-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_supervisor-1.10.2-py3-none-any.whl
Algorithm Hash digest
SHA256 e40692a36fd7d3946d762d21a2592d42ae46ff1fd3097fd4d9526cf1968195f9
MD5 ed0b43914ad50e6b1d8c2a6ea0e1018c
BLAKE2b-256 88a23ad1b794e5825a993720eda79591418ce2b70d155048352c1391ad018a38

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page