Skip to main content

Fabrique local environment

Project description

Library for fabrique ml pipelines development.

You can copy pipeline project examples from here

import fabrique_atelier
print(fabrique_atelier.__path__[0] + '/examples')

Model decomposition steps

You can find this decomposed example in /examples/fake_lightgbm

1. We have samples and model inference code

# load samples
samp_dir = './samples'
filenames = [f for f in os.listdir(samp_dir) if os.path.isfile(f'{samp_dir}/{f}')]
samples = []
for filename in filenames:
    with open(f'{samp_dir}/{filename}') as fp:
        samples.append(fp.read())

sample is json string

{
"ts": 1622637967.6218433,
"uid": 147757848,
"number": "00000125",
"type": "out",
"traffic": {
  "about": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
  "services": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
  "contacts": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
  "roaming": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
  "tariffs": [14, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
  "simcards": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
  "balance": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
  "internet": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
  "messaging": [0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0],
  "support": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]}
}
# load model
import lightgbm as lgb
bst = lgb.Booster(model_file='../model.txt')  # init model

classes = ["about", "services", "contacts", "roaming", "tariffs",
           "simcards", "balance", "internet", "messaging", "support"]
# simple inference code
import numpy as np


reference_results = []  # we will use this results for tests

for sample in samples:
    ## 1. extract features from json message

    # 1.1. parse message and get features
    mes = json.loads(sample)
    features = np.array([mes['traffic'][cls] for cls in classes]).flatten()

    ## 2. make prediction result

    # 2.1 get prediction
    pred_vals = bst.predict([features, ])[0]

    # 2.2 normalize scores, get class
    scores = pred_vals/pred_vals.sum()
    reason = classes[scores.argmax()]

    # 2.3 make and serialize message
    scores_dict = {cls: round(scores[i], 2) for i, cls in enumerate(classes)}
    res = dict(ts=mes['ts'], uid=mes['uid'], number=mes['number'], classes=scores_dict, reason=reason)
    reference_results.append(json.dumps(res, indent=2))

2. Split inference into feature extraction and lightgbm prediction

# splitted inference code

splitted_results = []

feature_msgs = []

for sample in samples:
    ## 1. extract features from json message

    # 1.1. parse message and get features
    mes = json.loads(sample)
    features = np.array([mes['traffic'][cls] for cls in classes]).flatten()

    # 1.2. create intermediate message
    data = dict(ts=mes['ts'], uid=mes['uid'], number=mes['number'], features=features.tolist())

    feature_msgs.append(data)



## 2. make prediction result

features_batch = np.array([data['features'] for data in feature_msgs])

# 2.1 get prediction
pred_batch = bst.predict(features_batch) # microbatch prediction

for i, mes in enumerate(feature_msgs):
    scores = pred_batch[i]

    # 2.2 normalize scores, get class
    try:
        scores = scores / scores.sum() # try to normalize
    except:
        pass

    reason_num = scores.argmax()
    reason = classes[reason_num]


    # 2.3 make and serialize message
    scores_dict = {cls: round(scores[i], 2) for i, cls in enumerate(classes)}
    res = dict(ts=mes['ts'], uid=mes['uid'], number=mes['number'], classes=scores_dict, reason=reason)
    splitted_results.append(json.dumps(res, indent=2))

check if results are equal

assert set(splitted_results) == set(reference_results)

3. Make pipeline.py

from fabrique_atelier.actors import Pipeline, Processor

import json
import numpy as np
import lightgbm as lgb


class ExtractFeatures(Processor):
    def __init__(self):
        self.classes = ["about", "services", "contacts", "roaming", "tariffs",
                        "simcards", "balance", "internet", "messaging", "support"]

    def get_result(self, body):
        ## 1. extract features from json message
        # 1.1. parse message and get features
        mes = json.loads(body['data'])
        features = np.array([mes['traffic'][cls] for cls in self.classes]).flatten()
        # 1.2. create intermediate message
        data = dict(ts=mes['ts'], uid=mes['uid'], number=mes['number'], features=features.tolist())

        return {'data': data}


class ScoringModel(Processor):
    def __init__(self):
        self.classes = ["about", "services", "contacts", "roaming", "tariffs",
                        "simcards", "balance", "internet", "messaging", "support"]

        self.bst = lgb.Booster(model_file='./model.txt')  # init model

    def get_batch_result(self, batch):
        ## 2. make prediction result
        features_batch = np.array([body['data']['features'] for body in batch])
        # 2.1 get prediction
        pred_batch = self.bst.predict(features_batch)  # microbatch prediction

        out_batch = []
        for i, body in enumerate(batch):
            in_data = body['data']
            scores = pred_batch[i]
            # 2.2 normalize scores, get class
            try:
                scores = scores / scores.sum() # try to normalize
            except:
                pass

            reason_num = scores.argmax()
            reason = self.classes[reason_num]

            # 2.3 make and serialize message
            scores_dict = {cls: round(scores[i], 2) for i, cls in enumerate(self.classes)}
            out_data = dict(ts=in_data['ts'], uid=in_data['uid'], number=in_data['number'],
                            classes=scores_dict, reason=reason)
            out_body = dict(data=json.dumps(out_data).encode(), metrics={"reason_num": int(reason_num)})
            out_batch.append(out_body)

        return out_batch


# topology
pipeline = Pipeline(['extractor', 'model'])

ids = pipeline.ids
nodes = pipeline.nodes

nodes.extractor = ExtractFeatures.to(ids.model)
nodes.model = ScoringModel.to(ids.end)

4. Run emulation and check results

run ./dev/start_pipeline.py with this code

from pipeline import pipeline

# load samples
## code from first decomposition step above ##

# start emulation
pipeline.start_emulation(samples)

# simple inference code
## code from first decomposition step above ##

# load results of emulation
result_dir = './out/data'
filenames = [f for f in os.listdir(result_dir) if os.path.isfile(f'{result_dir}/{f}')]
results = []
for filename in filenames:
    with open(f'{result_dir}/{filename}') as fp:
        results.append(fp.read())

#check if results are equal
assert set(results) == set(reference_results)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

fabrique_atelier-5.0.9-py3-none-any.whl (3.5 MB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page