Skip to main content

PyE2 is an SDK used to communicate with the AiXpand network

Project description

PyE2 SDK

This Python package enables low-code development and deployment of end-to-end AI cooperative application pipelines within the AiXpand Execution Engine processing nodes ecosystem. For further information please see "AiXpand - Decentralized ubiquitous computing MLOps execution engine".

Installation

Quick start "Hello world!"

Below is a simple "Hello world!" style application that creates a session by connecting to a known communication broker, listens for processing nodes heartbeats and displays the basic compute capabilities of the discovered nodes such as CPU & RAM.

Importing and configuration

from PyE2 import Session

Preparing callbacks

def on_hb(session, data):
  print(data['EE_ID'], " has a ", data['CPU'])
  return

Running a simple main loop

if __name__ == '__main__':
  sess = Session(
    host="hostname",
    port=88888,
    user="username",
    pwd="password",
    on_heartbeat=on_hb
  )
  sess.run(wait=10)

Advanced quick-start with decentralized distributed jobs

For a more advanced quick-start we are going to create a execution pipeline on a target processing node that will request a specific number of workers in the network (including itself) to run a brute prime number search job. The initiator job itself will create the request for the required number of discovered worker peers then will listen for results. Finally after a given configurable amount of time will close its own execution pipeline as well as each worker pipeline.

Worker code

The worker will randomly generate numbers and will check if they are prime. If it finds a prime number, it sets the _result variable.

_result=None
skip = False
for _ in range(plugin.cfg_max_tries):
  # generate up to `max_tries` numbers in this call
  num = plugin.np.random.randint(1, 10_000)
  for n in range(2,int(num**0.5)+1):
    if num % n == 0:
      # the generated number is not a prime
      skip=True
      break
    # endif
  # endfor
  if not skip:
    _result=num
    break
  # endif
# endfor

Initiator node code

The initiator will search for available workers in the network and will send them the custom job, then will collect data for a time, after which will close the worker nodes and itself

result=None
if plugin.int_cache['run_first_time'] == 0:
  # this is the first run, consider this the setup

  plugin.int_cache['run_first_time'] = 1

  worker_code = plugin.cfg_worker_code
  n_workers = plugin.cfg_n_workers
  # we use DeAPI `plugin.deapi_get_wokers` call to get the needed workers
  plugin.obj_cache['lst_workers'] = plugin.deapi_get_wokers(n_workers)
  plugin.obj_cache['dct_workers'] = {}
  plugin.obj_cache['dct_worker_progress'] = {}
  plugin.P(plugin.obj_cache['lst_workers'])

  # for each worker we symetrically launch the same job
  for worker in plugin.obj_cache['lst_workers']:
    plugin.obj_cache['dct_worker_progress'][worker] = []
    pipeline_name = plugin.cmdapi_start_simple_custom_pipeline(
      base64code=worker_code,
      dest=worker,
      instance_config={
        'MAX_TRIES': plugin.cfg_max_tries,
      }
    )
    plugin.obj_cache['dct_workers'][worker] = pipeline_name
  # endfor

  plugin.obj_cache["start_time"] = plugin.datetime.now()
  # endfor
elif (plugin.datetime.now() - plugin.obj_cache["start_time"]).seconds > plugin.cfg_max_run_time:
  # if the configured time has elapsed we stop all the worker pipelines
  # as well as stop this pipeline itself

  for ee_id, pipeline_name in plugin.obj_cache['dct_workers'].items():
    plugin.cmdapi_archive_pipeline(dest=ee_id, name=pipeline_name)
  # now archive own pipeline
  plugin.cmdapi_archive_pipeline()
  result = {
    'STATUS'  : 'DONE',
    'RESULTS' : plugin.obj_cache['dct_worker_progress']
  }
else:
  # here are the operations we are running periodically
  payload = plugin.dataapi_struct_data() # we use the DataAPI to get upstream data
  if payload is not None:

    ee_id = payload.get('EE_ID', payload.get('SB_ID'))
    pipeline_name = payload.get('STREAM_NAME')

    if (ee_id, pipeline_name) in plugin.obj_cache['dct_workers'].items():
      # now we extract result from the result key of the payload JSON
      # this also can be configured to another name
      num = payload.get('EXEC_RESULT', payload.get('EXEC_INFO'))
      if num is not None:
        plugin.obj_cache['dct_worker_progress'][ee_id].append(num)
        result = {
          'STATUS'  : 'IN_PROGRESS',
          'RESULTS' : plugin.obj_cache['dct_worker_progress']
        }
  # endif
# endif

The local code

from PyE2 import Session, code_to_base64

SERVER_CONFIG = {
    'host': "****************",
    'port': 8888,
    'user': "****************",
    'pwd': "****************"
}


def instance_on_data(pipeline, data):
  print(data)
  return


def pipeline_on_data(pipeline, signature, instance, data):
  pass

if __name__ == '__main__':

  WORKER_CODE_PATH = 'xperimental/dedist_example_worker.py'
  INITIATOR_CODE_PATH = 'xperimental/dedist_example_initiator.py'

  with open(WORKER_CODE_PATH, 'rt') as fh:
    worker_code = fh.read()

  e2id = 'e2id' # provide a known EE id
  sess = Session(**SERVER_CONFIG, silent=True)
  sess.connect()

  listener_params = {k.upper(): v for k, v in SERVER_CONFIG.items()}
  listener_params["PASS"] = listener_params["PWD"]
  listener_params["TOPIC"] = "lummetry/payloads"

  pipeline = sess.create_pipeline(
      e2id=e2id,
      name='test_dist_jobs',
      # data_source='Void',
      # config={},
      data_source='IotQueueListener', # this DCT allows data acquisition from MQTT brokers
      config={
          'STREAM_CONFIG_METADATA': listener_params,
          "RECONNECTABLE": True,
      },
      plugins=None,
      on_data=pipeline_on_data,
  )


  pipeline.start_custom_plugin(
      instance_id='inst02',
      plain_code_path=INITIATOR_CODE_PATH,
      params={
        'MAX_TRIES': 10, # this will be used within plugin as `plugin.cfg_max_tries`
        'MAX_RUN_TIME': 60, # this will be used within plugin as `plugin.cfg_max_run_time`
        'N_WORKERS': 2, # this will be used within plugin as `plugin.cfg_n_workers`

        # this will be used within plugin as `plugin.cfg_worker_code`
        'WORKER_CODE': code_to_base64(worker_code)
        },
      on_data=instance_on_data,
      process_delay=0.2
  )

  sess.wait_until_sigint(close_session=True, close_pipelines=True)

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pye2-0.3.7.tar.gz (36.1 kB view hashes)

Uploaded Source

Built Distribution

pye2-0.3.7-py3-none-any.whl (50.4 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page