Skip to main content

Python обертка для запросов в БД Clickhouse

Project description

Python wrapper for database queries Clickhouse

The wrapper is done around clickhouse-driver

Written in python version 3.5

Installation

pip install clickhousepy
or
pip install clickhousepy[pandas]  (for installation pandas)

Getting Data from Clickhouse in Pandas Dataframe Format

from clickhousepy import Client
import datetime as dt

TEST_DB = "__chpytest12345"
TEST_TABLE = "__chpytest12345"


client.create_db(TEST_DB)
client.create_table_mergetree(
    TEST_DB, TEST_TABLE,
    columns=[("i", "UInt32")], # or ["i UInt32"]
    orders=["i"],
)
client.insert(
    TEST_DB, TEST_TABLE,
    [{"i": 1}, {"i": 2}],
) 
query = "SELECT i FROM {}.{}".format(TEST_DB, TEST_TABLE)
r = client.get_df(query, columns_names=["Col Integer"])
print(r)

Brief documentation of some methods

from clickhousepy import Client
import datetime as dt


TEST_DB = "__chpytest12345"
TEST_TABLE = "__chpytest12345"

client = Client(host="", user="", password="")

r = client.show_databases()
print("list of databases:", r)

client.create_db(TEST_DB)

client.create_table_mergetree(
    TEST_DB, TEST_TABLE,
    columns=[("s", "String")],
    orders=["s"],
)
# Inserting data.
# Read more about it here
# https://clickhouse-driver.readthedocs.io/en/latest/quickstart.html#inserting-data
client.insert(
    TEST_DB, TEST_TABLE,
    [{"s": "1"}],
) 

r = client.exists(TEST_DB, TEST_TABLE)
print("does the table exist?", r)

r = client.get_count_rows(TEST_DB, TEST_TABLE)
print("number of lines:", r)

# Any request.
r = client.execute("SELECT * FROM {}.{}".format(TEST_DB, TEST_TABLE))
print(r)

Class DB

db = client.DB(TEST_DB)
r = db.show_tables()
print("list of database tables {}:".format(TEST_DB), r)

db.drop_db()

Class Table

db = client.create_db(TEST_DB)

table = db.create_table_mergetree(
    TEST_TABLE,
    columns=[("s", "String"), ("t", "String"), ("d", "Date")],
    orders=["d"],
    partition=["s", "d"],
)
# Initialization of an existing table.
# table = client.Table(TEST_DB, TEST_TABLE)

r = table.show_create_table()
print("table creation description", r)

r = table.describe()
print("table columns", r)

table.insert(
    [
        {"s": "1", "t": "1", "d": dt.datetime(2000, 1, 1)},
        {"s": "2", "t": "2", "d": dt.datetime(2000, 1, 2)},
        {"s": "3", "t": "3", "d": dt.datetime(2000, 1, 3)},
        {"s": "4", "t": "4", "d": dt.datetime(2000, 1, 4)},
    ],
    columns=["s", "t", "d"],
)

data = table.select()
print("First 10 rows of the table", data)

data = table.select(limit=1, columns=["s"], where="s = 2")
print("Filtered sampling", data)

r = table.get_count_rows()
print("number of lines:", r)

r = table.get_min_date(date_column_name="d")
print("minimum date:", r)

r = table.get_max_date(date_column_name="d")
print("maximum date:", r)

print("deleting partitions")
table.drop_partitions([["3", "2000-01-03"], ["4", "2000-01-04"]])

r = table.get_count_rows()
print("number of lines after deleting partitions:", r)

print("row update mutation")
table.update(update="t = '20' ", where="t = '2' ")

print("row deletion mutation")
table.delete(where="t = '20'")
time.sleep(1)
r = table.get_count_rows()
print("number of lines after mutation of line deletion:", r)

print("clear table")
table.truncate()
r = table.get_count_rows()
print("number of rows after clearing the table:", r)

new_table_name = TEST_TABLE + "_new"
print("rename table {} в {}".format(TEST_TABLE, new_table_name))
table.rename(TEST_DB, new_table_name)

r = client.exists(TEST_DB, TEST_TABLE)
print("does table {} exist?".format(TEST_TABLE), r)

print("drop tables")
table.drop_table()

print("deleting a database")
db.drop_db()

Method of copying data from one table to another with checking the number of rows after copying

client.drop_db(TEST_DB)
db = client.create_db(TEST_DB)
table = db.create_table_mergetree(
    TEST_TABLE,
    columns=[("string", "String"), ("integer", "UInt32"), ("dt", "DateTime")],
    orders=["string"],
    partition=["string"],
)
table.insert(
    [
        {"string": "a", "integer": 1, "dt": dt.datetime(2000, 1, 1)},
        {"string": "b", "integer": 2, "dt": dt.datetime(2000, 1, 2)},
        {"string": "c", "integer": 3, "dt": dt.datetime(2000, 1, 3)},
        {"string": "c", "integer": 3, "dt": dt.datetime(2000, 1, 3)},
    ],
)

table_name_2 = TEST_TABLE + "_copy"
table2 = table.copy_table(TEST_DB, table_name_2, return_new_table=True)
is_identic = table2.copy_data_from(
    TEST_DB, TEST_TABLE,
    where="string != 'c' ",
    columns=["string"]
)
# The function will return a bool value, whether the number of lines matches or not, after copying.
assert is_identic

A method of copying data from one table to another while removing duplicate rows.

client.drop_db(TEST_DB)
db = client.create_db(TEST_DB)
table = db.create_table_mergetree(
    TEST_TABLE,
    columns=[("string", "String"), ("integer", "UInt32"), ("dt", "DateTime")],
    orders=["string"],
    partition=["string"],
)
table.insert(
    [
        {"string": "a", "integer": 1, "dt": dt.datetime(2000, 1, 1)},
        {"string": "b", "integer": 2, "dt": dt.datetime(2000, 1, 2)},
        {"string": "c", "integer": 3, "dt": dt.datetime(2000, 1, 3)},
        {"string": "c", "integer": 3, "dt": dt.datetime(2000, 1, 3)},
    ],
)

table_name_2 = TEST_TABLE + "_copy"
table2 = table.copy_table(TEST_DB, table_name_2, return_new_table=True)
# When removing duplicate rows (distinct = True), 
# there will be no check for the number of rows after copying.
table2.copy_data_from(
    TEST_DB, TEST_TABLE,
    columns=["string"],
    distinct=True
)
assert 3 == table2.get_count_rows()

Dependencies

Author

Pavel Maksimov

You can contact me at Telegram, Facebook

Удачи тебе, друг! Поставь звездочку ;)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for clickhousepy, version 2021.1.23
Filename, size File type Python version Upload date Hashes
Filename, size clickhousepy-2021.1.23.tar.gz (12.5 kB) File type Source Python version None Upload date Hashes View

Supported by

Pingdom Pingdom Monitoring Google Google Object Storage and Download Analytics Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN DigiCert DigiCert EV certificate StatusPage StatusPage Status page