R/cluster_job_submit.R

Defines functions cluster_submit_shell_jobs cluster_job_submit

Documented in cluster_job_submit cluster_submit_shell_jobs

#' This function submits a single script to a high-performance cluster using a scheduler (Slurm or TORQUE).
#' It accepts a vector of arguments to be passed to the scheduler and
#' a vector of environment variables that should be passed to the compute node at job execution.
#'
#' The function returns the jobid of the scheduled job.
#'
#' @param script A script that should be executed by the scheduler. This can contain scheduler directives, but in the
#'      case of conflicts, the directives passed with \code{sched_args} will take precedence.
#' @param scheduler Which scheduler to use for job submission. Options are 'qsub', 'torque', 'sbatch', 'slurm', or 'sh'.
#'      The terms 'qsub' and 'torque' are aliases (where 'torque' submits via the qsub command). Likewise for 'sbatch'
#'      and 'slurm'. The scheduler 'sh' does not submit to any scheduler at all, but instead executes the command
#'      immediately via sh.
#' @param sched_args A character vector of arguments to be included in the scheduling command. On TORQUE, these
#'      will typically begin with '-l' such as '-l wall_time=10:00:00'.
#' @param env_variables A named character vector containing environment variables and their values to be passed
#'      to the \code{script} at execution time. This is handled by the '-v' directive on TORQUE clusters and
#'      by '--export' on Slurm clusters. The names of this vector are the environment variable names and
#'      the values of the vector are the environment variable values to be passed in.
#'      If you want to propagate the current value of an environment variable to the compute node at runtime,
#'      use NA as the value of the element in \code{env_variables}. See examples.
#' @param export_all Whether to export all environment variables to the compute node at runtime. Default: FALSE
#' @param echo Whether to echo the job submission command to the terminal at the time it is scheduled. Default: TRUE.
#' @param fail_on_error Whether to stop execution of the script (TRUE), or issue a warning (FALSE) if the job
#'      submission fails. Defaults to FALSE (i.e., issue a warning).
#' @param wait_jobs a character string of jobs or process ids that should complete before this job is executed
#' @param wait_signal on torque or slurm clusters, the signal that should indicate that parent jobs have finished.
#' @param repolling_interval The number of seconds to wait before rechecking job status (used only for local scheduler)
#'
#' @return A character string containing the jobid of the scheduled job.
#'
#' @examples
#' \dontrun{
#'   #simple PBS submission
#'   cluster_job_submit('myscript.bash', scheduler="torque", sched_args=c('-l walltime=10:00:00', '-l nodes=1:ppn=20'),
#'      env_variables=c(RUN_INDEX=2, MODEL_NAME='FSE21'))
#'
#'   #To forward environment variables without explicitly providing values. Note that these must
#'   #  be in R's system environment (cf. Sys.getenv) at execution time to forward as expected.
#'   cluster_job_submit('myscript.sbatch', scheduler="slurm",
#'      sched_args=c('-p general', '-N 1', '-n 12', '--mem=10g', '-t 02-00:00:00'),
#'      env_variables=c(RUN_INDEX=2, R_HOME=NA, JAVA_HOME=NA))
#' }
#'
#' @author Michael Hallquist
#' @importFrom tools file_path_sans_ext
#' @importFrom checkmate assert_character assert_subset
#' @export
cluster_job_submit <- function(script, scheduler="slurm", sched_args=NULL,
                           env_variables=NULL, export_all=FALSE, echo=TRUE, fail_on_error=FALSE, 
                           wait_jobs=NULL, wait_signal="afterok", repolling_interval=60, 
                           tracking_sqlite_db=NULL, tracking_args=list()) {

  checkmate::assert_string(script)
  checkmate::assert_file_exists(script)
  checkmate::assert_string(scheduler)
  checkmate::assert_subset(scheduler, c("qsub", "torque", "sbatch", "slurm", "sh", "local"))
  checkmate::assert_logical(export_all, max.len = 1L)
  checkmate::assert_logical(echo, max.len = 1L)
  checkmate::assert_logical(fail_on_error, max.len=1L)
  if (is.character(tracking_args) || is.numeric(tracking_args)) tracking_args <- as.list(tracking_args) # coerce tracking args to list
  if (length(tracking_args) > 0 && is.null(tracking_sqlite_db)) {
    warning("Tracking arguments provided to `cluster_job_submit` but `tracking_sqlite_db` is NULL")
  }

  if (scheduler %in% c("torque", "qsub")) {
    scheduler <- "qsub" # simpler internal tracking
    if (isTRUE(export_all)) {
      sched_args <- c("-V", sched_args)
    } # directive to export all environment variables to script
  } else if (scheduler %in% c("slurm", "sbatch")) {
    scheduler <- "sbatch"
    if (isTRUE(export_all)) {
      env_variables <- c(ALL = NA, env_variables)
    } # directive to export all environment variables to script
  } else if (scheduler %in% c("sh", "local")) {
    scheduler <- "sh"
    if (!is.null(sched_args)) {
      message("Omitting scheduler arguments for sh/local execution")
    }
    sched_args <- NULL # not relevant
  }

  # Scheduler arguments are just pasted together with spaces.
  # Thus, arguments like '--mem=5g' and '-n 12' are not handled differently
  if (!is.null(sched_args)) { sched_args <- paste(sched_args, collapse=" ") }

  #subfunction to handle variable=value and variable combinations
  paste_args <- function(str_vec) {
    nms <- names(str_vec)
    sapply(seq_along(str_vec), function(x) {
      if (is.na(str_vec[x])) {
        return(nms[x]) #just the name of the env variable (forwards from environment)
      } else {
        #force argument to be quoted to avoid problems with spaces
        val <- ifelse(grepl("^[\"'].*[\"']$", str_vec[x], perl=TRUE), str_vec[x], paste0("\"", str_vec[x], "\""))
        return(paste0(nms[x], "=", val))
      }
    })
  }

  #pass through environment variables
  if (!is.null(env_variables)) {
    env_variables <- paste_args(env_variables) #convert to appropriate name-value pairs
    if (scheduler == "qsub") {
      env_variables <- paste("-v", paste(env_variables, collapse=","))
    } else if (scheduler == "sbatch") {
      env_variables <- paste0("--export=", paste(env_variables, collapse=","))
    } else if (scheduler == "sh") {
      env_variables <- paste(sapply(env_variables, function(x) {
        ifelse(grepl("=", x, fixed = TRUE), x, paste0(x, "=\"$", x, "\""))
      }), collapse = " ")
    }

    sched_args <- paste(sched_args, env_variables)
  }

  if (!is.null(wait_jobs)) {
    jcomb <- paste(wait_jobs, collapse = ":") # multiple jobs are separated by colons
    if (scheduler == "qsub") {
      sched_args <- paste0(sched_args, " -W depend=", wait_signal, ":", jcomb)
    } else if (scheduler == "sbatch") {
      sched_args <- paste0(sched_args, " --dependency=", wait_signal, ":", jcomb)
    }
  }

  # use unique temp files to avoid parallel collisions in job tracking
  sub_stdout <- paste0(tempfile(), "_", tools::file_path_sans_ext(basename(script)), "_stdout") 
  sub_stderr <- paste0(tempfile(), "_", tools::file_path_sans_ext(basename(script)), "_stderr")
  sub_pid <- paste0(tempfile(), "_", tools::file_path_sans_ext(basename(script)), "_pid")

  if (scheduler == "sh") {
    # if an R script is provided, execute with Rscript --vanilla as command
    if (grepl(".+\\.R$", script, ignore.case = TRUE)) {
      bin <- "Rscript --vanilla"
    } else {
      bin <- "sh"
    }

    # for local scheduler, we need to hold jobs manually by waiting for relevant parents to complete
    if (!is.null(wait_jobs)) {
      message("Waiting for the following jobs to finish: ", paste(wait_jobs, collapse=","))
      wait_for_job(wait_jobs, repolling_interval = repolling_interval, scheduler = scheduler)
      # ZV: grab return code here for parent job failing?
    }

    # for direct execution, need to pass environment variables by prepending
    if (isTRUE(echo)) cat(paste(env_variables, bin, script), "\n")
    # submit the job script and return the jobid by forking to background and returning PID
    jobres <- system(paste(env_variables, bin, script, ">", sub_stdout, "2>", sub_stderr, "& echo $! >", sub_pid), wait = FALSE)
    Sys.sleep(.05) #sometimes the pid file is not in place when file.exists executes -- add a bit of time to ensure that it reads
    jobid <- if (file.exists(sub_pid)) scan(file = sub_pid, what = "char", sep = "\n", quiet = TRUE) else ""
  } else {
    if (isTRUE(echo)) cat(paste(scheduler, sched_args, script), "\n")
    # submit the job script and return the jobid
    jobres <- system2(scheduler, args = paste(sched_args, script), stdout = sub_stdout, stderr = sub_stderr)
    jobid <- if (file.exists(sub_stdout)) scan(file = sub_stdout, what = "char", sep = "\n", quiet = TRUE) else ""
  }

  joberr <- if (file.exists(sub_stderr)) {
    paste(scan(file = sub_stderr, what = "char", sep = "\n", quiet = TRUE), collapse = ". ")
  } else {
    ""
  }

  if (jobres != 0) {
    jobid <- NULL
    if (isTRUE(fail_on_error)) {
      stop("Job submission failed: ", script, ", error: ", joberr, ", errcode: ", jobres)
    } else {
      warning("Job submission failed: ", script, ", error: ", joberr, ", errcode: ", jobres)
    }
  } else {
    jobid <- sub("Submitted batch job ", "", jobid, fixed = TRUE) # replace irrelevant details if needed
    if (!is.null(tracking_args)) tracking_args$status <- "QUEUED" # on successful submission, tracking status defaults to "QUEUED"
  }

  if (!is.null(tracking_args)) {
    # populate tracking arguments that are undefined and can be defined at this point
    tracking_args <- populate_list_arg(tracking_args, "batch_file", script)
    tracking_args <- populate_list_arg(tracking_args, "scheduler", scheduler)
    tracking_args <- populate_list_arg(tracking_args, "scheduler_options", sched_args, append = TRUE)
  }

  # once a job_id has been generated, we add it to the tracking db
  # if a job has a NULL id or the tracking_sqlite_db arg is NULL, the function will return invisible NULL
  insert_tracked_job(sqlite_db = tracking_sqlite_db, job_id = jobid, tracking_args = tracking_args)

  if (!is.null(wait_jobs)) {
    # add any parent jobs using the wait_jobs argument (defaults to last parent id in list)
    add_tracked_job_parent(sqlite_db = tracking_sqlite_db, job_id = jobid, parent_job_id = wait_jobs[length(wait_jobs)]) 
  } else if (!is.null(tracking_args$parent_job_id)) {
    # in the case that a parent job id is passed in through the tracking_args list
    add_tracked_job_parent(sqlite_db = tracking_sqlite_db, job_id = jobid, parent_job_id = tracking_args$parent_job_id) 
  }

  return(jobid)
}



#' helper function to submit a set of shell jobs that are independent of one another
#'
#' @param job_list a list or character vector where each element represents an independent job to execute in a shell environment
#' @param commands_per_cpu how many elements from \code{job_list} are executed by each core within a single job
#' @param cpus_per_job how many cpus/cores are requested for each job
#' @param memgb_per_command amount of memory (RAM) requested for each command (in GB)
#' @param time_per_job amount of time requested for each job
#' @param fork_jobs if TRUE, all jobs within a single batch will run simultaneously using the fork (&) approach.
#' @param pre user-specified code to include in the job script prior to job_list (e.g., module load commands)
#' @param job_out_dir the directory where job scripts should be written
#' @param job_script_prefix the filename prefix for each job script
#' @param log_file a csv log file containing the job ids and commands that were executed
#' @param debug a logical indicating whether to actually submit the jobs (TRUE) or just create the scripts for inspection (FALSE)
#'
#' @importFrom tidyr unnest
#' @importFrom checkmate assert_multi_class
#'
#' @export
cluster_submit_shell_jobs <- function(job_list, commands_per_cpu = 1L, cpus_per_job = 8L, memgb_per_command = 8, time_per_job="1:00:00",
  time_per_command = NULL, fork_jobs = TRUE, pre=NULL, post=NULL, sched_args = NULL, env_variables = NULL, wait_jobs = NULL, scheduler="slurm",
  job_out_dir=getwd(), job_script_prefix="job", log_file="cluster_submit_jobs.csv", debug = FALSE)
{
  checkmate::assert_multi_class(job_list, c("list", "character"))
  checkmate::assert_integerish(commands_per_cpu, len = 1L, lower = 1, upper = 1e4)
  checkmate::assert_integerish(cpus_per_job, len=1L, lower=1, upper=1e3)
  checkmate::assert_number(memgb_per_command, lower = 0.01, upper = 1e3) # max 1TB
  checkmate::assert_logical(fork_jobs, len = 1L)
  if (!is.null(time_per_command)) {
    if (!is.null(time_per_job)) {
      message("Using time_per_command specification, even though time_per_job was also provided")
    }

    # convert to lubridate object for math
    time_per_command <- dhms(time_per_command)

    if (isTRUE(fork_jobs)) {
      # if we fork, then the total job duration is just the number of commands per cpu (default 1) multiplied
      # by the per-command time. Note that at present, the forking does not respect a job limit equal to cpus_per_job.
      # So, if you say commands_per_cpu=3 and cpus_per_job=8, then 24 jobs will launch at once. Need a jobs | wc -l waiting approach
      time_per_job <- time_per_command * commands_per_cpu
    } else {
      # if we do not fork, the total time increases by a factor of cpus_per_job
      # this is useful if each command is a multithreaded job (e.g., MCMC sampling multiple chains).
      time_per_job <- time_per_command * commands_per_cpus * cpus_per_job
    }

    # convert back to time string from period
    time_per_job <- as_dhms(time_per_job)
  } else {
    time_per_job <- validate_dhms(time_per_job)
  }

  n_jobs <- ceiling(length(job_list) / (cpus_per_job * commands_per_cpu))

  # how every job submission script begins
  preamble <- c(
    "#!/bin/bash",
    ""
  )

  if (scheduler == "slurm") {
    file_suffix <- ".sbatch"
    preamble <- c(preamble,
      "#SBATCH -N 1", #always single node for now
      paste0("#SBATCH -n ", cpus_per_job),
      paste0("#SBATCH --time=", time_per_job),
      paste0("#SBATCH --mem-per-cpu=", memgb_per_command, "G"),
      "cd $SLURM_SUBMIT_DIR",
      ""
    )
  } else if (scheduler == "torque") {
    file_suffix <- ".pbs"
    preamble <- c(
      preamble,
      paste0("#PBS -l nodes=1:ppn=", cpus_per_job),
      paste0("#PBS -l pmem=", memgb_per_command, "gb"),
      paste0("#PBS -l walltime=", time_per_job),
      "cd $PBS_O_WORKDIR",
      ""
    )
  } else if (scheduler == "local") {
    file_suffix <- ".bash"
  }

  # add any user-specified code that should precede each job execution
  preamble <- c(preamble, pre, "")

  job_df <- data.frame(
    job_number = rep(seq_len(n_jobs), each = cpus_per_job * commands_per_cpu, length.out = length(job_list)),
    job_id = NA_character_,
    stringsAsFactors = FALSE
  )

  job_df$cmd <- job_list # post-assignment keeps as list, if relevant

  submitted_jobs <- rep(NA_character_, n_jobs)

  for (j in seq_len(n_jobs)) {
    this_run <- job_df$cmd[job_df$job_number == j]

    outfile <- file.path(job_out_dir, paste0(job_script_prefix, "_", j, file_suffix))
    cat(preamble, file = outfile, sep = "\n")

    if (is.list(this_run)) {
      # for list inputs, each job consists of multiple commands. make sure these are executed together using a subshell
      job_str <- sapply(this_run, function(x) {
        c("(", x, paste0(")", ifelse(fork_jobs == TRUE, " &", "")))
      })
      cat(job_str, file = outfile, sep = "\n", append = TRUE)
    } else {
      cat(paste(this_run, ifelse(fork_jobs == TRUE, "&", "")), file = outfile, sep = "\n", append = TRUE)
    }

    # make sure parent script waits for forked jobs to complete before exiting
    if (isTRUE(fork_jobs)) {
      cat("wait\n\n", file = outfile, append = TRUE)
    }

    if (!is.null(post)) {
      cat(post, file=outfile, sep="\n", append = TRUE)
    }

    # in debug mode, we only create the scripts, but we do not submit them
    if (isTRUE(debug)) {
      job_id <- paste0("dummy", j)
    } else {
      job_id <- cluster_job_submit(outfile, scheduler = scheduler, sched_args = sched_args, env_variables = env_variables, wait_jobs = wait_jobs)
    }

    job_df$job_id[job_df$job_number == j] <- job_id
    submitted_jobs[j] <- job_id

  }

  # write job log to file
  if (is.list(job_list)) {
    write.csv(job_df %>% unnest(cmd), file = log_file, row.names = FALSE)
  } else {
    write.csv(job_df, file = log_file, row.names = FALSE)
  }

  return(submitted_jobs)
}
UNCDEPENdLab/fmri.pipeline documentation built on April 3, 2025, 3:21 p.m.