makeCommandList <- function(commandQueue, cmdInds, sucDir)
{
ncmd <- length(commandQueue)
ret <- list()
quoteCmd <- function(cmd) paste(shQuote(cmd$command), paste0(shQuote(cmd$args), collapse = " "), sep = " ")
if (Sys.info()[["sysname"]] == "Windows")
{
# fail with last exit code if a command failed: see https://stackoverflow.com/questions/734598/how-do-i-make-a-batch-file-terminate-upon-encountering-an-error
# NOTE: need to enable delayed expansion and use exclamation marks for current errorlevel expansion.
ORDoExit <- "|| exit /b !errorlevel!"
# mark success of a command by creating an empty file named after the command index
ANDMarkSucceed <- paste("&& type NUL >", file.path(sucDir, cmdInds))
}
else
{
ORDoExit <- "|| exit $?"
ANDMarkSucceed <- paste("&& touch", file.path(sucDir, cmdInds))
}
if (ncmd > 1)
{
# execute multiple processes at once
cmdList <- sapply(commandQueue, quoteCmd)
if (Sys.info()[["sysname"]] == "Windows")
{
# on Windows we easily reach the commandline text limit --> execute as batch file
# use call in order to be able to execute batch files
cmdList <- paste("call", cmdList)
shFile <- tempfile(fileext = ".bat")
cat("SETLOCAL EnableDelayedExpansion",
paste(cmdList, ANDMarkSucceed, ORDoExit, collapse = "\n"),
sep = "\n", file = shFile)
ret$command <- shFile
}
else
{
# not supported anymore by processx :( --> call sh instead for nix
# ret$commandline <- paste0(cmdList, collapse = " ; ")
ret$command <- "/bin/sh"
ret$args <- c("-c", paste0(cmdList, ANDMarkSucceed, ORDoExit, collapse = " && "))
}
}
else
{
if (Sys.info()[["sysname"]] == "Windows")
ret[c("command", "args")] <- commandQueue[[1]][c("command", "args")]
else
{
# HACK: on Linux (and others?) sometimes NA is returned for status messages, use success markers there like
# batch mode above
ret$command <- "/bin/sh"
ret$args <- c("-c", paste(quoteCmd(commandQueue[[1]]), ANDMarkSucceed, ORDoExit))
}
}
return(ret)
}
initCommand <- function(commandQueue, cmdInds, workDir, sucDir, doLog, printOutput, printError)
{
procArgs <- makeCommandList(commandQueue, cmdInds, sucDir)
procArgs <- c(procArgs, list(cleanup_tree = TRUE, supervise = TRUE))
if (!is.null(workDir))
procArgs <- c(procArgs, list(wd = workDir))
if (printOutput)
procArgs[["stdout"]] <- "|"
if (printError)
procArgs[["stderr"]] <- "|"
ncmd <- length(commandQueue)
if (ncmd == 1 && doLog && !is.null(commandQueue[[1]]$logFile))
procArgs[c("stdout", "stderr")] <- "|"
ret <- list()
ret$procArgs <- procArgs
ret$cmdIndRange <- c(cmdInds[1], cmdInds[ncmd])
ret$finished <- rep(FALSE, ncmd) # only used for timeout checking
ret$failed <- rep(FALSE, ncmd)
ret$timedOut <- rep(FALSE, ncmd)
ret$timeOutRetries <- rep(0, ncmd)
ret$startTime <- Sys.time()
ret$noResRetries <- rep(0, ncmd)
ret$running <- TRUE
return(ret)
}
maybeRestartCommand <- function(commandQueue, procInfo, sucDir, exitStatus, timeoutHandler, errorHandler)
{
restart <- FALSE
cmdInds <- seq(procInfo$cmdIndRange[1], procInfo$cmdIndRange[2])
ncmd <- length(cmdInds)
starti <- tail(which(procInfo$failed), 1)
if (length(starti) == 0)
starti <- 0
useSucMarkers <- ncmd > 1 || Sys.info()[["sysname"]] != "Windows"
for (i in seq(starti + 1, ncmd))
{
# succeeded?
if (useSucMarkers && file.exists(file.path(sucDir, cmdInds[i])))
next
# already marked as failed during a previous restart?
if (procInfo$failed[i])
next
# timed out?
if (procInfo$timedOut[i])
retry <- timeoutHandler(cmd = commandQueue[[i]],
retries = procInfo$timeOutRetries[i])
else
{
# we can assume that only the last process in sequence
# has failed. Hence the exit status must correspond to
# this process.
retry <- errorHandler(cmd = commandQueue[[i]], exitStatus = exitStatus,
retries = procInfo$noResRetries[i])
if (is.character(retry))
stop(retry) # special case: stop with error message given by the errorHandler
}
if (retry)
{
# Prevents occasional Windows error: "The requested operation cannot be performed on a file with a user-mapped section open"
Sys.sleep(1)
if (procInfo$timedOut[i])
{
procInfo$timeOutRetries[i] <- procInfo$timeOutRetries[i] + 1
procInfo$timedOut[i] <- FALSE
}
else
procInfo$noResRetries[i] <- procInfo$noResRetries[i] + 1
reStartInd <- i
}
else
{
procInfo$failed[i] <- TRUE
reStartInd <- i + 1
}
if (reStartInd <= ncmd)
{
if (reStartInd != 1)
{
# adjust command list if not all commands have to be restarted
pa <- makeCommandList(commandQueue[seq(reStartInd, ncmd)],
seq(cmdInds[reStartInd], cmdInds[ncmd]), sucDir)
procInfo$procArgs <- modifyList(procInfo$procArgs, pa)
procInfo$startTime <- Sys.time()
}
restart <- TRUE
}
break
}
return(list(procInfo = procInfo, restart = restart))
}
executeMultiProcessClassic <- function(commandQueue, finishHandler,
timeoutHandler, errorHandler,
prepareHandler, cacheName, procTimeout,
printOutput, printError,
logSubDir, showProgress, waitTimeout,
batchSize, delayBetweenProc)
{
if (length(commandQueue) == 0)
return(list())
cacheDB <- openCacheDBScope()
maxProcAmount <- getOption("patRoon.MP.maxProcs")
runningProcs <- vector("list", maxProcAmount)
runningProcInfo <- vector("list", maxProcAmount)
if (!is.null(prepareHandler))
commandQueue <- lapply(commandQueue, prepareHandler)
workDir <- unique(unlist(lapply(commandQueue, "[[", "workDir")))
if (length(workDir) > 1)
stop("Different work directories not yet supported")
totCmdCount <- length(commandQueue)
ret <- vector("list", totCmdCount)
names(ret) <- names(commandQueue)
if (showProgress)
prog <- openProgBar(0, totCmdCount)
nextCommand <- 1
finishedCommands <- 0
lastCommandTime <- 0 # at which time (in ms) the last command was started
logPath <- getOption("patRoon.MP.logPath", FALSE)
if (!is.null(logSubDir) && !isFALSE(logPath))
{
logPath <- file.path(logPath, logSubDir)
mkdirp(logPath)
commandQueue <- lapply(commandQueue, function(cmd) { cmd$logFile <- file.path(logPath, cmd$logFile); return(cmd) })
}
else
logPath <- NULL
doLog <- !is.null(logPath)
stopifnot(batchSize == 1 || (!doLog && !printOutput && !printError))
# clear up stale processes: see https://github.com/r-lib/processx/issues/171
on.exit({
for (pi in seq_along(runningProcs))
{
if (!is.null(runningProcs[[pi]]) && runningProcInfo[[pi]]$running)
runningProcs[[pi]]$kill()
}
if (doLog)
{
for (cmd in commandQueue)
{
if (!is.null(cmd$logFile))
{
tryCatch({
fprintf(cmd$logFile, "command: %s\nargs: %s\n", cmd$command, paste0(cmd$args, collapse = " "))
fprintf(cmd$logFile, "\n---\n\noutput:\n%s\n\nstandard error output:\n%s\n",
cmd$stdoutLog, cmd$stderrLog, append = TRUE)
}, error = function(e) "")
}
}
}
}, add = TRUE)
sucDir <- tempfile("suc")
dir.create(sucDir)
procFinished <- function(pi) !is.null(runningProcs[[pi]]) && !runningProcs[[pi]]$is_alive() && runningProcInfo[[pi]]$running
doProcessOut <- function(txt, print)
{
if (print)
cat(txt)
if (doLog)
return(txt)
}
# reading process output might fail sometimes(?)
emptyStrOnErr <- function(expr) tryCatch(expr, error = function(e) "")
while (nextCommand <= totCmdCount || any(sapply(runningProcInfo, function(rp) !is.null(rp) && rp$running)))
{
for (pi in seq_along(runningProcs))
{
finishedRunning <- procFinished(pi)
if (!is.null(runningProcs[[pi]]))
{
cmdInds <- seq(runningProcInfo[[pi]]$cmdIndRange[1], runningProcInfo[[pi]]$cmdIndRange[2])
# NOTE: logging/printing currently doesn't work in batch mode
if (printOutput || printError || doLog)
{
cind <- runningProcInfo[[pi]]$cmdIndRange[1]
commandQueue[[cind]]$stdoutLog <- paste0(commandQueue[[cind]]$stdoutLog,
emptyStrOnErr(doProcessOut(rp[[pi]]$read_output(), printOutput)))
commandQueue[[cind]]$stderrLog <- paste0(commandQueue[[cind]]$stderrLog,
emptyStrOnErr(doProcessOut(rp[[pi]]$read_error(), printError)))
}
}
if (finishedRunning)
{
ncmd <- length(cmdInds)
# NOTE: as per docs get_exit_status() might return NA, in this case check if a command failed (by
# checking for missing success marker files)
exitStatus <- runningProcs[[pi]]$get_exit_status()
if (is.na(exitStatus) || exitStatus != 0) # something (may have) failed?
{
maybe <- maybeRestartCommand(commandQueue[cmdInds], runningProcInfo[[pi]], sucDir, exitStatus,
timeoutHandler, errorHandler)
runningProcInfo[[pi]] <- maybe$procInfo # might have been updated
if (maybe$restart)
{
runningProcs[[pi]] <- do.call(processx::process$new, runningProcInfo[[pi]]$procArgs)
finishedRunning <- FALSE
}
}
}
if (is.null(runningProcs[[pi]]) || finishedRunning)
{
finishedProc <- runningProcs[[pi]]
finishedProcInfo <- runningProcInfo[[pi]]
# start next command first, so we can process results while the next is already running
if (nextCommand <= totCmdCount)
{
if (delayBetweenProc > 0)
{
diffTime <- curTimeMS() - lastCommandTime
if (diffTime < delayBetweenProc)
Sys.sleep((delayBetweenProc - diffTime) / 1000)
}
if (batchSize == 1)
ncmd <- 1
else
{
cmdLeft <- totCmdCount - (nextCommand - 1)
freeSlots <- sum(sapply(seq_along(runningProcs), function(i) is.null(runningProcs[[i]]) || procFinished(i)))
freeSize <- freeSlots * batchSize
if (freeSlots > 1 && cmdLeft < freeSize)
ncmd <- ceiling(cmdLeft / freeSlots) # divide between free slots
else
ncmd <- min(batchSize, cmdLeft)
# printf("ncmd %d (slot %d, remain: %d, free: %d)\n", ncmd, pi, cmdLeft, freeSlots)
}
cs <- seq(nextCommand, nextCommand + (ncmd - 1))
runningProcInfo[[pi]] <- initCommand(commandQueue[cs], cs, workDir, sucDir, doLog, printOutput, printError)
runningProcs[[pi]] <- do.call(processx::process$new, runningProcInfo[[pi]]$procArgs)
# printf("started %d-%d on slot %d\n", nextCommand, runningProcInfo[[pi]]$cmdIndRange[2], pi)
lastCommandTime <- curTimeMS()
if (!is.null(finishedProc))
{
# printf("prev time: %s\n", (Sys.time() - finishedProc$get_start_time()) * 1000)
finishedCommands <- finishedCommands +
(runningProcInfo[[pi]]$cmdIndRange[2]+1) - runningProcInfo[[pi]]$cmdIndRange[1]
if (showProgress)
setTxtProgressBar(prog, finishedCommands)
}
nextCommand <- nextCommand + ncmd
}
else if (!is.null(runningProcs[[pi]]))
runningProcInfo[[pi]]$running <- FALSE
if (!is.null(finishedProc))
{
inds <- cmdInds[!finishedProcInfo$failed]
if (length(inds) > 0)
{
ret[inds] <- lapply(inds, function(ci) finishHandler(cmd = commandQueue[[ci]]))
if (!is.null(cacheName))
{
for (i in inds)
saveCacheData(cacheName, ret[[i]], commandQueue[[i]]$hash, cacheDB)
}
}
}
}
else if (!is.null(procTimeout) && runningProcInfo[[pi]]$running)
{
# check for timeouts
kill <- FALSE
if (length(cmdInds) > 1)
{
# for batch execution: update start time if a new command was started
for (i in seq_along(cmdInds))
{
if (runningProcInfo[[pi]]$failed[i] || runningProcInfo[[pi]]$finished[i])
next
if (file.exists(file.path(sucDir, cmdInds[i])))
{
runningProcInfo[[pi]]$finished[i] <- TRUE # now finished
runningProcInfo[[pi]]$startTime <- Sys.time()
}
else if (difftime(Sys.time(), runningProcInfo[[pi]]$startTime, units = "secs")[[1]] > procTimeout)
runningProcInfo[[pi]]$timedOut[i] <- kill <- TRUE
break
}
}
else
runningProcInfo[[pi]]$timedOut[1] <- kill <-
difftime(Sys.time(), runningProcInfo[[pi]]$startTime, units = "secs") > procTimeout
if (kill)
{
runningProcs[[pi]]$kill()
runningProcs[[pi]]$wait()
}
}
}
if (printOutput || printError || doLog)
{
rp <- pruneList(runningProcs)
pl <- processx::poll(rp, waitTimeout)
if (FALSE)
{
for (pi in seq_along(rp))
{
if (doLog) # NOTE: logging currently doesn't work in batch mode
cind <- runningProcInfo[[pi]]$cmdIndRange[1]
if (pl[[pi]][["output"]] == "ready")
{
txt <- rp[[pi]]$read_output()
if (printOutput)
cat(txt)
if (doLog)
commandQueue[[cind]]$stdoutLog <- paste0(commandQueue[[cind]]$stdoutLog, txt)
}
if (pl[[pi]][["error"]] == "ready")
{
txt <- rp[[pi]]$read_error()
if (printError)
cat(txt)
if (doLog)
commandQueue[[cind]]$stderrLog <- paste0(commandQueue[[cind]]$stderrLog, txt)
}
}
}
}
else
{
# just wait for one of the running processes
for (pi in seq_along(runningProcs))
{
if (!is.null(runningProcInfo[[pi]]) && runningProcInfo[[pi]]$running)
{
runningProcs[[pi]]$wait(waitTimeout)
break
}
}
}
}
# get rid of potentially large amount of temporary files
unlink(sucDir, recursive = TRUE)
if (showProgress)
{
setTxtProgressBar(prog, totCmdCount)
close(prog)
}
return(ret)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.