Automatically created environment for python package
Project description
Project daskcheck
Tools for a more simple use of the dask. Dask scheduler is defined
in ~/.dask_ server
file, the content is just an ip address.
Work is in progress...
The idea
- define properly the core function (xcorefunc in the example) with THE proper return
daskcheck
then will take care about:- sending the task to the scheduler (but this is natural)
- uploads (python) single-file module to workers (via .upload)
- sending the parameters
- collecting results and saving them to local json file
- in future about parsing the local json file
- in future about sending local (bash) scripts to workers... (?)
- in future about managing (worker's) folders with data output (if output is too large)...
Files in the repo
It is becoming a bit messy, so for the reference:
batchforworker X BATCH TEST in remote ~/sand
bindaskcheck.py X nothing will be main script
config.py X module configs
convjson2spectra.py exodask output conversion to spectra
daskcheck FOLDER
daskcheck.py module OPERATIONS
daskresultslog20230510142235.json log file
docextr.py Xattemt to autocreate
exodask.py ???work on exogam
f.py ??resulting autogenerate
OldLogs Previous files
README.md MD
README.org this file
runscheduler RUN SCHEDULER
runsyncversions ??KEEP dasksched-workers in sync
runworker RUN WORKER
setup.py setup python
singlemod.py MODULE for import
singlexec.py XOBSOLETE - remote exec BASH
unitname.py generic module
version.py version is here
WAITING Instalation of daskcheck
pip install daskcheck
Installation of dask 2023
See https://docs.dask.org/en/stable/install.html
pip install "dask[complete]"
Launching dask scheduler/workers
- Pay attention to correct/compatible libraries on different workers
- open port 8786 and 8787 on scheduler and long
rangeof ports on workers...
Testing basics on localhost - the most simple in-terminal way
run_scheduler
- you see it running
dask worker 127.0.0.1:8786 --nworkers 1 --nthreads 1
- at this stage do not run
run_worker
, it looks for server - Starting established connection to tcp://127.0.0.1:8786
- at this stage do not run
./daskcheck.py loc
- see local function output, should work in any case
./daskcheck.py test -s 127.0.0.1
- overide the server to 127.0.0.1 to see if everything works
Set your server on worker(s)
nano ~/.dask_server
and put 10.10.0.2 or whatever address your server
has
Test basics on network
- go to server and run
run_scheduler
- go to worker and run
run_worker
- CHECK LIBRARIES and upgrade what is needed e.g.:
pip3 install tornado --upgrade
- TEST with
./daskcheck.py test
from worker
Launching worker from cmdline with local scheduler
dask worker 127.0.0.1:8786 --nworkers 5 --nthreads 1
Testing dask
Just local run of getcpuinfo
./daskcheck.py loc
This runs (scheduler and workers should be running) 40x getcpuinfo in
cluster
./daskcheck.py net
DOING Run single-file - (python) module OR (bash) batch
./daskcheck.py file
- file
parameter tells a filename and parameters
is comming...
# python function with main - example
./daskcheck.py file singlemod.py 11..33
How is that done?
- importlib is used to get the module's MAIN,
- UPLOAD
singlemod.py
to the scheduler, - chdir to
/dask_sendbox
and - run
tell()
andmain()
function of the singlemod.py:singlemod.py - writes a file to (worker's)~/dask_sendbox
deprec* singlexec.py - launches./runme
- if not present in (worker's)~/dask_sendbox
, it crashesbatchforworker- bash script, writes file to (worker's)~/dask_sendbox
Previously,batch_for_worker
LOAD and RUN was hardwired in thesignlexec.py
.
singlemod.py
- whatever returns, goes to json and cvs
Monitoring dask
Have open port on scheduler:
xdg-open http://localhost:8787
[TODO]{.todo .TODO} Recollection the data from json {#recollection-the-data-from-json}
to recover...
[TODO]{.todo .TODO} Python native check ?what? {#python-native-check-what}
It must be checked that python works too - as before
DEVELOPMENT
- restart scheduler after updating
daskcheck
- check open port when run
worker[ok]
Appendix
runworker - environment needed and command
#!/bin/bash
# PYTHON bin exports
export PATH=$PATH:$HOME/.local/bin
# ROOT exports
export PYTHONPATH=$HOME/root/lib/
export ROOTSYS=$HOME/root
export PATH=$ROOTSYS/bin:~/bin:$PATH
export LD_LIBRARY_PATH=$ROOTSYS/lib:$ROOTSYS/lib/root:$LD_LIBRARY_PATH
source $HOME/root/bin/thisroot.sh
# CONFIG FOR WORKER
export DISPLAY=:0
export DS=$HOME/.dask_server # get IP of the scheduler
export DSER=`cat $DS`
export HOST=`hostname`
export workers=2 # how many cores to run
export PORT=8786
cd /tmp
if [ -f "$DS" ]; then
echo ... OK $DS exists
else
echo ... NO $DS exists
sleep 5
echo ...
exit 1
fi
echo ... I am on $HOST and trying to connect to /$DSER/ one thread per worker
dask worker ${DSER}:${PORT} --nworkers $workers --nthreads 1
runscheduler - script
#!/bin/bash
#dask scheduler --port 8786
export PATH=$PATH:$HOME/.local/bin
export PORT=8786
export HOST=`hostname`
cd /tmp
if [ "$HOST" = "core6a" ]; then
echo ... starting scheduler
dask scheduler --port ${PORT} # --bokeh-port 8787
fi
echo ... scheduler eneded or not launched at all
sleep 5
exit 0
Run a (python) function from python code
This must be updated...
exo_dask.py
Contains a working (in the past) example, using root
.
This is (or should be) a python code that uses daskcheck
for sending a
function.
It is evidently crippled for the moment...
from daskcheck import daskcheck
from fire import Fire
import time
import platform
import datetime as dt
import json
def main( parlist ):
"""
Initiated by Fire. If one parameter, runs locally with local xcorefunc
"""
parameters = daskcheck.prepare_params( parlist )
if type(parameters)==list:
print("i... viable for DASK ....")
daskcheck.submit( daskcheck.get_cpu_info , parameters)
elif type(parameters)==tuple:
print("i... viable for DASK ....")
daskcheck.submit( daskcheck.get_cpu_info , parameters)
else:
print("i... running only locally")
my_results = xcorefunc( 1 , parameters ) # order = 1, just arbitrary number
# Write LOG file.
now = dt.datetime.now()
stamp = now.strftime("%Y%m%d_%H%M%S")
with open(f"dask_results_log_{stamp}.json", "w") as fp:
json.dump( my_results , fp, sort_keys=True, indent='\t', separators=(',', ': '))
return
def xcorefunc( order, param):
"""
CORE function to be sent to dask schedule.
:param order: order number of the call
:param param: parameter to be sent
"""
import ROOT # *TRICK* I need to import here to avoid breaking pickle
start_time = time.perf_counter()
return order, [platform.node(), f"{time.perf_counter() - start_time:.1f} s" ]
if __name__=="__main__":
Fire(main)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file daskcheck-0.0.16.tar.gz
.
File metadata
- Download URL: daskcheck-0.0.16.tar.gz
- Upload date:
- Size: 19.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.10.12
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2db09072cc17d2533435f7c47382bc75ff91c4792ce51b3d4a3f22064a8e7d12 |
|
MD5 | dfa756d0fe1b8d487933382104ff4342 |
|
BLAKE2b-256 | 32d2e396ee1acf2e1db4f4dc22bb8a8eb177d8ba09665556c870d23cc82f997d |