R/utils.R

Defines functions hline silent_system2 stopifnot_submitted stopifnot_slurm check_full_path check_sbatch_opt WhoAmI Slurm_clean Slurm_env snames parse_flags.list parse_flags.default parse_flags random_job_name

Documented in parse_flags parse_flags.default parse_flags.list random_job_name Slurm_clean Slurm_env snames

#' Generate a random job name
#'
#' @return A character scalar that can be used as job. All names will start with
#' the prefix `slurmr-job-` and then some random string. This is a wrapper of
#' the function [tempfile()] and uses as `tmpdir` argument
#' `opts_slurmR$get_tmp_path()`.
#' @examples
#' random_job_name()
#' @export
#'
random_job_name <- function() {
  job <- tempfile("slurmr-job-", tmpdir = opts_slurmR$get_tmp_path())
  gsub(".+(?=slurmr-job-)", "", job, perl = TRUE)
}

#' Utility function
#' @param ... Options to be parsed as bash flags.
#' @examples
#' cat(parse_flags(a=1, b=TRUE, hola=2, y="I have spaces", ms=2, `cpus-per-task`=4))
#' # -a 1 -b --hola=2 -y "I have spaces" --ms=2 --cpus-per-task=4
#' @return A character vector with the processed flags.
#' @export
#' @family utilities
parse_flags <- function(...) UseMethod("parse_flags")

#' @export
#' @rdname parse_flags
parse_flags.default <- function(...) {
  parse_flags.list(list(...))
}

#' @export
#' @param x A named list.
#' @rdname parse_flags
parse_flags.list <- function(x, ...) {

  # Skipping NULL and NAs
  if (length(x))
    x <- x[which(sapply(x, function(z) (length(z) > 0) && !is.na(z) ))]

  # If no flags are passed, then return ""
  if (!length(x))
    return(character(0L))

  single_char <- nchar(names(x)) == 1

  option <- ifelse(
    single_char,
    paste0("-", names(x)),
    paste0("--", names(x))
  )

  vals <- character(length(option))
  for (i in seq_along(x)) {

    # Includes equal
    equal_sign <- ifelse(single_char[i], " ", "=")

    if (is.logical(x[[i]]) && !x[[i]])
      option[i] <- ""
    else if (!is.logical(x[[i]]) && !is.character(x[[i]]))
      vals[i] <- paste0(equal_sign, x[[i]])
    else if (is.character(x[[i]])) {
      if (grepl("\\s+", x[[i]]))
        vals[i] <- sprintf("%s\"%s\"", equal_sign, x[[i]])
      else
        vals[i] <- sprintf("%s%s", equal_sign, x[[i]])

    }
  }

  sprintf("%s%s", option, vals)
}

#' Full path names for Slurm jobs
#'
#' Using [opts_slurmR]`$get_tmp_path` and [opts_slurmR]`$get_job_name` creates
#' file names with full path to the objects. This function is intended for
#' internal use only.
#'
#' @param type can be any of r, sh, out, or rds.
#' @param tmp_path Character scalar. Path to the temp directory used by the job
#' to write files.
#' @param job_name Character scalar. Name of the job.
#' @param array_id Integer. ID of the array to create the name.
#' @family utilities
#' @details
#' By default, the parameters `tmp_path` and `job_name` are retrieved from
#' the current options specified in [opts_slurmR].
#' @return A character scalar. The normalized path to the corresponding file.
#' @export
snames <- function(
  type,
  array_id = NULL,
  tmp_path = NULL,
  job_name = NULL
  ) {

  if (length(array_id) && length(array_id) > 1)
    return(sapply(array_id, snames, type = type, tmp_path = tmp_path, job_name = job_name))

  type <- switch (
    type,
    r   = "00-rscript.r",
    sh  = "01-bash.sh",
    out = "02-output-%A-%a.out",
    rds = if (missing(array_id))
      "03-answer-%03i.rds"
    else sprintf("03-answer-%03i.rds", array_id),
    job = "job.rds",
    stop(
      "Invalid type, the only valid types are `r`, `sh`, `out`, and `rds`.",
      call. = FALSE
    )
  )

  sprintf(
    "%s/%s/%s",
    tmp_path,
    job_name,
    type
  )

}

#' A wrapper of [Sys.getenv]
#'
#' This function is used within the R script written by `slurmR` to get the
#' current value of `SLURM_ARRAY_TASK_ID`, an environment variable that Slurm
#' creates when running an array. In the case that `opts_slurmR$get_debug() == TRUE`,
#' the function will return a 1 (see [opts_slurmR]).
#'
#' @return If slurm is available and the R session is running under a job
#' array, meaning that `SLURM_ARRAY_TASK_ID` is defined, then it returns that
#' value, otherwise it will return `1`.
#'
#' @param x Character scalar. Environment variable to get.
#' @family utilities
#' @export
Slurm_env <- function(x = "SLURM_ARRAY_TASK_ID") {

  y <- Sys.getenv(x)

  if ((x == "SLURM_ARRAY_TASK_ID") && y == "") {
    return(1)
  }

  y

}

#' Clean a session.
#'
#' The functions of the family [Slurm_*apply][Slurm_lapply] generate a set of
#' temporary files that are used for the job design, submission and collection.
#' This function will remove all the contents of directory created by calling
#' those functions.
#'
#' @param x An object of class `slurm_job`.
#' @export
#' @family post submission
#' @family utilities
#' @details If the job is finalized, it returns 0 if able to clean the directory
#' otherwise return whatever [unlink] returns after trying to remove the job
#' path.
#' @examples
#' \dontrun{
#'
#' job <- Slurm_EvalQ(1 + 1, 2, plan = "collect")
#'
#' # This will remove all the files generated by Slurm_EvalQ
#' Slurm_clean(job)
#'
#' }
Slurm_clean <- function(x) {

  # Checking if the job is running
  s <- if (opts_slurmR$get_debug() | !slurm_available()) 0
    else status(x)

  if (s %in% 1L:3L)
    stop("Some jobs are still running/pending (",
         paste(attr(s, "pending"), collapse=", "), ".", call. = FALSE)

  # Path specification
  path <- sprintf("%s/%s", get_tmp_path(x), get_job_name(x))

  if (dir.exists(path))
    unlink(path, recursive = TRUE, force = TRUE)
  else
    invisible(0)

}

#' Information about where jobs are submitted
#'
#' This returns a named vector with the following variables:
#' \Sexpr[stage=build]{paste(names(slurmR::WhoAmI()), collapse = ", ")}
#' @export
#' @return A character vector with the corresponding system environment variables'
#' values.
#' @family utilities
WhoAmI <- function() {

  vars <- c(
    "SLURM_LOCALID",
    "SLURMD_NODENAME",
    "SLURM_ARRAY_TASK_ID",
    "SLURM_CLUSTER_NAME",
    "SLURM_JOB_PARTITION",
    "SLURM_TASK_PID"
    )


  ans <- structure(sapply(vars, Sys.getenv), names = vars)
  # I only do this b/c I may need to use this in other context
  if (!slurm_available() | opts_slurmR$get_debug()) {
    ans["SLURM_TASK_PID"] <- Sys.getpid()
    ans["SLURMD_NODENAME"] <- "localhost"
    ans["SLURM_ARRAY_TASK_ID"] <- 1
  }

  ans

}

#' @export
#' @rdname WhoAmI
#' @details `whoami` is just an alias of `WhoAmI`.
whoami <- WhoAmI

#' Checks options passed in a list.
#' @noRd
check_sbatch_opt <- function(x, job_name = NULL, ...) {

  if (!is.list(x))
    stop(
      "`sbatch_opt` should be an object of class `list`. Right now, the ",
      "passed object is of class: ", paste(class(x), collapse = ", "), ".",
      call. = FALSE
      )

  if ("job-name" %in% names(x))
    stop(
      "`job-name` must be passed directly via `job_name` in the function, ",
      "not via `sbatch_opt`.", call. = FALSE
      )

  x$`job-name` <- job_name

  # More options
  dots <- list(...)
  for (i in names(dots)) {
    if (i %in% names(x))
      next
    x[[i]] <- dots[[i]]
  }

  return(x)

}

#' Check whether the file path exists, if not, create
#' @noRd
check_full_path <- function(tmp_path, job_name, overwrite = FALSE) {

  path <- normalizePath(file.path(tmp_path, job_name))
  test <- dir.exists(path)

  # Checking if the thing exists
  if (test) {

    if (overwrite)
      message("The path ", path, " already exists. Since `overwrite = TRUE`,",
        "slurmR will remove the previous data.")
    else
      stop(
        "The path ", path, " already exists. To overwrite a previously used ",
        "path (tmp_path/job_name) use the option `overwrite = TRUE`",
        call. = FALSE
      )

    unlink(path, recursive = TRUE)

  }

  # Creating the folder
  dir.create(path, recursive = TRUE)

  invisible(path)

}

#' Returns with an error if Slurm is not available
#' @noRd
stopifnot_slurm <- function() {

  if (!slurm_available())
    stop(
      "Slurm is not available on this system. If you are trying to debug or ",
      "run some tests, you should set the debug mode on (see opts_slurmR).",
      call. = FALSE
    )

}

stopifnot_submitted <- function(x) {

  if (is.na(x)) {
    stop(
      "This job hasn't started/been submitted yet. Nothing to do.",
      call. = FALSE
    )
  }

}

silent_system2 <- function(...) {

  # Getting the call
  call_str       <- sys.call()
  call_str[[1L]] <- bquote(system2)
  call_str       <- deparse(call_str)

  # Making the call
  ans <- suppressWarnings({
    tryCatch(system2(...), error = function(e) e)
  })

  if (length(attr(ans, "status")) && (attr(ans, "status") != 0)) {

    # If either is null, then these will be character(0)
    status. <- as.character(attr(ans, "status"))
    errmsg. <- as.character(attr(ans, "errmgs"))

    stop(
      "An error has occurred when calling\n", call_str,"\n",
      paste(ans, collapse="\n"),
      sprintf("\nReturn code (status): %s\n", status.),
      sprintf("\nError msg   (errmsg): %s\n", errmsg.),
      call. = FALSE
      )

  }

  ans

}

hline <- function(..., sep="\n") {
  message(paste(rep("-", options("width")), collapse=""))
  message(paste(..., collapse = "\n"))
  message(paste(rep("-", options("width")), collapse=""))
}

Try the slurmR package in your browser

Any scripts or data that you put into this service are public.

slurmR documentation built on Aug. 30, 2023, 5:06 p.m.