Skip to main content

airflow-clickhouse-plugin - Airflow plugin to execute ClickHouse commands and queries

Project description

Airflow ClickHouse Plugin

Provides ClickHouseHook and ClickHouseOperator for Apache Airflow based on mymarilyn/clickhouse-driver.

Features

  1. SQL queries are templated.
  2. Can run multiple SQL queries per single ClickHouseOperator.
  3. Result of the last query of ClickHouseOperator instance is pushed to XCom.
  4. Executed queries are logged in a pretty form.
  5. Uses effective native ClickHouse TCP protocol thanks to clickhouse-driver. Does not support HTTP protocol.

Installation

pip install -U airflow-clickhouse-plugin

Usage

See examples below.

ClickHouseOperator Reference

To import ClickHouseOperator use: from airflow.operators.clickhouse_operator import ClickHouseOperator

Supported kwargs:

  • sql: templated query (if argument is a single str) or queries (if iterable of str's).
  • clickhouse_conn_id: connection id. Connection schema (all properties are optional, defaults correspond to the default ClickHouse configuration):
    • host, default: localhost;
    • port, default: 9000 (default native ClickHouse protocol port);
    • database, default: default;
    • user, default: default;
    • password, default: '' (empty).
  • parameters: passed to clickhouse-driver execute method.
    • If multiple queries are provided via sql then the parameters are passed to all of them.
    • Parameters are not templated.
  • database: if present, overrides database defined by connection.
  • Other kwargs (including the required task_id) are inherited from Airflow BaseOperator.

The result of the last query is pushed to XCom.

ClickHouseHook Reference

To import ClickHouseHook use: from airflow.hooks.clickhouse_hook import ClickHouseHook

Supported kwargs of constructor (__init__ method):

  • clickhouse_conn_id: connection id. See connection schema above.
  • database: if present, overrides database defined by connection.

Supports all of the methods of the Airflow BaseHook including:

  • get_records(sql: str, parameters: dict=None): returns result of the query as a list of tuples. Materializes all the records in memory.
  • get_first(sql: str, parameters: dict=None): returns the first row of the result. Does not load the whole dataset into memory because of using execute_iter.
  • run(sql, parameters): runs a single query (specified argument of type str) or multiple queries (if iterable of str). parameters can have any form supported by execute method of clickhouse-driver.
    • If single query is run then returns its result. If multiple queries are run then returns the result of the last of them.
    • If multiple queries are given then parameters are passed to all of them.
    • Materializes all the records in memory (uses simple execute but not execute_iter).
      • To achieve results streaming by execute_iter use it directly via hook.get_conn().execute_iter(…) (see execute_iter reference).
    • Every run call uses a new connection which is closed when finished.
  • get_conn(): returns the underlying clickhouse_driver.Client instance.
  • get_pandas_df is not implemented.

Examples

ClickHouseOperator

from airflow import DAG
from airflow.operators.clickhouse_plugin import ClickHouseOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

with DAG(
        dag_id='update_income_aggregate',
        start_date=days_ago(2),
) as dag:
    ClickHouseOperator(
        task_id='update_income_aggregate',
        database='default',
        sql=(
            "INSERT INTO aggregate "
                "SELECT eventDt, sum(price * qty) AS income FROM sales "
                "WHERE eventDt = '{{ ds }}' GROUP BY eventDt",
            "OPTIMIZE TABLE aggregate ON CLUSTER {{ var.value.cluster_name }} "
                "PARTITION toDate('{{ execution_date.format('%Y-%m-01') }}')",
            "SELECT sum(income) FROM aggregate "
                "WHERE eventDt BETWEEN "
                "'{{ execution_date.start_of('month').to_date_string() }}'"
                "AND '{{ execution_date.end_of('month').to_date_string() }}'",
            # result of the last query is pushed to XCom
        ),
        clickhouse_conn_id='clickhouse_test',
    ) >> PythonOperator(
        task_id='print_month_income',
        provide_context=True,
        python_callable=lambda task_instance, **_:
            # pulling XCom value and printing it
            print(task_instance.xcom_pull(task_ids='update_income_aggregate')),
    )

ClickHouseHook

from airflow import DAG
from airflow.hooks.clickhouse_hook import ClickHouseHook
from airflow.hooks.mysql_hook import MySqlHook
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago


def mysql_to_clickhouse():
    mysql_hook = MySqlHook()
    ch_hook = ClickHouseHook()
    records = mysql_hook.get_records('SELECT * FROM some_mysql_table')
    ch_hook.run('INSERT INTO some_ch_table VALUES', records)


with DAG(
        dag_id='mysql_to_clickhouse',
        start_date=days_ago(2),
) as dag:
    dag >> PythonOperator(
        task_id='mysql_to_clickhouse',
        python_callable=mysql_to_clickhouse,
    )

Important note: don't try to insert values using ch_hook.run('INSERT INTO some_ch_table VALUES (1)') literal form. clickhouse-driver requires values for INSERT query to be provided via parameters due to specifics of the native ClickHouse protocol.

Default connection

By default the hook and operator use connection_id='clickhouse_default'.

Contributors

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow-clickhouse-plugin-0.5.2.tar.gz (7.0 kB view details)

Uploaded Source

Built Distribution

File details

Details for the file airflow-clickhouse-plugin-0.5.2.tar.gz.

File metadata

  • Download URL: airflow-clickhouse-plugin-0.5.2.tar.gz
  • Upload date:
  • Size: 7.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/46.1.3 requests-toolbelt/0.9.1 tqdm/4.45.0 CPython/3.6.9

File hashes

Hashes for airflow-clickhouse-plugin-0.5.2.tar.gz
Algorithm Hash digest
SHA256 0d63391519fefd474acc485cbde3a6986756154dafcce6884c4dce87791696de
MD5 3ab4e32e0efbdaa5f8cfabf79c1e9927
BLAKE2b-256 ee977f6cd4c87b0c3691be180fea0b693fb2cab2474b99a1ee490d59f7176452

See more details on using hashes here.

File details

Details for the file airflow_clickhouse_plugin-0.5.2-py3-none-any.whl.

File metadata

  • Download URL: airflow_clickhouse_plugin-0.5.2-py3-none-any.whl
  • Upload date:
  • Size: 8.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/46.1.3 requests-toolbelt/0.9.1 tqdm/4.45.0 CPython/3.6.9

File hashes

Hashes for airflow_clickhouse_plugin-0.5.2-py3-none-any.whl
Algorithm Hash digest
SHA256 5ea4c2d611d66be940ba486dfa3ede8f01e26a0e6e8513d66cb60d1e52cf5d3e
MD5 4187703c0fdd05c5df813ee76b307213
BLAKE2b-256 2a9f130db071e61bc8426b5cd7f31e1cb9f7ae90edb5cf8d2048de60f0e9350c

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page