R/lsf.R

Defines functions run_lsf

Documented in run_lsf

#' Submit array jobs to LSF clusters.
#' 
#' @description This function submits array jobs to LSF clusters.
#' 
#' The input has to be a function that can carry out a full computation
#' itself, plus a data.frame where each row represents the inputs that
#' this function is expecting. The input data.frame is dumped to a file,
#' the input function is wrapped inside an automatically generated R
#' script that gathers inputs from the command line. A LSF submission
#' script is generated for a bash shell. The function can also run the
#' LSF command `bsub` to submit the job, or just generate the required 
#' files and prompt the user to submit the job via the shell. LSF parameters
#' can be provided as a list of parameters, similarly modules and custom
#' filenames for the generated scripts.
#' 
#' @note  The queue and the project ID in `BSUB_config` should  
#' always be provided as they are cluster-specific. Default values will
#' prompt errors submitting the job. Besides, we have found that automatic job submission
#' can sometimes generate some `command not found` types of errors. Manual 
#' submission seems generally the safest option to submit LSF jobs.
#'
#' @param FUN A function that takes any arguments in input, and performs
#' a computation. This function should be runnable as a standalone R script.
#' @param PARAMS A data.frame where each row represents inputs for \code{FUN}.
#' An array job with as many rows as \code{PARAMS} is generated.
#' @param BSUB_config A list of BSUB commands for the LSF cluster should
#' be provided. The default input is obtained from a call to \code{default_BSUB_config()}.
#' The queue and the project ID should always be provided as they are cluster-specific. 
#' Otherwise, default values will prompt errors submitting the job. 
#' @param modules A list of modules that will be added as dependencies of the
#' LSF submission script. For instance \code{modules = 'R/3.5.0'} will generate
#' the dependecy for a specific R version as \code{"module load R/3.5.0"}.
#' @param extra_commands Extra set of commands that will be executed in the submission
#' script right after modules declaration.
#' @param input_file The name of the data.frame input file that is generated
#' from \code{PARAMS}. This file contains no header, and no row names.
#' @param R_script The name of the R script file that contains the definition
#' of \code{FUN}, and some other autogenerated R code to call the function
#' with input parameters from the command line. Function \code{FUN} is given
#' a fake name in this script.
#' @param Submission_script The name of the LSF script file that contains the 
#' submission routines.
#' @param output_folder The output of thsi function will be sent to this folder.
#' 
#' @param run If `TRUE`, the function all attempt invoking `bsub` and submit
#' the array jobs. Otherwise it will print to screen the instructions to run
#' the job manually through the console. 
#'
#' @seealso See \code{default_BSUB_config} that is used to generate
#' default parameters for LSF jobs.
#' 
#' @return Nothing, this funciton just generates the required inputs to submit
#' an array job via the LSF clusters. If required, it also attempts submitting
#' the jobs.
#' 
#' @export
#' 
#'
#' @examples
#' # very dummy example function
#' FUN = function(x, y){ print(x, y) }
#' 
#' # input for 25 array jobs
#' PARAMS = data.frame(x = runif(25), y = runif(25))
#' 
#' \dontrun{
#' # call - not run since it's cluster-specific
#' run_lsf(FUN, PARAMS)
#' }
run_lsf = function(FUN,
               PARAMS,
               BSUB_config = default_BSUB_config(),
               modules = c('R/3.5.0'),
               extra_commands = NULL,
               input_file = 'EASYPAR_LSF_input_jobarray.csv',
               R_script = 'EASYPAR_LSF_Run.R',
               Submission_script =  'EASYPAR_LSF_submission.sh',
               output_folder = '.',
               run = FALSE
               )
{
  # =-=-=-=-=-=-=-=-=-=-=-
  # Stop on error if input is not correct
  # =-=-=-=-=-=-=-=-=-=-=-
  stopifnot(is.function(FUN))
  stopifnot(is.data.frame(PARAMS))
  
  # =-=-=-=-=-=-=-=-=-=-=-
  # Prepare output folder
  # =-=-=-=-=-=-=-=-=-=-=-
  current_wd = getwd()
  if(output_folder != ".") 
  {
    dir.create(output_folder)
    setwd(output_folder)
  } 
  
  cli::cli_rule("easypar: LSF array jobs generator")
  cli::cli_alert("Destination folder: {.field {output_folder}}\n")
  
  # =-=-=-=-=-=-=-=-=-=-=-=-=-=
  # PARAMS go int an output file
  # =-=-=-=-=-=-=-=-=-=-=-=-=-=
  
  # Check column names (for variables)
  variables = colnames(PARAMS)
  
  has_nocols = any(is.null(variables))
  has_dots = any(grepl('\\.', variables))
  has_space = any(grepl(' ', variables))
  
  if(has_nocols | has_dots | has_space) 
    stop(
      'PARAMS should be a data.frame with column names, without spaces or dots. Aborting.'
    )

  write.table(
    PARAMS,
    input_file,
    quote = FALSE, 
    row.names = FALSE,
    col.names = FALSE, 
    sep = '\t'
  )
  
  # =-=-=-=-=-=-=-=-=-=-=-=-=-=
  # FUN goes into a file, as string
  # =-=-=-=-=-=-=-=-=-=-=-=-=-=
  info = paste0(
    "# =-=-=-=-=-=-=-=-=-=-=-=-=-==-=-=-=-=-=-=-=-=\n",
    "# Automatic R script generated via easypar\n",
    '# ', Sys.time(), '\n',
    "# =-=-=-=-=-=-=-=-=-=-=-=-=-==-=-=-=-=-=-=-=-=\n"
  )
  
  FUN_str = capture.output(print(FUN))
  
  non_source_code = sapply(FUN_str, function(x) startsWith(x, "<bytecode:") | startsWith(x, "<environment:") )
  FUN_str = FUN_str[!non_source_code]
  
  FUN_str = paste0(FUN_str, collapse = '\n')
  
  FUN_closing = paste0(
    "# EASYPAR Autogenerated R-code\n",
    "if(sys.nframe() == 0L) {\n",
    "   args = commandArgs(trailingOnly = TRUE)\n",
    Reduce(paste0,
           lapply(
             seq_along(variables), 
             function(x) paste0("   ", variables[x], ' = args[',x, ']\n' ))),
    "print(\"\\nInput parameters for the function\\n\")\n",
    Reduce(paste0,
           lapply(
             seq_along(variables), 
             function(x) paste0("   print(", variables[x], ')\n' ))),
    '   easypar_generated_function(', paste(variables, collapse = ', '), ')\n',
    "}"
  )
  

  FUN_str = 
    paste(
      info, '\n',
      'easypar_generated_function', '=', FUN_str,
          '\n',
          FUN_closing
          )
  
  write(FUN_str, R_script)
  
  # =-=-=-=-=-=-=-=-=-=-=-=-=-=
  # Assemble input script file
  # =-=-=-=-=-=-=-=-=-=-=-=-=-=
  separator = '\n'
  shell = '#!/bin/bash\n'
  
  # Header script for job submission -- special handling for -J
  BSUB_config_header = lapply(names(BSUB_config),
                              function(x)
                              {
                                bsub = paste0("#BSUB ", x, ' ', BSUB_config[[x]])
                                
                                if (x == '-J')
                                  bsub = paste0(bsub, '[1-', nrow(PARAMS), ']')
                                
                                paste0(bsub, '\n')
                              })
  
  BSUB_config_header = Reduce(paste0, BSUB_config_header)
  
  # Info easypar
  info = paste0(
    "# =-=-=-=-=-=-=-=-=-=-=-=-=-==-=-=-=-=-=-=-=-=\n",
    "# Automatic LSF script generated via easypar\n",
    '# ', Sys.time(), '\n',
    "# =-=-=-=-=-=-=-=-=-=-=-=-=-==-=-=-=-=-=-=-=-=\n"
  )
  
  # Required modeules
  modules = lapply(
    modules, 
    function(m){
      paste0('module load ', m, ' ', '\n')
    })
  modules = Reduce(paste0, modules)
  
  # Assemble LSF commands, info and modules
  header = paste0(
    shell, separator,
    BSUB_config_header,
    separator,
    info,
    separator,
    "# Required modules\n",
    modules,
    extra_commands,
    separator
  )
  
  # Assemble variables
  core_script = 
    paste0(
      'file_input=',input_file, '\n',
      'R_script=',R_script, '\n',
      'line=$LSB_JOBINDEX\n'
    )
  
  # Assemble awk to load input
  core_script_awk = lapply(
    seq_along(variables), 
    function(v)
    {
      paste0(
        variables[v], "=$( awk -v line=$line 'BEGIN {FS=\"\\t\"}; FNR==line ",
        "{print $", v, "}' ",
        "$file_input)\n"
      )
    })
  core_script_awk = Reduce(paste0, core_script_awk)
  
  # Assemble call the R script
  core_script_launch = paste0(
    'Rscript $R_script ', paste0('$', variables, collapse = ' ')
  )
  
  lsf_script = paste0(
      header,
      separator,
      "# Input file and R script\n",
      core_script,
      separator,
      "# Data loading\n",
      core_script_awk,
      separator,
      "# Job run\n",
      core_script_launch
    )
  
  write(lsf_script, Submission_script)
  
  # =-=-=-=-=-=-=-=-=-=-=-=-=-=
  # For Logs prepare output
  # =-=-=-=-=-=-=-=-=-=-=-=-=-=
  if(
    grepl(BSUB_config$`-o`, pattern = '/')
  )
  {
    folder = strsplit(BSUB_config$`-o`, split = '/')[[1]]
    
    if(length(folder) > 1) {
      cat("Creating folder for output and error logs: ", folder[1], '\n')
      dir.create(folder[1])
    }
  }
  
  # =-=-=-=-=-=-=-=-=-=-=-=-=-=
  # Final confirmation
  # =-=-=-=-=-=-=-=-=-=-=-=-=-=
  
  # Notification
  cli::cli_alert_success("LSF submission script: {.field {lsf_script}} (R runner: {.field {R_script}})")
  cli::cli_alert_success(" Input file (head of): {.field {input_file}}")
  system(paste0('head ', input_file))
  
  if(run)
  {
    # query for submission confirmation
    cat(separator, separator)
    repeat{
      flush.console()
      cat(paste0('Submit N = ', nrow(PARAMS), ' job(s) ? [Yes/no] '))
      answer = readline()
      
      if(answer %in% c('Yes', "Y", 'y', 'yes', 'No', 'N', 'n', 'no')) break;
    }
    
    
    if(answer %in% c('Yes', "Y", 'y', 'yes'))
    {
      message("\nSubmission confirmed, submitting jobs.\n")
      system(paste0('bsub < ', Submission_script))
      system(paste0('bjobs'))
      
    } 
    else
    {
      message("\nSubmission cancelled, deleting generated files.\n")
      
      file.remove(input_file)
      file.remove(R_script)
      file.remove(Submission_script)
    }
  }
  else{
    cli::cli_h2(paste0(crayon::white('Scripts generated')))
    cat('\n')
    
    # message("\nScripts generated, submit your job with the following shell command.\n")
    # cat(paste0('bsub < ', Submission_script), '\n')
    
    cli::cat_line(paste0(crayon::yellow('Job submission:'),  ' bsub < ', Submission_script))
    cli::cat_line(paste0(crayon::yellow('   Job testing:'),  ' Rscript ', 
                         R_script, ' ', paste0(PARAMS[1, ], collapse = ' ')))
    
  }
  
  setwd(current_wd)
  
  invisible(1)
}
caravagnalab/easypar documentation built on June 10, 2022, 6:05 a.m.