R/submit_lpjml.R

Defines functions submit_run submit_lpjml

Documented in submit_lpjml

#' Submit LPJmL model simulation to SLURM
#'
#' LPJmL simulations are submitted to SLURM using `"config*.json"` files written
#' by [`write_config()`]. `write_config()` returns a
#' tibble that can be used as an input (see `x`). It serves the details to
#' submit single or multiple (dependent/subsequent) model simulations.
#'
#' @param x A \link[tibble]{tibble} with at least one column named `"sim_name"`.
#'   Each simulation gets a separate row. An optional run parameter
#'   `"dependency"` is used for subsequent simulations (see details).
#'   [`write_config()`] returns a tibble in the required
#'   format. OR provide a character string (vector) with the file name of one or
#'   multiple generated config file(s).
#'
#' @param model_path Character string providing the path to LPJmL
#'  (equal to `LPJROOT` environment variable).
#'
#' @param sim_path Character string defining path where all simulation data are
#'   written, including output, restart and configuration files. If `NULL`,
#'   `model_path` is used. See also [write_config]
#'
#' @param group Character string defining the user group for which the job is
#'   submitted. Defaults to `"lpjml"`.
#'
#' @param sclass Character string defining the job classification. Available
#'   options at PIK: `c("short", "medium", "long", "priority", "standby", "io")`
#'   More information at <https://www.pik-potsdam.de/en>. Defaults to `"short"`.
#'
#' @param ntasks Integer defining the number of tasks/threads. More information
#'   at <https://www.pik-potsdam.de/en> and <https://slurm.schedmd.com>.
#'   Defaults to `256`.
#'
#' @param wtime Character string defining the time limit. Setting a lower time
#'   limit than the maximum runtime for `sclass` can reduce the wait time in the
#'   SLURM job queue. More information at <https://www.pik-potsdam.de/en> and
#'   <https://slurm.schedmd.com>.
#'
#' @param blocking Integer defining the number of cores to be blocked. More
#'   information at <https://www.pik-potsdam.de/en> and
#'   <https://slurm.schedmd.com>.
#'
#' @param no_submit Logical. Set to `TRUE` to test if `x` set correctly or
#'   `FALSE` to actually submit job to SLURM.
#'
#' @param output_path Argument is deprecated as of version 1.0; use sim_path
#'   instead.
#'
#' @return See `x`, extended by columns `"type"`, `"job_id"` and `"status"`.
#'
#' @details
#' A \link[tibble]{tibble} for `x` that has been generated by
#' [`write_config()`] and can look like the following examples can
#' supplied:
#'
#' | **sim_name**    |
#' |:--------------- |
#' | scen1_spinup    |
#' | scen2_transient |
#'
#' To perform subsequent or rather dependent simulations the optional run
#' parameter `"dependency"` needs to be provided within the initial
#' \link[tibble]{tibble} supplied as `param` to [`write_config()`].
#'
#' | **sim_name**    | **dependency** |
#' |:--------------- | :------------- |
#' | scen1_spinup    | NA             |
#' | scen2_transient | scen1 _spinup  |
#'
#' To use different SLURM settings for each run the optional SLURM options
#' `"sclass"`, `"ntask"`, `"wtime"` or `"blocking"` can also be supplied to the
#' initial \link[tibble]{tibble} supplied as `param` to
#' [`write_config()`]. These overwrite the (default) SLURM
#' arguments (`sclass`, `ntask`, `wtime` or `blocking`) supplied to
#' `submit_lpjml`.
#'
#' | **sim_name**    | **dependency** | **wtime** |
#' |:--------------- |:-------------- |----------:|
#' | scen1_spinup    | NA             | "8:00:00" |
#' | scen2_transient | scen1 _spinup  | "2:00:00" |
#'
#' As a shortcut it is also possible to provide the config file
#' `"config_*.json"` as a character string or multiple config files as a
#' character string vector directly as the `x` argument to `submit_lpjml`. \cr
#' With this approach, run parameters or SLURM options cannot be taken into
#' account. \cr
#'
#' @examples
#'
#' \dontrun{
#' library(tibble)
#'
#' model_path <- "./LPJmL_internal"
#' sim_path <-"./my_runs"
#'
#'
#' # Basic usage
#' my_params <- tibble(
#'  sim_name = c("scen1", "scen2"),
#'  random_seed = as.integer(c(42, 404)),
#'  pftpar.1.name = c("first_tree", NA),
#'  param.k_temp = c(NA, 0.03),
#'  new_phenology = c(TRUE, FALSE)
#' )
#'
#' config_details <- write_config(my_params, model_path, sim_path)
#'
#'  run_details <- submit_lpjml(
#'   x = config_details,
#'   model_path = model_path,
#'   sim_path = sim_path
#' )
#'
#' run_details
#' #   sim_name      job_id   status
#' #   <chr>           <int>  <chr>
#' # 1 scen1        21235215  submitted
#' # 2 scen2        21235216  submitted
#'
#'
#' # With run parameter dependency and SLURM option wtime being
#' #   set (also less other parameters than in previous example)
#' my_params <- tibble(
#'   sim_name = c("scen1", "scen2"),
#'   random_seed = as.integer(c(42, 404)),
#'   dependency = c(NA, "scen1_spinup"),
#'   wtime = c("8:00:00", "4:00:00"),
#' )
#'
#' config_details2 <- write_config(my_params2, model_path, sim_path)
#'
#' run_details2 <- submit_lpjml(config_details2, model_path, sim_path)
#'
#' run_details2
#' #   sim_name        order dependency   wtime   type       job_id   status
#' #   <chr>           <dbl> <chr>        <chr>   <chr>      <chr>    <chr>
#' # 1 scen1_spinup        1 NA           8:00:00 simulation 22910240 submitted
#' # 2 scen1_transient     2 scen1_spinup 4:00:00 simulation 22910241 submitted
#'
#'
#' # Same but by using the pipe operator
#' library(magrittr)
#'
#' run_details <- tibble(
#'   sim_name = c("scen1_spinup", "scen1_transient"),
#'   random_seed = as.integer(c(1, 42)),
#'   dependency = c(NA, "scen1_spinup"),
#'   wtime = c("8:00:00", "4:00:00"),
#' ) %>%
#'   write_config(model_path, sim_path) %>%
#'   submit_lpjml(model_path, sim_path)
#'
#'
#' # Shortcut approach
#' run_details <- submit_lpjml(
#'   x = "./config_scen1_transient.json",
#'   model_path = model_path,
#'   sim_path = sim_path
#' )
#'
#' run_details <- submit_lpjml(
#'   c("./config_scen1_spinup.json", "./config_scen1_transient.json"),
#'   model_path,
#'   sim_path
#' )
#'
#' }
#'
#' @md
#' @export
submit_lpjml <- function(x, # nolint:cyclocomp_linter.
                         model_path,
                         sim_path = NULL,
                         group = "lpjml",
                         sclass = "short",
                         ntasks = 256,
                         wtime = "",
                         blocking = "",
                         no_submit = FALSE,
                         output_path = NULL) {

  warn_runner_os("submit_lpjml")

  # Check if SLURM is available
  if (!is_slurm_available() && !no_submit && !testthat::is_testing()) {
    stop("submit_lpjml is only available on HPC cluster environments providing
          a SLURM workload manager")
  }

  # Check if model_path is set or unit test flag provided
  if (!dir.exists(model_path)) {
    stop("Folder of model_path \"", model_path, "\" does not exist!")
  }

  sim_path <- deprecate_arg(new_arg = sim_path,
                            deprec_arg = output_path,
                            version = "1.0.0")

  if (is.null(sim_path)) sim_path <- model_path

  # Case if character vector with file names is supplied instead of tibble
  if (methods::is(x, "character")) {
    x <- tibble::tibble(sim_name = sapply( # nolint:undesirable_function_linter.
      x,
      function(x) {
        strsplit(
          strsplit(basename(x), "config_")[[1]][2],
          ".json"
        )[[1]]
      }
    ))
  }

  x$type <- "simulation"
  x$job_id <- NA
  x$status <- "failed"
  slurm_args <- c("sclass", "ntask", "wtime", "blocking")

  if ("order" %in% colnames(x)) {

    for (order in unique(sort(x$order))) {
      sim_names <- x$sim_name[
        which(x$order == order)
      ]

      for (sim_name in sim_names) {

        sim_idx <- which(x$sim_name == sim_name)

        # Get dependency by sim_id
        dependency <- ifelse(!is.na(x$dependency[sim_idx]),
                             x$job_id[x$sim_name == x$dependency[sim_idx]],
                             NA)

        # Extract SLURM arguments from x and overwrite SLURM arguments supplied
        # to this function by mapply call
        slurm_param <- (
          x[slurm_args[slurm_args %in% colnames(x)]][
              sim_idx, ]
        )

        mapply( # nolint:undesirable_function_linter.
          function(x, xn) {
            if (!is.na(x)) {
              assign(xn, x, envir = parent.frame(n = 2))
            }
          },
          x = slurm_param,
          xn = colnames(slurm_param)
        )

        # No submit option for testing
        if (!no_submit) {
          job <- submit_run(sim_name,
                            model_path,
                            sim_path,
                            group,
                            sclass,
                            ntasks,
                            wtime,
                            blocking,
                            dependency)

          if (job$status == 0) {
            x$job_id[sim_idx] <- strsplit(
              strsplit(job$stdout, "Submitted batch job ")[[1]][2], "\n"
            )[[1]][1]

            x$status[sim_idx] <- "submitted"
          }

        } else {
          x$job_id[sim_idx] <- NA
          x$status[sim_idx] <- "not submitted"
        }
      }
    }

  } else {

    for (sim_name in x$sim_name) {

      sim_idx <- which(x$sim_name == sim_name)

      # Extract SLURM arguments from x and overwrite SLURM arguments supplied to
      # this function by mapply call
      slurm_param <- (
        x[slurm_args[slurm_args %in% colnames(x)]][
            sim_idx, ]
      )

      mapply( # nolint:undesirable_function_linter.
        function(x, xn) {
          if (!is.na(x)) {
            assign(xn, x, envir = parent.frame(n = 2))
          }
        },
        x = slurm_param,
        xn = colnames(slurm_param)
      )

      if (!no_submit) {
        job <- submit_run(sim_name,
                          model_path,
                          sim_path,
                          group,
                          sclass,
                          ntasks,
                          wtime,
                          blocking,
                          dependency = NA)

        if (job$status == 0) {
          x$job_id[sim_idx] <- strsplit(
            strsplit(job$stdout, "Submitted batch job ")[[1]][2], "\n"
          )[[1]][1]

          x$status[sim_idx] <- "submitted"

        } else {
          x$status[sim_idx] <- "failed"
        }

      } else {
        x$job_id[sim_idx] <- NA
        x$status[sim_idx] <- "not submitted"
      }
    }
  }

  attr(x, "stages") <- append(attr(x, "stages"), "lpjml")
  return(x)
}


# Internal submit run function
submit_run <- function(sim_name,
                       model_path,
                       sim_path,
                       group,
                       sclass,
                       ntasks,
                       wtime,
                       blocking,
                       dependency) {

  config_file <- paste0("config_",
                        sim_name,
                        ".json")

  timestamp <- format(Sys.time(), "%Y%m%d_%H%M")

  stdout <- paste0(sim_path,
                  "/output/",
                  sim_name,
                  "/",
                  "outfile_",
                  timestamp,
                  ".out")

  stderr <- paste0(sim_path,
                  "/output/",
                  sim_name,
                  "/",
                  "errfile_",
                  timestamp,
                  ".err")

  output_config <- paste0(sim_path,
                  "/output/",
                  sim_name,
                  "/",
                  "config_",
                  timestamp,
                  ".json")

  inner_command <-  paste0(model_path, "/bin/lpjsubmit", # nolint:absolute_path_linter.
                           " -nocheck",
                           " -class ", sclass,
                           " -group ", group,
                           ifelse(wtime != "",
                                  paste0(" -wtime ", wtime),
                                  ""),
                           ifelse(blocking != "",
                                  paste0(" -blocking ", blocking),
                                  ""),
                           ifelse(!is.na(dependency),
                                  paste0(" -dependency ", dependency),
                                  ""),
                           " -o ", stdout,
                           " -e ", stderr,
                           " ",
                           ntasks,
                           " ",
                           sim_path,
                           "/configurations/",
                           config_file)

  # Get LPJROOT variable and set according to model_path
  pre_lpjroot <- Sys.getenv("LPJROOT")

  # tryCatch to be able to set it back to its original value in case sth fails.
  tryCatch({

    Sys.setenv(LPJROOT = model_path) # nolint:undesirable_function_linter.

    # Run lpjsubmit.
    submit_status <- processx::run(command = "bash",
                                   args = c("-c", inner_command),
                                   cleanup_tree = TRUE,
                                   error_on_status = FALSE,
                                   wd = sim_path)
    if (!testthat::is_testing()) {
      copied <- file.copy(from = paste(sim_path, # nolint:object_usage_linter.
                                       "configurations",
                                       config_file,
                                       sep = "/"),
                          to = output_config)
    }
  }, finally = {

    if (pre_lpjroot == "") {

      # Unset variable if it was not set before ("")
      Sys.unsetenv("LPJROOT") # nolint:undesirable_function_linter.
    } else {

      # Set back to its original value
      Sys.setenv(LPJROOT = pre_lpjroot) # nolint:undesirable_function_linter.
    }
  })

  return(submit_status)
}

Try the lpjmlkit package in your browser

Any scripts or data that you put into this service are public.

lpjmlkit documentation built on March 31, 2023, 9:35 p.m.