airflow-clickhouse-plugin - Airflow plugin to execute ClickHouse commands and queries
Project description
Airflow ClickHouse Plugin
Provides ClickHouseHook
and ClickHouseOperator
for Apache Airflow
based on mymarilyn/clickhouse-driver.
Features
- SQL queries are templated.
- Can run multiple SQL queries per single
ClickHouseOperator
. - Result of the last query of
ClickHouseOperator
instance is pushed to XCom. - Executed queries are logged in a pretty form.
- Uses effective native ClickHouse TCP protocol thanks to clickhouse-driver. Does not support HTTP protocol.
Installation
pip install -U airflow-clickhouse-plugin
Usage
See examples below.
ClickHouseOperator Reference
To import ClickHouseOperator
use:
from airflow.operators.clickhouse_operator import ClickHouseOperator
Supported kwargs:
sql
: templated query (if argument is a singlestr
) or queries (if iterable ofstr
's).clickhouse_conn_id
: connection id. Connection schema (all properties are optional, defaults correspond to the default ClickHouse configuration):host
, default:localhost
;port
, default:9000
(default native ClickHouse protocol port);database
, default:default
;user
, default:default
;password
, default:''
(empty).
parameters
: passed to clickhouse-driver execute method.- If multiple queries are provided via
sql
then the parameters are passed to all of them. - Parameters are not templated.
- If multiple queries are provided via
database
: if present, overrides database defined by connection.- Other kwargs (including the required
task_id
) are inherited from Airflow BaseOperator.
The result of the last query is pushed to XCom.
ClickHouseHook Reference
To import ClickHouseHook
use:
from airflow.hooks.clickhouse_hook import ClickHouseHook
Supported kwargs of constructor (__init__
method):
clickhouse_conn_id
: connection id. See connection schema above.database
: if present, overrides database defined by connection.
Supports all of the methods of the Airflow BaseHook including:
get_records(sql: str, parameters: dict=None)
: returns result of the query as a list of tuples. Materializes all the records in memory.get_first(sql: str, parameters: dict=None)
: returns the first row of the result. Does not load the whole dataset into memory because of using execute_iter.run(sql, parameters)
: runs a single query (specified argument of typestr
) or multiple queries (if iterable ofstr
).parameters
can have any form supported by execute method of clickhouse-driver.- If single query is run then returns its result. If multiple queries are run then returns the result of the last of them.
- If multiple queries are given then
parameters
are passed to all of them. - Materializes all the records in memory (uses simple
execute
but notexecute_iter
).- To achieve results streaming by
execute_iter
use it directly viahook.get_conn().execute_iter(…)
(see execute_iter reference).
- To achieve results streaming by
- Every
run
call uses a new connection which is closed when finished.
get_conn()
: returns the underlying clickhouse_driver.Client instance.
Examples
ClickHouseOperator
from airflow import DAG
from airflow.operators.clickhouse_plugin import ClickHouseOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
with DAG(
dag_id='update_income_aggregate',
start_date=days_ago(2),
) as dag:
ClickHouseOperator(
task_id='update_income_aggregate',
database='default',
sql=(
"INSERT INTO aggregate "
"SELECT eventDt, sum(price * qty) AS income FROM sales "
"WHERE eventDt = '{{ ds }}' GROUP BY eventDt",
"OPTIMIZE TABLE aggregate ON CLUSTER {{ var.value.cluster_name }} "
"PARTITION toDate('{{ execution_date.format('%Y-%m-01') }}')",
"SELECT sum(income) FROM aggregate "
"WHERE eventDt BETWEEN "
"'{{ execution_date.start_of('month').to_date_string() }}'"
"AND '{{ execution_date.end_of('month').to_date_string() }}'",
# result of the last query is pushed to XCom
),
clickhouse_conn_id='clickhouse_test',
) >> PythonOperator(
task_id='print_month_income',
provide_context=True,
python_callable=lambda task_instance, **_:
# pulling XCom value and printing it
print(task_instance.xcom_pull(task_ids='update_income_aggregate')),
)
ClickHouseHook
from airflow import DAG
from airflow.hooks.clickhouse_hook import ClickHouseHook
from airflow.hooks.mysql_hook import MySqlHook
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
def mysql_to_clickhouse():
mysql_hook = MySqlHook()
ch_hook = ClickHouseHook()
records = mysql_hook.get_records('SELECT * FROM some_mysql_table')
ch_hook.run('INSERT INTO some_ch_table VALUES', records)
with DAG(
dag_id='mysql_to_clickhouse',
start_date=days_ago(2),
) as dag:
dag >> PythonOperator(
task_id='mysql_to_clickhouse',
python_callable=mysql_to_clickhouse,
)
Important note: don't try to insert values using
ch_hook.run('INSERT INTO some_ch_table VALUES (1)')
literal form.
clickhouse-driver requires values for INSERT
query to
be provided via parameters
due to specifics of the native ClickHouse
protocol.
Default connection
By default the hook and operator use connection_id='clickhouse_default'
.
How to run tests
From the root project directory: python -m unittest discover -s tests
How to upload to PyPI
info: https://packaging.python.org/tutorials/packaging-projects/#uploading-your-project-to-pypi python3 setup.py sdist bdist_wheel twine upload dist/*
Contributors
- Anton Bryzgalov, @bryzgaloff
- Viktor Taranenko, @viktortnk
- Danila Ganchar, @d-ganchar
- Mikhail, @glader
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Hashes for airflow-clickhouse-plugin-0.5.5.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2e64a97b4eaeca098c452777b57096d09a099260dda7df3076a7366faece9401 |
|
MD5 | 267f6cce63d50161e97f3b6817ebf929 |
|
BLAKE2b-256 | 4098e002d1d6ef7bff66489d599b1a649c99ed08a257e9af7c162cb57b3ec8ff |