Skip to main content

TDengine connector for python

Project description

TDengine Connector for Python

TDengine connector for Python enables python programs to access TDengine, using an API which is compliant with the Python DB API 2.0 (PEP-249). It uses TDengine C client library for client server communications.

Install

You can use pip to install from github or pypi:

pip install git+https://github.com/taosdata/taos-connector-python.git
pip install taospy

Source Code

TDengine connector for Python source code is hosted on GitHub.

Examples

Query with PEP-249 API

import taos

conn = taos.connect()
cursor = conn.cursor()

cursor.execute("show databases")
results = cursor.fetchall()
for row in results:
    print(row)
cursor.close()
conn.close()

Query with objective API

import taos

conn = taos.connect()
conn.exec("create database if not exists pytest")

result = conn.query("show databases")
num_of_fields = result.field_count
for field in result.fields:
    print(field)
for row in result:
    print(row)
result.close()
conn.exec("drop database pytest")
conn.close()

Query with async API

from taos import *
from ctypes import *
import time

def fetch_callback(p_param, p_result, num_of_rows):
    print("fetched ", num_of_rows, "rows")
    p = cast(p_param, POINTER(Counter))
    result = TaosResult(p_result)

    if num_of_rows == 0:
        print("fetching completed")
        p.contents.done = True
        result.close()
        return
    if num_of_rows < 0:
        p.contents.done = True
        result.check_error(num_of_rows)
        result.close()
        return None
    
    for row in result.rows_iter(num_of_rows):
        # print(row)
        None
    p.contents.count += result.row_count
    result.fetch_rows_a(fetch_callback, p_param)
    


def query_callback(p_param, p_result, code):
    # type: (c_void_p, c_void_p, c_int) -> None
    if p_result == None:
        return
    result = TaosResult(p_result)
    if code == 0:
        result.fetch_rows_a(fetch_callback, p_param)
    result.check_error(code)


class Counter(Structure):
    _fields_ = [("count", c_int), ("done", c_bool)]

    def __str__(self):
        return "{ count: %d, done: %s }" % (self.count, self.done)


def test_query(conn):
    # type: (TaosConnection) -> None
    counter = Counter(count=0)
    conn.query_a("select * from log.log", query_callback, byref(counter))

    while not counter.done:
        print("wait query callback")
        time.sleep(1)
    print(counter)
    conn.close()


if __name__ == "__main__":
    test_query(connect())

Statement API - Bind row after row

from taos import *

conn = connect()

dbname = "pytest_taos_stmt"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)

conn.exec(
    "create table if not exists log(ts timestamp, bo bool, nil tinyint, \
        ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \
        su smallint unsigned, iu int unsigned, bu bigint unsigned, \
        ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)

stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")

params = new_bind_params(16)
params[0].timestamp(1626861392589)
params[1].bool(True)
params[2].null()
params[3].tinyint(2)
params[4].smallint(3)
params[5].int(4)
params[6].bigint(5)
params[7].tinyint_unsigned(6)
params[8].smallint_unsigned(7)
params[9].int_unsigned(8)
params[10].bigint_unsigned(9)
params[11].float(10.1)
params[12].double(10.11)
params[13].binary("hello")
params[14].nchar("stmt")
params[15].timestamp(1626861392589)
stmt.bind_param(params)

params[0].timestamp(1626861392590)
params[15].null()
stmt.bind_param(params)
stmt.execute()


result = stmt.use_result()
assert result.affected_rows == 2
result.close()

result = conn.query("select * from log")

for row in result:
    print(row)
result.close()
stmt.close()
conn.close()

Statement API - Bind multi rows

from taos import *

conn = connect()

dbname = "pytest_taos_stmt"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)

conn.exec(
    "create table if not exists log(ts timestamp, bo bool, nil tinyint, \
        ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \
        su smallint unsigned, iu int unsigned, bu bigint unsigned, \
        ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)

stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")

params = new_multi_binds(16)
params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
params[1].bool((True, None, False))
params[2].tinyint([-128, -128, None]) # -128 is tinyint null
params[3].tinyint([0, 127, None])
params[4].smallint([3, None, 2])
params[5].int([3, 4, None])
params[6].bigint([3, 4, None])
params[7].tinyint_unsigned([3, 4, None])
params[8].smallint_unsigned([3, 4, None])
params[9].int_unsigned([3, 4, None])
params[10].bigint_unsigned([3, 4, None])
params[11].float([3, None, 1])
params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None])
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591])
stmt.bind_param_batch(params)
stmt.execute()


result = stmt.use_result()
assert result.affected_rows == 3
result.close()

result = conn.query("select * from log")
for row in result:
    print(row)
result.close()
stmt.close()
conn.close()

Statement API - Subscribe

import taos

conn = taos.connect()
dbname = "pytest_taos_subscribe_callback"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec("create table if not exists log(ts timestamp, n int)")
for i in range(10):
    conn.exec("insert into log values(now, %d)" % i)

sub = conn.subscribe(True, "test", "select * from log", 1000)
print("# consume from begin")
for ts, n in sub.consume():
    print(ts, n)

print("# consume new data")
for i in range(5):
    conn.exec("insert into log values(now, %d)(now+1s, %d)" % (i, i))
    result = sub.consume()
    for ts, n in result:
        print(ts, n)

print("# consume with a stop condition")
for i in range(10):
    conn.exec("insert into log values(now, %d)" % int(random() * 10))
    result = sub.consume()
    try:
        ts, n = next(result)
        print(ts, n)
        if n > 5:
            result.stop_query()
            print("## stopped")
            break
    except StopIteration:
        continue

sub.close()

conn.exec("drop database if exists %s" % dbname)
conn.close()

Statement API - Subscribe asynchronously with callback

from taos import *
from ctypes import *

import time


def subscribe_callback(p_sub, p_result, p_param, errno):
    # type: (c_void_p, c_void_p, c_void_p, c_int) -> None
    print("# fetch in callback")
    result = TaosResult(p_result)
    result.check_error(errno)
    for row in result.rows_iter():
        ts, n = row()
        print(ts, n)


def test_subscribe_callback(conn):
    # type: (TaosConnection) -> None
    dbname = "pytest_taos_subscribe_callback"
    try:
        conn.exec("drop database if exists %s" % dbname)
        conn.exec("create database if not exists %s" % dbname)
        conn.select_db(dbname)
        conn.exec("create table if not exists log(ts timestamp, n int)")

        print("# subscribe with callback")
        sub = conn.subscribe(False, "test", "select * from log", 1000, subscribe_callback)

        for i in range(10):
            conn.exec("insert into log values(now, %d)" % i)
            time.sleep(0.7)
        sub.close()

        conn.exec("drop database if exists %s" % dbname)
        conn.close()
    except Exception as err:
        conn.exec("drop database if exists %s" % dbname)
        conn.close()
        raise err


if __name__ == "__main__":
    test_subscribe_callback(connect())

Statement API - Stream

from taos import *
from ctypes import *

def stream_callback(p_param, p_result, p_row):
    # type: (c_void_p, c_void_p, c_void_p) -> None

    if p_result == None or p_row == None:
        return
    result = TaosResult(p_result)
    row = TaosRow(result, p_row)
    try:
        ts, count = row()
        p = cast(p_param, POINTER(Counter))
        p.contents.count += count
        print("[%s] inserted %d in 5s, total count: %d" % (ts.strftime("%Y-%m-%d %H:%M:%S"), count, p.contents.count))

    except Exception as err:
        print(err)
        raise err


class Counter(ctypes.Structure):
    _fields_ = [
        ("count", c_int),
    ]

    def __str__(self):
        return "%d" % self.count


def test_stream(conn):
    # type: (TaosConnection) -> None
    dbname = "pytest_taos_stream"
    try:
        conn.exec("drop database if exists %s" % dbname)
        conn.exec("create database if not exists %s" % dbname)
        conn.select_db(dbname)
        conn.exec("create table if not exists log(ts timestamp, n int)")

        result = conn.query("select count(*) from log interval(5s)")
        assert result.field_count == 2
        counter = Counter()
        counter.count = 0
        stream = conn.stream("select count(*) from log interval(5s)", stream_callback, param=byref(counter))

        for _ in range(0, 20):
            conn.exec("insert into log values(now,0)(now+1s, 1)(now + 2s, 2)")
            time.sleep(2)
        stream.close()
        conn.exec("drop database if exists %s" % dbname)
        conn.close()
    except Exception as err:
        conn.exec("drop database if exists %s" % dbname)
        conn.close()
        raise err


if __name__ == "__main__":
    test_stream(connect())

Insert with line protocol

import taos

conn = taos.connect()
dbname = "pytest_line"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s precision 'us'" % dbname)
conn.select_db(dbname)

lines = [
    'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"pass",c2=false,c4=4f64 1626006833639000000ns',
    'st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"pass it again",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000',
    'stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"pass it again_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000',
]
conn.schemaless_insert(lines, 0, "ns")
print("inserted")

lines = [
    'stf,t1=5i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"pass it again_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000',
]
conn.schemaless_insert(lines, 0, "ns")

result = conn.query("show tables")
for row in result:
    print(row)
result.close()


conn.exec("drop database if exists %s" % dbname)
conn.close()

License - AGPL-3.0

Keep same with TDengine.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

taospy-2.1.1.tar.gz (23.5 kB view details)

Uploaded Source

Built Distribution

taospy-2.1.1-py2.py3-none-any.whl (27.3 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file taospy-2.1.1.tar.gz.

File metadata

  • Download URL: taospy-2.1.1.tar.gz
  • Upload date:
  • Size: 23.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.2.0a2 CPython/3.8.10 Linux/5.4.0-89-generic

File hashes

Hashes for taospy-2.1.1.tar.gz
Algorithm Hash digest
SHA256 60b4ef487b6b9a21fc7b6b958b696956d2e48080dbc3376cd77041fa29620726
MD5 fc8cbf0c9ec035ac761d5542db42921a
BLAKE2b-256 fe95bf15a3629bdd2a552d14bc28fab1439694f9471739e2f7f540a31ed3f746

See more details on using hashes here.

File details

Details for the file taospy-2.1.1-py2.py3-none-any.whl.

File metadata

  • Download URL: taospy-2.1.1-py2.py3-none-any.whl
  • Upload date:
  • Size: 27.3 kB
  • Tags: Python 2, Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.2.0a2 CPython/3.8.10 Linux/5.4.0-89-generic

File hashes

Hashes for taospy-2.1.1-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 1e2ec4b4dd1a3296244523c59220c5dae96dc1089287512e79d8d05340ee2a35
MD5 e75c786165c7d13a44f4bcc77ef63661
BLAKE2b-256 0c2c545ffb86950eb9bad4f3f939f3a939e071a3a95b6124722680d0d84d287e

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page