Nothing
#' Apply, but in parallel
#' @param x vector, list
#' @param FUN function to apply on each element of \code{x}
#' @param FUN.args more arguments to feed into \code{FUN}
#' @param plan logical, or character or \code{future} plan; see Details.
#' @param callback function to run after each iteration
#' @param ... passed to \code{\link[future]{plan}}
#' @param future.chunk.size,future.seed see also \code{future_lapply}.
#' If you want the callbacks
#' to be called immediately after each loop, then set it to \code{1},
#' which is not optimal but the only way right now.
#' @details
#' When \code{plan} is logical, \code{FALSE} means use current plan.
#' If \code{plan=TRUE}, then it equals to \code{plan='multicore'}. For
#' characters, \code{plan} can be \code{'multicore'}, \code{'callr'},
#' \code{'sequential'}, \code{'multisession'}, \code{'multiprocess'},
#' etc. Alternatively, you could pass future \code{\link[future]{plan}}
#' objects.
#' @return same as
#' \code{with(FUN.args, lapply(x, function(el){eval(body(FUN))}))}
#' @examples
#'
#' library(future)
#' plan(sequential)
#'
#' # Use sequential plan
#' # 1. Change `plan` to 'multicore', 'multisession', or TRUE to enable
#' # multi-core, but still with progress information
#' # 2. Change plan=FALSE will use current future plan
#' res <- lapply_async2(100:200, function(x){
#' return(x+1)
#' }, callback = function(e){
#' sprintf('Input=%d', e)
#' }, plan = 'sequential')
#'
#' # Disable callback message, then the function reduce to
#' # normal `future.apply::future_lapply`
#' res <- lapply_async2(100:200, function(x){
#' return(x+1)
#' }, callback = NULL, plan = FALSE)
#'
#' if(interactive()) {
#'
#' # PID are different, meaning executing in different sessions
#' lapply_async2(1:4, function(x){
#' Sys.getpid()
#' })
#' }
#'
#' @seealso \code{\link{make_forked_clusters}}
#' @export
lapply_async2 <- function(x, FUN, FUN.args = list(),
callback = NULL, plan = TRUE,
future.chunk.size = NULL, future.seed = sample.int(1, n = 1e5 - 1), ...){
if(length(plan) && !isFALSE(plan)){
if(isTRUE(plan)){
make_forked_clusters(..., clean = TRUE)
} else {
current_plan <- future::plan("list")
on.exit({
# Restore plan might encounter errors, use try-catch
tryCatch({
future::plan(
current_plan,
substitute = FALSE,
.call = NULL,
.cleanup = FALSE,
.init = FALSE
)
}, error = function(e){
future::plan('sequential')
})
}, add = TRUE, after = TRUE)
if(is.character(plan) && plan == 'callr'){
if (!requireNamespace("future.callr", quietly = TRUE)) {
stop("Package \"future.callr\" is needed to set plan as 'callr'. Please install it.", call. = FALSE)
}
future::plan(future.callr::callr, ..., .call = NULL, .cleanup = FALSE, .init = FALSE)
}else{
future::plan(plan, ..., .call = NULL, .cleanup = FALSE, .init = FALSE)
}
}
}
if(inherits(callback, "formula")) {
penv <- parent.frame()
callback <- rlang::as_function(callback, env = penv)
}
if( is.function(callback) ){
callback_formals <- formals(callback)
if (length(callback_formals)){
callback_call <- quote(callback(el))
}else{
callback_call <- quote(callback())
}
}else{
callback_call <- NULL
}
# fun_body <- body(FUN)
# fun_param <- str2lang(names(formals(FUN)[1]))
call <- as_call(quote(FUN), quote(el), .list = FUN.args)
if(is.null(callback_call)){
f <- dipsaus::new_function2(alist(el = ), body = call, quote_type = 'quote', env = environment())
fs <- future.apply::future_lapply(
x, f, future.seed = future.seed,
future.scheduling = TRUE,
future.chunk.size = future.chunk.size)
}else{
old.handlers <- progressr::handlers(handler_dipsaus_progress())
on.exit({
try({
progressr::handlers(old.handlers)
}, silent = TRUE)
}, add = TRUE)
# if(is.null(shiny::getDefaultReactiveDomain())){
progressr::with_progress({
p <- progressr::progressor(steps = 2 * length(x) + 1)
f <- dipsaus::new_function2(alist(el = ), body = bquote({
...msg... <- .(callback_call)
if(is.character(...msg...)) {
...msg... <- paste(...msg..., collapse = "")
} else {
...msg... <- deparse(el, width.cutoff = 30)
if(length(...msg...) > 1){
...msg... <- ...msg...[[1]]
}
if(nchar(...msg...) >= 10){
...msg... <- sprintf("%s...", substr(...msg..., stop = 7, start = 1))
}
}
p(message = sprintf("%s (started)", ...msg...), )
on.exit({
p(message = sprintf("%s (end)", ...msg...))
}, add = TRUE, after = TRUE)
.(call)
}), quote_type = "quote", env = environment())
# f <- dipsaus::new_function2(alist(el = ), body = rlang::quo({
# p(message = sprintf("%s (started)", ...msg...), )
# eval(!!call)
# p(message = sprintf("%s (end)", ...msg...))
# }), quote_type = 'quote', env = environment())
fs <- future.apply::future_lapply(x, f,
future.scheduling = TRUE,
future.chunk.size = future.chunk.size,
future.seed = future.seed)
p("Results collected\n")
# fs <- future.apply::future_lapply(x, function(el) {
# p(message = eval(callback_call))
# eval(call)
# },
# future.scheduling = TRUE,
# future.chunk.size = future.chunk.size,
# future.seed = future.seed)
}, enable = interactive() || shiny_is_running())
# }else{
# progressr::withProgressShiny({
# p <- progressr::progressor(along = x)
# fs <- future.apply::future_lapply(x, function(el){
# p(message = eval(callback_call))
# eval(call)
# })
# }, handlers = handler_dipsaus_progress)
# }
}
# fs <- lapply(seq_along(x), function(ii){
#
# on.exit(eval(callback_call))
#
# call <- as_call(
# quote(future::future),
# as_call(
# quote(`{`),
# as_call(quote(`<-`), fun_param, x[[ii]]),
# fun_body
# ),
# substitute = TRUE,
# globals = TRUE
# )
#
# eval_dirty(call, env = env, data = FUN.args)
# })
# fs = future::value(fs)
fs
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.