R/run_lpjml.R

Defines functions do_parallel do_sequential do_run run_lpjml

Documented in run_lpjml

#' Run LPJmL model
#'
#' Runs LPJmL using `"config_*.json"` files written by
#' [`write_config()`]. `write_config()` returns a tibble
#' that can be used as an input (see `x`). It contains the details to run single
#' or multiple (dependent/subsequent) model runs.
#'
#' @param x A \link[tibble]{tibble} with at least one column named `"sim_name"`.
#'   Each simulation gets a separate row. Optional run parameters are `"order"`
#'   and `"dependency"` which are used for subsequent simulations (see details).
#'   [`write_config()`] returns a tibble in the required format.
#'   OR provide a character string (vector) with the file name of one or
#'   multiple generated configuration file(s).
#'
#' @param model_path Character string providing the path to LPJmL
#'   (equal to `LPJROOT` environment variable). Defaults to "."
#'
#' @param sim_path Character string defining path where all simulation data are
#'   written, including output, restart and configuration files. If `NULL`,
#'   `model_path` is used. See also [write_config]
#'
#' @param parallel_cores Integer defining the number of available CPU
#'   cores/nodes for parallelization. Defaults to `1` (no parallelization).
#'   Please note that parallelization is only supported for SLURM jobs and not
#'   for interactive runs.
#'
#' @param write_stdout Logical. If `TRUE`, `stdout` as well as `stderr` files
#'   are written. If `FALSE` (default), these are printed instead. Within a
#'   SLURM job `write_stdout` is automatically set to `TRUE`.
#'
#' @param raise_error Logical. Whether to raise an error if sub-process has
#'   non-zero exit status. Defaults to `TRUE`.
#'
#' @param output_path Argument is deprecated as of version 1.0; use sim_path
#'   instead.
#'
#' @return See `x`, extended by columns `"type"`, `"job_id"` and `"status"`.
#'
#' @details
#' A \link[tibble]{tibble} for `x` that has been generated by
#' [`write_config()`] and can look like the following examples can
#' supplied:
#'
#' | **sim_name**    |
#' |:--------------- |
#' | scen1_spinup    |
#' | scen2_transient |
#'
#' To perform subsequent or rather dependent runs the optional run parameter
#' `"dependency"` needs to be provided within the initial
#' \link[tibble]{tibble} supplied as `param` to [`write_config()`].
#'
#' | **sim_name**    | **order** | **dependency** |
#' |:--------------- | ---------:|:-------------- |
#' | scen1_spinup    | 1         | NA             |
#' | scen2_transient | 2         | scen1 _spinup  |
#'
#'
#' As a shortcut it is also possible to provide the config file
#' `"config_*.json"` as a character string or multiple config files as a
#' character string vector directly as the `x` argument to `run_lpjml`. \cr
#' Also be aware that the order of the supplied config files is important
#' (e.g. make sure the spin-up run is run before the transient one).
#'
#' @examples
#'
#' \dontrun{
#' library(tibble)
#'
#' model_path <- "./LPJmL_internal"
#' sim_path <-"./my_runs"
#'
#' # Basic usage
#' my_params1 <- tibble(
#'   sim_name = c("scen1", "scen2"),
#'   startgrid = c(27410, 27410),
#'   river_routing = c(FALSE, FALSE),
#'   random_seed = c(42, 404),
#'   pftpar.1.name = c("first_tree", NA),
#'   param.k_temp = c(NA, 0.03),
#'   new_phenology = c(TRUE, FALSE)
#' )
#'
#' config_details1 <- write_config(my_params1, model_path, sim_path)
#'
#'  run_details1 <- run_lpjml(
#'   x = config_details1,
#'   model_path = model_path,
#'   sim_path = sim_path
#' )
#'
#' run_details1
#' #   sim_name      job_id   status
#' #   <chr>           <int>  <chr>
#' # 1 scen1              NA  run
#' # 2 scen2              NA  run
#'
#'
#' # With run parameters dependency and order being set (also less other
#' #   parameters than in previous example)
#' my_params2 <- tibble(
#'   sim_name = c("scen1", "scen2"),
#'   startgrid = c(27410, 27410),
#'   river_routing = c(FALSE, FALSE),
#'   random_seed = c(42, 404),
#'   dependency = c(NA, "scen1_spinup")
#' )
#'
#' config_details2 <- write_config(my_params2, model_path, sim_path)
#'
#' run_details2 <- run_lpjml(config_details2, model_path, sim_path)
#'
#' run_details2
#' #   sim_name        order dependency   type       job_id   status
#' #   <chr>           <dbl> <chr>        <chr>      <chr>    <chr>
#' # 1 scen1_spinup        1 NA           simulation NA       run
#' # 2 scen1_transient     2 scen1_spinup simulation NA       run
#'
#'
#' # Same but by using the pipe operator
#' library(magrittr)
#'
#' run_details2 <- tibble(
#'   sim_name = c("scen1_spinup", "scen1_transient"),
#'   random_seed = as.integer(c(1, 42)),
#'   dependency = c(NA, "scen1_spinup")
#' ) %>%
#'   write_config(model_path, sim_path) %>%
#'   run_lpjml(model_path, sim_path)
#'
#'
#' # Shortcut approaches
#' run_details3 <- run_lpjml(
#'   x = "./config_scen1_transient.json",
#'   model_path = model_path,
#'   sim_path = sim_path
#' )
#'
#' run_details4 <- run_lpjml(
#'   c("./config_scen1_spinup.json", "./config_scen1_transient.json"),
#'   model_path,
#'   sim_path
#' )
#'
#' }
#'
#' @md
#' @export
run_lpjml <- function(x,
                      model_path = ".",
                      sim_path = NULL,
                      parallel_cores = 1,
                      write_stdout = FALSE,
                      raise_error = TRUE,
                      output_path = NULL) {

  warn_runner_os("run_lpjml")

  # Check if model_path is set or unit test flag provided
  if (!dir.exists(model_path)) {
    stop("Folder of model_path \"", model_path, "\" does not exist.")
  }

  sim_path <- deprecate_arg(new_arg = sim_path,
                            deprec_arg = output_path,
                            version = "1.0.0")

  if (is.null(sim_path)) sim_path <- model_path

  # Case if character vector with file names is supplied instead of tibble
  if (methods::is(x, "character")) {
    x <- tibble::tibble(sim_name = sapply( # nolint:undesirable_function_linter.
      x,
      function(x) {
        strsplit(
          strsplit(rev(strsplit(x, "/")[[1]])[1], "config_")[[1]][2],
          ".json"
        )[[1]]
      }
    ))
  }

  x$type <- "simulation"
  x$job_id <- NA
  x$status <- "failed"

  if ("order" %in% colnames(x)) {

    for (order in unique(sort(x$order))) {
      sim_names <- x$sim_name[which(x$order == order)]

      if (parallel_cores == 1) {
        do_sequential(
          sim_names, model_path, sim_path, write_stdout, raise_error
        )
      } else if (parallel_cores > 1 && Sys.getenv("SLURM_JOB_ID") != "") {
        do_parallel(
          sim_names, model_path, sim_path, parallel_cores, raise_error
        )
      } else {
        stop(
          "Parallelization is only supported for slurm jobs. Also",
          " please set parallel_cores to a value between 1 (non",
          " parallel) and n parallel cores/nodes."
        )
      }
    }

  } else {
    if (parallel_cores == 1) {
      do_sequential(
        x$sim_name, model_path, sim_path, write_stdout, raise_error
      )
    } else if (parallel_cores > 1 && Sys.getenv("SLURM_JOB_ID") != "") {
      do_parallel(
        x$sim_name, model_path, sim_path, parallel_cores, raise_error
      )
    } else {
      stop(
        "Parallelization is only supported for slurm jobs. Also",
        " please set parallel_cores to a value between 1 (non",
        " parallel) and n parallel cores/nodes."
      )
    }
  }

  x$status[x$type == "simulation"] <- "run"
  x
}


# Inner run function
do_run <- function(sim_name,
                   model_path,
                   sim_path,
                   write_stdout,
                   raise_error) {

  config_file <- paste0("config_",
                        sim_name,
                        ".json")

  timestamp <- format(Sys.time(), "%Y%m%d_%H%M")

  if (Sys.getenv("SLURM_JOB_ID") != "") {
    write_stdout <- TRUE
  }

  # When running inside a slurm job it ensures to propagate ressources
  inner_command <-  paste0(ifelse(Sys.getenv("SLURM_JOB_ID") == "",
                                  "",
                                  "srun --propagate "),
                           model_path,
                           "/bin/lpjml ", # nolint:absolute_path_linter.
                           sim_path,
                           "/configurations/",
                           config_file)

  stdout_file <- ifelse(write_stdout,
                        paste0(sim_path,
                               "/output/",
                               sim_name,
                               "/",
                               "outfile_",
                               timestamp,
                               ".out"),
                        "|")

  if (!testthat::is_testing()) {
    cat(paste0("\nRunning LPJmL for config: ", config_file, "...\n"))
  }

  if (write_stdout) {
    cat(paste0("View output at \"", stdout_file, "\"\n"))
  }

  processx::run(command = "bash",
                args = c("-c", inner_command),
                stdout = stdout_file,
                stderr = ifelse(write_stdout,
                                paste0(sim_path,
                                       "/output/",
                                       sim_name,
                                       "/",
                                       "errfile_",
                                       timestamp,
                                       ".err"),
                                "|"),
                echo = !testthat::is_testing(),
                cleanup_tree = TRUE,
                spinner = ifelse(write_stdout &&
                                 Sys.getenv("SLURM_JOB_ID") == "",
                                            TRUE,
                                            FALSE),
                error_on_status = raise_error,
                wd = sim_path)
}


# Conduct sequential runs (no parallelization), also switch off MPI on login
# node.
do_sequential <- function(sim_names,
                          model_path,
                          sim_path,
                          write_stdout,
                          raise_error) {

  # tryCatch to unset and set MPI for function call outside of slurm job on
  #   an HPC cluster even when function call is interrupted or has thrown
  #   an error.
  tryCatch({

    # Check if slurm is available
    if (is_slurm_available() && Sys.getenv("SLURM_JOB_ID") == "") {
      Sys.setenv(I_MPI_DAPL_UD = "disable", # nolint:undesirable_function_linter.
                 I_MPI_FABRICS = "shm:shm",
                 I_MPI_DAPL_FABRIC = "shm:sh")
    }
    for (sim_name in sim_names) {
      do_run(sim_name, model_path, sim_path, write_stdout, raise_error)
    }
  }, finally = {
    # Check if slurm is available
    if (is_slurm_available() && Sys.getenv("SLURM_JOB_ID") == "") {
      Sys.setenv(I_MPI_DAPL_UD = "enable", # nolint:undesirable_function_linter.
                 I_MPI_FABRICS = "shm:dapl")
      Sys.unsetenv("I_MPI_DAPL_FABRIC")
    }
  })
}


# Conduct parallel runs.
do_parallel <- function(sim_names,
                        model_path,
                        sim_path,
                        parallel_cores,
                        raise_error) {

  # Create temporary file to store stdout and stderr within parallel mode
  error_file <- tempfile(fileext = ".txt")

  # Create and register cluster based on available CPU cores/nodes
  cl <- parallel::makeCluster(parallel_cores, outfile = error_file)
  doParallel::registerDoParallel(cl)
  sim_name <- NULL

  # Parallel foreach sim_name.
  job_details <- foreach::foreach(sim_name = sim_names, # nolint:object_usage_linter.
                                  .errorhandling = "stop"
  ) %dopar% {

    # Write single call
    tryCatch({
      do_run(
        sim_name, model_path, sim_path, write_stdout = TRUE, raise_error
      )

    # Stop when error occures
    }, error = function(e) {

      # Check if error is returned
      if (e != "") {

        # Error with hint to deactivate parallelization
        stop(
          e,
          " - Please set parallel_cores=1 for traceback ",
          "functionality (only available without",
          " parallelization)",
          call. = FALSE
        )
      } else {

        # Hint to deactivate parallelization
        stop(
          "This is not a common error,",
          " please set parallel_cores=1 for traceback ",
          "functionality (only available without",
          " parallelization"
        )
      }
    })
  }

  # Close cluster
  parallel::stopCluster(cl)
}

Try the lpjmlkit package in your browser

Any scripts or data that you put into this service are public.

lpjmlkit documentation built on March 31, 2023, 9:35 p.m.