No project description provided
Project description
pyraisdk
Dynamic Batching
Description
When we deploy a model in AML with GPU instance to provide inference service, if it occupies GPU for inferencing in each request separately, it will be quite inefficient. This is a shared module helping to collect data items from different requests, and inferencing batchly in a backend thread. This will considerably improve usage efficiency of GPU.
Usage Examples
Build YourModel
class inherited from pyraisdk.dynbatch.BaseModel
.
from typing import List
from pyraisdk.dynbatch import BaseModel
class SimpleModel(BaseModel):
def predict(self, items: List[str]) -> List[int]:
rs = []
for item in items:
rs.append(len(item))
return rs
def preprocess(self, items: List[str]) -> List[str]:
rs = []
for item in items:
rs.append(f'[{item}]')
return rs
Initialize a pyraisdk.dynbatch.DynamicBatchModel
with YourModel
instance, and call predict / predict_one
for inferencing.
from pyraisdk.dynbatch import DynamicBatchModel
# prepare model
simple_model = SimpleModel()
batch_model = DynamicBatchModel(simple_model)
# predict
items = ['abc', '123456', 'xyzcccffaffaaa']
predictions = batch_model.predict(items)
assert predictions == [5, 8, 16]
# predict_one
item = 'abc'
prediction = batch_model.predict_one(item)
assert prediction == 5
Concurrent requests to predict / predict_one
, in different threads.
from threading import Thread
from pyraisdk.dynbatch import DynamicBatchModel
# prepare model
simple_model = SimpleModel()
batch_model = DynamicBatchModel(simple_model)
# thread run function
def run(name, num):
for step in range(num):
item = f'{name}-{step}'
prediction = batch_model.predict_one(item)
assert prediction == len(item) + 2
# start concurrent inference
threads = [Thread(target=run, args=(f'{tid}', 100)) for tid in range(20)]
for t in threads:
t.start()
for t in threads:
t.join()
Loging & Events
Description
This module is for logging and event tracing.
interface
def initialize(
eh_hostname: Optional[str] = None,
client_id: Optional[str] = None,
eh_conn_str: Optional[str] = None,
eh_structured: Optional[str] = None,
eh_unstructured: Optional[str] = None,
role: Optional[str] = None,
instance: Optional[str] = None,
)
Parameter description for initialize
:
- eh_hostname: Fully Qualified Namespace aka EH Endpoint URL (*.servicebus.windows.net). Default, read $EVENTHUB_NAMESPACE
- client_id: client_id of service principal. Default, read $UAI_CLIENT_ID
- eh_conn_str: connection string of eventhub namespace. Default, read $EVENTHUB_CONN_STRING
- eh_structured: structured eventhub name. Default, read $EVENTHUB_AUX_STRUCTURED
- eh_unstructured: unstructured eventhub name. Default, read $EVENTHUB_AUX_UNSTRUCTURED
- role: role, Default: RemoteModel_${ENDPOINT_NAME}
- instance: instance, Default: "${ENDPOINT_VERSION}|{os.uname()[1]}" or "${ENDPOINT_VERSION}|{_probably_unique_id()}"
def event(self, key: str, code: str, numeric: float, detail: str='', corr_id: str='', elem: int=-1)
def infof(self, format: str, *args: Any)
def infocf(self, corr_id: str, elem: int, format: str, *args: Any)
def warnf(self, format: str, *args: Any)
def warncf(self, corr_id: str, elem: int, format: str, *args: Any)
def errorf(self, format: str, *args: Any)
def errorcf(self, corr_id: str, elem: int, ex: Optional[Exception], format: str, *args: Any)
def fatalf(self, format: str, *args: Any)
def fatalcf(self, corr_id: str, elem: int, ex: Optional[Exception], format: str, *args: Any)
examples
# export EVENTHUB_AUX_UNSTRUCTURED='ehunstruct'
# export EVENTHUB_AUX_STRUCTURED='ehstruct'
# export UAI_CLIENT_ID='xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx'
# export EVENTHUB_NAMESPACE='xxx.servicebus.windows.net'
from pyraisdk import rlog
rlog.initialize()
rlog.infof('this is a info message %s', 123)
rlog.event('LifetimeEvent', 'STOP_GRACEFUL_SIGNAL', 0, 'detail info')
# export EVENTHUB_AUX_UNSTRUCTURED='ehunstruct'
# export EVENTHUB_AUX_STRUCTURED='ehstruct'
# export EVENTHUB_CONN_STRING='<connection string>'
from pyraisdk import rlog
rlog.initialize()
rlog.infocf('corrid', -1, 'this is a info message: %s', 123)
rlog.event('RequestDuration', '200', 0.01, 'this is duration in seconds')
from pyraisdk import rlog
rlog.initialize(eh_structured='ehstruct', eh_unstructured='ehunstruct', eh_conn_str='<eventhub-conn-str>')
rlog.errorcf('corrid', -1, Exception('error msg'), 'error message: %s %s', 1,2)
rlog.event('CpuUsage', '', 0.314, detail='cpu usage', corr_id='corrid', elem=-1)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for sdk_42946c74_f393_4856_9a84_41c4a80cba69-0.1.0.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 25985bec0bb5020163501dbbc40754d6f244fa3489f1972509e59fae0a156d18 |
|
MD5 | d19b7579c0cc808e590f393e100d6b5d |
|
BLAKE2b-256 | 9a45d4159a015fd80e8de1bb89492b22022da9c2f071b816df88f355dab02a46 |
Hashes for sdk_42946c74_f393_4856_9a84_41c4a80cba69-0.1.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 120e4f2b90f143ca6c2bda183871f9faf1431ac87e370df742a8dab1bf3ea892 |
|
MD5 | 88a3f442401ff2ae643c201b73e6f804 |
|
BLAKE2b-256 | 59c5689d677c9a8cc047f8c29157244163145174d42e0a51417509646383f589 |