R/Slurm_collect.R

Defines functions Slurm_collect.slurm_job Slurm_collect

Documented in Slurm_collect Slurm_collect.slurm_job

#' Collect the results of a slurm job
#'
#' This function takes an object of class `slurm_job` and retrieves the results,
#' this is, combines the R objects generated by each job. Object of class
#' `slurm_job`.
#'
#' @param ... Further arguments passed to the method.
#' @param x An object of class [slurm_job].
#' @export
#' @family post submission
#' @details If the given job has hooks, which is a list of functions, these will
#' be applied sequentially to the set of retrieved results before returning.
#'
#' @return By default, it returns a concatenated list of the output files
#' generated by each job. If the job object has a hook, it will apply each hook
#' to the full list before returning. See [new_slurm_job].
#'
#' @examples
#' \dontrun{
#' # Collecting a job after calling it
#' job <- Slurm_EvalQ(slurmR::WhoAmI(), njobs = 4, plan = "wait")
#' Slurm_collect(job)
#'
#' # Collecting a job from a previous R session
#' job <- read_slurm_job("/path/to/a/job/tmp_dir")
#' Slurm_collect(job)
#' }
Slurm_collect <- function(...) UseMethod("Slurm_collect")

#' @export
#' @param any. Logical. When `TRUE`, will collect any output available regardless
#' of whether the job is completed or not.
#' @param wait Integer scalar. Number of seconds to wait before checking the
#' state of a job if the first try returned `-1` (no job found).
#' @rdname Slurm_collect
Slurm_collect.slurm_job <- function(x, any. = FALSE, wait = 10L, ...) {

  # Getting coordinates
  tmp_path <- get_tmp_path(x)
  job_name <- get_job_name(x)

  res <- if (!opts_slurmR$get_debug()) {

    # Checking the state of the job
    S <- status(x)

    if (S == -1L) {
      message(
        attr(S, "description"), ". Waiting ", wait, " seconds before retry."
        )
      Sys.sleep(wait)
      S <- status(x)
    }

    # After the second try
    if (S == -1L) {
      stop(attr(S, "description"), call. = FALSE)
    }

    # Getting the filenames
    readRDS_trying <- function(...) {
      tryCatch(
        suppressWarnings(readRDS(...)),
        error = function(e) e
      )
    }

    # Generating element-names
    e_names <- paste0(x$jobid, "_", 1:x$njobs)

    if (S == 0L)
      do.call(
        "c",
        lapply(
          snames(
            "rds",
            array_id = 1:x$njobs,
            tmp_path = tmp_path,
            job_name = job_name
            ),
          readRDS
        )
      )
      
    else if (any.) {

      do.call(
        "c",
        lapply(
          snames(
            "rds",
            array_id = 1:x$njobs,
            tmp_path = tmp_path,
            job_name = job_name
            ),
          readRDS_trying
        )
      )

    } else
      stop(
        "Nothing to retrieve (see ?status). ",
        "If this is a rerun, you can try using 'any. = TRUE' ",
        "to read-in any output file available in the folder.",
        call. = FALSE
      )

  } else {

    fn <- snames(
      "rds",
      array_id = 1L,
      tmp_path = tmp_path,
      job_name = job_name
      )

    if (file.exists(fn))
      readRDS(fn)
    else
      stop("No result yet from the script.", call. = FALSE)

  }

  # If we are checking any, and we are
  test <- sapply(res, inherits, what = "error")
  if (any. && any(test))
    warning(
      "One or more jobs of the array may have not completed yet: ",
      paste(which(test), collapse = ", "), ". You can check the log files of ",
      "a given job by using `Slurm_log()`."
      )

  # Applying hooks
  if (length(x$hooks)) {
    for (h in x$hooks) {
      res <- tryCatch(h(res, x), error = function(e) e)
      if (inherits(res, "error"))
        stop("An error ocurred while calling a hook after collection:\n",
          res,
          "\nCheck the hook the slurm_job -x-.", call. = FALSE
        )
    }
  }

  return(res)

}

Try the slurmR package in your browser

Any scripts or data that you put into this service are public.

slurmR documentation built on Aug. 30, 2023, 5:06 p.m.