Skip to main content

Automatically created environment for python package

Project description

Project daskcheck

Tools for a more simple use of the dask. Dask scheduler is defined in ~/.dask_ server file, the content is just an ip address.

Work is in progress...

The idea

  1. define properly the core function (xcorefunc in the example) with THE proper return
  2. daskcheck then will take care about:
    • sending the task to the scheduler (but this is natural)
    • uploads (python) single-file module to workers (via .upload)
    • sending the parameters
    • collecting results and saving them to local json file
    • in future about parsing the local json file
    • in future about sending local (bash) scripts to workers... (?)
    • in future about managing (worker's) folders with data output (if output is too large)...

Files in the repo

It is becoming a bit messy, so for the reference:


batchforworker X BATCH TEST in remote ~/sand bindaskcheck.py X nothing will be main script config.py X module configs convjson2spectra.py exodask output conversion to spectra daskcheck FOLDER daskcheck.py module OPERATIONS daskresultslog20230510142235.json log file docextr.py Xattemt to autocreate exodask.py ???work on exogam f.py ??resulting autogenerate OldLogs Previous files README.md MD README.org this file runscheduler RUN SCHEDULER runsyncversions ??KEEP dasksched-workers in sync runworker RUN WORKER setup.py setup python singlemod.py MODULE for import singlexec.py XOBSOLETE - remote exec BASH unitname.py generic module version.py version is here


WAITING Instalation of daskcheck

pip install daskcheck

Installation of dask 2023

See https://docs.dask.org/en/stable/install.html

pip install "dask[complete]"

Launching dask scheduler/workers

  • Pay attention to correct/compatible libraries on different workers
  • open port 8786 and 8787 on scheduler and longrange of ports on workers...

Testing basics on localhost - the most simple in-terminal way

  • run_scheduler
    • you see it running
  • dask worker 127.0.0.1:8786 --nworkers 1 --nthreads 1
    • at this stage do not run run_worker, it looks for server
    • Starting established connection to tcp://127.0.0.1:8786
  • ./daskcheck.py loc
    • see local function output, should work in any case
  • ./daskcheck.py test -s 127.0.0.1
    • overide the server to 127.0.0.1 to see if everything works

Set your server on worker(s)

nano ~/.dask_server and put 10.10.0.2 or whatever address your server has

Test basics on network

  • go to server and run run_scheduler
  • go to worker and run run_worker
  • CHECK LIBRARIES and upgrade what is needed e.g.:
    • pip3 install tornado --upgrade
  • TEST with ./daskcheck.py test from worker

Launching worker from cmdline with local scheduler

dask     worker 127.0.0.1:8786 --nworkers 5 --nthreads 1

Testing dask

Just local run of getcpuinfo

./daskcheck.py loc

This runs (scheduler and workers should be running) 40x getcpuinfo in cluster

./daskcheck.py net

DOING Run single-file - (python) module OR (bash) batch

./daskcheck.py file - file parameter tells a filename and parameters is comming...

# python function with main - example
./daskcheck.py file singlemod.py  11..33

How is that done?

  • importlib is used to get the module's MAIN,
  • UPLOAD singlemod.py to the scheduler,
  • chdir to /dask_sendbox and
  • run tell() and main() function of the singlemod.py:
    • singlemod.py - writes a file to (worker's) ~/dask_sendbox
    • deprec* singlexec.py - launches ./runme - if not present in (worker's) ~/dask_sendbox, it crashes
    • batchforworker - bash script, writes file to (worker's) ~/dask_sendbox
      • Previously, batch_for_worker LOAD and RUN was hardwired in the signlexec.py.

singlemod.py - whatever returns, goes to json and cvs

Monitoring dask

Have open port on scheduler:

xdg-open http://localhost:8787

[TODO]{.todo .TODO} Recollection the data from json {#recollection-the-data-from-json}

to recover...

[TODO]{.todo .TODO} Python native check ?what? {#python-native-check-what}

It must be checked that python works too - as before

DEVELOPMENT

  • restart scheduler after updating daskcheck
  • check open port when runworker [ok]

Appendix

runworker - environment needed and command

#!/bin/bash

# PYTHON bin exports
export PATH=$PATH:$HOME/.local/bin

# ROOT exports
export PYTHONPATH=$HOME/root/lib/
export ROOTSYS=$HOME/root
export PATH=$ROOTSYS/bin:~/bin:$PATH
export LD_LIBRARY_PATH=$ROOTSYS/lib:$ROOTSYS/lib/root:$LD_LIBRARY_PATH

source $HOME/root/bin/thisroot.sh

# CONFIG FOR WORKER
export DISPLAY=:0
export DS=$HOME/.dask_server  # get IP of the scheduler
export DSER=`cat $DS`
export HOST=`hostname`
export workers=2 # how many cores to run
export PORT=8786
cd /tmp

if [ -f  "$DS" ]; then
    echo ... OK $DS exists
else
    echo ... NO $DS exists
    sleep 5
    echo ...
    exit 1
fi


echo ... I am on $HOST and trying to connect to /$DSER/ one thread per worker
dask worker ${DSER}:${PORT}      --nworkers $workers --nthreads 1

runscheduler - script

#!/bin/bash


#dask scheduler --port 8786
export PATH=$PATH:$HOME/.local/bin

export PORT=8786
export HOST=`hostname`

cd /tmp

if [ "$HOST" = "core6a" ]; then
    echo ... starting scheduler
    dask scheduler   --port ${PORT}   #  --bokeh-port 8787
fi
echo ... scheduler eneded or not launched at all
sleep 5
exit 0

Run a (python) function from python code

This must be updated...

exo_dask.py Contains a working (in the past) example, using root.

This is (or should be) a python code that uses daskcheck for sending a function.

It is evidently crippled for the moment...

from daskcheck import daskcheck

from fire import Fire
import time
import platform
import datetime as dt
import json

def main( parlist ):
    """
    Initiated by Fire. If one parameter, runs locally with local xcorefunc
    """
    parameters = daskcheck.prepare_params( parlist )

    if type(parameters)==list:
        print("i... viable for DASK ....")
        daskcheck.submit( daskcheck.get_cpu_info , parameters)
    elif type(parameters)==tuple:
        print("i... viable for DASK ....")
        daskcheck.submit( daskcheck.get_cpu_info , parameters)
    else:
        print("i... running only locally")
        my_results = xcorefunc( 1 , parameters ) # order = 1, just arbitrary number
        # Write LOG file.
        now = dt.datetime.now()
        stamp = now.strftime("%Y%m%d_%H%M%S")
        with open(f"dask_results_log_{stamp}.json", "w") as fp:
            json.dump( my_results , fp, sort_keys=True, indent='\t', separators=(',', ': '))
    return

def xcorefunc( order, param):
    """
    CORE function to be sent to dask schedule.

    :param order: order number of the call
    :param param: parameter to be sent
    """
    import ROOT # *TRICK* I need to import here to avoid breaking pickle
    start_time = time.perf_counter()

    return order, [platform.node(),  f"{time.perf_counter() - start_time:.1f} s" ]


if __name__=="__main__":
    Fire(main)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

daskcheck-0.0.16.tar.gz (19.0 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page