defMultiProcErrorHandler <- function(cmd, exitStatus, ...)
{
return(sprintf("Failed to run command '%s' with args: '%s'. Exit code: %d",
cmd$command, paste0(cmd$args, collapse = " "), exitStatus))
}
#' Simultaneous execution of system commands.
#'
#' Execute a queue of system commands in parallel.
#'
#' This function executes a given queue with system commands in parallel to
#' speed up computation. Commands are executed in the background using the
#' \pkg{processx} package. A configurable maximum amount of processes are
#' created to execute multiple commands in parallel.
#'
#' Multiple commands may be executed in sequence that are launched from a single
#' parent process (as part of a batch script on Windows or combined with the
#' shell AND operator otherwise). Note that in this scenario still multiple
#' processes are spawned. Each of these processes will manage a chunk of the
#' command queue (size defined by \code{batchSize} argument). This approach is
#' typically suitable for fast running commands: the overhead of spawning a new
#' process for each command from R would in this case be significant enough to
#' loose most of the speedup otherwise gained with parallel execution. Note that
#' the actual batch size may be adjusted to ensure that a maximum number of
#' processes are running simultaneously.
#'
#' Other functionalities of this function include timeout and error handling.
#'
#' @param commandQueue A list with commands. Should contain \code{command}
#' (scalar string) and \code{args} (\code{character} vector). More user
#' defineds fields are allowed and useful to attach command information that
#' can be used in the finish, timeout and error handlers.
#' @param finishHandler A function that is called when a command has finished.
#' This function is typically used to process any results generated by the
#' command. The function is called right after spawning a new process, hence
#' processing results can occur while the next command is running in the
#' background. The function signature should be \code{function(cmd)} where
#' \code{cmd} is the queue data (from \code{commandQueue}) of the command that
#' has finished.
#' @param timeoutHandler A function that is called whenever a timeout for a
#' command occurs. Should return \code{TRUE} if execution of the command
#' should be retried. The function signature should be \code{function(cmd,
#' retries)} where \code{cmd} is the queue data for that command and
#' \code{retries} the number of times the command has been retried.
#' @param errorHandler Similar to \code{timeoutHandler}, but called whenever a
#' command has failed. The signature should be \code{function(cmd, exitStatus,
#' retries)}. The \code{exitStatus} argument is the exit code of the command
#' (may be \code{NA} in rare cases this is unknown). Other arguments are as
#' \code{timeoutHandler}. The return value should be as \code{timeoutHandler}
#' or a \code{character} with an error message which will be thrown with
#' \code{\link{stop}}.
#' @param prepareHandler A function that is called prior to execution of the
#' command. The function signature should be \code{function(cmd)} where
#' \code{cmd} is the queue data (from \code{commandQueue}) of the command to
#' be started. The return value must be (an updated) \code{cmd}.
#' @param cacheName,setHash Used for caching results. Set to \code{NULL} to
#' disable caching.
#' @param procTimeout The maximum time a process may consume before a timeout
#' occurs (in seconds). Set to \code{NULL} to disable timeouts. Ignored if
#' \code{patRoon.MP.method="future"}.
#' @param printOutput,printError Set to \code{TRUE} to print stdout/stderr
#' output to the console. Ignored if \option{patRoon.MP.method="future"}.
#' @param logSubDir The sub-directory used for log files. The final log file
#' path is constructed from \option{patRoon.MP.logPath}, \code{logSubDir} and
#' \code{logFile} set in the \code{commandQueue}.
#' @param showProgress Set to \code{TRUE} to display a progress bar. Ignored if
#' \option{patRoon.MP.method="future"}.
#' @param waitTimeout Number of milliseconds to wait before checking if a new
#' process should be spawned. Ignored if \option{patRoon.MP.method="future"}.
#' @param batchSize Number of commands that should be executed in sequence per
#' processes. See details. Ignored if \option{patRoon.MP.method="future"}.
#' @param delayBetweenProc Minimum number of milliseconds to wait before
#' spawning a new process. Might be needed to workaround errors. Ignored if
#' \option{patRoon.MP.method="future"}.
#' @param method Overrides \option{patRoon.MP.method} if not \code{NULL}.
#'
#' @keywords internal
executeMultiProcess <- function(commandQueue, finishHandler,
timeoutHandler = function(...) TRUE,
errorHandler = defMultiProcErrorHandler,
prepareHandler = NULL, cacheName = NULL, setHash = NULL,
procTimeout = NULL, printOutput = FALSE, printError = FALSE,
logSubDir = NULL, showProgress = TRUE, waitTimeout = 50,
batchSize = 1, delayBetweenProc = 0, method = NULL)
{
if (!checkmate::testNamed(commandQueue))
names(commandQueue) <- seq_along(commandQueue) # setting names makes it easier to split/merge cached results later
cmdNames <- names(commandQueue)
cacheDB <- openCacheDBScope()
if (!is.null(cacheName))
{
if (!is.null(setHash))
cachedSet <- loadCacheSet(cacheName, setHash, cacheDB)
else
cachedSet <- NULL
allHashes <- sapply(commandQueue, "[[", "hash")
cachedResults <- lapply(allHashes, function(hash)
{
res <- NULL
if (!is.null(cachedSet))
res <- cachedSet[[hash]]
if (is.null(res))
res <- loadCacheData(cacheName, hash, cacheDB)
return(res)
})
names(cachedResults) <- cmdNames
cachedResults <- pruneList(cachedResults)
# remove cached results
commandQueue <- commandQueue[setdiff(cmdNames, names(cachedResults))]
}
if (length(commandQueue) > 0)
{
if (is.null(method))
method <- getOption("patRoon.MP.method", "classic")
f <- switch(method,
classic = executeMultiProcessClassic,
future = executeMultiProcessFuture)
if (is.null(f))
stop("Wrong value set for patRoon.MP.method! Should be \"classic\" or \"future\"")
results <- f(commandQueue = commandQueue, finishHandler = finishHandler, timeoutHandler = timeoutHandler,
errorHandler = errorHandler, prepareHandler = prepareHandler, cacheName = cacheName,
procTimeout = procTimeout, printOutput = printOutput, printError = printError,
logSubDir = logSubDir, showProgress = showProgress, waitTimeout = waitTimeout,
batchSize = batchSize, delayBetweenProc = delayBetweenProc)
}
else
results <- list()
if (!is.null(cacheName))
{
if (length(cachedResults) > 0)
{
# merge cached results
results <- c(results, cachedResults)
results <- results[intersect(cmdNames, names(results))] # re-order
}
if (!is.null(setHash) && is.null(cachedSet))
saveCacheSet(cacheName, allHashes, setHash, cacheDB)
}
return(results)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.