Skip to main content

SDK for volcengine

Project description

volcengine data/predict api sdk, python version

import uuid
from datetime import datetime, timedelta

from byteair import ClientBuilder, Client
from byteair.protocol.volcengine_byteair_pb2 import *
from core import Option, NetException, BizException, metrics, MetricsOption

# 必传,租户id.
TENANT_ID = "xxx"
# 必传,应用id.
APPLICATION_ID = "xxx"
# 必传,密钥AK.
AK = "xxx"
# 必传,密钥SK.
SK = "xxx"
# 必传,域名.
HOST = "xxx"

client: Client = ClientBuilder() \
    .tenant_id(TENANT_ID) \
    .application_id(APPLICATION_ID) \
    .ak(AK) \
    .sk(SK) \
    .hosts([HOST]) \
    .build()
# metrics上报初始化.建议开启,方便火山侧排查问题.
metrics.init((MetricsOption.with_metrics_domain(HOST),))


def write():
    # 此处为测试数据,实际调用时需注意字段类型和格式
    data_list = [
        {
            "id": "item_id1",
            "title": "test_title1",
            "status": 0,
            "brand": "volcengine",
            "pub_time": 1583641807,
            "current_price": 1.1,
        },
        {
            "id": "item_id2",
            "title": "test_title2",
            "status": 1,
            "brand": "volcengine",
            "pub_time": 1583641503,
            "current_price": 2.2,
        }
    ]
    # topic为枚举值,请参考API文档
    topic = "item"
    # 传输天级数据
    opts = (
        # 预同步("pre_sync"), 历史数据同步("history_sync"), 增量天级同步("incremental_sync_daily"),
        # 增量实时同步("incremental_sync_streaming")
        Option.with_stage("pre_sync"),
        # 必传,数据产生日期,实际传输时需修改为实际日期
        Option.with_data_date(datetime(year=2022, month=1, day=1)),
        Option.with_timeout(timedelta(milliseconds=1000)),
        Option.with_request_id(str(uuid.uuid1())),
    )
    try:
        write_response = client.write_data(data_list, topic, *opts)
    except BizException as e:
        print("[write] occur err, msg: %s" % e)
        return
    if not write_response.status.success:
        print("[write] failure")
        return
    print("[write] success")
    return


def done():
    date_list = [datetime(year=2022, month=1, day=1)]
    # topic为枚举值,请参考API文档
    topic = "item"
    opts = (
        # 预同步("pre_sync"), 历史数据同步("history_sync"), 增量天级同步("incremental_sync_daily"),
        # 增量实时同步("incremental_sync_streaming")
        Option.with_stage("pre_sync"),
        Option.with_timeout(timedelta(milliseconds=1000)),
        Option.with_request_id(str(uuid.uuid1())),
    )
    try:
        done_response = client.done(date_list, topic, *opts)
    except BizException as e:
        print("[done] occur err, msg: %s" % e)
        return
    if not done_response.status.success:
        print("[done] failure")
        return
    print("[done] success")
    return


def predict():
    # 构造predict请求体
    predict_request = PredictRequest()
    user = predict_request.user
    user.uid = 'uid1'
    context = predict_request.context
    context.spm = "1$##$2$##$3$##$4"
    context.extra["extra_key"] = "extra_value"
    
    feature = context.feature
    feature.stringFeature["key"] = "value1"
    feature.stringFeature["key"] = "value2"
    feature.stringArrayFeature["array_key"].values.append("array_value1")
    feature.stringArrayFeature["array_key"].values.append("array_value2")
    filter = context.filter
    filter.stringFilter["key"] = "value"
    filter.stringArrayFilter["array_key"].values.append("array_value1")
    filter.stringArrayFilter["array_key"].values.append("array_value2")
    
    candidate_item1 = predict_request.candidateItems.add()
    candidate_item1.id = "item_id1"
    candidate_item2 = predict_request.candidateItems.add()
    candidate_item2.id = "item_id2"
    opts = (
        Option.with_request_id(str(uuid.uuid1())),
        Option.with_scene("default"),
        Option.with_timeout(timedelta(milliseconds=1000)),
        Option.with_headers({"Enable-Spm-Route": "true"})
    )
    try:
        rsp = client.predict(predict_request, *opts)
    except (NetException, BizException) as e:
        print("[predict] occur error, msg: %s" % e)
        return
    if not rsp.success:
        print("[predict] failure")
        return
    print("[predict] success")


def callback():
    # 构造callback请求体
    callback_request = CallbackRequest()
    # 对应的predict请求的request id
    callback_request.predict_request_id = "xxx"
    # 对应的predict请求的uid
    callback_request.uid = "uid1"
    # 对应的predict请求的scene.
    callback_request.scene = "default"
    # 对应的predict请求的items列表
    callback_item1 = callback_request.items.add()
    callback_item1.id = "item_id1"
    callback_item1.pos = "position1"
    callback_item1.extra = "{\"reason\":\"exposure\"}"
    callback_item2 = callback_request.items.add()
    callback_item2.id = "item_id2"
    callback_item2.pos = "position2"
    callback_item2.extra = "{\"reason\":\"filter\"}"
    # callback请求上下文
    callback_context = CallbackContext()
    callback_context.spm = "1$##$2$##$3$##$4"
    callback_request.context = callback_context
    opts = (
        Option.with_request_id(str(uuid.uuid1())),
        Option.with_timeout(timedelta(milliseconds=1000)),
    )
    try:
        rsp = client.callback(callback_request, *opts)
    except (NetException, BizException) as e:
        print("[callback] occur error, msg: %s" % e)
        return
    if not rsp.success:
        print("[callback] failure")
        return
    print("[callback] success")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

volcengine_sdk_rec-1.3.3-py3-none-any.whl (44.7 kB view details)

Uploaded Python 3

File details

Details for the file volcengine_sdk_rec-1.3.3-py3-none-any.whl.

File metadata

File hashes

Hashes for volcengine_sdk_rec-1.3.3-py3-none-any.whl
Algorithm Hash digest
SHA256 9cadd10f45c66f5711ae546d562265c67d66a153a688d06b207e353376861c54
MD5 280db498c6a5df069f36e082e72ced11
BLAKE2b-256 93423762ede2a5d428db3c3a697f615869271372ece5b4bf32e630b04b880aa5

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page