SDK for volcengine
Project description
volcengine data/predict api sdk, python version
import uuid
from datetime import datetime, timedelta
from byteair import ClientBuilder, Client
from byteair.protocol.volcengine_byteair_pb2 import *
from core import Region, Option, NetException, BizException, metrics
# 必传,租户id.
TENANT_ID = "xxx"
# 必传,应用id.
APPLICATION_ID = "xxx"
# 必传,密钥AK,获取方式:【火山引擎控制台】->【个人信息】->【密钥管理】中获取.
AK = "xxx"
# 必传,密钥SK,获取方式:【火山引擎控制台】->【个人信息】->【密钥管理】中获取.
SK = "xxx"
client: Client = ClientBuilder() \
.tenant_id(TENANT_ID) \
.application_id(APPLICATION_ID) \
.ak(AK) \
.sk(SK) \
.region(Region.AIR_CN) \
.build()
# metrics上报初始化.建议开启,方便火山侧排查问题.
metrics.init(())
def write():
# 此处为测试数据,实际调用时需注意字段类型和格式
data_list = [
{
"id": "item_id1",
"title": "test_title1",
"status": 0,
"brand": "volcengine",
"pub_time": 1583641807,
"current_price": 1.1,
},
{
"id": "item_id2",
"title": "test_title2",
"status": 1,
"brand": "volcengine",
"pub_time": 1583641503,
"current_price": 2.2,
}
]
# topic为枚举值,请参考API文档
topic = "item"
# 传输天级数据
opts = (
# 预同步("pre_sync"), 历史数据同步("history_sync"), 增量天级同步("incremental_sync_daily"),
# 增量实时同步("incremental_sync_streaming")
Option.with_stage("pre_sync"),
# 必传,数据产生日期,实际传输时需修改为实际日期
Option.with_data_date(datetime(year=2022, month=1, day=1)),
Option.with_timeout(timedelta(milliseconds=1000)),
Option.with_request_id(str(uuid.uuid1())),
)
try:
write_response = client.write_data(data_list, topic, *opts)
except BizException as e:
print("[write] occur err, msg: %s" % e)
return
if not write_response.status.success:
print("[write] failure")
return
print("[write] success")
return
def done():
date_list = [datetime(year=2022, month=1, day=1)]
# topic为枚举值,请参考API文档
topic = "item"
opts = (
# 预同步("pre_sync"), 历史数据同步("history_sync"), 增量天级同步("incremental_sync_daily"),
# 增量实时同步("incremental_sync_streaming")
Option.with_stage("pre_sync"),
Option.with_timeout(timedelta(milliseconds=1000)),
Option.with_request_id(str(uuid.uuid1())),
)
try:
done_response = client.done(date_list, topic, *opts)
except BizException as e:
print("[done] occur err, msg: %s" % e)
return
if not done_response.status.success:
print("[done] failure")
return
print("[done] success")
return
def predict():
# 构造predict请求体
predict_request = PredictRequest()
user = predict_request.user
user.uid = 'uid1'
context = predict_request.context
context.spm = "1$##$2$##$3$##$4"
context.extra["extra_key"] = "extra_value"
feature = context.feature
feature.stringFeature["key"] = "value1"
feature.stringFeature["key"] = "value2"
feature.stringArrayFeature["array_key"].values.append("array_value1")
feature.stringArrayFeature["array_key"].values.append("array_value2")
filter = context.filter
filter.stringFilter["key"] = "value"
filter.stringArrayFilter["array_key"].values.append("array_value1")
filter.stringArrayFilter["array_key"].values.append("array_value2")
candidate_item1 = predict_request.candidateItems.add()
candidate_item1.id = "item_id1"
candidate_item2 = predict_request.candidateItems.add()
candidate_item2.id = "item_id2"
opts = (
Option.with_request_id(str(uuid.uuid1())),
Option.with_scene("default"),
Option.with_timeout(timedelta(milliseconds=1000)),
Option.with_headers({"Enable-Spm-Route": "true"})
)
try:
rsp = client.predict(predict_request, *opts)
except (NetException, BizException) as e:
print("[predict] occur error, msg: %s" % e)
return
if not rsp.success:
print("[predict] failure")
return
print("[predict] success")
def callback():
# 构造callback请求体
callback_request = CallbackRequest()
# 对应的predict请求的request id
callback_request.predict_request_id = "xxx"
# 对应的predict请求的uid
callback_request.uid = "uid1"
# 对应的predict请求的scene.
callback_request.scene = "default"
# 对应的predict请求的items列表
callback_item1 = callback_request.items.add()
callback_item1.id = "item_id1"
callback_item1.pos = "position1"
callback_item1.extra = "{\"reason\":\"exposure\"}"
callback_item2 = callback_request.items.add()
callback_item2.id = "item_id2"
callback_item2.pos = "position2"
callback_item2.extra = "{\"reason\":\"filter\"}"
# callback请求上下文
callback_context = CallbackContext()
callback_context.spm = "1$##$2$##$3$##$4"
callback_request.context = callback_context
opts = (
Option.with_request_id(str(uuid.uuid1())),
Option.with_timeout(timedelta(milliseconds=1000)),
)
try:
rsp = client.callback(callback_request, *opts)
except (NetException, BizException) as e:
print("[callback] occur error, msg: %s" % e)
return
if not rsp.success:
print("[callback] failure")
return
print("[callback] success")
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
No source distribution files available for this release.
See tutorial on generating distribution archives.
Built Distribution
Close
Hashes for volcengine_sdk_rec-1.3.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | cde2ae7a7b6cf2cc1e59040d29e81313859705a6703f943a8dde52bb0335e2f3 |
|
MD5 | fa2aef108287b29035e767454e2f04aa |
|
BLAKE2b-256 | a2fd1e5f5858976f5cb64529dad278febf6aba1844150ecff9e7cf8223ccefd1 |