Skip to main content

Generic computation implementation on COINSTAC.

Project description

COINSTAC computations development made easy.

PyPi version YourActionName Actions Status versions

A very intuitive wrapper for writing coinstac based computations:

  • Break down your computations into simple phases with automatic transition between phases.
  • Add as many phases as you want.
  • Even run phases that needs to be run multiple local-remote trips(iterations). See advanced use case example below.
  • Installation: pip install coinstac-computation
  • Add entry coinstac-computation to the requirements.txt.

Example: Gather max even numbers from each site

A full working use case is in the examples/basic directory where:

  • Local sites filters out even numbers and sends to the remote.
  • Remote finds the max across sites and returns the final result to each of the sites.
  • Sites save final result.

inputspec.json data:

[
  {
    "data": {
      "value": [10, 3, 5, 6, 7, 8, 12, 38, 32, 789, 776, 441]
    }
  },
  {
    "data": {
      "value": [12, 33, 88, 61, 37, 58, 103, 3386, 312, 9, 77, 41]
    }
  }
]

Local node pipeline:

import os
from coinstac_computation import COINSTACPyNode, ComputationPhase


class PhaseLoadData(ComputationPhase):
    def compute(self):
        data = []
        for d in self.input['data']:
            if d % 2 == 0:
                data.append(d)
        return {'filtered_data': data}


class PhaseSaveResult(ComputationPhase):
    def compute(self):
        with open(f"{self.state['outputDirectory'] + os.sep + 'results.txt'}", 'w') as out:
            out.write(f"{self.input['aggregated_data']}")


local = COINSTACPyNode(mode='local', debug=True)
local.add_phase(PhaseLoadData)
local.add_phase(PhaseSaveResult)

Remote node pipeline:

from coinstac_computation import COINSTACPyNode, ComputationPhase, PhaseEndWithSuccess


class PhaseCollectMaxEvenData(ComputationPhase):
    def compute(self):
        data = []
        for site, site_vars in self.input.items():
            site_max = max(site_vars['filtered_data'])
            data.append(site_max)
        return {'aggregated_data': data}


remote = COINSTACPyNode(mode='remote', debug=True)
remote.add_phase(PhaseCollectMaxEvenData)
remote.add_phase(PhaseEndWithSuccess)

Entry point:

import coinstac

from local_pipeline import local
from remote_pipeline import remote

coinstac.start(local, remote)

Run:

cd examples/basic/
~/coinstac-computation/examples/basic/$ docker build -t base . && coinstac-simulator

Advanced use case example with multiple iterations where:

  • Each sites cast a vote(positive vote if number is even) for multiple(default=51) times.
  • Remote gathers the votes and returns the final voting result to all sites at the end.
  • Sites save the final result.

Overview:

  1. Specify when to end the iterative phase with a phase jump signal:
class PhaseSubmitVote(ComputationPhase):

    def _initialize(self):
        """This method runs only once"""
        self.cache['data_index'] = 0
        self.cache['data'] = []
        for line in open(self.state['baseDirectory'] + os.sep + self.input_args['data_source']).readlines():
            self.cache['data'].append(float(line.strip()))

    def compute(self):
        out = {
            'vote': self.cache['data'][self.cache['data_index']] % 2 == 0,
        }
        self.cache['data_index'] += 1
        
        """Send a jump to next phase signal"""
        out['jump_to_next'] = self.cache['data_index'] > len(self.cache['data']) - 1
        return out
  1. Add the phase as multi-iterations:
local.add_phase(PhaseSubmitVote, multi_iterations=True)

Development notes:

  • Make sure you have:
    • docker installed and running.
    • nodejs installed.
    • coinstac-simulator package installed. npm install --global coinstac-simulator
  • Must set debug=False while deploying.
  • Backward compatible to the older library(compspecVersion=1):
    • Add the following snippet at the end of local and remote pipeline scripts.
    if __name__ == "__main__":
        local.to_stdout()
    
    • Use version 1.0 compspec format.
    • Comment out line CMD ["python", "entry.py"] in the Dockerfile.
    • You can also use a remote debugger in pycharm as here.

Thanks!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

coinstac-computation-0.62.tar.gz (6.0 kB view details)

Uploaded Source

File details

Details for the file coinstac-computation-0.62.tar.gz.

File metadata

  • Download URL: coinstac-computation-0.62.tar.gz
  • Upload date:
  • Size: 6.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.7.1 importlib_metadata/4.10.1 pkginfo/1.8.2 requests/2.27.1 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.10.1

File hashes

Hashes for coinstac-computation-0.62.tar.gz
Algorithm Hash digest
SHA256 6a7ed815d8b0c93495ccba0989072ab10263a5cae58f9b636192ba56f37e5def
MD5 ad448479ad0d9c0bef1f2a3a57ce0d83
BLAKE2b-256 b9930509b2671e1b97758e13f17f0fd890b136ad85e6b2853f43096b4f3d39a6

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page