Skip to main content

Generic computation implementation on COINSTAC.

Project description

COINSTAC computations development made easy.

PyPi version YourActionName Actions Status versions

Express development(see example folder for a use case):

1. mkdir -p example/dist
2. chmod u+x deploy.sh #(Needed only once)
3. ./deploy.sh example/dist

Deployment:

pip install coinstac-computation (or add to requirements.txt file)

Coinstac base computation framework that supports:

  • Multiple computation phases.
  • Multiple iterations per phase.
  • Automatic phase transition.

Use case: Gather max even numbers from each site

A full working use case is in the example directory where

  • Local sites filters out even number and send to remote.
  • Remote Finds the max for each site and returns to all sites.
  • Sites save final result.

inputspec.json data:

[
  {
    "data": {
      "value": [10, 3, 5, 6, 7, 8, 12, 38, 32, 789, 776, 441]
    }
  },
  {
    "data": {
      "value": [12, 33, 88, 61, 37, 58, 103, 3386, 312, 9, 77, 41]
    }
  }
]

Local node pipeline:

import os
from coinstac_computation import COINSTACPyNode, ComputationPhase


class PhaseLoadData(ComputationPhase):
    def compute(self):
        data = []
        for d in self.input['data']:
            if d % 2 == 0:
                data.append(d)
        return {'filtered_data': data}


class PhaseSaveResult(ComputationPhase):
    def compute(self):
        with open(f"{self.state['outputDirectory'] + os.sep + 'results.txt'}", 'w') as out:
            out.write(f"{self.input['aggregated_data']}")


local = COINSTACPyNode(mode='local', debug=True)
local.add_phase(PhaseLoadData)
local.add_phase(PhaseSaveResult)

Remote node pipeline:

from coinstac_computation import COINSTACPyNode, ComputationPhase, PhaseEndWithSuccess


class PhaseCollectMaxEvenData(ComputationPhase):
    def compute(self):
        data = []
        for site, site_vars in self.input.items():
            site_max = max(site_vars['filtered_data'])
            data.append(site_max)
        return {'aggregated_data': data}


remote = COINSTACPyNode(mode='remote', debug=True)
remote.add_phase(PhaseCollectMaxEvenData)
remote.add_phase(PhaseEndWithSuccess)

Entry point

import coinstac

from local_pipeline import local
from remote_pipeline import remote

coinstac.start(local.compute, remote.compute)

Run

cd example
docker build -t base . && coinstac-simulator

Extras:

  • Must set debug=False while deploying.
  • Is backward compaitible to compspecVersion=1(deprecated now):
    • Add the following snippet at the end of local and remote pipeline scripts.
    • Use compspecVersion1 format.
if __name__ == "__main__":
    local.to_stdout()

Specify a phase that needs to be run multiple iterations as:

remote.add_phase(SomeIterativePhase, multi_iterations=True)

and to stop, just return jump_to_next=True as:

class SomeIterativePhase(ComputationPhase):
    def compute(self):
        """All your stuff here..."""
        
        """check if you are done with this phase 
            and want to jump to the next.
        """
        should_jump:bool = ... 
        return {..., 'jump_to_next':should_jump}

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

coinstac-computation-0.3.tar.gz (5.8 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page