SDK for volcengine
Project description
volcengine data/predict api sdk, python version
import uuid
from datetime import datetime, timedelta
from byteair import ClientBuilder, Client
from byteair.protocol.volcengine_byteair_pb2 import *
from core import Option, NetException, BizException, metrics, MetricsOption
# 必传,租户id.
TENANT_ID = "xxx"
# 必传,应用id.
APPLICATION_ID = "xxx"
# 必传,密钥AK.
AK = "xxx"
# 必传,密钥SK.
SK = "xxx"
# 必传,域名.
HOST = "xxx"
client: Client = ClientBuilder() \
.tenant_id(TENANT_ID) \
.application_id(APPLICATION_ID) \
.ak(AK) \
.sk(SK) \
.hosts([HOST]) \
.build()
# metrics上报初始化.建议开启,方便火山侧排查问题.
metrics.init((MetricsOption.with_metrics_domain(HOST),))
def write():
# 此处为测试数据,实际调用时需注意字段类型和格式
data_list = [
{
"id": "item_id1",
"title": "test_title1",
"status": 0,
"brand": "volcengine",
"pub_time": 1583641807,
"current_price": 1.1,
},
{
"id": "item_id2",
"title": "test_title2",
"status": 1,
"brand": "volcengine",
"pub_time": 1583641503,
"current_price": 2.2,
}
]
# topic为枚举值,请参考API文档
topic = "item"
# 传输天级数据
opts = (
# 预同步("pre_sync"), 历史数据同步("history_sync"), 增量天级同步("incremental_sync_daily"),
# 增量实时同步("incremental_sync_streaming")
Option.with_stage("pre_sync"),
# 必传,数据产生日期,实际传输时需修改为实际日期
Option.with_data_date(datetime(year=2022, month=1, day=1)),
Option.with_timeout(timedelta(milliseconds=1000)),
Option.with_request_id(str(uuid.uuid1())),
)
try:
write_response = client.write_data(data_list, topic, *opts)
except BizException as e:
print("[write] occur err, msg: %s" % e)
return
if not write_response.status.success:
print("[write] failure")
return
print("[write] success")
return
def done():
date_list = [datetime(year=2022, month=1, day=1)]
# topic为枚举值,请参考API文档
topic = "item"
opts = (
# 预同步("pre_sync"), 历史数据同步("history_sync"), 增量天级同步("incremental_sync_daily"),
# 增量实时同步("incremental_sync_streaming")
Option.with_stage("pre_sync"),
Option.with_timeout(timedelta(milliseconds=1000)),
Option.with_request_id(str(uuid.uuid1())),
)
try:
done_response = client.done(date_list, topic, *opts)
except BizException as e:
print("[done] occur err, msg: %s" % e)
return
if not done_response.status.success:
print("[done] failure")
return
print("[done] success")
return
def predict():
# 构造predict请求体
predict_request = PredictRequest()
user = predict_request.user
user.uid = 'uid1'
context = predict_request.context
context.spm = "1$##$2$##$3$##$4"
context.extra["extra_key"] = "extra_value"
feature = context.feature
feature.stringFeature["key"] = "value1"
feature.stringFeature["key"] = "value2"
feature.stringArrayFeature["array_key"].values.append("array_value1")
feature.stringArrayFeature["array_key"].values.append("array_value2")
filter = context.filter
filter.stringFilter["key"] = "value"
filter.stringArrayFilter["array_key"].values.append("array_value1")
filter.stringArrayFilter["array_key"].values.append("array_value2")
candidate_item1 = predict_request.candidateItems.add()
candidate_item1.id = "item_id1"
candidate_item2 = predict_request.candidateItems.add()
candidate_item2.id = "item_id2"
opts = (
Option.with_request_id(str(uuid.uuid1())),
Option.with_scene("default"),
Option.with_timeout(timedelta(milliseconds=1000)),
Option.with_headers({"Enable-Spm-Route": "true"})
)
try:
rsp = client.predict(predict_request, *opts)
except (NetException, BizException) as e:
print("[predict] occur error, msg: %s" % e)
return
if not rsp.success:
print("[predict] failure")
return
print("[predict] success")
def callback():
# 构造callback请求体
callback_request = CallbackRequest()
# 对应的predict请求的request id
callback_request.predict_request_id = "xxx"
# 对应的predict请求的uid
callback_request.uid = "uid1"
# 对应的predict请求的scene.
callback_request.scene = "default"
# 对应的predict请求的items列表
callback_item1 = callback_request.items.add()
callback_item1.id = "item_id1"
callback_item1.pos = "position1"
callback_item1.extra = "{\"reason\":\"exposure\"}"
callback_item2 = callback_request.items.add()
callback_item2.id = "item_id2"
callback_item2.pos = "position2"
callback_item2.extra = "{\"reason\":\"filter\"}"
# callback请求上下文
callback_context = CallbackContext()
callback_context.spm = "1$##$2$##$3$##$4"
callback_request.context = callback_context
opts = (
Option.with_request_id(str(uuid.uuid1())),
Option.with_timeout(timedelta(milliseconds=1000)),
)
try:
rsp = client.callback(callback_request, *opts)
except (NetException, BizException) as e:
print("[callback] occur error, msg: %s" % e)
return
if not rsp.success:
print("[callback] failure")
return
print("[callback] success")
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
No source distribution files available for this release.See tutorial on generating distribution archives.
Built Distribution
File details
Details for the file volcengine_sdk_rec-1.3.3-py3-none-any.whl
.
File metadata
- Download URL: volcengine_sdk_rec-1.3.3-py3-none-any.whl
- Upload date:
- Size: 44.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9cadd10f45c66f5711ae546d562265c67d66a153a688d06b207e353376861c54 |
|
MD5 | 280db498c6a5df069f36e082e72ced11 |
|
BLAKE2b-256 | 93423762ede2a5d428db3c3a697f615869271372ece5b4bf32e630b04b880aa5 |