#' Title Preparing parameters to send to Job
#'
#' @param parameters is a list with all parameters needed by the function
#' to run within the object
#' @return The RDS file that stores the parameters
#' @export
#'
#' @examples
packJobParameters = function(parameters,wd="~/tmp/"){
fname = paste0(wd,"/_",as.character(signif(runif(1),5)),".rds")
saveRDS(parameters,fname)
return(fname)
}
#' Title business logic of the Job put to work
#'
#' @param token
#'
#' @return
#' @export
#'
#' @examples
runJob = function(token){
cat("Running job with token",token,"\n")
myparams <- readRDS(token)
targetFunc = myparams$fun
myparams$fun = NULL
toSave = list(result=do.call(what="targetFunc",args=myparams),
params=myparams)
saveRDS(toSave,paste0(token,"_out.rds"))
}
#' Title
#'
#' @param jobid as generated by the SGE system
#'
#' @return TRUE if the jobid is found in the first column of a qstat command output
#' @export
#'
#' @examples
jobRunning = function(job,wd){
tmpfile = paste0(wd,"/qstat.out")
queue = system("qstat",intern=T)
if(length(queue) > 0){
isit = length(unlist(lapply(queue,function(x){ grep(job,x)}))) > 0
return(isit)
}
return(T)
}
#' Title How to send a job to the queue
#'
#' @param parameters This is a list with the parameters, the name of the parameter is the key of the hash.
#' The parameter contents are the content of the hash
#' @param clParams List of parameters which are specific to this job and to the cluster, e.g. asking for memory
#' or dedicated hardware
#' @param prefix A prefix to use in the files, may be useful for distinguishing between jobs
#'
#' @return A handler to the job: it is a list with the following elements
#' -paramfile: the full path to the file with the RDS object storing the arguments to the function called when running
#' the job
#' -jobname: the name of the job within the queue
#' -outfile: the full path to the RDS file where results will be stored when the job finishes
#' -logfile/errorfile: full paths to the logs and errors generated by the job
#' @export
#'
#' @examples
launchJob = function(parameters,
clParams="",
wd="~/tmp/",
justPrepareForLaunch=F,
prefix=NULL){
token = packJobParameters(parameters,wd=wd)
expid = paste0("J_",ifelse(is.null(prefix),"U",prefix),"_",as.character(signif(runif(1),5)))
logfile.log = paste0(wd,"/",expid,".log")
logfile.e = paste0(wd,"/",expid,".e")
logFiles = list(log=logfile.log,e=logfile.e)
command = paste0("echo \"Rscript -e \\\"library(sgefacilities);",
"runJob(token=\\\\\\\"",token,"\\\\\\\")",
"\\\"\" | qsub -S /bin/bash -N ",expid," ",
clParams,
" -o ",logfile.log," -e ",logfile.e)
cat("The command is\n",command,"\n")
if(!justPrepareForLaunch){
jobid = system(command,intern=T)
if(!jobRunning(jobid,wd)){
cat("Something went wrong with the Job????\n")
}else
cat("Job",jobid,"is queued\n")
}else
jobid="none"
return(list(paramfile=token,
jobname=expid,
jobid=jobid,
wd=wd,
command=command,
outfile=paste0(token,"_out.rds"),
logfile=logfile.log,
errfile=logfile.e))
}
submitJobs = function(handlers,batchSize=1000,timePerBatch=1800){
newhandlers = NULL
packs = seq(1,length(handlers),batchSize)
for(pack in packs){
if(pack + batchSize > length(handlers))
right = length(handlers)
else
right = pack + batchSize - 1
bhandlers = names(handlers)[pack:right]
cat("Qsubmitting from",pack,"to",right,"jobs\n")
for(handleri in bhandlers){
handler = handlers[[handleri]]
cat("The command is\n",handler$command,"\n")
jobid = system(handler$command,intern=T)
if(!jobRunning(jobid,handler$wd)){
cat("Something went wrong with the Job????\n")
}else
cat("Job",jobid,"is queued\n")
handler$jobid = jobid
newhandlers[[handler$jobname]] = handler
}
if(which(packs == pack) < length(packs))
Sys.sleep(timePerBatch)
}
return(newhandlers)
}
#' Title Wait for jobs to finish and collect results
#'
#' @param handlers The launchJob function will return a handler to each job. Handlers is a list with all the
#' job handlers we want to wait for
#' @param timeLimit Wait for all to finish no longer than timeLimit seconds
#' @param increment Test whether some/all jobs finished each increment seconds
#' @param removeLogs Clean up the log files after checking that the job finished
#'
#' @return A list with all the job results. They are indexed by the same job id at the handlers
#' @export
#'
#' @examples
waitForJobs = function(handlers,
timeLimit= 24*3600,
increment=30,
removeLogs=T,
removeData=T,
wd="~/tmp/",
qstatworks=F){
waitForReady = names(handlers)
elapsed = 0
responses = NULL
while(length(waitForReady) > 0 & elapsed < timeLimit){
for(index in waitForReady){
if(!is.null(handlers[[index]])){
cat("Checking for",handlers[[index]]$outfile,"\n")
if(file.exists(handlers[[index]]$outfile)){
cat("Job from",handlers[[index]]$outfile,"finished\n")
Sys.sleep(5)
responses[[index]] = readRDS(handlers[[index]]$outfile)
waitForReady = waitForReady[waitForReady != index]
if(removeData){
file.remove(handlers[[index]]$outfile)
file.remove(handlers[[index]]$paramfile)
}
if(removeLogs){
file.remove(handlers[[index]]$logfile)
file.remove(handlers[[index]]$errfile)
}
}else{
cat("Checking state of",handlers[[index]]$jobname,"\n")
if(qstatworks){
if(!jobRunning(handlers[[index]]$jobid,wd)){
cat("No results and no job running:",handlers[[index]]$jobname,"\n")
responses[[index]] = NULL
waitForReady = waitForReady[waitForReady != index]
}
}else{
cat("We know nothing about",handlers[[index]]$jobname,"so far\n")
}
}
}
}
Sys.sleep(increment)
elapsed = elapsed + increment
cat("Waking up, still",length(waitForReady),"files to read out of",length(handlers),
" and ",timeLimit-elapsed," seconds to go\n")
flush.console()
}
cat("Done with the",length(handlers),"handlers\n")
return(responses)
}
#' Title Collect return values from all jobs that finished
#'
#' @param handlers The launchJob function will return a handler to each job. Handlers is a list with all the
#' job handlers we want to collect data from
#' @param removeLogs Clean up the log files after checking that the job finished
#' @param removeData Clean up everything left from that job
#'
#' @return A list with an 'indexes' element with the jobs that finished and 'results' with the return values
#' for these jobs
#' @export
#'
#' @examples
collectResults = function(handlers,
removeLogs=T,
removeData=T){
if(typeof(handlers) == "character")
handlers = readRDS(handlers)
results = NULL
for(index in names(handlers)){
if(!is.null(handlers[[index]])){
if(file.exists(handlers[[index]]$outfile)){
#cat("Job from",handlers[[index]]$outfile,"finished\n")
results[[index]] = readRDS(handlers[[index]]$outfile)
}
}
}
return(list(indexes=names(results),results=results))
}
reportOnJobs = function(handlers,wd="~/tmp/"){
if(typeof(handlers) == "character")
handlers = readRDS(handlers)
running = finished = wrong = NULL
for(index in names(handlers)){
if(!is.null(handlers[[index]])){
if(file.exists(handlers[[index]]$outfile)){
#cat("Job from",handlers[[index]]$outfile,"finished\n")
finished = c(finished,index)
}else{
if(!jobRunning(handlers[[index]]$jobid,wd)){
#cat("No results and no job running:",handlers[[index]]$jobname,"\n")
wrong = c(wrong,index)
}else{
running = c(running,index)
}
}
}
}
cat("Total Jobs",length(handlers),"were launched\n")
if(!is.null(running))
cat(length(running),"jobs running\n")
else
cat("No jobs running\n")
if(!is.null(wrong))
cat(length(wrong),"jobs wrong, neither running nor results available\n")
else
cat("No wrong jobs so far\n")
if(!is.null(finished))
cat(length(finished),"jobs done\n")
else
cat("No jobs finished\n")
missing = length(handlers) - length(finished) - length(running) - length(wrong)
if(missing)
cat(missing,"jobs are missing\n")
cat("Done with the",length(handlers),"handlers\n")
return(list(finished=handlers[finished],
running=handlers[running],
wrong=handlers[wrong]))
}
#' Title
#'
#' @param handlers
#' @param wd
#'
#' @return
#' @export
#'
#' @examples
cleanUpJobs = function(handlers,
wd="~/tmp/"){
for(index in names(handlers)){
file.remove(handlers[[index]]$outfile)
file.remove(handlers[[index]]$paramfile)
file.remove(handlers[[index]]$logfile)
file.remove(handlers[[index]]$errfile)
}
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.