Timeplus python SDK
Project description
Timeplus Python Client
Timeplus is a real-time streaming data analytic platform. Timeplus python client provides basic functionalities to interact with Timeplus cloud, to manage the streaming analytic work loads.
Installation
Proton Python Driver currently supports the following versions of Python: 3.8, 3.9, and 3.10.
Installing with pip
We recommend creating a virtual environment when installing Python dependencies. For more information on setting up a virtual environment, see the Python documentation
pip install timeplus
3 API
The timeplus
python libary supports 3 API:
- DB API as defined in Python Database API Specification v2.0.
- SQLAchamy
- REST API
DB API
Timeplus python client support DB API defined in Python Database API Specification v2.0.
from timeplus.dbapi import connect
api_key = "your_timeplus_apikey"
api_address = "us.timeplus.cloud"
workspace = "your_timeplus_workspace_id"
# create a connection using host/password/workspace
conn = connect(host=api_address, password=api_key, path=workspace)
# run a streaming query
cursor = conn.execute("select * from car_live_data")
# get first result from the qeury
next_result = cursor.next()
# get next one result from the qeury
row1 = cursor.fetchone()
# get next three result
rows = cursor.fetchmany(3)
Note, as the streaming query result is unbounded, the cursor will not end, fetch will be blocked is there is no new query result.
SQLAchamy
Timeplus python client has implemeted a SQLAlchemy dialect to run queries, so user can use it with SQLAlchemy API.
import os
from sqlalchemy import create_engine, text, select, MetaData, Table
from sqlalchemy.dialects import registry
# register timeplus dialect
registry.register("timeplus", "timeplus.sqlalchemy", "TimeplusDialect")
api_key = os.environ.get("TIMEPLUS_API_KEY")
api_address = "dev.timeplus.cloud"
port = 443
workspace = os.environ.get("TIMEPLUS_WORKSPACE") or "tp-demo"
# create a new engine
engine = create_engine(
f"timeplus://:{api_key}@{api_address}:{port}/{workspace}")
# execute streaming sql
with engine.connect() as connection:
result = connection.execute(text("select * from car_live_data"))
count = 0
max = 10
for row in result:
print(f"got one row : {row}")
count += 1
if count >= max:
break
# execute statement using table from metadata
metadata_obj = MetaData()
car_table = Table(table_name, metadata_obj, autoload_with=engine)
print(f"reflected table is {car_table}")
print(f"cols is {[ (c.name, c.type) for c in car_table.columns]}")
stmt = select(car_table).where(car_table.c.cid == "c00001")
print(stmt)
with engine.connect() as conn:
count = 0
max = 3
for row in conn.execute(stmt):
print(f"got one row from query {row}")
count += 1
if count >= max:
break
REST API
Timeplus python client also provides resources wrapper which can be used to call the Timeplus REST API through python object.
here is a list of all supported resources
Resource | Supported Methods |
---|---|
Stream | create,list,get,delete,ingest,exist |
Query | create,list,get,delete,cancel,analyze |
Source | create,list,get,delete,start,stop |
Sink | create,list,get,delete,start,stop |
View | create,list,get,delete,exist |
UDF | list |
Alert | list |
Dashboard | list |
query
Run streaming query and fetch the query result with query metrics.
import os
import traceback
import json
from pprint import pprint
from timeplus import Query, Environment
api_key = os.environ.get("TIMEPLUS_API_KEY")
api_address = os.environ.get("TIMEPLUS_HOST")
workspace = os.environ.get("TIMEPLUS_WORKSPACE")
# Configure API key and address
env = Environment().address(api_address).workspace(workspace).apikey(api_key)
try:
# list all qeuries
query_list = Query(env=env).list()
pprint(f"there are {len(query_list)} queries ")
# create a new query
query = (
Query(env=env).sql(query="SELECT * FROM car_live_data")
# .batching_policy(1000, 1000)
.create()
)
pprint(f"query with metadata {json.dumps(query.metadata())}")
# query header is the colume definitions of query result table
# it is a list of name/value pair
# for example : [{'name': 'in_use', 'type': 'bool'}, {'name': 'speed', 'type': 'float32'}]
query_header = query.header()
pprint(f"query with header {query.header()}")
# iterate query result
limit = 3
count = 0
# query.result() is an iterator which will pull all the query result in small batches
# the iterator will continously pulling query result
# for streaming query, the iterator will not end until user cancel the query
for event in query.result():
# metric event return result time query metrics
# a sample metrics event:
# {'count': 117, 'eps': 75, 'processing_time': 1560,
# 'last_event_time': 1686237113265, 'response_time': 861,
# 'scanned_rows': 117, 'scanned_bytes': 7605}
if event.event == "metrics":
pprint(json.loads(event.data))
# message event contains query result which is an array of arrays
# representing multiple query result rows
# a sample message event:
# [[True,-73.857],[False, 84.1]]
if event.event == "message":
pprint(json.loads(event.data))
count += 1
if count >= limit:
break
query.cancel()
query.delete()
except Exception as e:
pprint(e)
traceback.print_exc()
stream
Create/list/get/delete of streams
import os
import traceback
import json
from pprint import pprint
from timeplus import Stream, Environment
api_key = os.environ.get("TIMEPLUS_API_KEY")
api_address = os.environ.get("TIMEPLUS_HOST")
worksapce = os.environ.get("TIMEPLUS_WORKSAPCE")
# Configure API key and address
env = Environment().address(api_address).apikey(api_key).workspace(worksapce)
try:
# list all streams
stream_list = Stream(env=env).list()
pprint(f"there are {len(stream_list)} streams ")
# create a new stream
stream = (
Stream(env=env)
.name("test")
.column("time", "datetime64(3)")
.column("data", "string")
.create()
)
stream_list = Stream(env=env).list()
pprint(f"there are {len(stream_list)} streams after create")
pprint(f"created stream is {stream.metadata()}; type is {type(stream.metadata())}")
a_stream = Stream(env=env).name("test").get()
pprint(f"get stream is {a_stream.metadata()} ; type is {type(a_stream.metadata())}")
stream.delete()
stream_list = Stream(env=env).list()
pprint(f"there are {len(stream_list)} streams after delete")
except Exception as e:
pprint(e)
traceback.print_exc()
ingest
Ingest data into streams
default ingest
stream = (
Stream(env=env)
.name("test_ingest")
.column("time", "datetime64(3)")
.column("data", "string")
.create()
)
stream.ingest(["time", "data"], [[datetime.datetime.now(), "abcd"]])
ingest json streams
stream = (
Stream(env=env)
.name("test_ingest")
.column("a", "integer")
.column("b", "string")
.create()
)
payload = """
{"a":2,"b":"hello"}
{"a":1,"b":"world"}
"""
stream.ingest(payload=payload, format="streaming")
ingest one raw event with multiple lines
stream = Stream(env=env).name("test_ingest_raw").column("raw", "string").create()
payload = """
first line
second line
"""
stream.ingest(payload=payload, format="raw")
ingest multiple lines json
stream = Stream(env=env).name("test_ingest_lines").column("raw", "string").create()
payload = '{"a":1,"b":"world"}\n{"a":2,"b":"hello"}'
stream.ingest(payload=payload, format="lines")
Examples
More sample code can be found here
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file timeplus-1.4.1.tar.gz
.
File metadata
- Download URL: timeplus-1.4.1.tar.gz
- Upload date:
- Size: 104.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 36db4c76e9898dd83dc64fe8520d772bd51e98286292aa0c62d7f4cde49662c7 |
|
MD5 | ef51276314486b59966dc9ec57701bee |
|
BLAKE2b-256 | 37d868a19339bea2d6f691180ae13bb40df99efd08653e3b15fb67958a6bc75b |
File details
Details for the file timeplus-1.4.1-py3-none-any.whl
.
File metadata
- Download URL: timeplus-1.4.1-py3-none-any.whl
- Upload date:
- Size: 290.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 34961e1d9789d49e9a60caa42205b2b9d4d2acede4724037316c73c39c8d0268 |
|
MD5 | 87fa73c1491d060c10b96f9118bc190a |
|
BLAKE2b-256 | 764136cde8204e431f98de520ffb96836af0edd3915c8b66472b454b45c9c66f |