Python обертка для запросов в БД Clickhouse
Project description
Python wrapper for database queries Clickhouse
The wrapper is done around clickhouse-driver
Written in python version 3.5
Installation
pip install clickhousepy
or
pip install clickhousepy[pandas] (for installation pandas)
Getting Data from Clickhouse in Pandas Dataframe Format
from clickhousepy import Client import datetime as dt TEST_DB = "__chpytest12345" TEST_TABLE = "__chpytest12345" client.create_db(TEST_DB) client.create_table_mergetree( TEST_DB, TEST_TABLE, columns=[("i", "UInt32")], # or ["i UInt32"] orders=["i"], ) client.insert( TEST_DB, TEST_TABLE, [{"i": 1}, {"i": 2}], ) query = "SELECT i FROM {}.{}".format(TEST_DB, TEST_TABLE) r = client.get_df(query, columns_names=["Col Integer"]) print(r)
Brief documentation of some methods
from clickhousepy import Client import datetime as dt TEST_DB = "__chpytest12345" TEST_TABLE = "__chpytest12345" client = Client(host="", user="", password="") r = client.show_databases() print("list of databases:", r) client.create_db(TEST_DB) client.create_table_mergetree( TEST_DB, TEST_TABLE, columns=[("s", "String")], orders=["s"], ) # Inserting data. # Read more about it here # https://clickhouse-driver.readthedocs.io/en/latest/quickstart.html#inserting-data client.insert( TEST_DB, TEST_TABLE, [{"s": "1"}], ) r = client.exists(TEST_DB, TEST_TABLE) print("does the table exist?", r) r = client.get_count_rows(TEST_DB, TEST_TABLE) print("number of lines:", r) # Any request. r = client.execute("SELECT * FROM {}.{}".format(TEST_DB, TEST_TABLE)) print(r)
Class DB
db = client.DB(TEST_DB) r = db.show_tables() print("list of database tables {}:".format(TEST_DB), r) db.drop_db()
Class Table
db = client.create_db(TEST_DB) table = db.create_table_mergetree( TEST_TABLE, columns=[("s", "String"), ("t", "String"), ("d", "Date")], orders=["d"], partition=["s", "d"], ) # Initialization of an existing table. # table = client.Table(TEST_DB, TEST_TABLE) r = table.show_create_table() print("table creation description", r) r = table.describe() print("table columns", r) table.insert( [ {"s": "1", "t": "1", "d": dt.datetime(2000, 1, 1)}, {"s": "2", "t": "2", "d": dt.datetime(2000, 1, 2)}, {"s": "3", "t": "3", "d": dt.datetime(2000, 1, 3)}, {"s": "4", "t": "4", "d": dt.datetime(2000, 1, 4)}, ], columns=["s", "t", "d"], ) data = table.select() print("First 10 rows of the table", data) data = table.select(limit=1, columns=["s"], where="s = 2") print("Filtered sampling", data) r = table.get_count_rows() print("number of lines:", r) r = table.get_min_date(date_column_name="d") print("minimum date:", r) r = table.get_max_date(date_column_name="d") print("maximum date:", r) print("deleting partitions") table.drop_partitions([["3", "2000-01-03"], ["4", "2000-01-04"]]) r = table.get_count_rows() print("number of lines after deleting partitions:", r) print("row update mutation") table.update(update="t = '20' ", where="t = '2' ") print("row deletion mutation") table.delete(where="t = '20'") time.sleep(1) r = table.get_count_rows() print("number of lines after mutation of line deletion:", r) print("clear table") table.truncate() r = table.get_count_rows() print("number of rows after clearing the table:", r) new_table_name = TEST_TABLE + "_new" print("rename table {} в {}".format(TEST_TABLE, new_table_name)) table.rename(TEST_DB, new_table_name) r = client.exists(TEST_DB, TEST_TABLE) print("does table {} exist?".format(TEST_TABLE), r) print("drop tables") table.drop_table() print("deleting a database") db.drop_db()
Method of copying data from one table to another with checking the number of rows after copying
client.drop_db(TEST_DB) db = client.create_db(TEST_DB) table = db.create_table_mergetree( TEST_TABLE, columns=[("string", "String"), ("integer", "UInt32"), ("dt", "DateTime")], orders=["string"], partition=["string"], ) table.insert( [ {"string": "a", "integer": 1, "dt": dt.datetime(2000, 1, 1)}, {"string": "b", "integer": 2, "dt": dt.datetime(2000, 1, 2)}, {"string": "c", "integer": 3, "dt": dt.datetime(2000, 1, 3)}, {"string": "c", "integer": 3, "dt": dt.datetime(2000, 1, 3)}, ], ) table_name_2 = TEST_TABLE + "_copy" table2 = table.copy_table(TEST_DB, table_name_2, return_new_table=True) is_identic = table2.copy_data_from( TEST_DB, TEST_TABLE, where="string != 'c' ", columns=["string"] ) # The function will return a bool value, whether the number of lines matches or not, after copying. assert is_identic
A method of copying data from one table to another while removing duplicate rows.
client.drop_db(TEST_DB) db = client.create_db(TEST_DB) table = db.create_table_mergetree( TEST_TABLE, columns=[("string", "String"), ("integer", "UInt32"), ("dt", "DateTime")], orders=["string"], partition=["string"], ) table.insert( [ {"string": "a", "integer": 1, "dt": dt.datetime(2000, 1, 1)}, {"string": "b", "integer": 2, "dt": dt.datetime(2000, 1, 2)}, {"string": "c", "integer": 3, "dt": dt.datetime(2000, 1, 3)}, {"string": "c", "integer": 3, "dt": dt.datetime(2000, 1, 3)}, ], ) table_name_2 = TEST_TABLE + "_copy" table2 = table.copy_table(TEST_DB, table_name_2, return_new_table=True) # When removing duplicate rows (distinct = True), # there will be no check for the number of rows after copying. table2.copy_data_from( TEST_DB, TEST_TABLE, columns=["string"], distinct=True ) assert 3 == table2.get_count_rows()
Dependencies
- clickhouse-driver
- pandas (Optional)
Author
Pavel Maksimov
You can contact me at Telegram, Facebook
Удачи тебе, друг! Поставь звездочку ;)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Filename, size | File type | Python version | Upload date | Hashes |
---|---|---|---|---|
Filename, size clickhousepy-2021.1.23.tar.gz (12.5 kB) | File type Source | Python version None | Upload date | Hashes View |