Fabrique local environment
Project description
Library for fabrique ml pipelines development.
You can copy pipeline project examples from here
import fabrique_atelier
print(fabrique_atelier.__path__[0] + '/examples')
Model decomposition steps
You can find this decomposed example in /examples/fake_lightgbm
1. We have samples and model inference code
# load samples
samp_dir = './samples'
filenames = [f for f in os.listdir(samp_dir) if os.path.isfile(f'{samp_dir}/{f}')]
samples = []
for filename in filenames:
with open(f'{samp_dir}/{filename}') as fp:
samples.append(fp.read())
sample is json string
{
"ts": 1622637967.6218433,
"uid": 147757848,
"number": "00000125",
"type": "out",
"traffic": {
"about": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"services": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"contacts": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"roaming": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"tariffs": [14, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"simcards": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"balance": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"internet": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"messaging": [0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"support": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]}
}
# load model
import lightgbm as lgb
bst = lgb.Booster(model_file='../model.txt') # init model
classes = ["about", "services", "contacts", "roaming", "tariffs",
"simcards", "balance", "internet", "messaging", "support"]
# simple inference code
import numpy as np
reference_results = [] # we will use this results for tests
for sample in samples:
## 1. extract features from json message
# 1.1. parse message and get features
mes = json.loads(sample)
features = np.array([mes['traffic'][cls] for cls in classes]).flatten()
## 2. make prediction result
# 2.1 get prediction
pred_vals = bst.predict([features, ])[0]
# 2.2 normalize scores, get class
scores = pred_vals/pred_vals.sum()
reason = classes[scores.argmax()]
# 2.3 make and serialize message
scores_dict = {cls: round(scores[i], 2) for i, cls in enumerate(classes)}
res = dict(ts=mes['ts'], uid=mes['uid'], number=mes['number'], classes=scores_dict, reason=reason)
reference_results.append(json.dumps(res, indent=2))
2. Split inference into feature extraction and lightgbm prediction
# splitted inference code
splitted_results = []
feature_msgs = []
for sample in samples:
## 1. extract features from json message
# 1.1. parse message and get features
mes = json.loads(sample)
features = np.array([mes['traffic'][cls] for cls in classes]).flatten()
# 1.2. create intermediate message
data = dict(ts=mes['ts'], uid=mes['uid'], number=mes['number'], features=features.tolist())
feature_msgs.append(data)
## 2. make prediction result
features_batch = np.array([data['features'] for data in feature_msgs])
# 2.1 get prediction
pred_batch = bst.predict(features_batch) # microbatch prediction
for i, mes in enumerate(feature_msgs):
scores = pred_batch[i]
# 2.2 normalize scores, get class
try:
scores = scores / scores.sum() # try to normalize
except:
pass
reason_num = scores.argmax()
reason = classes[reason_num]
# 2.3 make and serialize message
scores_dict = {cls: round(scores[i], 2) for i, cls in enumerate(classes)}
res = dict(ts=mes['ts'], uid=mes['uid'], number=mes['number'], classes=scores_dict, reason=reason)
splitted_results.append(json.dumps(res, indent=2))
check if results are equal
assert set(splitted_results) == set(reference_results)
3. Make pipeline.py
from fabrique_atelier.actors import Pipeline, Processor
import json
import numpy as np
import lightgbm as lgb
class ExtractFeatures(Processor):
def __init__(self):
self.classes = ["about", "services", "contacts", "roaming", "tariffs",
"simcards", "balance", "internet", "messaging", "support"]
def get_result(self, body):
## 1. extract features from json message
# 1.1. parse message and get features
mes = json.loads(body['data'])
features = np.array([mes['traffic'][cls] for cls in self.classes]).flatten()
# 1.2. create intermediate message
data = dict(ts=mes['ts'], uid=mes['uid'], number=mes['number'], features=features.tolist())
return {'data': data}
class ScoringModel(Processor):
def __init__(self):
self.classes = ["about", "services", "contacts", "roaming", "tariffs",
"simcards", "balance", "internet", "messaging", "support"]
self.bst = lgb.Booster(model_file='./model.txt') # init model
def get_batch_result(self, batch):
## 2. make prediction result
features_batch = np.array([body['data']['features'] for body in batch])
# 2.1 get prediction
pred_batch = self.bst.predict(features_batch) # microbatch prediction
out_batch = []
for i, body in enumerate(batch):
in_data = body['data']
scores = pred_batch[i]
# 2.2 normalize scores, get class
try:
scores = scores / scores.sum() # try to normalize
except:
pass
reason_num = scores.argmax()
reason = self.classes[reason_num]
# 2.3 make and serialize message
scores_dict = {cls: round(scores[i], 2) for i, cls in enumerate(self.classes)}
out_data = dict(ts=in_data['ts'], uid=in_data['uid'], number=in_data['number'],
classes=scores_dict, reason=reason)
out_body = dict(data=json.dumps(out_data).encode(), metrics={"reason_num": int(reason_num)})
out_batch.append(out_body)
return out_batch
# topology
pipeline = Pipeline(['extractor', 'model'])
ids = pipeline.ids
nodes = pipeline.nodes
nodes.extractor = ExtractFeatures.to(ids.model)
nodes.model = ScoringModel.to(ids.end)
4. Run emulation and check results
run ./dev/start_pipeline.py with this code
from pipeline import pipeline
# load samples
## code from first decomposition step above ##
# start emulation
pipeline.start_emulation(samples)
# simple inference code
## code from first decomposition step above ##
# load results of emulation
result_dir = './out/data'
filenames = [f for f in os.listdir(result_dir) if os.path.isfile(f'{result_dir}/{f}')]
results = []
for filename in filenames:
with open(f'{result_dir}/{filename}') as fp:
results.append(fp.read())
#check if results are equal
assert set(results) == set(reference_results)
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
No source distribution files available for this release.See tutorial on generating distribution archives.
Built Distribution
Close
Hashes for fabrique_atelier-5.0.9-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | b67370fb92d07f8cff74a5c35209c4e22630104de494cae571def30c0413b9d3 |
|
MD5 | 025ccd305d971bdbd706819668ea0d83 |
|
BLAKE2b-256 | 8450a056254f52ad0daf1e585258425c330be7007449d76bd9af514912859f26 |