Skip to main content

Monte Carlo's Apache Airflow Provider

Project description

airflow-mcd

Monte Carlo's alpha Airflow provider.

Installation

Requires Python 3.7 or greater and is compatible with Airflow 1.10.14 or greater.

You can install and update using pip. For instance:

pip install -U airflow-mcd

This package can be added like any other python dependency to Airflow (e.g. via requirements.txt).

Basic usage

Hooks:

  • SessionHook

    Creates a pycarlo compatible session. This is useful for creating your own operator built on top of our Python SDK.

    This hook expects an Airflow HTTP connection with the MCD API ID and Token in "extra" with the following format:

    {
        "mcd_id": "<ID>",
        "mcd_token": "<SECRET>"
    }
    

    If you prefer you can specify the mcd_token in the "password" field instead. See here for details on how to generate a token.

Operators:

  • BaseMcdOperator

    This operator can be extended to build your own operator using our SDK or any other dependencies. This is useful if you want implement your own custom logic (e.g. creating custom lineage after a task completes).

  • SimpleCircuitBreakerOperator

    This operator can be used to execute a circuit breaker compatible rule (custom SQL monitor) to run integrity tests before allowing any downstream tasks to execute. Raises an AirflowFailException if the rule condition is in breach when using an Airflow version newer than 1.10.11, as that is preferred for tasks that can be failed without retrying. Older Airflow versions raise an AirflowException. For instance:

    from datetime import datetime, timedelta
    
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    
    from airflow_mcd.operators import SimpleCircuitBreakerOperator
    
    mcd_connection_id = 'mcd_default_session'
    
    with DAG('sample-dag', start_date=datetime(2022, 2, 8), catchup=False, schedule_interval=timedelta(1)) as dag:
        task1 = BashOperator(
            task_id='example_elt_job_1',
            bash_command='echo I am transforming a very important table!',
        )
        breaker = SimpleCircuitBreakerOperator(
            task_id='example_circuit_breaker',
            mcd_session_conn_id=mcd_connection_id,
            rule_uuid='<RULE_UUID>'
        )
        task2 = BashOperator(
            task_id='example_elt_job_2',
            bash_command='echo I am building a very important dashboard from the table created in task1!',
            trigger_rule='none_failed'
        )
    
        task1 >> breaker >> task2
    

    This operator expects the following parameters:

    • mcd_session_conn_id: A SessionHook compatible connection.
    • rule_uuid: UUID of the rule (custom SQL monitor) to execute.

    The following parameters can also be passed:

    • timeout_in_minutes [default=5]: Polling timeout in minutes. Note that The Data Collector Lambda has a max timeout of 15 minutes when executing a query. Queries that take longer to execute are not supported, so we recommend filtering down the query output to improve performance (e.g limit WHERE clause). If you expect a query to take the full 15 minutes we recommend padding the timeout to 20 minutes.
    • fail_open [default=True]: Prevent any errors or timeouts when executing a rule from stopping your pipeline. Raises AirflowSkipException if set to True and any issues are encountered. Recommended to set the trigger_rule param for any downstream tasks to none_failed in this case.

Tests and releases

Locally make test will run all tests. See README-dev.md for additional details on development.

CircleCI manages all testing for deployment.

When ready to release make a PR for main. CircleCI will test and deploy to PyPI.

Don't forget to increment the version number in setup.py first! An existing version will not be deployed.

License

Apache 2.0 - See the LICENSE for more information.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_mcd-0.0.1.tar.gz (9.2 kB view hashes)

Uploaded Source

Built Distribution

airflow_mcd-0.0.1-py3-none-any.whl (10.6 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page