Skip to main content

An asyncio driver for ClickHouse with native TCP support

Project description

asynch

pypi license workflows workflows

Introduction

asynch is an asynchronous ClickHouse Python driver with native TCP interface support, which reuses most of clickhouse-driver features and complies with PEP249.

Installation

> pip install asynch

If you want to install clickhouse-cityhash to enable transport compression

> pip install asynch[compression]

Release v0.3.0 announcement

The version 0.2.5 should have been the v0.3.0 due to compatibility-breaking changes. Before upgrading to the v0.3.0, please pay attention to the incompatible changes like:

  • The asynch/connection.py::connect function is removed - you can use the async with a Connection instance.
  • The asynch/connection.py::Connection.connected property is renamed to opened.
  • The asynch/pool.py::create_pool function is removed - you can use the async with a Pool instance.
  • The deprecated methods from Connection, Cursor and Pool classes are removed.

For more details, please refer to the project CHANGELOG.md file.

Usage

Basically, a connection to a ClickHouse server can be established in two ways:

  1. with a DSN string, e.g., clickhouse://[user:password]@host:port/database;

    from asynch import Connection
    
    # connecting with a DSN string
    async def connect_database():
        async with Connection(
            dsn = "clickhouse://ch_user:P@55w0rD:@127.0.0.1:9000/chdb",
        ) as conn:
            pass
    
  2. with separately given connection/DSN parameters: user (optional), password (optional), host, port, database.

    from asynch import Connection
    
    # connecting with DSN parameters
    async def connect_database():
        async with Connection(
            user = "ch_user",
            password = "P@55w0rD",
            host = "127.0.0.1",
            port = 9000,
            database = "chdb",
        ) as conn:
            pass
    

If a DSN string is given, it takes priority over any specified connection parameter.

Create a database and a table by executing SQL statements via an instance of the Cursor class (here its child DictCursor class) acquired from an instance of the Connection class.

async def create_table(conn: Connection):
    async with conn.cursor(cursor=DictCursor) as cursor:
        await cursor.execute("CREATE DATABASE IF NOT EXISTS test")
        await cursor.execute("""
            CREATE TABLE if not exists test.asynch
            (
                `id`       Int32,
                `decimal`  Decimal(10, 2),
                `date`     Date,
                `datetime` DateTime,
                `float`    Float32,
                `uuid`     UUID,
                `string`   String,
                `ipv4`     IPv4,
                `ipv6`     IPv6
            )
            ENGINE = MergeTree
            ORDER BY id
            """
        )

Fetching one row from an executed SQL statement:

async def fetchone(conn: Connection):
    # by default, an instance of the `Cursor` class
    async with conn.cursor() as cursor:
        await cursor.execute("SELECT 1")
        ret = await cursor.fetchone()
        assert ret == (1,)

Fetching all the rows from an executed SQL statement:

async def fetchall():
    async with conn.cursor() as cursor:
        await cursor.execute("SELECT 1")
        ret = await cursor.fetchall()
        assert ret == [(1,)]

Using an instance of the DictCursor class to get results as a sequence of dictionaries representing the rows of an executed SQL query:

async def dict_cursor():
    async with conn.cursor(cursor=DictCursor) as cursor:
        await cursor.execute("SELECT 1")
        ret = await cursor.fetchall()
        assert ret == [{"1": 1}]

Inserting data with dicts via a DictCursor instance:

from asynch.cursors import DictCursor

async def insert_dict():
    async with conn.cursor(cursor=DictCursor) as cursor:
        ret = await cursor.execute(
            """INSERT INTO test.asynch(id,decimal,date,datetime,float,uuid,string,ipv4,ipv6) VALUES""",
            [
                {
                    "id": 1,
                    "decimal": 1,
                    "date": "2020-08-08",
                    "datetime": "2020-08-08 00:00:00",
                    "float": 1,
                    "uuid": "59e182c4-545d-4f30-8b32-cefea2d0d5ba",
                    "string": "1",
                    "ipv4": "0.0.0.0",
                    "ipv6": "::",
                }
            ],
        )
        assert ret == 1

Inserting data with tuples:

async def insert_tuple():
    async with conn.cursor(cursor=DictCursor) as cursor:
        ret = await cursor.execute(
            """INSERT INTO test.asynch(id,decimal,date,datetime,float,uuid,string,ipv4,ipv6) VALUES""",
            [
                (
                    1,
                    1,
                    "2020-08-08",
                    "2020-08-08 00:00:00",
                    1,
                    "59e182c4-545d-4f30-8b32-cefea2d0d5ba",
                    "1",
                    "0.0.0.0",
                    "::",
                )
            ],
        )
        assert ret == 1

Connection Pool

Before the v0.2.4:

async def use_pool():
    pool = await asynch.create_pool()
    async with pool.acquire() as conn:
        async with conn.cursor() as cursor:
            await cursor.execute("SELECT 1")
            ret = await cursor.fetchone()
            assert ret == (1,)
    pool.close()
    await pool.wait_closed()

Since the v0.2.5 -> v0.3.0:

async def use_pool():
    # init a Pool and fill it with the `minsize` opened connections
    async with Pool(minsize=1, maxsize=2) as pool:
        # acquire a connection from the pool
        async with pool.connection() as conn:
            async with conn.cursor() as cursor:
                await cursor.execute("SELECT 1")
                ret = await cursor.fetchone()
                assert ret == (1,)

Or, you may opne/close the pool manually:

async def use_pool():
    pool = Pool(minsize=1, maxsize=2)
    await pool.startup()

    # some logic

    await pool.shutdown()

ThanksTo

License

This project is licensed under the Apache-2.0 License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

asynch-0.3.0.tar.gz (59.7 kB view details)

Uploaded Source

Built Distribution

asynch-0.3.0-py3-none-any.whl (76.4 kB view details)

Uploaded Python 3

File details

Details for the file asynch-0.3.0.tar.gz.

File metadata

  • Download URL: asynch-0.3.0.tar.gz
  • Upload date:
  • Size: 59.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.9.22

File hashes

Hashes for asynch-0.3.0.tar.gz
Algorithm Hash digest
SHA256 5806b3df6b7ed998a427c718c7d3be73f498f234d11e3aa5b5dd50577db94516
MD5 3ad6fc81c9afc42f568215153bb6e482
BLAKE2b-256 2872bc6f4e4a4fb66227d5a6c10e68c7140dd825730d40bc4f224a5555d625a7

See more details on using hashes here.

File details

Details for the file asynch-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: asynch-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 76.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.9.22

File hashes

Hashes for asynch-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b95e58150c6d2d0eac0044da99357699cf36bb7ec6d78aa92a8e2a3b5f27621e
MD5 a62327413054b35465c6be859ddbc7b4
BLAKE2b-256 70c01d1190c9e49bf947451a3dc817f25fec5e54ac44102ded96e3e4e9dc55cd

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page