# runParallel
#' Run a list of functions parallel
#'
#' \code{runParallel} runs its input tasks parallel until all complete
#' and returns either a named list (on error \code{NULL}) or the value
#' of a given callback.
#'
#' @param tasks List of functions (anonymous and named) \strong{required}.
#' @param cb Anonymous or named function with signature
#' \code{cb(error, data)} \strong{optional}.
#' @return If \code{cb} is \code{NULL} the tasks' return values are returned
#' in a named list (on error \code{NULL}). If \code{cb} is a function it is
#' called upon completion of all tasks and gets passed an error value
#' (default \code{NULL}) as first parameter and a named list of the tasks'
#' return values (on error \code{NULL}) as second parameter.
#'
#' @details If an error is encountered while calling the tasks without a
#' callback \code{runParallel} immediately stops execution and returns
#' \code{NULL}. If an error is encountered and a callback is defined
#' \code{runParallel} immediately stops execution and calls the callback with
#' the \code{data} parameter set to \code{NULL} and the \code{error} parameter
#' set to the encountered error. Thus, the callback will always have only one
#' non-\code{NULL} argument. Within the callback simply check for an error
#' with \code{is.null(error)}. If the \code{error} object is not \code{NULL}
#' it has a property \code{$task} indicating the function that failed.
#'
#' @seealso \code{\link{runSeries}} \code{\link{runWaterfall}}
#' \code{\link{runRace}}
#' \url{https://github.com/feross/run-parallel}
#'
#' @examples
#' \dontrun{
#' callback <- function(err, d) {
#' if (is.null(err)) d else stop(err, err$task)
#' }
#' runParallel(list(function(i=0L) while (i < 1e6L) i <- i + 1L,
#' function() {Sys.sleep(5L); return(419L)}),
#' callback)
#' }
#'
#' @export
runParallel <- function(tasks=list(NULL), cb=NULL) {
stopifnot(isNonEmptyListofFuncs(tasks), is.null(cb) || is.function(cb))
if (is.function(cb) && length(formals(cb)) != 2L)
stop('callback must have two parameters: 1st error, 2nd data')
# input function names
games <- getFuncNames(tasks, cb) # returns the names of tasks only
# io setup
if (!dir.exists('.runr'))
dir.create('.runr') # root for all tasks
else
unlink(file.path('.runr', '*')) # clear old stuff
# filenames
FLNMS_R <- file.path('.runr', paste0('xp.', games, '.R'))
MMNMS_LOG <- nameI('LOG', seq_along(FLNMS_R))
MMNMS_OUT <- nameI('OUT', seq_along(FLNMS_R))
MMNMS_BND <- nameI('BND', seq_along(FLNMS_R))
# execution script template for children
TMPL <- paste0('%s\n', # for function object
'%s\n', # for bound data
'RTN <- NULL\n',
'runr_END <- \'runr_EOF\'\n',
'sharedata::clone_environment(\'CLONE\', envir=.GlobalEnv)\n',
'list(\n',
'tryCatch(\n',
'assign(\'RTN\', (FUN)(), envir=.GlobalEnv),\n',
'error=function(e) {\n',
'assign(\'runr_END\',',
'geterrmessage(), envir=.GlobalEnv)\n',
'assign(\'RTN\', e, envir=.GlobalEnv)',
'}\n',
'),\n',
'sharedata::share_object(RTN, \'%s\')\n',
')\n',
'sharedata::share_object(runr_END, \'%s\')')
# clone parent's global environment data... sharedata::...
sharedata::share_environment('CLONE', envir=.GlobalEnv)
# further preparation
PIDS <- list() # memory for PIDSs of tasks
xp.tasks <- vector('list', length(tasks))
for (i in seq_along(tasks)) {
# conditionally transfer bound environments
if (bounds::isBound(tasks[[i]])) # save bound environments
sharedata::share_object(environment(tasks[[i]]), MMNMS_BND[[i]])
# prepare input tasks
xp.tasks[[i]] <- sprintf(
TMPL,
paste0('FUN <- ', paste0(deparse(tasks[[i]]), sep='\n', collapse='')),
ifelse(bounds::isBound(tasks[[i]]),
paste0('environment(FUN) <- sharedata::clone_object(\'',
MMNMS_BND[[i]], '\')'),
''),
MMNMS_OUT[[i]],
MMNMS_LOG[[i]]
)
# export prepared tasks
cat(xp.tasks[[i]], file=FLNMS_R[[i]])
}
# start child processes non-blocking and record their pids
for (i in seq_along(tasks)) {
PIDS[[games[i]]] <- sys::exec_background(
cmd='R',
args=c('--vanilla', '--slave', '-f', FLNMS_R[[i]]),
std_out=FALSE, std_err=FALSE
)
}
# enter blocking loop till all tasks are done
err <- NULL
status <- lapply(PIDS, function(p) FALSE) # task status completed: T/F
x <- lapply(status, function(s) NULL) # return object
i <- 1L
repeat { # block
# check if error occurred
if (!status[[games[i]]] &&
tryCatch(sharedata::clone_object(MMNMS_LOG[[i]]) != 'runr_EOF',
error=function(e) FALSE)) {
x <- NULL # set data to NULL
Sys.sleep(.5) # wait 4 OS to commit
# read in error
err <- sharedata::clone_object(MMNMS_OUT[[i]])
err$task <- games[i] # add info
break # early exit
}
# check if current task completed
if (!status[[games[i]]] &&
tryCatch(sharedata::clone_object(MMNMS_LOG[[i]]) == 'runr_EOF',
error=function(e) FALSE)) {
Sys.sleep(.5) # wait 4 OS to commit
# read in return value
RTN <- sharedata::clone_object(MMNMS_OUT[[i]])
# not assigning NULL to prevent deleting named list item
if (!is.null(RTN)) x[[games[i]]] <- RTN
status[[games[i]]] <- TRUE # mark current task as completed
}
# check if all tasks completed
if (all(unlist(status))) break # been time
i <- i + 1L # increment
if (i > length(tasks)) i <- 1L # rewind
}
# clean up
killEmAll(PIDS)
lapply(list(MMNMS_LOG, MMNMS_OUT, MMNMS_BND), sharedata::unshare)
unlink('.runr', recursive=TRUE)
# done
return(if (is.function(cb)) cb(err, x) else x)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.