R/runQSLIMFinder.R

##' Run a list of QSLIMFinder jobs
##' @rdname runQSLIMFinder
##' @name runQSLIMFinder
##' @author Vitalii Kleshchevnikov
##' @param file_list data.table containing path to files and directories for QSLIMFinder: fastafile, queryfile, outputdir, outputfile
##' @param commands character vector of commands generated by \code{\link{QSLIMFinderCommand}} that will launch QSLIMFinder jobs, commands_list[[2]]
##' @param commands_list list containing: 1. command to set up enviromental variable IUPred_PATH; 2. character vector of bash commands that will lauch QSLIMFinder as a job on LSF cluster; 3, 4, 5 - directories where LSF should write stout and sterr
##' @param max_memory max memory to use for a job
##' @param onLSF use LSF cluster to run QSLIMFinder jobs
##' @param recursive how to create directories
##' @param Njobs_limit integer, the number of LSF jobs allowed to run simultaneously
##' @return list: character vector, commands that have crashed even at \code{max_memory}; data.table, corresponding rows of \code{file_list}
##' @import data.table
##' @export runQSLIMFinder
##' @seealso \code{\link{QSLIMFinderCommand}}
runQSLIMFinder = function(commands_list, file_list, max_memory = 5000, onLSF = T, recursive = F, lsf_keyword = "TERM_MEMLIMIT", rm_log = T, memory_step = 100, memory_start = 200, Njobs_limit = 490){
  if(mean(c("set_env_var","run") %in% names(commands_list)) < 1) stop("`commands_list` doesn't contain `set_env_var` and/or `run`, check that `commands_list` is an output of mQSLIMFinderCommand")
  if(onLSF){
    # create dirs for stout and sterr
    if(!is.null(commands_list$log_dirfull)) if(!dir.exists(commands_list$log_dirfull)) dir.create(commands_list$log_dirfull, recursive = recursive)
    if(!is.null(commands_list$log_dirlog)) if(!dir.exists(commands_list$log_dirlog)) dir.create(commands_list$log_dirlog, recursive = recursive)
    if(!is.null(commands_list$log_direrror)) if(!dir.exists(commands_list$log_direrror)) dir.create(commands_list$log_direrror, recursive = recursive)

    # set up enviromental variable IUPred_PATH
    system(commands_list$set_env_var)
    # run runQSLIMFinder and wait until all jobs finish
    LSFrunQSLIMFinder(commands_list$run, Njobs_limit = Njobs_limit)
    # find which jobs have crashed
    commands_crashed = jobsCrashed(commands_list, rm_log = rm_log, lsf_keyword = lsf_keyword)

    memory_vals = seq(memory_start, max_memory, memory_step)
    for(memory_val in memory_vals){
      if(length(commands_crashed) >= 1){
        commands_crashed = modifyMemoryInBsub(commands_crashed, memory = memory_val)
        LSFrunQSLIMFinder(commands_crashed, Njobs_limit = Njobs_limit)
        commands_crashed = jobsCrashed(commands_list, rm_log = rm_log, lsf_keyword = lsf_keyword)
      }
    }
return((commands_crashed))
  } else {stop("onLSF = F not implemented") }
}


##' @rdname runQSLIMFinder
##' @name modifyMemoryInBsub
##' @param memory integer, how much memory should the job use
##' @import data.table
##' @export modifyMemoryInBsub
modifyMemoryInBsub = function(commands, memory){
  commands = gsub("-M [[:digit:]]+ ",paste0("-M ",memory," "),commands)
  commands = gsub("rusage\\[mem=[[:digit:]]+\\]",paste0("rusage\\[mem=",memory,"\\]"),commands)
  commands
}

##' @rdname runQSLIMFinder
##' @name LSFrunQSLIMFinder
##' @param job_name_sig string common to all job names
##' @import data.table
##' @export LSFrunQSLIMFinder
LSFrunQSLIMFinder = function(commands, job_name_sig = "batch_", Njobs_limit = 490) {
  for (command in commands) {
    Nbjobs = length(system("bjobs", intern =T)) - 1
    done = FALSE
    n = 1
    while(!done){
      if(Nbjobs < Njobs_limit){
        system(command, wait = F)
        done = TRUE
      } else {
        Sys.sleep(100)
        Nbjobs = length(system("bjobs", intern =T)) - 1
      }
    }
  }
  bjobs = system("bjobs -w", intern =T)
  finished = sum(grepl(job_name_sig, bjobs)) == 0
  while(!finished){
    bjobs = system("bjobs -w", intern =T)
    finished = sum(grepl(job_name_sig, bjobs)) == 0
    Sys.sleep(10)
  }
}

##' @rdname runQSLIMFinder
##' @name jobsCrashed
##' @param lsf_keyword character, LSF termination reason keyword (https://www.ibm.com/support/knowledgecenter/en/SSETD4_9.1.3/lsf_admin/termination_reasons_lsf.html). Defaults to "TERM_MEMLIMIT"
##' @param rm_log remove log after checking for termination reason
##' @return \code{jobsCrashed()}: vector of commands that crashed
##' @import data.table
##' @export jobsCrashed
jobsCrashed = function(commands_list, rm_log = T, lsf_keyword = "TERM_MEMLIMIT") {
  logs = gsub("bsub.+ -o | -e.+$","", commands_list$run)
  crashed = sapply(logs, function(log) {
    suppressWarnings({
      reason_length = length(system(paste0("grep ",lsf_keyword," ", log), intern = T))
      reason_logic = reason_length == 1
      if(rm_log & reason_logic) system(paste0("rm ", log))
    })
    reason_logic
  })
  commands_crashed = commands_list$run[crashed]
}
vitkl/SLIMFinderR documentation built on May 3, 2019, 8:08 p.m.