Skip to main content

SDK for volcengine

Project description

volcengine data/predict api sdk, python version

import uuid
from datetime import datetime, timedelta

from byteair import ClientBuilder, Client
from byteair.protocol.volcengine_byteair_pb2 import *
from core import Region, Option, NetException, BizException, metrics

# 必传,租户id.
TENANT_ID = "xxx"
# 必传,应用id.
APPLICATION_ID = "xxx"
# 必传,密钥AK,获取方式:【火山引擎控制台】->【个人信息】->【密钥管理】中获取.
AK = "xxx"
# 必传,密钥SK,获取方式:【火山引擎控制台】->【个人信息】->【密钥管理】中获取.
SK = "xxx"

client: Client = ClientBuilder() \
    .tenant_id(TENANT_ID) \
    .application_id(APPLICATION_ID) \
    .ak(AK) \
    .sk(SK) \
    .region(Region.AIR_CN) \
    .build()
# metrics上报初始化.建议开启,方便火山侧排查问题.
metrics.init(())


def write():
    # 此处为测试数据,实际调用时需注意字段类型和格式
    data_list = [
        {
            "id": "item_id1",
            "title": "test_title1",
            "status": 0,
            "brand": "volcengine",
            "pub_time": 1583641807,
            "current_price": 1.1,
        },
        {
            "id": "item_id2",
            "title": "test_title2",
            "status": 1,
            "brand": "volcengine",
            "pub_time": 1583641503,
            "current_price": 2.2,
        }
    ]
    # topic为枚举值,请参考API文档
    topic = "item"
    # 传输天级数据
    opts = (
        # 预同步("pre_sync"), 历史数据同步("history_sync"), 增量天级同步("incremental_sync_daily"),
        # 增量实时同步("incremental_sync_streaming")
        Option.with_stage("pre_sync"),
        # 必传,数据产生日期,实际传输时需修改为实际日期
        Option.with_data_date(datetime(year=2022, month=1, day=1)),
        Option.with_timeout(timedelta(milliseconds=1000)),
        Option.with_request_id(str(uuid.uuid1())),
    )
    try:
        write_response = client.write_data(data_list, topic, *opts)
    except BizException as e:
        print("[write] occur err, msg: %s" % e)
        return
    if not write_response.status.success:
        print("[write] failure")
        return
    print("[write] success")
    return


def done():
    date_list = [datetime(year=2022, month=1, day=1)]
    # topic为枚举值,请参考API文档
    topic = "item"
    opts = (
        # 预同步("pre_sync"), 历史数据同步("history_sync"), 增量天级同步("incremental_sync_daily"),
        # 增量实时同步("incremental_sync_streaming")
        Option.with_stage("pre_sync"),
        Option.with_timeout(timedelta(milliseconds=1000)),
        Option.with_request_id(str(uuid.uuid1())),
    )
    try:
        done_response = client.done(date_list, topic, *opts)
    except BizException as e:
        print("[done] occur err, msg: %s" % e)
        return
    if not done_response.status.success:
        print("[done] failure")
        return
    print("[done] success")
    return


def predict():
    # 构造predict请求体
    predict_request = PredictRequest()
    user = predict_request.user
    user.uid = 'uid1'
    context = predict_request.context
    context.spm = "1$##$2$##$3$##$4"
    context.extra["extra_key"] = "extra_value"

    feature = context.feature
    feature.stringFeature["key"] = "value1"
    feature.stringFeature["key"] = "value2"
    feature.stringArrayFeature["array_key"].values.append("array_value1")
    feature.stringArrayFeature["array_key"].values.append("array_value2")
    filter = context.filter
    filter.stringFilter["key"] = "value"
    filter.stringArrayFilter["array_key"].values.append("array_value1")
    filter.stringArrayFilter["array_key"].values.append("array_value2")

    candidate_item1 = predict_request.candidateItems.add()
    candidate_item1.id = "item_id1"
    candidate_item2 = predict_request.candidateItems.add()
    candidate_item2.id = "item_id2"
    opts = (
        Option.with_request_id(str(uuid.uuid1())),
        Option.with_scene("default"),
        Option.with_timeout(timedelta(milliseconds=1000)),
        Option.with_headers({"Enable-Spm-Route": "true"})
    )
    try:
        rsp = client.predict(predict_request, *opts)
    except (NetException, BizException) as e:
        print("[predict] occur error, msg: %s" % e)
        return
    if not rsp.success:
        print("[predict] failure")
        return
    print("[predict] success")


def callback():
    # 构造callback请求体
    callback_request = CallbackRequest()
    # 对应的predict请求的request id
    callback_request.predict_request_id = "xxx"
    # 对应的predict请求的uid
    callback_request.uid = "uid1"
    # 对应的predict请求的scene.
    callback_request.scene = "default"
    # 对应的predict请求的items列表
    callback_item1 = callback_request.items.add()
    callback_item1.id = "item_id1"
    callback_item1.pos = "position1"
    callback_item1.extra = "{\"reason\":\"exposure\"}"
    callback_item2 = callback_request.items.add()
    callback_item2.id = "item_id2"
    callback_item2.pos = "position2"
    callback_item2.extra = "{\"reason\":\"filter\"}"
    # callback请求上下文
    callback_context = CallbackContext()
    callback_context.spm = "1$##$2$##$3$##$4"
    callback_request.context = callback_context
    opts = (
        Option.with_request_id(str(uuid.uuid1())),
        Option.with_timeout(timedelta(milliseconds=1000)),
    )
    try:
        rsp = client.callback(callback_request, *opts)
    except (NetException, BizException) as e:
        print("[callback] occur error, msg: %s" % e)
        return
    if not rsp.success:
        print("[callback] failure")
        return
    print("[callback] success")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

volcengine_sdk_rec-1.3.0-py3-none-any.whl (44.6 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page