Nothing
#' A batchtools future is a future whose value will be resolved via batchtools
#'
#' @param expr The R expression to be evaluated
#'
#' @param envir The environment in which global environment
#' should be located.
#'
#' @param substitute Controls whether `expr` should be
#' `substitute()`:d or not.
#'
#' @param globals (optional) a logical, a character vector, a named list, or a
#' [Globals][globals::Globals] object. If TRUE, globals are identified by code
#' inspection based on `expr` and `tweak` searching from environment
#' `envir`. If FALSE, no globals are used. If a character vector, then
#' globals are identified by lookup based their names `globals` searching
#' from environment `envir`. If a named list or a Globals object, the
#' globals are used as is.
#'
#' @param label (optional) Label of the future (where applicable, becomes the
#' job name for most job schedulers).
#'
#' @param resources (optional) A named list passed to the \pkg{batchtools}
#' template (available as variable `resources`). See Section 'Resources'
#' in [batchtools::submitJobs()] more details.
#'
#' @param workers (optional) The maximum number of workers the batchtools
#' backend may use at any time. Interactive and "local" backends can only
#' process one future at the time (`workers = 1L`), whereas HPC backends,
#' where futures are resolved via separate jobs on a scheduler, can have
#' multiple workers. In the latter, the default is `workers = NULL`, which
#' will resolve to
#' \code{getOption("\link{future.batchtools.workers}")}.
#' If neither are specified, then the default is `100`.
#'
#' @param finalize If TRUE, any underlying registries are
#' deleted when this object is garbage collected, otherwise not.
#'
#' @param conf.file (optional) A batchtools configuration file.
#'
#' @param cluster.functions (optional) A batchtools
#' [ClusterFunctions][batchtools::ClusterFunctions] object.
#'
#' @param registry (optional) A named list of settings to control the setup
#' of the batchtools registry.
#'
#' @param \ldots Additional arguments passed to [future::Future()].
#'
#' @return A BatchtoolsFuture object
#'
#' @export
#' @importFrom future Future getGlobalsAndPackages
#' @keywords internal
BatchtoolsFuture <- function(expr = NULL, envir = parent.frame(),
substitute = TRUE,
globals = TRUE, packages = NULL,
label = NULL,
resources = list(),
workers = NULL,
finalize = getOption("future.finalize", TRUE),
conf.file = findConfFile(),
cluster.functions = NULL,
registry = list(),
...) {
if (substitute) expr <- substitute(expr)
assert_no_positional_args_but_first()
## Record globals
gp <- getGlobalsAndPackages(expr, envir = envir, globals = globals)
future <- Future(expr = gp$expr, envir = envir, substitute = FALSE,
globals = gp$globals,
packages = unique(c(packages, gp$packages)),
label = label,
...)
future <- as_BatchtoolsFuture(future,
resources = resources,
workers = workers,
finalize = finalize,
conf.file = conf.file,
cluster.functions = cluster.functions,
registry = registry)
future
}
## Helper function to create a BatchtoolsFuture from a vanilla Future
#' @importFrom utils file_test
as_BatchtoolsFuture <- function(future,
resources = list(),
workers = NULL,
finalize = getOption("future.finalize", TRUE),
conf.file = findConfFile(),
cluster.functions = NULL,
registry = list(),
...) {
if (is.function(workers)) workers <- workers()
if (is.null(workers)) {
workers <- getOption("future.batchtools.workers", default = 100)
stop_if_not(
is.numeric(workers),
length(workers) == 1,
!is.na(workers), workers >= 1
)
} else {
stop_if_not(length(workers) >= 1)
if (is.numeric(workers)) {
stop_if_not(length(workers) == 1, !is.na(workers), workers >= 1)
} else if (is.character(workers)) {
stop_if_not(length(workers) >= 0, !anyNA(workers))
} else {
stop("Argument 'workers' should be either a numeric or a function: ",
mode(workers))
}
}
future$workers <- workers
if (!is.null(cluster.functions)) {
stop_if_not(is.list(cluster.functions))
stop_if_not(inherits(cluster.functions, "ClusterFunctions"))
} else if (missing(conf.file)) {
## BACKWARD COMPATILITY: Only when calling BatchtoolsFuture() directly
cluster.functions <- makeClusterFunctionsInteractive(external = FALSE)
} else {
## If 'cluster.functions' is not specified, then 'conf.file' must
## exist
if (!file_test("-f", conf.file)) {
stop("No such batchtools configuration file: ", sQuote(conf.file))
}
}
stop_if_not(is.list(registry))
if (length(registry) > 0L) {
stopifnot(!is.null(names(registry)), all(nzchar(names(registry))))
}
stop_if_not(is.list(resources))
## batchtools configuration
future$config <- list(
reg = NULL,
jobid = NA_integer_,
resources = resources,
conf.file = conf.file,
cluster.functions = cluster.functions,
registry = registry,
finalize = finalize
)
structure(future, class = c("BatchtoolsFuture", class(future)))
}
#' Prints a batchtools future
#'
#' @param x An BatchtoolsFuture object
#' @param \ldots Not used.
#'
#' @export
#' @keywords internal
print.BatchtoolsFuture <- function(x, ...) {
NextMethod()
## batchtools specific
config <- x$config
conf.file <- config$conf.file
printf("batchtools configuration file: %s\n", file_info(conf.file))
reg <- config$reg
if (inherits(reg, "Registry")) {
cluster.functions <- reg$cluster.functions
printf("batchtools cluster functions: %s\n",
sQuote(cluster.functions$name))
template <- attr(cluster.functions, "template")
printf("batchtools cluster functions template: %s\n", file_info(template))
} else {
printf("batchtools cluster functions: <none>\n")
}
## Ask for status once
status <- status(x)
printf("batchtools status: %s\n", paste(sQuote(status), collapse = ", "))
if ("error" %in% status) {
printf("Error captured by batchtools: %s\n", loggedError(x))
}
if (is_na(status)) {
printf("batchtools %s: Not found (happens when finished and deleted)\n",
class(reg))
} else {
if (inherits(reg, "Registry")) {
printf("batchtools Registry:\n")
printf(" File dir exists: %s\n", file_test("-d", reg$file.dir))
printf(" Work dir exists: %s\n", file_test("-d", reg$work.dir))
try(print(reg))
} else {
printf("batchtools Registry: <NA>\n")
}
}
invisible(x)
}
#' @importFrom batchtools getStatus
status <- function(future, ...) {
debug <- getOption("future.debug", FALSE)
if (debug) {
mdebug("status() for ", class(future)[1], " ...")
on.exit(mdebug("status() for ", class(future)[1], " ... done"), add = TRUE)
}
## WORKAROUND: Avoid warnings on partially matched arguments
get_status <- function(...) {
## Temporarily disable batchtools output?
## (i.e. messages and progress bars)
debug <- getOption("future.debug", FALSE)
batchtools_output <- getOption("future.batchtools.output", debug)
if (!batchtools_output) {
oopts <- options(batchtools.verbose = FALSE, batchtools.progress = FALSE)
} else {
oopts <- list()
}
on.exit(options(oopts))
## WORKAROUND: batchtools::getStatus() updates the RNG state,
## which we must make sure to undo.
with_stealth_rng({
batchtools::getStatus(...)
})
} ## get_status()
## Known to be in its final state?
if (getOption("future.batchtools.status.cache", TRUE)) {
status <- future$.status
if (identical(status, c("defined", "finished", "started", "submitted"))) {
return(status)
}
}
config <- future$config
reg <- config$reg
if (!inherits(reg, "Registry")) return(NA_character_)
## Closed and deleted?
if (!file_test("-d", reg$file.dir)) return(NA_character_)
jobid <- config$jobid
if (is.na(jobid)) return("not submitted")
status <- get_status(reg = reg, ids = jobid)
status <- (unlist(status) == 1L)
status <- status[status]
status <- sort(names(status))
status <- setdiff(status, c("n"))
status[status == "done"] <- "finished"
result <- future$result
if (inherits(result, "FutureResult")) {
if (result_has_errors(result)) status <- unique(c("error", status))
}
## Cache result
future$.status <- status
if (debug) mdebug("- status: ", paste(sQuote(status), collapse = ", "))
status
}
finished <- function(future, ...) {
status <- status(future)
if (is_na(status)) return(NA)
any(c("finished", "error", "expired") %in% status)
}
#' Logged output of batchtools future
#'
#' @param future The future.
#' @param \ldots Not used.
#'
#' @return A character vector or a logical scalar.
#'
#' @aliases loggedOutput loggedError
#'
#' @export loggedError
#' @export loggedOutput
#' @keywords internal
loggedOutput <- function(...) UseMethod("loggedOutput")
loggedError <- function(...) UseMethod("loggedError")
#' @importFrom batchtools getErrorMessages
#' @rdname loggedOutput
#' @export
loggedError.BatchtoolsFuture <- function(future, ...) {
stat <- status(future)
if (is_na(stat)) return(NULL)
if (!finished(future)) {
label <- future$label
if (is.null(label)) label <- "<none>"
msg <- sprintf("%s ('%s') has not finished yet", class(future)[1L], label)
stop(BatchtoolsFutureError(msg, future = future))
}
if (!"error" %in% stat) return(NULL)
config <- future$config
reg <- config$reg
if (!inherits(reg, "Registry")) return(NULL)
jobid <- config$jobid
res <- getErrorMessages(reg = reg, ids = jobid) ### CHECKED
msg <- res$message
msg <- paste(sQuote(msg), collapse = ", ")
msg
} # loggedError()
#' @importFrom batchtools getLog
#' @export
loggedOutput.BatchtoolsFuture <- function(future, ...) {
stat <- status(future)
if (is_na(stat)) return(NULL)
if (!finished(future)) {
label <- future$label
if (is.null(label)) label <- "<none>"
msg <- sprintf("%s ('%s') has not finished yet", class(future)[1L], label)
stop(BatchtoolsFutureError(msg, future = future))
}
config <- future$config
reg <- config$reg
if (!inherits(reg, "Registry")) return(NULL)
jobid <- config$jobid
getLog(id = jobid, reg = reg)
} # loggedOutput()
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# Future API
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
#' @importFrom future resolved
#' @export
#' @keywords internal
resolved.BatchtoolsFuture <- function(x, ...) {
signalEarly <- import_future("signalEarly")
## Is value already collected?
if (!is.null(x$result)) {
## Signal conditions early?
signalEarly(x, ...)
return(TRUE)
}
## Assert that the process that created the future is
## also the one that evaluates/resolves/queries it.
assertOwner <- import_future("assertOwner")
assertOwner(x)
## If not, checks the batchtools registry status
resolved <- finished(x)
if (is.na(resolved)) return(FALSE)
## Signal conditions early? (happens only iff requested)
if (resolved) signalEarly(x, ...)
resolved
}
#' @importFrom future result
#' @export
#' @keywords internal
result.BatchtoolsFuture <- function(future, cleanup = TRUE, ...) {
debug <- getOption("future.debug", FALSE)
if (debug) {
mdebug("result() for BatchtoolsFuture ...")
on.exit(mdebug("result() for BatchtoolsFuture ... done"), add = TRUE)
}
## Has the value already been collected?
result <- future$result
if (inherits(result, "FutureResult")) {
if (debug) mdebug("- FutureResult already collected")
return(result)
}
## Has the value already been collected? - take two
if (future$state %in% c("finished", "failed", "interrupted")) {
if (debug) mdebug("- FutureResult already collected - take 2")
return(NextMethod())
}
if (future$state == "created") {
if (debug) mdebug("- starting future ...")
future <- run(future)
if (debug) mdebug("- starting future ... done")
}
if (debug) mdebug("- getting batchtools status")
stat <- status(future)
if (is_na(stat)) {
label <- future$label
if (is.null(label)) label <- "<none>"
stopf("The result no longer exists (or never existed) for Future ('%s') of class %s", label, paste(sQuote(class(future)), collapse = ", ")) #nolint
}
if (debug) mdebug("- waiting for batchtools job to finish ...")
result <- await(future, cleanup = FALSE)
if (debug) mdebug("- waiting for batchtools job to finish ... done")
stop_if_not(inherits(result, "FutureResult"))
future$result <- result
future$state <- "finished"
if (cleanup) {
if (debug) mdebugf("- delete %s ...", class(future)[1])
delete(future)
if (debug) mdebugf("- delete %s ... done", class(future)[1])
}
if (debug) mdebug("- NextMethod()")
NextMethod()
}
#' @importFrom future run getExpression
#' @importFrom batchtools batchExport batchMap saveRegistry setJobNames submitJobs
#' @importFrom utils capture.output str
#' @export
run.BatchtoolsFuture <- function(future, ...) {
if (future$state != "created") {
label <- future$label
if (is.null(label)) label <- "<none>"
msg <- sprintf("A future ('%s') can only be launched once.", label)
stop(FutureError(msg, future = future))
}
## Assert that the process that created the future is
## also the one that evaluates/resolves/queries it.
assertOwner <- import_future("assertOwner")
assertOwner(future)
## Temporarily disable batchtools output?
## (i.e. messages and progress bars)
debug <- getOption("future.debug", FALSE)
batchtools_output <- getOption("future.batchtools.output", debug)
if (!batchtools_output) {
oopts <- options(batchtools.verbose = FALSE, batchtools.progress = FALSE)
} else {
oopts <- list()
}
on.exit(options(oopts))
expr <- getExpression(future)
## Always evaluate in local environment
expr <- substitute(local(expr), list(expr = expr))
## (i) Create batchtools registry
reg <- future$config$reg
stop_if_not(is.null(reg) || inherits(reg, "Registry"))
if (is.null(reg)) {
if (debug) mprint("- Creating batchtools registry")
config <- future$config
stop_if_not(is.list(config))
## Create batchtools registry
reg <- temp_registry(
label = future$label,
conf.file = config$conf.file,
cluster.functions = config$cluster.functions,
config = config$registry
)
if (debug) mprint(reg)
future$config$reg <- reg
## Register finalizer?
if (config$finalize) future <- add_finalizer(future)
config <- NULL
}
stop_if_not(inherits(reg, "Registry"))
## (ii) Attach packages that needs to be attached
packages <- future$packages
if (length(packages) > 0) {
mdebugf("Attaching %d packages (%s) ...",
length(packages), hpaste(sQuote(packages)))
## Record which packages in 'pkgs' that are loaded and
## which of them are attached (at this point in time).
is_loaded <- is.element(packages, loadedNamespaces())
is_attached <- is.element(packages, attached_packages())
## FIXME: Update the expression such that the new session
## will have the same state of (loaded, attached) packages.
reg$packages <- packages
with_stealth_rng({
saveRegistry(reg = reg)
})
mdebugf("Attaching %d packages (%s) ... DONE",
length(packages), hpaste(sQuote(packages)))
}
## Not needed anymore
packages <- NULL
## (iii) Export globals?
if (length(future$globals) > 0) {
batchExport(export = future$globals, reg = reg)
}
## 1. Add to batchtools for evaluation
mdebug("batchtools::batchMap()")
## WORKAROUND: batchtools::batchMap() updates the RNG state,
## which we must make sure to undo.
with_stealth_rng({
jobid <- batchMap(fun = geval, list(expr),
more.args = list(substitute = TRUE), reg = reg)
})
## 2. Set job name, if specified
label <- future$label
if (!is.null(label)) {
setJobNames(ids = jobid, names = label, reg = reg)
}
## 3. Update
future$config$jobid <- jobid
mdebugf("Created %s future #%d", class(future)[1], jobid$job.id)
## WORKAROUND: (For multicore and macOS only)
if (reg$cluster.functions$name == "Multicore") {
## On some macOS systems, a system call to 'ps' may output an error message
## "dyld: DYLD_ environment variables being ignored because main executable
## (/bin/ps) is setuid or setgid" to standard error that is picked up by
## batchtools which incorrectly tries to parse it. By unsetting all DYLD_*
## environment variables, we avoid this message. For more info, see:
## * https://github.com/tudo-r/BatchJobs/issues/117
## * https://github.com/HenrikBengtsson/future.BatchJobs/issues/59
## /HB 2016-05-07
dyld_envs <- tryCatch({
envs <- list()
res <- system2("ps", stdout = TRUE, stderr = TRUE)
if (any(grepl("DYLD_", res))) {
envs <- Sys.getenv()
envs <- envs[grepl("^DYLD_", names(envs))]
if (length(envs) > 0L) lapply(names(envs), FUN = Sys.unsetenv)
}
envs
}, error = function(ex) list())
}
## 4. Wait for an available worker
waitForWorker(future, workers = future$workers)
## 5. Submit
future$state <- "running"
resources <- future$config$resources
if (is.null(resources)) resources <- list()
## WORKAROUND: batchtools::submitJobs() updates the RNG state,
## which we must make sure to undo.
tryCatch({
with_stealth_rng({
submitJobs(reg = reg, ids = jobid, resources = resources)
})
}, error = function(ex) {
msg <- conditionMessage(ex)
label <- future$label
if (is.null(label)) label <- "<none>"
msg <- sprintf("Failed to submit %s (%s). The reason was: %s", class(future)[1], label, msg)
info <- capture.output(str(resources))
info <- paste(info, collapse = "\n")
msg <- sprintf("%s\nTROUBLESHOOTING INFORMATION:\nbatchtools::submitJobs() was called with the following 'resources' argument:\n%s\n", msg, info)
stop(BatchtoolsFutureError(msg, future = future))
})
mdebugf("Launched future #%d", jobid$job.id)
## 6. Rerserve worker for future
registerFuture(future)
## 7. Trigger early signalling
if (inherits(future, "BatchtoolsUniprocessFuture")) {
resolved(future)
}
invisible(future)
} ## run()
#' @importFrom batchtools loadResult waitForJobs
#' @importFrom utils tail
await <- function(future, cleanup = TRUE,
timeout = getOption("future.wait.timeout", 30 * 24 * 60 * 60),
delta = getOption("future.wait.interval", 1.0),
alpha = getOption("future.wait.alpha", 1.01),
...) {
stop_if_not(is.finite(timeout), timeout >= 0)
stop_if_not(is.finite(alpha), alpha > 0)
debug <- getOption("future.debug", FALSE)
if (debug) mdebug("future.batchtools:::await() ...")
expr <- future$expr
config <- future$config
reg <- config$reg
stop_if_not(inherits(reg, "Registry"))
jobid <- config$jobid
mdebug("batchtools::waitForJobs() ...")
## Control batchtools info output
oopts <- options(batchtools.verbose = debug)
on.exit(options(oopts))
## Sleep function - increases geometrically as a function of iterations
sleep_fcn <- function(i) delta * alpha ^ (i - 1)
res <- waitForJobs(ids = jobid, timeout = timeout, sleep = sleep_fcn,
stop.on.error = FALSE, reg = reg)
if (debug) mdebugf("- batchtools::waitForJobs(): %s", res)
stat <- status(future)
if (debug) {
mdebugf("- status(): %s", paste(sQuote(stat), collapse = ", "))
mdebug("batchtools::waitForJobs() ... done")
}
finished <- is_na(stat) || any(c("finished", "error", "expired") %in% stat)
## PROTOTYPE RESULTS BELOW:
prototype_fields <- NULL
result <- NULL
if (finished) {
mdebug("Results:")
label <- future$label
if (is.null(label)) label <- "<none>"
if ("finished" %in% stat) {
if (debug) mdebug("- batchtools::loadResult() ...")
result <- loadResult(reg = reg, id = jobid)
if (debug) mdebug("- batchtools::loadResult() ... done")
if (inherits(result, "FutureResult")) {
prototype_fields <- c(prototype_fields, "batchtools_log")
result[["batchtools_log"]] <- try(local({
if (debug) {
mdebug("- batchtools::getLog() ...")
on.exit(mdebug("- batchtools::getLog() ... done"))
}
## Since we're already collected the results, the log file
## should already exist, if it exists. Because of this,
## only poll for the log file for a second before giving up.
reg$cluster.functions$fs.latency <- 1.0
getLog(id = jobid, reg = reg)
}), silent = TRUE)
if (result_has_errors(result)) cleanup <- FALSE
}
} else if ("error" %in% stat) {
cleanup <- FALSE
msg <- sprintf(
"BatchtoolsFutureError for %s ('%s') captured by batchtools: %s",
class(future)[1], label, loggedError(future))
stop(BatchtoolsFutureError(msg, future = future))
} else if ("expired" %in% stat) {
cleanup <- FALSE
msg <- sprintf("BatchtoolsExpiration: Future ('%s') expired (registry path %s).", label, reg$file.dir)
output <- loggedOutput(future)
hint <- unlist(strsplit(output, split = "\n", fixed = TRUE))
hint <- hint[nzchar(hint)]
hint <- tail(hint, n = getOption("future.batchtools.expiration.tail", 48L))
if (length(hint) > 0) {
hint <- paste(hint, collapse = "\n")
msg <- paste(msg, ". The last few lines of the logged output:\n",
hint, sep="")
} else {
msg <- sprintf("%s. No logged output exist.", msg)
}
stop(BatchtoolsFutureError(msg, future = future))
} else if (is_na(stat)) {
msg <- sprintf("BatchtoolsDeleted: Cannot retrieve value. Future ('%s') deleted: %s", label, reg$file.dir) #nolint
stop(BatchtoolsFutureError(msg, future = future))
}
if (debug) { mstr(result) }
} else {
cleanup <- FALSE
msg <- sprintf("AsyncNotReadyError: Polled for results for %s seconds every %g seconds, but asynchronous evaluation for future ('%s') is still running: %s", timeout, delta, label, reg$file.dir) #nolint
stop(BatchtoolsFutureError(msg, future = future))
}
if (length(prototype_fields) > 0) {
result$PROTOTYPE_WARNING <- sprintf("WARNING: The fields %s should be considered internal and experimental for now, that is, until the Future API for these additional features has been settled. For more information, please see https://github.com/HenrikBengtsson/future/issues/172", hpaste(sQuote(prototype_fields), max_head = Inf, collapse = ", ", last_collapse = " and "))
}
## Cleanup?
if (cleanup) {
delete(future, delta = 0.5 * delta, ...)
}
if (debug) mdebug("future.batchtools:::await() ... done")
result
} # await()
delete <- function(...) UseMethod("delete")
#' Removes a batchtools future
#'
#' @param future The future.
#' @param onRunning Action if future is running or appears to run.
#' @param onFailure Action if failing to delete future.
#' @param onMissing Action if future does not exist.
#' @param times The number of tries before giving up.
#' @param delta The delay interval (in seconds) between retries.
#' @param alpha A multiplicative penalty increasing the delay
#' for each failed try.
#' @param \ldots Not used.
#'
#' @return (invisibly) TRUE if deleted and FALSE otherwise.
#'
#' @export
#' @importFrom batchtools clearRegistry removeRegistry
#' @importFrom utils file_test
#' @keywords internal
delete.BatchtoolsFuture <- function(future,
onRunning = c("warning", "error", "skip"),
onFailure = c("error", "warning", "ignore"),
onMissing = c("ignore", "warning", "error"),
times = 10L,
delta = getOption("future.wait.interval", 1.0),
alpha = getOption("future.wait.alpha", 1.01),
...) {
onRunning <- match.arg(onRunning)
onMissing <- match.arg(onMissing)
onFailure <- match.arg(onFailure)
debug <- getOption("future.debug", FALSE)
## Identify registry
config <- future$config
reg <- config$reg
## Trying to delete a non-launched batchtools future?
if (!inherits(reg, "Registry")) return(invisible(TRUE))
path <- reg$file.dir
## Already deleted?
if (is.null(path) || !file_test("-d", path)) {
if (onMissing %in% c("warning", "error")) {
msg <- sprintf("Cannot remove batchtools registry, because directory does not exist: %s", sQuote(path)) #nolint
mdebugf("delete(): %s", msg)
if (onMissing == "warning") {
warning(msg)
} else if (onMissing == "error") {
stop(BatchtoolsFutureError(msg, future = future))
}
}
return(invisible(TRUE))
}
## Is the future still not resolved? If so, then...
if (!resolved(future)) {
if (onRunning == "skip") return(invisible(TRUE))
status <- status(future)
label <- future$label
if (is.null(label)) label <- "<none>"
msg <- sprintf("Will not remove batchtools registry, because is appears to hold a non-resolved future (%s; state = %s; batchtools status = %s): %s", sQuote(label), sQuote(future$state), paste(sQuote(status), collapse = ", "), sQuote(path)) #nolint
mdebugf("delete(): %s", msg)
if (onRunning == "warning") {
warning(msg)
return(invisible(TRUE))
} else if (onRunning == "error") {
stop(BatchtoolsFutureError(msg, future = future))
}
}
## Make sure to collect the results before deleting
## the internal batchtools registry
result <- result(future, cleanup = FALSE)
stop_if_not(inherits(result, "FutureResult"))
## Free up worker
unregisterFuture(future)
## To simplify post mortem troubleshooting in non-interactive sessions,
## should the batchtools registry files be removed or not?
mdebugf("delete(): Option 'future.delete = %s",
sQuote(getOption("future.delete", "<NULL>")))
if (!getOption("future.delete", interactive())) {
status <- status(future)
res <- future$result
if (inherits(res, "FutureResult")) {
if (result_has_errors(res)) status <- unique(c("error", status))
}
mdebugf("delete(): status(<future>) = %s",
paste(sQuote(status), collapse = ", "))
if (any(c("error", "expired") %in% status)) {
msg <- sprintf("Will not remove batchtools registry, because the status of the batchtools was %s and option 'future.delete' is FALSE or running in an interactive session: %s", paste(sQuote(status), collapse = ", "), sQuote(path)) #nolint
mdebugf("delete(): %s", msg)
warning(msg)
return(invisible(FALSE))
}
}
## Have user disabled deletions?
if (!getOption("future.delete", TRUE)) {
msg <- sprintf("Option 'future.delete' is FALSE - will not delete batchtools registry: %s", sQuote(path))
mdebugf("delete(): %s", msg)
return(invisible(FALSE))
}
## Control batchtools info output
oopts <- options(batchtools.verbose = debug)
on.exit(options(oopts))
## Try to delete registry
## WORKAROUND: batchtools::clearRegistry() and
## batchtools::removeRegistry() update the RNG state,
## which we must make sure to undo.
with_stealth_rng({
interval <- delta
for (kk in seq_len(times)) {
try(unlink(path, recursive = TRUE), silent = FALSE)
if (!file_test("-d", path)) break
try(removeRegistry(wait = 0.0, reg = reg), silent = FALSE)
if (!file_test("-d", path)) break
try(clearRegistry(reg = reg), silent = TRUE)
try(removeRegistry(wait = 0.0, reg = reg), silent = FALSE)
if (!file_test("-d", path)) break
Sys.sleep(interval)
interval <- alpha * interval
}
})
## Success?
if (file_test("-d", path)) {
if (onFailure %in% c("warning", "error")) {
msg <- sprintf("Failed to remove batchtools registry: %s", sQuote(path))
mdebugf("delete(): %s", msg)
if (onMissing == "warning") {
warning(msg)
} else if (onMissing == "error") {
stop(BatchtoolsFutureError(msg, future = future))
}
}
return(invisible(FALSE))
}
mdebugf("delete(): batchtools registry deleted: %s", sQuote(path))
invisible(TRUE)
} # delete()
add_finalizer <- function(...) UseMethod("add_finalizer")
add_finalizer.BatchtoolsFuture <- function(future, debug = FALSE, ...) {
## Register finalizer (will clean up registries etc.)
if (debug) {
mdebug("add_finalizer() for ", sQuote(class(future)[1]), " ...")
on.exit(mdebug("add_finalizer() for ", sQuote(class(future)[1]), " ... done"), add = TRUE)
}
reg.finalizer(future, f = function(f) {
if (debug) {
if (!exists("mdebug", mode = "function")) mdebug <- message
mdebug("Finalize ", sQuote(class(f)[1]), " ...")
on.exit(mdebug("Finalize ", sQuote(class(f)[1]), " ... done"), add = TRUE)
}
if (inherits(f, "BatchtoolsFuture") && "future.batchtools" %in% loadedNamespaces()) {
if (debug) {
mdebug("- attempting to delete future")
if (requireNamespace("utils", quietly = TRUE)) {
mdebug(utils::capture.output(utils::str(as.list(f))))
}
}
res <- try({
delete(f, onRunning = "skip", onMissing = "ignore", onFailure = "warning")
})
if (debug) {
if (inherits(res, "try-error")) {
mdebug("- Failed to delete: ", sQuote(res))
} else {
mdebug("- deleted: ", res)
}
}
}
}, onexit = TRUE)
invisible(future)
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.