Skip to main content

TDengine connector for python

Project description

TDengine Connector for Python

TDengine connector for Python enables python programs to access TDengine, using an API which is compliant with the Python DB API 2.0 (PEP-249). It uses TDengine C client library for client server communications.

Install

You can use pip to install from github or pypi:

pip install git+https://github.com/taosdata/taos-connector-python.git
pip install taos-py

Source Code

TDengine connector for Python source code is hosted on GitHub.

Examples

Query with PEP-249 API

import taos

conn = taos.connect()
cursor = conn.cursor()

cursor.execute("show databases")
results = cursor.fetchall()
for row in results:
    print(row)
cursor.close()
conn.close()

Query with objective API

import taos

conn = taos.connect()
conn.exec("create database if not exists pytest")

result = conn.query("show databases")
num_of_fields = result.field_count
for field in result.fields:
    print(field)
for row in result:
    print(row)
result.close()
conn.exec("drop database pytest")
conn.close()

Query with async API

from taos import *
from ctypes import *
import time

def fetch_callback(p_param, p_result, num_of_rows):
    print("fetched ", num_of_rows, "rows")
    p = cast(p_param, POINTER(Counter))
    result = TaosResult(p_result)

    if num_of_rows == 0:
        print("fetching completed")
        p.contents.done = True
        result.close()
        return
    if num_of_rows < 0:
        p.contents.done = True
        result.check_error(num_of_rows)
        result.close()
        return None
    
    for row in result.rows_iter(num_of_rows):
        # print(row)
        None
    p.contents.count += result.row_count
    result.fetch_rows_a(fetch_callback, p_param)
    


def query_callback(p_param, p_result, code):
    # type: (c_void_p, c_void_p, c_int) -> None
    if p_result == None:
        return
    result = TaosResult(p_result)
    if code == 0:
        result.fetch_rows_a(fetch_callback, p_param)
    result.check_error(code)


class Counter(Structure):
    _fields_ = [("count", c_int), ("done", c_bool)]

    def __str__(self):
        return "{ count: %d, done: %s }" % (self.count, self.done)


def test_query(conn):
    # type: (TaosConnection) -> None
    counter = Counter(count=0)
    conn.query_a("select * from log.log", query_callback, byref(counter))

    while not counter.done:
        print("wait query callback")
        time.sleep(1)
    print(counter)
    conn.close()


if __name__ == "__main__":
    test_query(connect())

Statement API - Bind row after row

from taos import *

conn = connect()

dbname = "pytest_taos_stmt"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)

conn.exec(
    "create table if not exists log(ts timestamp, bo bool, nil tinyint, \
        ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \
        su smallint unsigned, iu int unsigned, bu bigint unsigned, \
        ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)

stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")

params = new_bind_params(16)
params[0].timestamp(1626861392589)
params[1].bool(True)
params[2].null()
params[3].tinyint(2)
params[4].smallint(3)
params[5].int(4)
params[6].bigint(5)
params[7].tinyint_unsigned(6)
params[8].smallint_unsigned(7)
params[9].int_unsigned(8)
params[10].bigint_unsigned(9)
params[11].float(10.1)
params[12].double(10.11)
params[13].binary("hello")
params[14].nchar("stmt")
params[15].timestamp(1626861392589)
stmt.bind_param(params)

params[0].timestamp(1626861392590)
params[15].null()
stmt.bind_param(params)
stmt.execute()


result = stmt.use_result()
assert result.affected_rows == 2
result.close()

result = conn.query("select * from log")

for row in result:
    print(row)
result.close()
stmt.close()
conn.close()

Statement API - Bind multi rows

from taos import *

conn = connect()

dbname = "pytest_taos_stmt"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)

conn.exec(
    "create table if not exists log(ts timestamp, bo bool, nil tinyint, \
        ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \
        su smallint unsigned, iu int unsigned, bu bigint unsigned, \
        ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)

stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")

params = new_multi_binds(16)
params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
params[1].bool((True, None, False))
params[2].tinyint([-128, -128, None]) # -128 is tinyint null
params[3].tinyint([0, 127, None])
params[4].smallint([3, None, 2])
params[5].int([3, 4, None])
params[6].bigint([3, 4, None])
params[7].tinyint_unsigned([3, 4, None])
params[8].smallint_unsigned([3, 4, None])
params[9].int_unsigned([3, 4, None])
params[10].bigint_unsigned([3, 4, None])
params[11].float([3, None, 1])
params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None])
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591])
stmt.bind_param_batch(params)
stmt.execute()


result = stmt.use_result()
assert result.affected_rows == 3
result.close()

result = conn.query("select * from log")
for row in result:
    print(row)
result.close()
stmt.close()
conn.close()

Statement API - Subscribe

import taos

conn = taos.connect()
dbname = "pytest_taos_subscribe_callback"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec("create table if not exists log(ts timestamp, n int)")
for i in range(10):
    conn.exec("insert into log values(now, %d)" % i)

sub = conn.subscribe(True, "test", "select * from log", 1000)
print("# consume from begin")
for ts, n in sub.consume():
    print(ts, n)

print("# consume new data")
for i in range(5):
    conn.exec("insert into log values(now, %d)(now+1s, %d)" % (i, i))
    result = sub.consume()
    for ts, n in result:
        print(ts, n)

print("# consume with a stop condition")
for i in range(10):
    conn.exec("insert into log values(now, %d)" % int(random() * 10))
    result = sub.consume()
    try:
        ts, n = next(result)
        print(ts, n)
        if n > 5:
            result.stop_query()
            print("## stopped")
            break
    except StopIteration:
        continue

sub.close()

conn.exec("drop database if exists %s" % dbname)
conn.close()

Statement API - Subscribe asynchronously with callback

from taos import *
from ctypes import *

import time


def subscribe_callback(p_sub, p_result, p_param, errno):
    # type: (c_void_p, c_void_p, c_void_p, c_int) -> None
    print("# fetch in callback")
    result = TaosResult(p_result)
    result.check_error(errno)
    for row in result.rows_iter():
        ts, n = row()
        print(ts, n)


def test_subscribe_callback(conn):
    # type: (TaosConnection) -> None
    dbname = "pytest_taos_subscribe_callback"
    try:
        conn.exec("drop database if exists %s" % dbname)
        conn.exec("create database if not exists %s" % dbname)
        conn.select_db(dbname)
        conn.exec("create table if not exists log(ts timestamp, n int)")

        print("# subscribe with callback")
        sub = conn.subscribe(False, "test", "select * from log", 1000, subscribe_callback)

        for i in range(10):
            conn.exec("insert into log values(now, %d)" % i)
            time.sleep(0.7)
        sub.close()

        conn.exec("drop database if exists %s" % dbname)
        conn.close()
    except Exception as err:
        conn.exec("drop database if exists %s" % dbname)
        conn.close()
        raise err


if __name__ == "__main__":
    test_subscribe_callback(connect())

Statement API - Stream

from taos import *
from ctypes import *

def stream_callback(p_param, p_result, p_row):
    # type: (c_void_p, c_void_p, c_void_p) -> None

    if p_result == None or p_row == None:
        return
    result = TaosResult(p_result)
    row = TaosRow(result, p_row)
    try:
        ts, count = row()
        p = cast(p_param, POINTER(Counter))
        p.contents.count += count
        print("[%s] inserted %d in 5s, total count: %d" % (ts.strftime("%Y-%m-%d %H:%M:%S"), count, p.contents.count))

    except Exception as err:
        print(err)
        raise err


class Counter(ctypes.Structure):
    _fields_ = [
        ("count", c_int),
    ]

    def __str__(self):
        return "%d" % self.count


def test_stream(conn):
    # type: (TaosConnection) -> None
    dbname = "pytest_taos_stream"
    try:
        conn.exec("drop database if exists %s" % dbname)
        conn.exec("create database if not exists %s" % dbname)
        conn.select_db(dbname)
        conn.exec("create table if not exists log(ts timestamp, n int)")

        result = conn.query("select count(*) from log interval(5s)")
        assert result.field_count == 2
        counter = Counter()
        counter.count = 0
        stream = conn.stream("select count(*) from log interval(5s)", stream_callback, param=byref(counter))

        for _ in range(0, 20):
            conn.exec("insert into log values(now,0)(now+1s, 1)(now + 2s, 2)")
            time.sleep(2)
        stream.close()
        conn.exec("drop database if exists %s" % dbname)
        conn.close()
    except Exception as err:
        conn.exec("drop database if exists %s" % dbname)
        conn.close()
        raise err


if __name__ == "__main__":
    test_stream(connect())

Insert with line protocol

import taos

conn = taos.connect()
dbname = "pytest_line"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s precision 'us'" % dbname)
conn.select_db(dbname)

lines = [
    'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"pass",c2=false,c4=4f64 1626006833639000000ns',
    'st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"pass it again",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns',
    'stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"pass it again_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000ns',
]
conn.insert_lines(lines)
print("inserted")

lines = [
    'stf,t1=5i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"pass it again_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000ns',
]
conn.insert_lines(lines)

result = conn.query("show tables")
for row in result:
    print(row)
result.close()


conn.exec("drop database if exists %s" % dbname)
conn.close()

License - AGPL-3.0

Keep same with TDengine.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

taospy-0.0.0rc2.tar.gz (22.7 kB view details)

Uploaded Source

Built Distribution

taospy-0.0.0rc2-py2.py3-none-any.whl (26.2 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file taospy-0.0.0rc2.tar.gz.

File metadata

  • Download URL: taospy-0.0.0rc2.tar.gz
  • Upload date:
  • Size: 22.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.7 CPython/3.8.10 Linux/5.4.0-80-generic

File hashes

Hashes for taospy-0.0.0rc2.tar.gz
Algorithm Hash digest
SHA256 6cf0c3226f7dbe5f84e63b4fa8e46b524a0ed376b160e9b891c4a55c10145ee8
MD5 1e35d0e46405ea7752af40df01ae1323
BLAKE2b-256 deca575c2f34943e70f4f6430b4b0ce9753abf07a46e459f2c0af38425874e5f

See more details on using hashes here.

File details

Details for the file taospy-0.0.0rc2-py2.py3-none-any.whl.

File metadata

  • Download URL: taospy-0.0.0rc2-py2.py3-none-any.whl
  • Upload date:
  • Size: 26.2 kB
  • Tags: Python 2, Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.7 CPython/3.8.10 Linux/5.4.0-80-generic

File hashes

Hashes for taospy-0.0.0rc2-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 d5a1da75ccdda4d0fef74073ede0a9241f800ab29c8d4373b5901f6596e4c9eb
MD5 3b552b33aef9fd2967744c22595b21cb
BLAKE2b-256 44b0077f48a2f260c3236be1591d223d62569690a25d47421f6b5dce137e51b0

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page