R/useNThreads.R

Defines functions allocateJobs WGCNAnThreads enableWGCNAThreads blockSize .checkAvailableMemory disableWGCNAThreads allowWGCNAThreads .nProcessorsOnline .useNThreads

Documented in allocateJobs allowWGCNAThreads blockSize .checkAvailableMemory disableWGCNAThreads enableWGCNAThreads .nProcessorsOnline .useNThreads WGCNAnThreads

# Function to control the number of threads to use in threaded calculations.

#' @title .useNThreads
#'
#' @param nThreads
#'
#' @return
#'
.useNThreads <- function(nThreads = 0) {
  if (nThreads == 0) {
    nt.env <- Sys.getenv(.threadAllowVar, unset = NA)
    if (is.na(nt.env)) {
      return(1)
    }
    if (nt.env == "") {
      return(1)
    }

    if (nt.env == "ALL_PROCESSORS") {
      return(.nProcessorsOnline())
    }

    nt <- suppressWarnings(as.numeric(nt.env))

    if (!is.finite(nt)) {
      return(2)
    }

    return(nt)
  } else {
    return(nThreads)
  }
}

#' @title .nProcessorsOnline
#'
#' @importFrom parallel detectCores
#' @return
#'
.nProcessorsOnline <- function() {
  n <- detectCores()
  if (!is.numeric(n)) n <- 2
  if (!is.finite(n)) n <- 2
  if (n < 1) n <- 2
  return(n)
}

#' @title allowWGCNAThreads
#'
#' @param nThreads
#'
#' @return
#' @export
#'
allowWGCNAThreads <- function(nThreads = NULL) {
  # Stop any clusters that may be still running
  disableWGCNAThreads()

  # Enable WGCNA threads
  if (is.null(nThreads)) nThreads <- .nProcessorsOnline()
  if (!is.numeric(nThreads) || nThreads < 2) {
    stop("nThreads must be numeric and at least 2.")
  }

  if (nThreads > .nProcessorsOnline()) {
    printFlush(paste(
      "Warning in allowWGCNAThreads: Requested number of threads is higher than number\n",
      "of available processors (or cores). Using too many threads may degrade code",
      "performance. It is recommended that the number of threads is no more than number\n",
      "of available processors.\n"
    ))
  }

  printFlush(paste("Allowing multi-threading with up to", nThreads, "threads."))

  pars <- list(nThreads)
  names(pars) <- .threadAllowVar
  do.call(Sys.setenv, pars)
  return(invisible(nThreads))
}

#' @title disableWGCNAThreads
#'
#' @return
#' @export
#'
disableWGCNAThreads <- function() {
  Sys.unsetenv(.threadAllowVar)
  pars <- list(1)
  names(pars) <- .threadAllowVar
  do.call(Sys.setenv, pars)
  if (exists(".revoDoParCluster", where = ".GlobalEnv")) {
    stopCluster(get(".revoDoParCluster", pos = ".GlobalEnv"))
  }
  return(registerDoSEQ())
}

#' @title .checkAvailableMemory
#'
#' @return
#'
.checkAvailableMemory <- function() {
  size <- 0
  res <- .C("checkAvailableMemoryForR", size = as.double(size), PACKAGE = "WGCNA")
  return(res$size)
}

#' @title blockSize
#' @description Function to calculate an appropriate blocksize
#'
#' @param matrixSize
#' @param rectangularBlocks
#' @param maxMemoryAllocation
#' @param overheadFactor
#'
#' @return
#' @export
#'
blockSize <- function(matrixSize,
                      rectangularBlocks = TRUE,
                      maxMemoryAllocation = NULL,
                      overheadFactor = 3) {
  if (is.null(maxMemoryAllocation)) {
    maxAlloc <- .checkAvailableMemory()
  } else {
    maxAlloc <- maxMemoryAllocation / 8
  }
  maxAlloc <- maxAlloc / overheadFactor

  if (rectangularBlocks) {
    blockSz <- floor(maxAlloc / matrixSize)
  } else {
    blockSz <- floor(sqrt(maxAlloc))
  }

  return(min(matrixSize, blockSz))
}

#' @title enableWGCNAThreads
#'
#' @param nThreads
#' 
#' @importFrom parallel detectCores
#' @importFrom doParallel registerDoParallel
#' 
#' @return
#' @export
#'
enableWGCNAThreads <- function(nThreads = NULL) {
  nCores <- detectCores()
  if (is.null(nThreads)) {
    if (nCores < 4) nThreads <- nCores else nThreads <- nCores - 1
  }
  if (!is.numeric(nThreads) || nThreads < 2) {
    stop("nThreads must be numeric and at least 2.")
  }
  if (nThreads > nCores) {
    printFlush(paste(
      "Warning in allowWGCNAThreads: Requested number of threads is higher than number\n",
      "of available processors (or cores). Using too many threads may degrade code",
      "performance. It is recommended that the number of threads is no more than number\n",
      "of available processors.\n"
    ))
  }
  printFlush(paste("Allowing parallel execution with up to", nThreads, "working processes."))
  pars <- list(nThreads)
  names(pars) <- .threadAllowVar
  do.call(Sys.setenv, pars)

  # Register a parallel backend for foreach
  registerDoParallel(nThreads)

  # Return the number of threads invisibly
  return(invisible(nThreads))
}

#' @title WGCNAnThreads
#'
#' @return
#' @export
#'
WGCNAnThreads <- function() {
  n <- suppressWarnings(as.numeric(as.character(Sys.getenv(.threadAllowVar, unset = 1))))
  if (is.na(n)) n <- 1
  if (length(n) == 0) n <- 1
  return(n)
}

#' @title allocateJobs
#' @description Facilitates multi-threading by producing an even allocation of jobs
#'   Works even when number of jobs is less than number of threads in which case some components of the
#'   returned allocation will have length 0.
#'
#' @param nTasks
#' @param nWorkers
#'
#' @return
#' @export
#'
allocateJobs <- function(nTasks, nWorkers) {
  if (is.na(nWorkers)) {
    warning("In function allocateJobs: 'nWorkers' is NA. Will use 1 worker.")
    nWorkers <- 1
  }
  n1 <- floor(nTasks / nWorkers)
  n2 <- nTasks - nWorkers * n1
  allocation <- list()
  start <- 1
  for (t in 1:nWorkers)
  {
    end <- start + n1 - 1 + as.numeric(t <= n2)
    if (start > end) {
      allocation[[t]] <- numeric(0)
    } else {
      allocation[[t]] <- c(start:end)
    }
    start <- end + 1
  }

  return(allocation)
}
milescsmith/WGCNA documentation built on April 11, 2024, 1:26 a.m.