airflow-clickhouse-plugin - Airflow plugin to execute ClickHouse commands and queries
Project description
Airflow ClickHouse Plugin
Provides ClickHouseHook
and ClickHouseOperator
for Apache Airflow
based on mymarilyn/clickhouse-driver.
Features
- SQL queries are templated.
- Can run multiple SQL queries per single
ClickHouseOperator
. - Result of the last query of
ClickHouseOperator
instance is pushed to XCom. - Executed queries are logged in a pretty form.
- Uses effective native ClickHouse TCP protocol thanks to clickhouse-driver. Does not support HTTP protocol.
Installation
pip install -U airflow-clickhouse-plugin
Usage
See examples below.
ClickHouseOperator Reference
To import ClickHouseOperator
use:
from airflow.operators.clickhouse_operator import ClickHouseOperator
Supported kwargs:
sql
: templated query (if argument is a singlestr
) or queries (if iterable ofstr
's).clickhouse_conn_id
: connection id. Connection schema (all properties are optional, defaults correspond to the default ClickHouse configuration):host
, default:localhost
;port
, default:9000
(default native ClickHouse protocol port);database
, default:default
;user
, default:default
;password
, default:''
(empty).
parameters
: passed to clickhouse-driver execute method.- If multiple queries are provided via
sql
then the parameters are passed to all of them. - Parameters are not templated.
- If multiple queries are provided via
database
: if present, overrides database defined by connection.- Other kwargs (including the required
task_id
) are inherited from Airflow BaseOperator.
The result of the last query is pushed to XCom.
ClickHouseHook Reference
To import ClickHouseHook
use:
from airflow.hooks.clickhouse_hook import ClickHouseHook
Supported kwargs of constructor (__init__
method):
clickhouse_conn_id
: connection id. See connection schema above.database
: if present, overrides database defined by connection.
Supports all of the methods of the Airflow BaseHook including:
get_records(sql: str, parameters: dict=None)
: returns result of the query as a list of tuples. Materializes all the records in memory.get_first(sql: str, parameters: dict=None)
: returns the first row of the result. Does not load the whole dataset into memory because of using execute_iter.run(sql, parameters)
: runs a single query (specified argument of typestr
) or multiple queries (if iterable ofstr
).parameters
can have any form supported by execute method of clickhouse-driver.- If single query is run then returns its result. If multiple queries are run then returns the result of the last of them.
- If multiple queries are given then
parameters
are passed to all of them. - Materializes all the records in memory (uses simple
execute
but notexecute_iter
).- To achieve results streaming by
execute_iter
use it directly viahook.get_conn().execute_iter(…)
(see execute_iter reference).
- To achieve results streaming by
- Every
run
call uses a new connection which is closed when finished.
get_conn()
: returns the underlying clickhouse_driver.Client instance.
Examples
ClickHouseOperator
from airflow import DAG
from airflow.operators.clickhouse_plugin import ClickHouseOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
with DAG(
dag_id='update_income_aggregate',
start_date=days_ago(2),
) as dag:
ClickHouseOperator(
task_id='update_income_aggregate',
database='default',
sql=(
"INSERT INTO aggregate "
"SELECT eventDt, sum(price * qty) AS income FROM sales "
"WHERE eventDt = '{{ ds }}' GROUP BY eventDt",
"OPTIMIZE TABLE aggregate ON CLUSTER {{ var.value.cluster_name }} "
"PARTITION toDate('{{ execution_date.format('%Y-%m-01') }}')",
"SELECT sum(income) FROM aggregate "
"WHERE eventDt BETWEEN "
"'{{ execution_date.start_of('month').to_date_string() }}'"
"AND '{{ execution_date.end_of('month').to_date_string() }}'",
# result of the last query is pushed to XCom
),
clickhouse_conn_id='clickhouse_test',
) >> PythonOperator(
task_id='print_month_income',
provide_context=True,
python_callable=lambda task_instance, **_:
# pulling XCom value and printing it
print(task_instance.xcom_pull(task_ids='update_income_aggregate')),
)
ClickHouseHook
from airflow import DAG
from airflow.hooks.clickhouse_hook import ClickHouseHook
from airflow.hooks.mysql_hook import MySqlHook
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
def mysql_to_clickhouse():
mysql_hook = MySqlHook()
ch_hook = ClickHouseHook()
records = mysql_hook.get_records('SELECT * FROM some_mysql_table')
ch_hook.run('INSERT INTO some_ch_table VALUES', records)
with DAG(
dag_id='mysql_to_clickhouse',
start_date=days_ago(2),
) as dag:
dag >> PythonOperator(
task_id='mysql_to_clickhouse',
python_callable=mysql_to_clickhouse,
)
Important note: don't try to insert values using
ch_hook.run('INSERT INTO some_ch_table VALUES (1)')
literal form.
clickhouse-driver requires values for INSERT
query to
be provided via parameters
due to specifics of the native ClickHouse
protocol.
Default connection
By default the hook and operator use connection_id='clickhouse_default'
.
How to run tests
From the root project directory: python -m unittest discover -s tests
How to upload to PyPI
info: https://packaging.python.org/tutorials/packaging-projects/#uploading-your-project-to-pypi python3 setup.py sdist bdist_wheel twine upload dist/*
Contributors
- Anton Bryzgalov, @bryzgaloff
- Viktor Taranenko, @viktortnk
- Danila Ganchar, @d-ganchar
- Mikhail, @glader
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file airflow-clickhouse-plugin-0.5.5.tar.gz
.
File metadata
- Download URL: airflow-clickhouse-plugin-0.5.5.tar.gz
- Upload date:
- Size: 6.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/46.1.3 requests-toolbelt/0.9.1 tqdm/4.44.0 CPython/3.7.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2e64a97b4eaeca098c452777b57096d09a099260dda7df3076a7366faece9401 |
|
MD5 | 267f6cce63d50161e97f3b6817ebf929 |
|
BLAKE2b-256 | 4098e002d1d6ef7bff66489d599b1a649c99ed08a257e9af7c162cb57b3ec8ff |