Generic computation implementation on COINSTAC.
Project description
COINSTAC computations development made easy.
A very intuitive wrapper for writing coinstac based computations:
- Break down your computations into simple phases with automatic transition between phases.
- Add as many phases as you want.
- Phases alternate between local and remote automatically by default starting from the first phase of the local. See advanced use case example below for extras like:
- Run phases that needs to be run multiple local-remote trips; Specify
multi_iterations=True
while adding a phase. - Run phases that needs to be run either in local or remote without making a trip(like preprocessing, gathering final results ...); Specify
local_only=True
while adding a phase.
- Run phases that needs to be run multiple local-remote trips; Specify
- Automatic logging that saves what comes and leaves on each phase. Just set
debug=True
.
Installation:
- Run
pip install coinstac-computation
- Add entry
coinstac-computation
to the requirements.txt.
ComputationPhase signature:
from coinstac_computation import ComputationPhase
class PhaseLoadData(ComputationPhase):
def _initialize(self):
"""Put anything that needs to be initialized only once here"""
pass
def compute(self):
out = {}
...
"""To end multi-iterative phase, and go to the next phase, in local or remote set:"""
out['jump_to_next'] = True
"""To stop the computation, In remote set:"""
out['success'] = True
return out
Example: Gather max even numbers from each site
A full working use case is in the examples/basic directory where:
- Local sites filters out even numbers and sends to the remote.
- Remote finds the max across sites and returns the final result to each of the sites.
- Sites save final result.
inputspec.json data:
[
{
"data": {
"value": [10, 3, 5, 6, 7, 8, 12, 38, 32, 789, 776, 441]
}
},
{
"data": {
"value": [12, 33, 88, 61, 37, 58, 103, 3386, 312, 9, 77, 41]
}
}
]
Local node pipeline:
import os
from coinstac_computation import COINSTACPyNode, ComputationPhase
class PhaseLoadData(ComputationPhase):
def compute(self):
data = []
for d in self.input['data']:
if d % 2 == 0:
data.append(d)
return {'filtered_data': data}
class PhaseSaveResult(ComputationPhase):
def compute(self):
with open(f"{self.out_dir + os.sep + 'results.txt'}", 'w') as out:
out.write(f"{self.input['aggregated_data']}")
local = COINSTACPyNode(mode='local', debug=True)
local.add_phase(PhaseLoadData)
local.add_phase(PhaseSaveResult)
Remote node pipeline:
from coinstac_computation import COINSTACPyNode, ComputationPhase, PhaseEndWithSuccess
class PhaseCollectMaxEvenData(ComputationPhase):
def compute(self):
data = []
for site, site_vars in self.input.items():
site_max = max(site_vars['filtered_data'])
data.append(site_max)
return {'aggregated_data': data}
remote = COINSTACPyNode(mode='remote', debug=True)
remote.add_phase(PhaseCollectMaxEvenData)
remote.add_phase(PhaseEndWithSuccess)
Entry point:
import coinstac
from local_pipeline import local
from remote_pipeline import remote
coinstac.start(local, remote)
Run:
cd examples/basic/
~/coinstac-computation/examples/basic/$ docker build -t base . && coinstac-simulator
Advanced use cases:
1. Multiple local <---> remote iterations example:
- Each sites cast a vote(positive vote if number is even) for multiple(default=51) times.
- Remote gathers the votes and returns the final voting result to all sites at the end.
- Sites save the final result.
Overview:
- Specify when to end the iterative phase with a phase jump signal as
jump_to_next=True
:
class PhaseSubmitVote(ComputationPhase):
def _initialize(self):
"""This method runs only once"""
self.cache['data_index'] = 0
self.cache['data'] = []
for line in open(self.base_dir + os.sep + self.input_args['data_source']).readlines():
self.cache['data'].append(float(line.strip()))
def compute(self):
out = {
'vote': self.cache['data'][self.cache['data_index']] % 2 == 0,
}
self.cache['data_index'] += 1
"""Send a jump to next phase signal"""
out['jump_to_next'] = self.cache['data_index'] > len(self.cache['data']) - 1
return out
- Add the phase as multi-iterations:
local.add_phase(PhaseSubmitVote, multi_iterations=True)
2. Send data across local <---> remote example:
To make it simple, we send a matrix of size 1000 by 1000 to remote, aggregate it by averaging, and return to each site.
Local:
import os
from coinstac_computation import COINSTACPyNode, ComputationPhase
import numpy as np
class PhaseLoadData(ComputationPhase):
def compute(self):
out = {}
data = np.random.randn(*self.input['matrix_shape'])
out.update(**self.send("site_matrix", data))
return out
class PhaseSaveResult(ComputationPhase):
def compute(self):
data = self.recv('averaged_matrix')
np.save(self.out_dir + os.sep + "averaged_matrix.npy", data)
local = COINSTACPyNode(mode='local', debug=True)
local.add_phase(PhaseLoadData)
local.add_phase(PhaseSaveResult)
Remote:
from coinstac_computation import COINSTACPyNode, ComputationPhase, PhaseEndWithSuccess
import numpy as np
class PhaseAggregateMatrix(ComputationPhase):
def compute(self):
out = {}
data = self.recv("site_matrix")
mean_data = np.array(data).mean(0)
out.update(**self.send("averaged_matrix", mean_data))
return out
remote = COINSTACPyNode(mode='remote', debug=True)
remote.add_phase(PhaseAggregateMatrix)
remote.add_phase(PhaseEndWithSuccess)
Sample logs from local0
[INPUT] 14:27:36 02/04/2022
->{'data_source': 'data_file.txt'}
[CACHE] 14:27:36 02/04/2022
->{'PIPELINE:LOCAL': {'index': 0, 'iterations': {'PhaseSubmitVote': 0, 'PhaseSaveResult': 0}}}
<-{'PIPELINE:LOCAL': {'index': 0, 'iterations': {'PhaseSubmitVote': 1, 'PhaseSaveResult': 0}}, 'input_args': {'data_source': 'data_file.txt'}, 'next_phase': 'PhaseSubmitVote', 'data_index': 1, 'data': [712.0, 309.0, 574.0, 838.0, 296.0, 349.0, 781.0, 749.0, 360.0, 702.0, 253.0, 831.0, 911.0, 14.0, 259.0, 805.0, 494.0, 501.0, 549.0, 624.0, 919.0, 836.0, 362.0, 373.0, 563.0, 134.0, 610.0, 875.0, 328.0, 299.0, 874.0, 387.0, 743.0, 233.0, 834.0, 870.0, 685.0, 342.0, 79.0, 270.0, 314.0, 42.0, 364.0, 902.0, 755.0, 248.0, 815.0, 4.0, 21.0, 423.0, 302.0], 'PHASE:PhaseSubmitVote': True}
[OUTPUT] 14:27:36 02/04/2022
<-{'output': {'vote': True, 'jump_to_next': False}}
[INPUT] 14:27:37 02/04/2022
->{}
[CACHE] 14:27:37 02/04/2022
->{'PIPELINE:LOCAL': {'index': 0, 'iterations': {'PhaseSubmitVote': 1, 'PhaseSaveResult': 0}}, 'input_args': {'data_source': 'data_file.txt'}, 'next_phase': 'PhaseSubmitVote', 'data_index': 1, 'data': [712.0, 309.0, 574.0, 838.0, 296.0, 349.0, 781.0, 749.0, 360.0, 702.0, 253.0, 831.0, 911.0, 14.0, 259.0, 805.0, 494.0, 501.0, 549.0, 624.0, 919.0, 836.0, 362.0, 373.0, 563.0, 134.0, 610.0, 875.0, 328.0, 299.0, 874.0, 387.0, 743.0, 233.0, 834.0, 870.0, 685.0, 342.0, 79.0, 270.0, 314.0, 42.0, 364.0, 902.0, 755.0, 248.0, 815.0, 4.0, 21.0, 423.0, 302.0], 'PHASE:PhaseSubmitVote': True}
<-{'PIPELINE:LOCAL': {'index': 0, 'iterations': {'PhaseSubmitVote': 2, 'PhaseSaveResult': 0}}, 'input_args': {'data_source': 'data_file.txt'}, 'next_phase': 'PhaseSubmitVote', 'data_index': 2, 'data': [712.0, 309.0, 574.0, 838.0, 296.0, 349.0, 781.0, 749.0, 360.0, 702.0, 253.0, 831.0, 911.0, 14.0, 259.0, 805.0, 494.0, 501.0, 549.0, 624.0, 919.0, 836.0, 362.0, 373.0, 563.0, 134.0, 610.0, 875.0, 328.0, 299.0, 874.0, 387.0, 743.0, 233.0, 834.0, 870.0, 685.0, 342.0, 79.0, 270.0, 314.0, 42.0, 364.0, 902.0, 755.0, 248.0, 815.0, 4.0, 21.0, 423.0, 302.0], 'PHASE:PhaseSubmitVote': True}
[OUTPUT] 14:27:37 02/04/2022
<-{'output': {'vote': False, 'jump_to_next': False}}
[INPUT] 14:27:37 02/04/2022
->{}
[CACHE] 14:27:37 02/04/2022
->{'PIPELINE:LOCAL': {'index': 0, 'iterations': {'PhaseSubmitVote': 2, 'PhaseSaveResult': 0}}, 'input_args': {'data_source': 'data_file.txt'}, 'next_phase': 'PhaseSubmitVote', 'data_index': 2, 'data': [712.0, 309.0, 574.0, 838.0, 296.0, 349.0, 781.0, 749.0, 360.0, 702.0, 253.0, 831.0, 911.0, 14.0, 259.0, 805.0, 494.0, 501.0, 549.0, 624.0, 919.0, 836.0, 362.0, 373.0, 563.0, 134.0, 610.0, 875.0, 328.0, 299.0, 874.0, 387.0, 743.0, 233.0, 834.0, 870.0, 685.0, 342.0, 79.0, 270.0, 314.0, 42.0, 364.0, 902.0, 755.0, 248.0, 815.0, 4.0, 21.0, 423.0, 302.0], 'PHASE:PhaseSubmitVote': True}
<-{'PIPELINE:LOCAL': {'index': 0, 'iterations': {'PhaseSubmitVote': 3, 'PhaseSaveResult': 0}}, 'input_args': {'data_source': 'data_file.txt'}, 'next_phase': 'PhaseSubmitVote', 'data_index': 3, 'data': [712.0, 309.0, 574.0, 838.0, 296.0, 349.0, 781.0, 749.0, 360.0, 702.0, 253.0, 831.0, 911.0, 14.0, 259.0, 805.0, 494.0, 501.0, 549.0, 624.0, 919.0, 836.0, 362.0, 373.0, 563.0, 134.0, 610.0, 875.0, 328.0, 299.0, 874.0, 387.0, 743.0, 233.0, 834.0, 870.0, 685.0, 342.0, 79.0, 270.0, 314.0, 42.0, 364.0, 902.0, 755.0, 248.0, 815.0, 4.0, 21.0, 423.0, 302.0], 'PHASE:PhaseSubmitVote': True}
[OUTPUT] 14:27:37 02/04/2022
<-{'output': {'vote': True, 'jump_to_next': False}}
...
Sample logs from remote
[INPUT] 14:27:37 02/04/2022
->{'local0': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': False, 'jump_to_next': False}, 'local2': {'vote': False, 'jump_to_next': False}, 'local3': {'vote': False, 'jump_to_next': False}}
[CACHE] 14:27:37 02/04/2022
->{'PIPELINE:REMOTE': {'index': 0, 'iterations': {'PhaseCollectVote': 0, 'PhaseSendGlobalResults': 0, 'PhaseEndWithSuccess': 0}}}
<-{'PIPELINE:REMOTE': {'index': 0, 'iterations': {'PhaseCollectVote': 1, 'PhaseSendGlobalResults': 0, 'PhaseEndWithSuccess': 0}}, 'input_args': {'local0': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': False, 'jump_to_next': False}, 'local2': {'vote': False, 'jump_to_next': False}, 'local3': {'vote': False, 'jump_to_next': False}}, 'next_phase': 'PhaseCollectVote', 'vote_ballot': [[1, 3]], 'PHASE:PhaseCollectVote': True}
[OUTPUT] 14:27:37 02/04/2022
<-{'output': {}, 'success': False}
[INPUT] 14:27:37 02/04/2022
->{'local0': {'vote': False, 'jump_to_next': False}, 'local2': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': True, 'jump_to_next': False}, 'local3': {'vote': True, 'jump_to_next': False}}
[CACHE] 14:27:37 02/04/2022
->{'PIPELINE:REMOTE': {'index': 0, 'iterations': {'PhaseCollectVote': 1, 'PhaseSendGlobalResults': 0, 'PhaseEndWithSuccess': 0}}, 'input_args': {'local0': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': False, 'jump_to_next': False}, 'local2': {'vote': False, 'jump_to_next': False}, 'local3': {'vote': False, 'jump_to_next': False}}, 'next_phase': 'PhaseCollectVote', 'vote_ballot': [[1, 3]], 'PHASE:PhaseCollectVote': True}
<-{'PIPELINE:REMOTE': {'index': 0, 'iterations': {'PhaseCollectVote': 2, 'PhaseSendGlobalResults': 0, 'PhaseEndWithSuccess': 0}}, 'input_args': {'local0': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': False, 'jump_to_next': False}, 'local2': {'vote': False, 'jump_to_next': False}, 'local3': {'vote': False, 'jump_to_next': False}}, 'next_phase': 'PhaseCollectVote', 'vote_ballot': [[1, 3], [3, 1]], 'PHASE:PhaseCollectVote': True}
[OUTPUT] 14:27:37 02/04/2022
<-{'output': {}, 'success': False}
[INPUT] 14:27:37 02/04/2022
->{'local0': {'vote': True, 'jump_to_next': False}, 'local3': {'vote': True, 'jump_to_next': False}, 'local2': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': True, 'jump_to_next': False}}
[CACHE] 14:27:37 02/04/2022
->{'PIPELINE:REMOTE': {'index': 0, 'iterations': {'PhaseCollectVote': 2, 'PhaseSendGlobalResults': 0, 'PhaseEndWithSuccess': 0}}, 'input_args': {'local0': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': False, 'jump_to_next': False}, 'local2': {'vote': False, 'jump_to_next': False}, 'local3': {'vote': False, 'jump_to_next': False}}, 'next_phase': 'PhaseCollectVote', 'vote_ballot': [[1, 3], [3, 1]], 'PHASE:PhaseCollectVote': True}
<-{'PIPELINE:REMOTE': {'index': 0, 'iterations': {'PhaseCollectVote': 3, 'PhaseSendGlobalResults': 0, 'PhaseEndWithSuccess': 0}}, 'input_args': {'local0': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': False, 'jump_to_next': False}, 'local2': {'vote': False, 'jump_to_next': False}, 'local3': {'vote': False, 'jump_to_next': False}}, 'next_phase': 'PhaseCollectVote', 'vote_ballot': [[1, 3], [3, 1], [4, 0]], 'PHASE:PhaseCollectVote': True}
[OUTPUT] 14:27:37 02/04/2022
<-{'output': {}, 'success': False}
...
Development notes:
- Make sure you have:
- docker installed and running.
- nodejs installed.
- coinstac-simulator package installed.
npm install --global coinstac-simulator
- Must set
debug=False
while deploying. - Backward compatible to the older library(compspecVersion=1):
- Add the following snippet at the end of local and remote pipeline scripts.
if __name__ == "__main__": local.to_stdout()
- Use version 1.0 compspec format.
- Comment out line
CMD ["python", "entry.py"]
in theDockerfile
. - You can also use a remote debugger in pycharm as here.
Thanks!
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file coinstac-computation-1.0.2.tar.gz
.
File metadata
- Download URL: coinstac-computation-1.0.2.tar.gz
- Upload date:
- Size: 10.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.9 tqdm/4.63.1 importlib-metadata/4.11.3 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.10.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4dc87590ba42005aafd42360419c2e0cccb2c0febb0aa5fad7db50cccc8b9f3d |
|
MD5 | bfd2df4f5a6e5cd6d0ace70539efc5f1 |
|
BLAKE2b-256 | ea997e70962defea4d05e5d9ab73b9f941890481f0846d3fbe7bc777e58936ec |