R/parallelize.R

Defines functions parallelize

Documented in parallelize

#' @title Setup parallel processing using snow.
#'
#' @description
#' By default it uses a multinode cluster if available, otherwise sets up
#' multicore via doMC. Libraries required: parallel, doSNOW, RhpcBLASctl,
#' foreach. As of Feb. 2020 no longer supports doMC.
#'
#' @param type "any", "cluster"/"doSNOW", "doParallel", "doMC", or "seq". doMC
#' now sets up a snow cluster as of Feb. 2020.
#' @param max_cores Restrict to this many cores, even if more are available.
#' @param allow_multinode If T will use multiple nodes if detected. If F will
#'   not use multiple machines even if they are available.
#' @param machine_list List of networked computers for multinode computation.
#' @param cpus_per_node Number of processes to run on each node, if using
#'   multinode parallelization.
#' @param outfile File to collect output across workers. IF "" then results are
#'   printed to the console.
#' @param verbose If TRUE display additional output during execution.
#'
#' @return obj Cluster object that can be passed to stop_cluster().
#'
#' @importFrom utils installed.packages
#' @seealso stop_cluster
#' @export
parallelize =
  function(type = "any", max_cores = NULL, allow_multinode = TRUE,
           machine_list = Sys.getenv("SLURM_NODELIST"),
           cpus_per_node = as.numeric(Sys.getenv("SLURM_CPUS_ON_NODE")),
           outfile = "", verbose = FALSE) {

  # Indicator for multi-node parallelism.
  multinode = FALSE

  # Check if we have multiple machine access.
  if (allow_multinode) {
    machines = strsplit(machine_list, ",")[[1]]
    if (length(machines) > 1L) {
      cat("Have multi-node access for parallelism with", length(machines),
          "machines:", machines, "\n")

      # Restrict the number of cores used, e.g. if we need a lot of memory per
      # core.
      if (!is.null(max_cores)) {
        cpus_per_node = min(cpus_per_node, max_cores)
      }

      # NOTE: this may be a bad config if the nodes have different core counts.
      cores = rep(machines, each = cpus_per_node)
      cat("Multi-node cores enabled:", cores, "\n")
      multinode = TRUE
    }
  }

  if (!multinode) {
    # Count of physical cores, unlike parallel::detectCores() which is logical
    # cores (threads).
    cores = RhpcBLASctl::get_num_cores()
    cat("Local physical cores detected:", cores, "\n")

    # Restrict to max_cores if needed, e.g. high memory usage per core.
    if (!is.null(max_cores)) {
      if (max_cores < cores) {
        cat("Restricting usage to", max_cores, "cores.\n")
        cores = min(max_cores, cores)
      }
    }
  }

  if (multinode ||
      (!is.null(type) && type %in% c("cluster", "doSNOW", "snow"))) {
    # Outfile = "" allows output from within foreach to be displayed while in
    # RStudio.
    # TODO: figure out how to suppress the output from makeCluster()
    #capture.output({ cl = parallel::makeCluster(cores, outfile = outfile) })
    cat("Starting multinode cluster with cores:", cores, "\n")
    capture.output({
      cl = parallel::makeCluster(cores, type = "SOCK", outfile = outfile)
    })
    # doParallel supports multicore and multinode parallelism, but may require
    # explicitly exporting functions and libraries across the cluster.
    doParallel::registerDoParallel(cl)
    parallel::setDefaultCluster(cl)
    parallel_type = "snow"
  } else if (type %in% "seq" || is.null(type)) {
    # Don't use any parallelization.
    options(mc.cores = 1)
    foreach::registerDoSEQ()
    cl = NA
    parallel_type = "seq"
  } else if (type %in% c("doParallel", "doMC") ||
             !"doMC" %in% rownames(installed.packages())) {
    # Outfile = "" allows output from within foreach to be displayed.
    # TODO: figure out how to suppress the output from makeCluster()
    capture.output({
      cl = parallel::makeCluster(cores, outfile = outfile)
    })
    doParallel::registerDoParallel(cl)
    parallel::setDefaultCluster(cl)
    parallel_type = "multicore"
  }

  # Make sure the BLAS is not competing with the SL parallelism.
  omp_threads = RhpcBLASctl::omp_get_max_threads()
  if (!is.null(omp_threads) && omp_threads > 1) {
    RhpcBLASctl::omp_set_num_threads(1)
  }

  # TODO: need to figure out difference between get_max_threads and
  # get_num_procs.
  # They are not always both consistently set to 1 (i.e. on Benten).
  omp_threads = RhpcBLASctl::omp_get_max_threads()
  # If omp_threads is NULL we can safely plan on using 1 thread.
  omp_threads = ifelse(is.null(omp_threads), 1, omp_threads)
  cat("Our BLAS is setup for", RhpcBLASctl::blas_get_num_procs(),
      "threads and OMP is", omp_threads, "threads.\n")

  cat("doPar backend registered:", foreach::getDoParName(), "\n")
  cat("Workers enabled:", foreach::getDoParWorkers(), "\n")

  # Return invisibily so that NA is not printed.
  return(invisible(cl))
}
ck37/ckTools documentation built on April 29, 2023, 11:47 p.m.