R/sge.R

#' Title Preparing parameters to send to Job
#'
#' @param parameters is a list with all parameters needed by the function
#' to run within the object
#' @return The RDS file that stores the parameters
#' @export
#'
#' @examples
packJobParameters = function(parameters,wd="~/tmp/"){
  fname = paste0(wd,"/_",as.character(signif(runif(1),5)),".rds")
  saveRDS(parameters,fname)
  return(fname)
}

#' Title business logic of the Job put to work
#'
#' @param token
#'
#' @return
#' @export
#'
#' @examples
runJob = function(token){
  cat("Running job with token",token,"\n")
  myparams <- readRDS(token)
  targetFunc = myparams$fun
  myparams$fun = NULL
  toSave = list(result=do.call(what="targetFunc",args=myparams),
                params=myparams)
  saveRDS(toSave,paste0(token,"_out.rds"))
}


#' Title
#'
#' @param jobid as generated by the SGE system
#'
#' @return TRUE if the jobid is found in the first column of a qstat command output
#' @export
#'
#' @examples
jobRunning = function(job,wd){
  tmpfile = paste0(wd,"/qstat.out")
  queue = system("qstat",intern=T)
  if(length(queue) > 0){
    isit = length(unlist(lapply(queue,function(x){ grep(job,x)}))) > 0
    return(isit)
  }
  return(T)
}

#' Title How to send a job to the queue
#'
#' @param parameters This is a list with the parameters, the name of the parameter is the key of the hash.
#' The parameter contents are the content of the hash
#' @param clParams List of parameters which are specific to this job and to the cluster, e.g. asking for memory
#' or dedicated hardware
#' @param prefix A prefix to use in the files, may be useful for distinguishing between jobs
#'
#' @return A handler to the job: it is a list with the following elements
#' -paramfile: the full path to the file with the RDS object storing the arguments to the function called when running
#' the job
#' -jobname: the name of the job within the queue
#' -outfile: the full path to the RDS file where results will be stored when the job finishes
#' -logfile/errorfile: full paths to the logs and errors generated by the job
#' @export
#'
#' @examples
launchJob = function(parameters,
                     clParams="",
                     wd="~/tmp/",
                     justPrepareForLaunch=F,
                     prefix=NULL){

  token = packJobParameters(parameters,wd=wd)
  expid = paste0("J_",ifelse(is.null(prefix),"U",prefix),"_",as.character(signif(runif(1),5)))
  logfile.log = paste0(wd,"/",expid,".log")
  logfile.e = paste0(wd,"/",expid,".e")
  logFiles = list(log=logfile.log,e=logfile.e)
  command = paste0("echo \"Rscript -e \\\"library(sgefacilities);",
                   "runJob(token=\\\\\\\"",token,"\\\\\\\")",
                   "\\\"\" | qsub -S /bin/bash -N ",expid," ",
                   clParams,
                   " -o ",logfile.log," -e ",logfile.e)
  cat("The command is\n",command,"\n")
  if(!justPrepareForLaunch){
    jobid = system(command,intern=T)
    if(!jobRunning(jobid,wd)){
      cat("Something went wrong with the Job????\n")
    }else
      cat("Job",jobid,"is queued\n")
  }else
    jobid="none"

  return(list(paramfile=token,
              jobname=expid,
              jobid=jobid,
              wd=wd,
              command=command,
              outfile=paste0(token,"_out.rds"),
              logfile=logfile.log,
              errfile=logfile.e))
}

submitJobs = function(handlers,batchSize=1000,timePerBatch=1800){
  newhandlers = NULL
  packs = seq(1,length(handlers),batchSize)
  for(pack in packs){
    if(pack + batchSize > length(handlers))
      right = length(handlers)
    else
      right = pack + batchSize - 1
    bhandlers = names(handlers)[pack:right]
    cat("Qsubmitting from",pack,"to",right,"jobs\n")
    for(handleri in bhandlers){
      handler = handlers[[handleri]]
      cat("The command is\n",handler$command,"\n")
      jobid = system(handler$command,intern=T)
      if(!jobRunning(jobid,handler$wd)){
        cat("Something went wrong with the Job????\n")
      }else
        cat("Job",jobid,"is queued\n")
      handler$jobid = jobid
      newhandlers[[handler$jobname]] = handler
    }
    if(which(packs == pack) < length(packs))
      Sys.sleep(timePerBatch)
  }
  return(newhandlers)
}

#' Title Wait for jobs to finish and collect results
#'
#' @param handlers The launchJob function will return a handler to each job. Handlers is a list with all the
#' job handlers we want to wait for
#' @param timeLimit Wait for all to finish no longer than timeLimit seconds
#' @param increment Test whether some/all jobs finished each increment seconds
#' @param removeLogs Clean up the log files after checking that the job finished
#'
#' @return A list with all the job results. They are indexed by the same job id at the handlers
#' @export
#'
#' @examples
waitForJobs = function(handlers,
                       timeLimit= 24*3600,
                       increment=30,
                       removeLogs=T,
                       removeData=T,
                       wd="~/tmp/",
                       qstatworks=F){


  waitForReady = names(handlers)
  elapsed = 0
  responses = NULL

  while(length(waitForReady) > 0 & elapsed < timeLimit){
    for(index in waitForReady){
      if(!is.null(handlers[[index]])){
        cat("Checking for",handlers[[index]]$outfile,"\n")
        if(file.exists(handlers[[index]]$outfile)){
          cat("Job from",handlers[[index]]$outfile,"finished\n")
          Sys.sleep(5)
          responses[[index]] = readRDS(handlers[[index]]$outfile)
          waitForReady = waitForReady[waitForReady != index]

          if(removeData){
            file.remove(handlers[[index]]$outfile)
            file.remove(handlers[[index]]$paramfile)
          }
          if(removeLogs){
            file.remove(handlers[[index]]$logfile)
            file.remove(handlers[[index]]$errfile)
          }
        }else{
          cat("Checking state of",handlers[[index]]$jobname,"\n")

          if(qstatworks){
            if(!jobRunning(handlers[[index]]$jobid,wd)){
              cat("No results and no job running:",handlers[[index]]$jobname,"\n")
              responses[[index]] = NULL
              waitForReady = waitForReady[waitForReady != index]

            }
          }else{
            cat("We know nothing about",handlers[[index]]$jobname,"so far\n")
          }

        }
      }

    }
    Sys.sleep(increment)
    elapsed = elapsed + increment
    cat("Waking up, still",length(waitForReady),"files to read out of",length(handlers),
        " and ",timeLimit-elapsed," seconds to go\n")
    flush.console()
  }
  cat("Done with the",length(handlers),"handlers\n")
  return(responses)

}

#' Title Collect return values from all jobs that finished
#'
#' @param handlers The launchJob function will return a handler to each job. Handlers is a list with all the
#' job handlers we want to collect data from
#' @param removeLogs Clean up the log files after checking that the job finished
#' @param removeData Clean up everything left from that job
#'
#' @return A list with an 'indexes' element with the jobs that finished and 'results' with the return values
#' for these jobs
#' @export
#'
#' @examples
collectResults = function(handlers,
                          removeLogs=T,
                          removeData=T){

  if(typeof(handlers) == "character")
    handlers = readRDS(handlers)
  results = NULL

  for(index in names(handlers)){
    if(!is.null(handlers[[index]])){
      if(file.exists(handlers[[index]]$outfile)){
        #cat("Job from",handlers[[index]]$outfile,"finished\n")
        results[[index]] = readRDS(handlers[[index]]$outfile)

      }
    }
  }

  return(list(indexes=names(results),results=results))
}

reportOnJobs = function(handlers,wd="~/tmp/"){
  if(typeof(handlers) == "character")
    handlers = readRDS(handlers)

  running = finished = wrong = NULL

  for(index in names(handlers)){
    if(!is.null(handlers[[index]])){
      if(file.exists(handlers[[index]]$outfile)){
        #cat("Job from",handlers[[index]]$outfile,"finished\n")
        finished = c(finished,index)
      }else{
        if(!jobRunning(handlers[[index]]$jobid,wd)){
          #cat("No results and no job running:",handlers[[index]]$jobname,"\n")
          wrong = c(wrong,index)
        }else{
          running = c(running,index)
        }
      }
    }

  }
  cat("Total Jobs",length(handlers),"were launched\n")
  if(!is.null(running))
    cat(length(running),"jobs running\n")
  else
    cat("No jobs running\n")

  if(!is.null(wrong))
    cat(length(wrong),"jobs wrong, neither running nor results available\n")
  else
    cat("No wrong jobs so far\n")

  if(!is.null(finished))
    cat(length(finished),"jobs done\n")
  else
    cat("No jobs finished\n")

  missing = length(handlers) - length(finished) - length(running) - length(wrong)
  if(missing)
    cat(missing,"jobs are missing\n")

  cat("Done with the",length(handlers),"handlers\n")
  return(list(finished=handlers[finished],
              running=handlers[running],
              wrong=handlers[wrong]))

}

#' Title
#'
#' @param handlers
#' @param wd
#'
#' @return
#' @export
#'
#' @examples
cleanUpJobs = function(handlers,
                       wd="~/tmp/"){

  for(index in names(handlers)){
    file.remove(handlers[[index]]$outfile)
    file.remove(handlers[[index]]$paramfile)
    file.remove(handlers[[index]]$logfile)
    file.remove(handlers[[index]]$errfile)
  }

}
juanbot/sgefacilities documentation built on May 24, 2019, 9:54 a.m.