R/clusterFunctions.R

Defines functions findTemplateFile getBatchIds cfKillJob cfHandleUnknownSubmitError cfBrewTemplate cfReadBrewTemplate print.SubmitJobResult makeSubmitJobResult print.ClusterFunctions makeClusterFunctions

Documented in cfBrewTemplate cfHandleUnknownSubmitError cfKillJob cfReadBrewTemplate findTemplateFile makeClusterFunctions makeSubmitJobResult

#' @title ClusterFunctions Constructor
#'
#' @description
#' This is the constructor used to create \emph{custom} cluster functions.
#' Note that some standard implementations for TORQUE, Slurm, LSF, SGE, etc. ship
#' with the package.
#'
#' @param name [\code{character(1)}]\cr
#'   Name of cluster functions.
#' @param submitJob [\code{function(reg, jc, ...)}]\cr
#'   Function to submit new jobs. Must return a \code{\link{SubmitJobResult}} object.
#'   The arguments are \code{reg} (\code{\link{Registry}}) and \code{jobs} (\code{\link{JobCollection}}).
#' @param killJob [\code{function(reg, batch.id)}]\cr
#'   Function to kill a job on the batch system. Make sure that you definitely kill the job! Return
#'   value is currently ignored. Must have the arguments \code{reg} (\code{\link{Registry}}) and
#'   \code{batch.id} (\code{character(1)} as returned by \code{submitJob}).
#'   Note that there is a helper function \code{\link{cfKillJob}} to repeatedly try to kill jobs.
#'   Set \code{killJob} to \code{NULL} if killing jobs cannot be supported.
#' @param listJobsQueued [\code{function(reg)}]\cr
#'   List all queued jobs on the batch system for the current user.
#'   Must return an character vector of batch ids, same format as they
#'   are returned by \code{submitJob}.
#'   Set \code{listJobsQueued} to \code{NULL} if listing of queued jobs is not supported.
#' @param listJobsRunning [\code{function(reg)}]\cr
#'   List all running jobs on the batch system for the current user.
#'   Must return an character vector of batch ids, same format as they
#'   are returned by \code{submitJob}. It does not matter if you return a few job ids too many (e.g.
#'   all for the current user instead of all for the current registry), but you have to include all
#'   relevant ones. Must have the argument are \code{reg} (\code{\link{Registry}}).
#'   Set \code{listJobsRunning} to \code{NULL} if listing of running jobs is not supported.
#' @param array.var [\code{character(1)}]\cr
#'   Name of the environment variable set by the scheduler to identify IDs of job arrays.
#'   Default is \code{NA} for no array support.
#' @param store.job.collection [\code{logical(1)}]\cr
#'   Flag to indicate that the cluster function implementation of \code{submitJob} can not directly handle \code{\link{JobCollection}} objects.
#'   If set to \code{FALSE}, the \code{\link{JobCollection}} is serialized to the file system before submitting the job.
#' @param store.job.files [\code{logical(1)}]\cr
#'   Flag to indicate that job files need to be stored in the file directory.
#'   If set to \code{FALSE} (default), the job file is created in a temporary directory, otherwise (or if the debug mode is enabled) in
#'   the subdirectory \code{jobs} of the \code{file.dir}.
#' @param scheduler.latency [\code{numeric(1)}]\cr
#'   Time to sleep after important interactions with the scheduler to ensure a sane state.
#'   Currently only triggered after calling \code{\link{submitJobs}}.
#' @param fs.latency [\code{numeric(1)}]\cr
#'   Expected maximum latency of the file system, in seconds.
#'   Set to a positive number for network file systems like NFS which enables more robust (but also more expensive) mechanisms to
#'   access files and directories.
#'   Usually safe to set to \code{0} to disable the heuristic, e.g. if you are working on a local file system.
#' @param hooks [\code{list}]\cr
#'   Named list of functions which will we called on certain events like \dQuote{pre.submit} or \dQuote{post.sync}.
#'   See \link{Hooks}.
#' @export
#' @aliases ClusterFunctions
#' @family ClusterFunctions
#' @family ClusterFunctionsHelper
makeClusterFunctions = function(name, submitJob, killJob = NULL, listJobsQueued = NULL, listJobsRunning = NULL,
  array.var = NA_character_, store.job.collection = FALSE, store.job.files = FALSE, scheduler.latency = 0,
  fs.latency = 0, hooks = list()) {
  assertList(hooks, types = "function", names = "unique")
  assertSubset(names(hooks), unlist(batchtools$hooks, use.names = FALSE))

  setClasses(list(
      name = assertString(name, min.chars = 1L),
      submitJob = assertFunction(submitJob, c("reg", "jc"), null.ok = TRUE),
      killJob = assertFunction(killJob, c("reg", "batch.id"), null.ok = TRUE),
      listJobsQueued = assertFunction(listJobsQueued, "reg", null.ok = TRUE),
      listJobsRunning = assertFunction(listJobsRunning, "reg", null.ok = TRUE),
      array.var = assertString(array.var, na.ok = TRUE),
      store.job.collection = assertFlag(store.job.collection),
      store.job.files = assertFlag(store.job.files),
      scheduler.latency = assertNumber(scheduler.latency, lower = 0),
      fs.latency = assertNumber(fs.latency, lower = 0),
      hooks = hooks),
    "ClusterFunctions")
}

#' @export
print.ClusterFunctions = function(x, ...) {
  catf("ClusterFunctions for mode: %s", x$name)
  catf("  List queued Jobs : %s", !is.null(x$listJobsQueued))
  catf("  List running Jobs: %s", !is.null(x$listJobsRunning))
  catf("  Kill Jobs        : %s", !is.null(x$killJob))
  catf("  Hooks            : %s", if (length(x$hooks)) stri_flatten(names(x$hooks), ",") else "-")
}

#' @title Create a SubmitJobResult
#'
#' @description
#' This function is only intended for use in your own cluster functions implementation.
#'
#' Use this function in your implementation of \code{\link{makeClusterFunctions}} to create a return
#' value for the \code{submitJob} function.
#'
#' @param status [\code{integer(1)}]\cr
#'   Launch status of job. 0 means success, codes between 1 and 100 are temporary errors and any
#'   error greater than 100 is a permanent failure.
#' @param batch.id [\code{character()}]\cr
#'   Unique id of this job on batch system, as given by the batch system.
#'   Must be globally unique so that the job can be terminated using just this information.
#'   For array jobs, this may be a vector of length equal to the number of jobs in the array.
#' @param log.file [\code{character()}]\cr
#'   Log file. If \code{NA}, defaults to \code{[job.hash].log}.
#'   Some cluster functions set this for array jobs.
#' @param msg [\code{character(1)}]\cr
#'   Optional error message in case \code{status} is not equal to 0. Default is \dQuote{OK},
#'   \dQuote{TEMPERROR}, \dQuote{ERROR}, depending on \code{status}.
#' @return [\code{\link{SubmitJobResult}}]. A list, containing
#'   \code{status}, \code{batch.id} and \code{msg}.
#' @family ClusterFunctionsHelper
#' @aliases SubmitJobResult
#' @export
makeSubmitJobResult = function(status, batch.id, log.file = NA_character_, msg = NA_character_) {
  status = asInt(status)
  if (is.na(msg)) {
    msg = if (status == 0L)
      "OK"
    else if (status <= 100L)
      "TEMPERROR"
    else
      "ERROR"
  }
  "!DEBUG [makeSubmitJobResult]: Result for batch.id '`paste0(batch.id, sep = ',')`': `status` (`msg`)"

  setClasses(list(status = status, batch.id = batch.id, log.file = log.file, msg = msg), "SubmitJobResult")
}

#' @export
print.SubmitJobResult = function(x, ...) {
  cat("Job submission result\n")
  catf("  ID    : %s", stri_flatten(x$batch.id, ","))
  catf("  Status: %i", x$status)
  catf("  Msg   : %s", x$msg)
}

#' @title Cluster Functions Helper to Parse a Brew Template
#'
#' @description
#' This function is only intended for use in your own cluster functions implementation.
#'
#' This function is only intended for use in your own cluster functions implementation.
#' Simply reads your template file and returns it as a character vector.
#'
#' @param template [\code{character(1)}]\cr
#'   Path to template file which is then passed to \code{\link[brew]{brew}}.
#' @param comment.string [\code{character(1)}]\cr
#'   Ignore lines starting with this string.
#' @return [\code{character}].
#' @family ClusterFunctionsHelper
#' @export
cfReadBrewTemplate = function(template, comment.string = NA_character_) {
  "!DEBUG [cfReadBrewTemplate]: Parsing template file '`template`'"
  lines = stri_trim_both(readLines(template))

  lines = lines[!stri_isempty(lines)]
  if (!is.na(comment.string))
    lines = lines[!stri_startswith_fixed(lines, comment.string)]
  if (length(lines) == 0L)
    stopf("Error reading template '%s' or empty template", template)
  return(stri_flatten(lines, "\n"))
}

#' @title Cluster Functions Helper to Write Job Description Files
#'
#' @description
#' This function is only intended for use in your own cluster functions implementation.
#'
#' Calls brew silently on your template, any error will lead to an exception.
#' The file is stored at the same place as the corresponding job file in the \dQuote{jobs}-subdir
#' of your files directory.
#'
#' @template reg
#' @param text [\code{character(1)}]\cr
#'   String ready to be brewed. See \code{\link{cfReadBrewTemplate}} to read a template from the file system.
#' @param jc [\code{\link{JobCollection})}]\cr
#'   Will be used as environment to brew the template file in. See \code{\link{JobCollection}} for a list of all
#'   available variables.
#' @return [\code{character(1)}]. File path to brewed template file.
#' @family ClusterFunctionsHelper
#' @export
cfBrewTemplate = function(reg, text, jc) {
  assertString(text)
  outfile = fs::path(dir(reg, "jobs"), sprintf("%s.job", jc$job.hash))

  parent.env(jc) = asNamespace("batchtools")
  on.exit(parent.env(jc) <- emptyenv())
  "!DEBUG [cfBrewTemplate]: Brewing template to file '`outfile`'"

  z = try(brew(text = text, output = outfile, envir = jc), silent = TRUE)
  if (is.error(z))
    stopf("Error brewing template: %s", as.character(z))
  waitForFile(outfile, reg$cluster.functions$fs.latency)
  return(outfile)
}

#' @title Cluster Functions Helper to Handle Unknown Errors
#'
#' @description
#' This function is only intended for use in your own cluster functions implementation.
#'
#' Simply constructs a \code{\link{SubmitJobResult}} object with status code 101, NA as batch id and
#' an informative error message containing the output of the OS command in \code{output}.
#'
#' @param cmd [\code{character(1)}]\cr
#'   OS command used to submit the job, e.g. qsub.
#' @param exit.code [\code{integer(1)}]\cr
#'   Exit code of the OS command, should not be 0.
#' @param output [\code{character}]\cr
#'   Output of the OS command, hopefully an informative error message.
#'   If these are multiple lines in a vector, they are automatically joined.
#' @return [\code{\link{SubmitJobResult}}].
#' @family ClusterFunctionsHelper
#' @export
cfHandleUnknownSubmitError = function(cmd, exit.code, output) {
  assertString(cmd, min.chars = 1L)
  exit.code = asInt(exit.code)
  assertCharacter(output, any.missing = FALSE)
  msg = sprintf("Command '%s' produced exit code %i. Output: '%s'", cmd, exit.code, stri_flatten(output, "\n"))
  makeSubmitJobResult(status = 101L, batch.id = NA_character_, msg = msg)
}

#' @title Cluster Functions Helper to Kill Batch Jobs
#'
#' @description
#' This function is only intended for use in your own cluster functions implementation.
#'
#' Calls the OS command to kill a job via \code{\link[base]{system}} like this: \dQuote{cmd batch.job.id}. If the
#' command returns an exit code > 0, the command is repeated after a 1 second sleep
#' \code{max.tries-1} times. If the command failed in all tries, an error is generated.
#'
#' @template reg
#' @param cmd [\code{character(1)}]\cr
#'   OS command, e.g. \dQuote{qdel}.
#' @param args [\code{character}]\cr
#'   Arguments to \code{cmd}, including the batch id.
#' @param max.tries [\code{integer(1)}]\cr
#'   Number of total times to try execute the OS command in cases of failures.
#'   Default is \code{3}.
#' @inheritParams runOSCommand
#' @return \code{TRUE} on success. An exception is raised otherwise.
#' @family ClusterFunctionsHelper
#' @export
cfKillJob = function(reg, cmd, args = character(0L), max.tries = 3L, nodename = "localhost") {
  assertString(cmd, min.chars = 1L)
  assertCharacter(args, any.missing = FALSE)
  assertString(nodename)
  max.tries = asCount(max.tries)

  for (i in seq_len(max.tries)) {
    res = runOSCommand(cmd, args, nodename = nodename)
    if (res$exit.code == 0L)
      return(TRUE)
    Sys.sleep(1)
  }

  stopf("Really tried to kill job, but failed %i times with '%s'.\nMessage: %s",
    max.tries, stri_flatten(c(cmd, args), " "), stri_flatten(res$output, "\n"))
}

getBatchIds = function(reg, status = "all") {
  cf = reg$cluster.functions
  tab = data.table(batch.id = character(0L), status = character(0L))
  batch.id = NULL

  if (status %chin% c("all", "running") && !is.null(cf$listJobsRunning)) {
    "!DEBUG [getBatchIds]: Getting running Jobs"
    x = unique(cf$listJobsRunning(reg))
    if (length(x) > 0L)
      tab = rbind(tab, data.table(batch.id = x, status = "running"))
  }

  if (status %chin% c("all", "queued") && !is.null(cf$listJobsQueued)) {
    "!DEBUG [getBatchIds]: Getting queued Jobs"
    x = chsetdiff(cf$listJobsQueued(reg), tab$batch.id)
    if (length(x) > 0L)
      tab = rbind(tab, data.table(batch.id = unique(x), status = "queued"))
  }

  submitted = done = batch.id = NULL
  batch.ids = reg$status[!is.na(submitted) & is.na(done) & !is.na(batch.id), unique(batch.id)]
  tab[batch.id %in% batch.ids]
}


#' @title Find a batchtools Template File
#'
#' @description
#' This functions returns the path to a template file on the file system.
#' @template template
#' @return [\code{character}] Path to the file or \code{NA} if no template template file was found.
#' @keywords internal
#' @export
findTemplateFile = function(template) {
  assertString(template, min.chars = 1L)

  if (stri_endswith_fixed(template, ".tmpl")) {
    assertFileExists(template, access = "r")
    return(fs::path_abs(template))
  }

  x = Sys.getenv("R_BATCHTOOLS_SEARCH_PATH")
  if (nzchar(x)) {
    x = fs::path(x, sprintf("batchtools.%s.tmpl", template))
    if (fs::file_access(x, "read"))
      return(fs::path_abs(x))
  }

  x = sprintf("batchtools.%s.tmpl", template)
  if (fs::file_access(x, "read"))
    return(fs::path_abs(x))

  x = fs::path(user_config_dir("batchtools", expand = FALSE), sprintf("%s.tmpl", template))
  if (fs::file_access(x, "read"))
    return(x)

  x = fs::path("~", sprintf(".batchtools.%s.tmpl", template))
  if (fs::file_access(x, "read"))
    return(fs::path_abs(x))

  x = fs::path(site_config_dir("batchtools"), sprintf("%s.tmpl", template))
  if (fs::file_access(x, "read"))
    return(x)

  x = system.file("templates", sprintf("%s.tmpl", template), package = "batchtools")
  if (fs::file_access(x, "read"))
    return(x)

  return(NA_character_)
}
mllg/batchtools documentation built on April 21, 2023, 11:49 a.m.