Skip to main content

airflow-clickhouse-plugin - Airflow plugin to execute ClickHouse commands and queries

Project description

Airflow ClickHouse Plugin

Provides ClickHouseOperator, ClickHouseHook and ClickHouseSqlSensor for Apache Airflow based on mymarilyn/clickhouse-driver.

Features

  1. SQL queries are templated.
  2. Can run multiple SQL queries per single ClickHouseOperator.
  3. Result of the last query of ClickHouseOperator instance is pushed to XCom.
  4. Executed queries are logged in a pretty form.
  5. Uses efficient native ClickHouse TCP protocol thanks to clickhouse-driver. Does not support HTTP protocol.
  6. Supports extra ClickHouse connection parameters such as various timeouts, compression, secure, etc through Airflow Connection.extra property.

Installation and dependencies

pip install -U airflow-clickhouse-plugin

Requires apache-airflow and clickhouse-driver (installed automatically by pip). Primarily supports Airflow 2.0–2.3. Later versions are expected to work properly but may be not fully tested. Use plugin versions below 0.6.0 (e.g. 0.5.7.post1) to preserve compatibility with Airflow 1.10.6 (this version has long-term support on Google Cloud Composer).

Note on pandas dependency

Starting from Airflow 2.2 pandas is now an extra requirement. To install airflow-clickhouse-plugin with pandas support, use pip install airflow-clickhouse-plugin[pandas].

Important: this works only with pip 21+. So to handle pandas dependency properly you may need to first upgrade pip using pip install -U pip.

If you are not able to upgrade pip to 21+, install dependency directly using pip install apache-airflow[pandas]== (specifying current Airflow version). Simple one-liner: pip install "apache-airflow[pandas]==$(pip freeze | grep apache-airflow== | cut -d'=' -f3)".

Usage

To see examples scroll down.

ClickHouseOperator Reference

To import ClickHouseOperator use: from airflow_clickhouse_plugin.operators.clickhouse_operator import ClickHouseOperator

Supported kwargs:

  • sql: templated query (if argument is a single str) or queries (if iterable of str's).
  • clickhouse_conn_id: connection id. Connection schema is described below.
  • parameters: passed to clickhouse-driver execute method.
    • If multiple queries are provided via sql then the parameters are passed to all of them.
    • Parameters are not templated.
  • database: if present, overrides database defined by connection.
  • Other kwargs (including the required task_id) are inherited from Airflow BaseOperator.

The result of the last query is pushed to XCom.

See example below.

ClickHouseHook Reference

To import ClickHouseHook use: from airflow_clickhouse_plugin.hooks.clickhouse_hook import ClickHouseHook

Supported kwargs of constructor (__init__ method):

  • clickhouse_conn_id: connection id. Connection schema is described below.
  • database: if present, overrides database defined by connection.

Supports all the methods of the Airflow BaseHook including:

  • get_records(sql: str, parameters: dict=None): returns result of the query as a list of tuples. Materializes all the records in memory.
  • get_first(sql: str, parameters: dict=None): returns the first row of the result. Does not load the whole dataset into memory because of using execute_iter. If the dataset is empty then returns None following fetchone semantics.
  • run(sql, parameters): runs a single query (specified argument of type str) or multiple queries (if iterable of str). parameters can have any form supported by execute method of clickhouse-driver.
    • If single query is run then returns its result. If multiple queries are run then returns the result of the last of them.
    • If multiple queries are given then parameters are passed to all of them.
    • Materializes all the records in memory (uses simple execute but not execute_iter).
      • To achieve results streaming by execute_iter use it directly via hook.get_conn().execute_iter(…) (see execute_iter reference).
    • Every run call uses a new connection which is closed when finished.
  • get_conn(): returns the underlying clickhouse_driver.Client instance.

See example below.

ClickHouseSqlSensor Reference

Sensor fully inherits from Airflow SQLSensor and therefore fully implements its interface using ClickHouseHook to fetch the SQL execution result and supports templating of sql argument.

See example below.

ClickHouse Connection schema

clickhouse_driver.Client is initiated with attributes stored in Airflow Connection attributes. The mapping of the attributes is listed below:

Airflow Connection attribute Client.__init__ argument
host host
port port
schema database
login user
password password

If you pass database argument to ClickHouseOperator or ClickHouseHook explicitly then it is passed to the Client instead of the schema attribute of the Airflow connection.

Extra arguments

You may also pass additional arguments, such as timeouts, compression, secure, etc through Connection.extra attribute. The attribute should contain a JSON object which will be deserialized and all of its properties will be passed as-is to the Client.

For example, if Airflow connection contains extra={"secure":true} then the Client.__init__ will receive secure=True keyword argument in addition to other non-empty connection attributes.

Default values

If the Airflow connection attribute is not set then it is not passed to the Client at all. In that case the default value of the corresponding clickhouse_driver.Connection argument is used (e.g. user defaults to 'default').

This means that Airflow ClickHouse Plugin does not itself define any default values for the ClickHouse connection. You may fully rely on default values of the clickhouse-driver version you use. The only exception is host: if the attribute of Airflow connection is not set then 'localhost' is used.

Default connection

By default, the plugin uses connection_id='clickhouse_default'.

Examples

ClickHouseOperator Example

from airflow import DAG
from airflow_clickhouse_plugin.operators.clickhouse_operator import ClickHouseOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

with DAG(
        dag_id='update_income_aggregate',
        start_date=days_ago(2),
) as dag:
    ClickHouseOperator(
        task_id='update_income_aggregate',
        database='default',
        sql=(
            '''
                INSERT INTO aggregate
                SELECT eventDt, sum(price * qty) AS income FROM sales
                WHERE eventDt = '{{ ds }}' GROUP BY eventDt
            ''', '''
                OPTIMIZE TABLE aggregate ON CLUSTER {{ var.value.cluster_name }}
                PARTITION toDate('{{ execution_date.format('%Y-%m-01') }}')
            ''', '''
                SELECT sum(income) FROM aggregate
                WHERE eventDt BETWEEN
                    '{{ execution_date.start_of('month').to_date_string() }}'
                    AND '{{ execution_date.end_of('month').to_date_string() }}'
            ''',
            # result of the last query is pushed to XCom
        ),
        clickhouse_conn_id='clickhouse_test',
    ) >> PythonOperator(
        task_id='print_month_income',
        provide_context=True,
        python_callable=lambda task_instance, **_:
            # pulling XCom value and printing it
            print(task_instance.xcom_pull(task_ids='update_income_aggregate')),
    )

ClickHouseHook Example

from airflow import DAG
from airflow_clickhouse_plugin.hooks.clickhouse_hook import ClickHouseHook
from airflow.hooks.mysql_hook import MySqlHook
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago


def mysql_to_clickhouse():
    mysql_hook = MySqlHook()
    ch_hook = ClickHouseHook()
    records = mysql_hook.get_records('SELECT * FROM some_mysql_table')
    ch_hook.run('INSERT INTO some_ch_table VALUES', records)


with DAG(
        dag_id='mysql_to_clickhouse',
        start_date=days_ago(2),
) as dag:
    dag >> PythonOperator(
        task_id='mysql_to_clickhouse',
        python_callable=mysql_to_clickhouse,
    )

Important note: don't try to insert values using ch_hook.run('INSERT INTO some_ch_table VALUES (1)') literal form. clickhouse-driver requires values for INSERT query to be provided via parameters due to specifics of the native ClickHouse protocol.

ClickHouseSqlSensor Example

from airflow import DAG
from airflow_clickhouse_plugin.sensors.clickhouse_sql_sensor import ClickHouseSqlSensor
from airflow_clickhouse_plugin.operators.clickhouse_operator import ClickHouseOperator
from airflow.utils.dates import days_ago


with DAG(
        dag_id='listen_warnings',
        start_date=days_ago(2),
) as dag:
    dag >> ClickHouseSqlSensor(
        task_id='poke_events_count',
        database='monitor',
        sql="SELECT count() FROM warnings WHERE eventDate = '{{ ds }}'",
        success=lambda cnt: cnt > 10000,
    ) >> ClickHouseOperator(
        task_id='create_alert',
        database='alerts',
        sql='''
            INSERT INTO events SELECT eventDate, count()
            FROM monitor.warnings WHERE eventDate = '{{ ds }}'
        ''',
    )

How to run tests

Unit tests

From the root project directory: python -m unittest discover -s tests/unit

Integration tests

Integration tests require access to ClickHouse server. Tests use connection URI defined via environment variable AIRFLOW_CONN_CLICKHOUSE_DEFAULT with clickhouse://localhost as default.

Run from the project root: python -m unittest discover -s tests/integration

All tests

From the root project directory: python -m unittest discover -s tests

Github Actions

Github Action is set up for this project.

Run tests using Docker

Run ClickHouse server inside Docker:

docker exec -it $(docker run --rm -d yandex/clickhouse-server) bash

The above command will open bash inside the container.

Install dependencies into container and run tests (execute inside container):

apt-get update \
  && apt-get install -y python3.8 python3-pip git \
  && git clone https://github.com/whisklabs/airflow-clickhouse-plugin.git \
  && cd airflow-clickhouse-plugin \
  && python3.8 -m pip install -r requirements.txt \
  && python3.8 -m unittest discover -s tests

PR

  1. Merge PR.
  2. Add a commit with contributors list update.
  3. Add a commit with version increment.
  4. Publish to PyPI.

How to upload to PyPI

info: https://packaging.python.org/tutorials/packaging-projects/#uploading-your-project-to-pypi

Test PyPI:

rm -rf __pycache__/ airflow_clickhouse_plugin.egg-info/ dist/ build/
python3 setup.py sdist bdist_wheel
twine upload --repository testpypi dist/*
# username: __token__
# token: pypi-AgENdGVzdC5weXBpLm9yZwIkOTM2MTIxMTAtZTg1Mi00YzcwLWFiMDMtYTFmN2M5MWU3MmRmAAJKeyJwZXJtaXNzaW9ucyI6IHsicHJvamVjdHMiOiBbImFpcmZsb3ctY2xpY2tob3VzZS1wbHVnaW4iXX0sICJ2ZXJzaW9uIjogMX0AAAYgVpAn_Y2jBuGxoJdJFrZeX3s6svBPkgK2gWVWFcXxlZs

Run tests for test PyPI version:

docker exec -it $(docker run --rm -d yandex/clickhouse-server) bash

VERSION=0.8.1
apt-get update \
  && apt-get install -y python3.8 python3-pip git \
  && git clone https://github.com/whisklabs/airflow-clickhouse-plugin.git \
  && cd airflow-clickhouse-plugin \
  && rm -rf airflow_clickhouse_plugin \
  && python3.8 -m pip install -r requirements.txt \
  && python3.8 -m pip install \
    --index-url https://test.pypi.org/simple \
    --extra-index-url https://pypi.org/simple \
    airflow-clickhouse-plugin[pandas]==${VERSION} \
  && python3.8 -m unittest discover -s tests

Pandas test may fail.

Public PyPI:

twine upload dist/*

Test public PyPI (run clickhouse container), with pandas:

VERSION=0.8.1
apt-get update \
  && apt-get install -y python3.8 python3-pip git \
  && git clone https://github.com/whisklabs/airflow-clickhouse-plugin.git \
  && cd airflow-clickhouse-plugin \
  && rm -rf airflow_clickhouse_plugin \
  && python3.8 -m pip install -r requirements.txt \
  && python3.8 -m pip install airflow-clickhouse-plugin[pandas]==${VERSION} \
  && python3.8 -m unittest discover -s tests

Contributors

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow-clickhouse-plugin-0.8.2.tar.gz (15.5 kB view details)

Uploaded Source

Built Distribution

airflow_clickhouse_plugin-0.8.2-py3-none-any.whl (11.5 kB view details)

Uploaded Python 3

File details

Details for the file airflow-clickhouse-plugin-0.8.2.tar.gz.

File metadata

  • Download URL: airflow-clickhouse-plugin-0.8.2.tar.gz
  • Upload date:
  • Size: 15.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.3.0 pkginfo/1.7.0 requests/2.25.1 setuptools/54.1.1 requests-toolbelt/0.9.1 tqdm/4.59.0 CPython/3.8.5

File hashes

Hashes for airflow-clickhouse-plugin-0.8.2.tar.gz
Algorithm Hash digest
SHA256 6579986ccb7bea72fdfa835c35d45a0a7d1643c004d23e81ecc6d94d9b7a5b01
MD5 32d8fcb5396128e4c87f352dbb0e0252
BLAKE2b-256 a337a978a130afe23a5fa189911d72b324ef465a13c7b9d549175e5a64c2c254

See more details on using hashes here.

File details

Details for the file airflow_clickhouse_plugin-0.8.2-py3-none-any.whl.

File metadata

  • Download URL: airflow_clickhouse_plugin-0.8.2-py3-none-any.whl
  • Upload date:
  • Size: 11.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.3.0 pkginfo/1.7.0 requests/2.25.1 setuptools/54.1.1 requests-toolbelt/0.9.1 tqdm/4.59.0 CPython/3.8.5

File hashes

Hashes for airflow_clickhouse_plugin-0.8.2-py3-none-any.whl
Algorithm Hash digest
SHA256 e13395ac7756720d0e83ae3e976998f05cee4cb647ba0e98bb42b0acd40b29cb
MD5 37c1b46784f212c6ae4d70e953757773
BLAKE2b-256 f739553dea0d7a1793944a11b509a6255d7bc7b8ebda086706155e0ae92b7cf7

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page