
Defines functions parallelize

Documented in parallelize

#' @title Setup parallel processing using snow.
#' @description
#' By default it uses a multinode cluster if available, otherwise sets up
#' multicore via doMC. Libraries required: parallel, doSNOW, RhpcBLASctl,
#' foreach. As of Feb. 2020 no longer supports doMC.
#' @param type "any", "cluster"/"doSNOW", "doParallel", "doMC", or "seq". doMC
#' now sets up a snow cluster as of Feb. 2020.
#' @param max_cores Restrict to this many cores, even if more are available.
#' @param allow_multinode If T will use multiple nodes if detected. If F will
#'   not use multiple machines even if they are available.
#' @param machine_list List of networked computers for multinode computation.
#' @param cpus_per_node Number of processes to run on each node, if using
#'   multinode parallelization.
#' @param outfile File to collect output across workers. IF "" then results are
#'   printed to the console.
#' @param verbose If TRUE display additional output during execution.
#' @return obj Cluster object that can be passed to stop_cluster().
#' @importFrom utils installed.packages
#' @seealso stop_cluster
#' @export
parallelize =
  function(type = "any", max_cores = NULL, allow_multinode = TRUE,
           machine_list = Sys.getenv("SLURM_NODELIST"),
           cpus_per_node = as.numeric(Sys.getenv("SLURM_CPUS_ON_NODE")),
           outfile = "", verbose = FALSE) {

  # Indicator for multi-node parallelism.
  multinode = FALSE

  # Check if we have multiple machine access.
  if (allow_multinode) {
    machines = strsplit(machine_list, ",")[[1]]
    if (length(machines) > 1L) {
      cat("Have multi-node access for parallelism with", length(machines),
          "machines:", machines, "\n")

      # Restrict the number of cores used, e.g. if we need a lot of memory per
      # core.
      if (!is.null(max_cores)) {
        cpus_per_node = min(cpus_per_node, max_cores)

      # NOTE: this may be a bad config if the nodes have different core counts.
      cores = rep(machines, each = cpus_per_node)
      cat("Multi-node cores enabled:", cores, "\n")
      multinode = TRUE

  if (!multinode) {
    # Count of physical cores, unlike parallel::detectCores() which is logical
    # cores (threads).
    cores = RhpcBLASctl::get_num_cores()
    cat("Local physical cores detected:", cores, "\n")

    # Restrict to max_cores if needed, e.g. high memory usage per core.
    if (!is.null(max_cores)) {
      if (max_cores < cores) {
        cat("Restricting usage to", max_cores, "cores.\n")
        cores = min(max_cores, cores)

  if (multinode ||
      (!is.null(type) && type %in% c("cluster", "doSNOW", "snow"))) {
    # Outfile = "" allows output from within foreach to be displayed while in
    # RStudio.
    # TODO: figure out how to suppress the output from makeCluster()
    #capture.output({ cl = parallel::makeCluster(cores, outfile = outfile) })
    cat("Starting multinode cluster with cores:", cores, "\n")
      cl = parallel::makeCluster(cores, type = "SOCK", outfile = outfile)
    # doParallel supports multicore and multinode parallelism, but may require
    # explicitly exporting functions and libraries across the cluster.
    parallel_type = "snow"
  } else if (type %in% "seq" || is.null(type)) {
    # Don't use any parallelization.
    options(mc.cores = 1)
    cl = NA
    parallel_type = "seq"
  } else if (type %in% c("doParallel", "doMC") ||
             !"doMC" %in% rownames(installed.packages())) {
    # Outfile = "" allows output from within foreach to be displayed.
    # TODO: figure out how to suppress the output from makeCluster()
      cl = parallel::makeCluster(cores, outfile = outfile)
    parallel_type = "multicore"

  # Make sure the BLAS is not competing with the SL parallelism.
  omp_threads = RhpcBLASctl::omp_get_max_threads()
  if (!is.null(omp_threads) && omp_threads > 1) {

  # TODO: need to figure out difference between get_max_threads and
  # get_num_procs.
  # They are not always both consistently set to 1 (i.e. on Benten).
  omp_threads = RhpcBLASctl::omp_get_max_threads()
  # If omp_threads is NULL we can safely plan on using 1 thread.
  omp_threads = ifelse(is.null(omp_threads), 1, omp_threads)
  cat("Our BLAS is setup for", RhpcBLASctl::blas_get_num_procs(),
      "threads and OMP is", omp_threads, "threads.\n")

  cat("doPar backend registered:", foreach::getDoParName(), "\n")
  cat("Workers enabled:", foreach::getDoParWorkers(), "\n")

  # Return invisibily so that NA is not printed.

Try the ck37r package in your browser

Any scripts or data that you put into this service are public.

ck37r documentation built on Feb. 6, 2020, 5:09 p.m.