R/multi-process.R

Defines functions executeMultiProcess defMultiProcErrorHandler

Documented in executeMultiProcess

defMultiProcErrorHandler <- function(cmd, exitStatus, ...)
{
    return(sprintf("Failed to run command '%s' with args: '%s'. Exit code: %d",
                   cmd$command, paste0(cmd$args, collapse = " "), exitStatus))
}

#' Simultaneous execution of system commands.
#'
#' Execute a queue of system commands in parallel.
#'
#' This function executes a given queue with system commands in parallel to
#' speed up computation. Commands are executed in the background using the
#' \pkg{processx} package. A configurable maximum amount of processes are
#' created to execute multiple commands in parallel.
#'
#' Multiple commands may be executed in sequence that are launched from a single
#' parent process (as part of a batch script on Windows or combined with the
#' shell AND operator otherwise). Note that in this scenario still multiple
#' processes are spawned. Each of these processes will manage a chunk of the
#' command queue (size defined by \code{batchSize} argument). This approach is
#' typically suitable for fast running commands: the overhead of spawning a new
#' process for each command from R would in this case be significant enough to
#' loose most of the speedup otherwise gained with parallel execution. Note that
#' the actual batch size may be adjusted to ensure that a maximum number of
#' processes are running simultaneously.
#'
#' Other functionalities of this function include timeout and error handling.
#'
#' @param commandQueue A list with commands. Should contain \code{command}
#'   (scalar string) and \code{args} (\code{character} vector). More user
#'   defineds fields are allowed and useful to attach command information that
#'   can be used in the finish, timeout and error handlers.
#' @param finishHandler A function that is called when a command has finished.
#'   This function is typically used to process any results generated by the
#'   command. The function is called right after spawning a new process, hence
#'   processing results can occur while the next command is running in the
#'   background. The function signature should be \code{function(cmd)} where
#'   \code{cmd} is the queue data (from \code{commandQueue}) of the command that
#'   has finished.
#' @param timeoutHandler A function that is called whenever a timeout for a
#'   command occurs. Should return \code{TRUE} if execution of the command
#'   should be retried. The function signature should be \code{function(cmd,
#'   retries)} where \code{cmd} is the queue data for that command and
#'   \code{retries} the number of times the command has been retried.
#' @param errorHandler Similar to \code{timeoutHandler}, but called whenever a
#'   command has failed. The signature should be \code{function(cmd, exitStatus,
#'   retries)}. The \code{exitStatus} argument is the exit code of the command
#'   (may be \code{NA} in rare cases this is unknown). Other arguments are as
#'   \code{timeoutHandler}. The return value should be as \code{timeoutHandler}
#'   or a \code{character} with an error message which will be thrown with
#'   \code{\link{stop}}.
#' @param prepareHandler A function that is called prior to execution of the
#'   command. The function signature should be \code{function(cmd)} where
#'   \code{cmd} is the queue data (from \code{commandQueue}) of the command to
#'   be started. The return value must be (an updated) \code{cmd}.
#' @param cacheName,setHash Used for caching results. Set to \code{NULL} to
#'   disable caching.
#' @param procTimeout The maximum time a process may consume before a timeout
#'   occurs (in seconds). Set to \code{NULL} to disable timeouts. Ignored if
#'   \code{patRoon.MP.method="future"}.
#' @param printOutput,printError Set to \code{TRUE} to print stdout/stderr
#'   output to the console. Ignored if \option{patRoon.MP.method="future"}.
#' @param logSubDir The sub-directory used for log files. The final log file
#'   path is constructed from \option{patRoon.MP.logPath}, \code{logSubDir} and
#'   \code{logFile} set in the \code{commandQueue}.
#' @param showProgress Set to \code{TRUE} to display a progress bar. Ignored if
#'   \option{patRoon.MP.method="future"}.
#' @param waitTimeout Number of milliseconds to wait before checking if a new
#'   process should be spawned. Ignored if \option{patRoon.MP.method="future"}.
#' @param batchSize Number of commands that should be executed in sequence per
#'   processes. See details. Ignored if \option{patRoon.MP.method="future"}.
#' @param delayBetweenProc Minimum number of milliseconds to wait before
#'   spawning a new process. Might be needed to workaround errors. Ignored if
#'   \option{patRoon.MP.method="future"}.
#' @param method Overrides \option{patRoon.MP.method} if not \code{NULL}.
#'
#' @keywords internal
executeMultiProcess <- function(commandQueue, finishHandler,
                                timeoutHandler = function(...) TRUE,
                                errorHandler = defMultiProcErrorHandler,
                                prepareHandler = NULL, cacheName = NULL, setHash = NULL,
                                procTimeout = NULL, printOutput = FALSE, printError = FALSE,
                                logSubDir = NULL, showProgress = TRUE, waitTimeout = 50,
                                batchSize = 1, delayBetweenProc = 0, method = NULL)
{
    if (!checkmate::testNamed(commandQueue))
        names(commandQueue) <- seq_along(commandQueue) # setting names makes it easier to split/merge cached results later
    
    cmdNames <- names(commandQueue)
    cacheDB <- openCacheDBScope()
    
    if (!is.null(cacheName))
    {
        if (!is.null(setHash))
            cachedSet <- loadCacheSet(cacheName, setHash, cacheDB)
        else
            cachedSet <- NULL
        
        allHashes <- sapply(commandQueue, "[[", "hash")
        cachedResults <- lapply(allHashes, function(hash)
        {
            res <- NULL
            if (!is.null(cachedSet))
                res <- cachedSet[[hash]]
            if (is.null(res))
                res <- loadCacheData(cacheName, hash, cacheDB)
            return(res)
        })
        names(cachedResults) <- cmdNames
        cachedResults <- pruneList(cachedResults)
        
        # remove cached results
        commandQueue <- commandQueue[setdiff(cmdNames, names(cachedResults))]
    }
    
    if (length(commandQueue) > 0)
    {
        if (is.null(method))
            method <- getOption("patRoon.MP.method", "classic")
        f <- switch(method,
                    classic = executeMultiProcessClassic,
                    future = executeMultiProcessFuture)
        if (is.null(f))
            stop("Wrong value set for patRoon.MP.method! Should be \"classic\" or \"future\"")
        
        results <- f(commandQueue = commandQueue, finishHandler = finishHandler, timeoutHandler = timeoutHandler,
                     errorHandler = errorHandler, prepareHandler = prepareHandler, cacheName = cacheName,
                     procTimeout = procTimeout, printOutput = printOutput, printError = printError,
                     logSubDir = logSubDir, showProgress = showProgress, waitTimeout = waitTimeout,
                     batchSize = batchSize, delayBetweenProc = delayBetweenProc)
    }
    else
        results <- list()
    
    if (!is.null(cacheName))
    {
        if (length(cachedResults) > 0)
        {
            # merge cached results
            results <- c(results, cachedResults)
            results <- results[intersect(cmdNames, names(results))] # re-order
        }
    
        if (!is.null(setHash) && is.null(cachedSet))
            saveCacheSet(cacheName, allHashes, setHash, cacheDB)
    }
    
    return(results)
}
rickhelmus/patRoon documentation built on April 3, 2024, 6:56 p.m.