R/backend_api-11.ClusterFutureBackend-class.R

Defines functions cluster getFutureBackendConfigs.ClusterFuture assertValidConnection psockImmediateConditionHandler cluster_call_blocking requestNode result.ClusterFuture resolved.ClusterFuture getSocketSelectTimeout .makeCluster nbrOfFreeWorkers.ClusterFutureBackend nbrOfWorkers.ClusterFutureBackend stopWorkers.ClusterFutureBackend interruptFuture.ClusterFutureBackend launchFuture.ClusterFutureBackend print.ClusterFutureBackend

Documented in cluster

#' A ClusterFutureBackend resolves futures in parallel using any PSOCK cluster
#'
#' @inheritParams FutureBackend
#'
#' @param workers ...
#'
#' @param persistent (deprecated) ...
#'
#' @details
#' The `ClusterFutureBackend` is selected by
#' `plan(cluster, workers = workers)`.
#'
#' @keywords internal
#' @rdname FutureBackend
#'
#' @importFrom parallelly as.cluster availableWorkers
#' @export
ClusterFutureBackend <- local({
  getDefaultCluster <- import_parallel_fcn("getDefaultCluster")

  ## Most recent 'workers' set up
  last <- NULL

  function(workers = availableWorkers(), gc = TRUE, earlySignal = TRUE, interrupts = FALSE, persistent = FALSE, ...) {
    debug <- isTRUE(getOption("future.debug"))

    if (debug) {
      mdebugf_push("ClusterFutureBackend(..., persistent = %s, gc = %s, earlySignal = %s) ...", persistent, gc, earlySignal)
      on.exit(mdebugf_pop("ClusterFutureBackend(..., persistent = %s, gc = %s, earlySignal = %s) ... done", persistent, gc, earlySignal))
    }
    
    if (is.function(workers)) workers <- workers()
    if (is.null(workers)) {
      clusterRegistry$stopCluster(debug = debug)
      last <<- NULL
      workers <- getDefaultCluster()
      workers <- addCovrLibPath(workers)
    } else if (is.numeric(workers) || is.character(workers)) {
      if (is.numeric(workers)) {
        if (debug) mdebugf("workers: %g", workers)
        ## Preserve class attributes, especially "AsIs"
        clazz <- class(workers)
        workers <- as.integer(workers)
        class(workers) <- clazz
        stop_if_not(length(workers) == 1, is.finite(workers))
      } else {
        stop_if_not(length(workers) >= 1, !anyNA(workers))
        workers <- sort(workers)
        if (debug) mdebugf("workers: [n=%d] %s", length(workers), commaq(workers))
      }

      ## Already setup?
      cluster <- clusterRegistry$getCluster(debug = debug)
      if (is.null(cluster) || !identical(workers, last)) {
        clusterRegistry$stopCluster(debug = debug)
        last <<- NULL
        
        if (debug) mdebug_push("Starting new cluster ...")
        cluster <- clusterRegistry$startCluster(workers, makeCluster = .makeCluster, ..., debug = debug)
        if (debug) {
          mprint(cluster)
          mdebug_pop("Starting new cluster ... done")
        }
        last <<- workers
      } else {
        if (debug) {
          mprint(cluster)
          mdebug("Cluster already existed")
        }
      }
      workers <- cluster
    } else {
      ## A pre-created cluster?
      ## FIXME: Don't stop it if already in place, or ...?
      clusterRegistry$stopCluster(debug = debug)
      last <<- NULL
      workers <- as.cluster(workers)
      workers <- addCovrLibPath(workers)
    }
    if (!inherits(workers, "cluster")) {
      stopf("Argument 'workers' is not of class 'cluster': %s", commaq(class(workers)))
    }
    if (debug) mdebugf("Number of workers: %d", length(workers))
    stop_if_not(length(workers) > 0)


    ## Attached workers' session information, unless already done.
    ## FIXME: We cannot do this here, because it introduces a race condition
    ## where multiple similar requests may appear at the same time bringing
    ## the send/receive data to be out of sync and therefore corrupt the
    ## futures' values.
    ##  workers <- add_cluster_session_info(workers)
  
    ## Attach name to cluster?
    name <- attr(workers, "name", exact = TRUE)
    if (is.null(name)) {
      name <- uuid(workers)
      stop_if_not(length(name) > 0, nzchar(name))
      attr(workers, "name") <- name
      if (debug) mdebug("Generated workers UUID")
    }
    if (debug) mdebugf("Workers UUID: %s", sQuote(name))
      
    ## Name of the FutureRegistry
    reg <- sprintf("workers-%s", name)
  
    core <- MultiprocessFutureBackend(
      workers = workers,
      reg = reg,
      earlySignal = earlySignal,
      interrupts = interrupts,
      persistent = persistent,
      ...
    )
    core[["futureClasses"]] <- c("ClusterFuture", core[["futureClasses"]])
    core <- structure(core, class = c("ClusterFutureBackend", class(core)))
    core
  }
})
tweakable(ClusterFutureBackend) <- list(MultiprocessFutureBackend, makeClusterPSOCK_args())

#' @importFrom parallelly isConnectionValid isNodeAlive
#' @importFrom utils capture.output
#' @export
print.ClusterFutureBackend <- function(x, details = c("workers"), validate = FALSE, ...) {
  backend <- NextMethod()
  workers <- backend[["workers"]]

  if ("workers" %in% details) {
    s <- character(0L)
    
    s <- c(s, sprintf("Workers of type %s:", class(workers)[1]))
    info <- capture.output(print(workers))
    info <- paste(info, collapse = "; ")
    s <- c(s, sprintf("- Summary: %s", info))
    cat(s, sep = "\n")
  
    ## Validate connections
    for (kk in seq_along(workers)) {
      node <- workers[[kk]]
      con <- node[["con"]]
  
      info <- capture.output(print(node))
      info <- paste(info, collapse = "; ")
      status <- " OK "
      details <- c()
      
      if (!is.null(con)) {
        isValid <- isConnectionValid(con)
        if (isValid) {
          details <- c(details, "valid connection")
        } else {
          details <- c(details, "invalid connection")
          status <- "FAIL"
        }
      }

      if (validate) {
        alive <- isNodeAlive(node)
        if (alive) {
          details <- c(details, "alive")
        } else {
          details <- c(details, "not alive")
          status <- "FAIL"
        }
      }

      details <- paste(details, collapse = ", ")    
      cat(sprintf("- [%s] Node %d/%d: %s [%s]\n", status, kk, length(workers), details, info))
    }
  } ## ("workers" %in% details)

  invisible(x)
}


#' @export
launchFuture.ClusterFutureBackend <- function(backend, future, ...) {
  debug <- isTRUE(getOption("future.debug"))
  if (debug) {
    mdebug_push("launchFuture() for ClusterFutureBackend ...")
    on.exit(mdebug_pop("launchFuture() for ClusterFutureBackend ... done"))
  }

  ## Record 'backend' in future for now
  future[["backend"]] <- backend

  workers <- backend[["workers"]]
  stop_if_not(inherits(workers, "cluster"))
  
  reg <- backend[["reg"]]
  stop_if_not(is.character(reg), length(reg) == 1L)
  
  if (debug) {
    mdebugf("Workers: [n=%d]", length(workers))
    mprint(workers)
    mdebug("FutureRegistry: ", sQuote(reg))
  }
  
  ## Next available cluster node
  t_start <- Sys.time()

  ## (1) Get a free worker. This will block until one is available
  if (debug) mdebug_push("requestWorker() ...")

  timeout <- backend[["wait.timeout"]]
  delta <- backend[["wait.interval"]]
  alpha <- backend[["wait.alpha"]]

  ## Get the index of a free cluster node, which has been validated to
  ## be functional. If the existing worker was found to be non-functional,
  ## it was re-launched by requestNode()
  node_idx <- requestNode(await = function() {
    FutureRegistry(reg, action = "collect-first", earlySignal = TRUE, debug = debug)
  }, backend = backend, timeout = timeout, delta = delta, alpha = alpha)
  workers <- backend[["workers"]] ## Backend might have been updated
  future[["node"]] <- node_idx
  
  if (inherits(future[[".journal"]], "FutureJournal")) {
    appendToFutureJournal(future,
         event = "getWorker",
      category = "overhead",
        parent = "launch",
         start = t_start,
          stop = Sys.time()
    )
  }
  if (debug) mdebugf("cluster node index: %d", node_idx)

  cl <- workers[node_idx]
  stop_if_not(length(cl) == 1L, inherits(cl, "cluster"))
  node <- cl[[1]]
  
  ## Does the cluster node communicate with a connection?
  ## (if not, it's likely via MPI)
  stop_if_not(inherits(node, c("SOCK0node", "SOCKnode")))
  con <- node[["con"]]
  future[["nodeHasConnection"]] <- !is.null(con)

  if (debug) mdebug_pop("requestWorker() ... done")


  ## (2) Attach packages that needs to be attached
  ##     NOTE: Already take care of by evalFuture().
  ##     However, if we need to get an early error about missing packages,
  ##     we can get the error here before launching the future.
  if (future[["earlySignal"]]) {
    if (debug) mdebug_push("requirePackages() ...")
    
    packages <- future[["packages"]]
    if (debug) mdebug("packages: [n=%d] %s", length(packages), commaq(packages))
    
    ## Nothing to do?
    if (length(packages) > 0L) { 
      t_start <- Sys.time()
  
      ## (ii) Attach packages that needs to be attached
      ##      NOTE: Already take care of by evalFuture().
      ##      However, if we need to get an early error about missing packages,
      ##      we can get the error here before launching the future.
      if (debug) mdebug_push("Attaching packages on worker ...")
      ## Blocking cluster-node call
      cluster_call_blocking(cl, fun = function(pkgs) { requirePackages(pkgs); "future-requirePackages" }, packages, future = future, when = "call requirePackages() on", expected = "future-requirePackages")
      if (debug) mdebug_pop("Attaching packages on worker ... done")
      
      ## Add event to future journal?
      if (inherits(future[[".journal"]], "FutureJournal")) {
        appendToFutureJournal(future,
             event = "attachPackages",
          category = "overhead",
            parent = "launch",
             start = t_start,
              stop = Sys.time()
        )
      }
    }
    
    if (debug) mdebug_pop("requirePackages() ... done")
  }

  ## (2) Reset global environment of cluster node such that
  ##     previous futures are not affecting this one, which
  ##     may happen even if the future is evaluated inside a
  ##     local, e.g. local({ a <<- 1 }).
  ##     If the persistent = TRUE, this will be skipped.
  persistent <- isTRUE(future[["persistent"]])
  if (!persistent) {
    if (debug) mdebug_push("eraseGlobalEnvironment() ...")
    
    t_start <- Sys.time()
    
    ## (i) Reset global environment of cluster node such that
    ##     previous futures are not affecting this one, which
    ##     may happen even if the future is evaluated inside a
    ##     local, e.g. local({ a <<- 1 }).
    ## Blocking cluster-node call
    cluster_call_blocking(cl, fun = grmall, future = future, when = "call grmall() on", expected = "future-grmall")

    ## Add event to future journal
    if (inherits(future[[".journal"]], "FutureJournal")) {
      appendToFutureJournal(future,
           event = "eraseWorker",
        category = "overhead",
          parent = "launch",
           start = t_start,
            stop = Sys.time()
      )
    }
    if (debug) mdebug_pop("eraseGlobalEnvironment() ... done")
  }


  ## (3) Garbage collection
  ## FIXME: This is _before_, not _after_ as documented
  ##        Should use gc[1] for before and gc[2] for after
  if (isTRUE(future[["gc"]])) {
    cluster_call_blocking(cl, fun = function() { gc(); "future-gc" }, future = future, when = "call gc() on", expected = "future-gc")
  }


  ## (4) Launch future
  if (debug) mdebug_push("launchFuture() ...")
  worker <- future[["node"]]
  stop_if_not(
    length(worker) == 1L, is.integer(worker), !is.na(worker),
    worker >= 1L, worker <= length(workers)
  ) 
  if (debug) mdebugf("cluster node index: %d", worker)
  data <- getFutureData(future, mc.cores = 1L, debug = debug)
  node <- workers[[worker]]
  ## Non-blocking cluster-node call
  node_call_nonblocking(node, fun = evalFuture, args = list(data), future = future, when = "launch future on")
  FutureRegistry(reg, action = "add", future = future, earlySignal = FALSE, debug = debug)
  if (debug) mdebug_pop("launchFuture() ... done")

  future[["state"]] <- "running"

  if (debug) mdebugf("%s started", class(future)[1])
  
  future
}


#' @importFrom parallelly killNode
#' @export
interruptFuture.ClusterFutureBackend <- function(backend, future, ...) {
  ## Has interrupts been disabled by user?
  if (!backend[["interrupts"]]) return(future)
  
  workers <- backend[["workers"]]
  node_idx <- future[["node"]]
  node <- workers[[node_idx]]
  void <- suppressWarnings(killNode(node))
  future[["state"]] <- "interrupted"
  
  future
}

#' @export
stopWorkers.ClusterFutureBackend <- function(backend, interrupt = TRUE, ...) {
  debug <- isTRUE(getOption("future.debug"))
  if (debug) {
    mdebugf_push("stopWorkers() for %s ...", class(backend)[1])
    on.exit(mdebugf_pop("stopWorkers() for %s ... done", class(backend)[1]))
  }  
  
  ## Interrupt all futures
  if (interrupt) {
    mdebugf_push("Interrupt active futures ...")
    futures <- listFutures(backend)[["future"]]
    mdebugf("Number of futures: %d", length(futures))
    futures <- lapply(futures, FUN = interrupt, ...)
    mdebugf_pop("Interrupt active futures ... done")
  }

  ## Clear registry
  mdebugf_push("Clear future registry ...")
  reg <- backend[["reg"]]
  FutureRegistry(reg, action = "reset")
  ## Assert that reset worked
  ## FIXME: Remove this later; it should never fail
  futures <- listFutures(backend)[["future"]]
  if (length(futures) > 0L) {
    stop(FutureError(sprintf("Failed to erase futures for %s. There are still %d active futures", class(backend)[1], length(futures))))
  }
  mdebugf_pop("Clear future registry ... done")
  
  ## Stop workers
  mdebugf_push("Stop cluster workers ...")
  clusterRegistry$stopCluster(debug = debug)
  env <- environment(ClusterFutureBackend)
  env[["last"]] <- NULL
  mdebugf_pop("Stop cluster workers ... done")
  
  TRUE
}



#' @export
nbrOfWorkers.ClusterFutureBackend <- function(evaluator) {
  backend <- evaluator
  workers <- backend[["workers"]]
  stop_if_not(length(workers) > 0L, inherits(workers, "cluster"))
  workers <- length(workers)
  stop_if_not(length(workers) == 1L, !is.na(workers), workers >= 1L, is.finite(workers))
  workers
}


#' @export
nbrOfFreeWorkers.ClusterFutureBackend <- function(evaluator, ...) {
  debug <- isTRUE(getOption("future.debug"))
  backend <- evaluator
  workers <- backend[["workers"]]
  stop_if_not(length(workers) > 0L, inherits(workers, "cluster"))
  workers <- length(workers)
  reg <- backend[["reg"]]
  stop_if_not(length(reg) == 1L, is.character(reg), nzchar(reg))

  ## Number of unresolved cluster futures
  usedNodes <- length(FutureRegistry(reg, action = "list", earlySignal = FALSE, debug = debug))
  
  workers <- workers - usedNodes
  stop_if_not(length(workers) == 1L, !is.na(workers), workers >= 0L, is.finite(workers))
  
  workers
}


.makeCluster <- function(workers, ...) {
  if (length(workers) == 0L) return(NULL)
  oenv <- Sys.getenv("R_FUTURE_PLAN", NA_character_)
  Sys.unsetenv("R_FUTURE_PLAN")
  on.exit({
    if (!is.na(oenv)) Sys.setenv(R_FUTURE_PLAN = oenv)
  })
  
  args <- list(...)
  
  ## Ignore non-recognized arguments
  keep <- intersect(names(args), makeClusterPSOCK_args())
  args <- args[keep]

  args <- c(list(workers), args)
  cl <- do.call(makeClusterPSOCK, args = args, quote = TRUE)
  cl <- addCovrLibPath(cl)
  cl
} ## .makeCluster()


#' @importFrom parallel clusterCall
addCovrLibPath <- local({
  is_covr <- NULL
  
  function(cl) {
    if (!is.null(is_covr)) {
      if (!is_covr) return(cl)
    } else {
      is_covr <<- is.element("covr", loadedNamespaces())
      if (!is_covr) return(cl)
    }
    debug <- isTRUE(getOption("future.debug"))
    
    ## WORKAROUND: When running covr::package_coverage(), the
    ## package being tested may actually not be installed in
    ## library path used by covr.  We here add that path iff
    ## covr is being used. /HB 2016-01-15
    if (debug) mdebug_push("covr::package_coverage() workaround ...")
    libPath <- .libPaths()[1]
    clusterCall(cl, fun = function() .libPaths(c(libPath, .libPaths())))
    if (debug) mdebug_pop("covr::package_coverage() workaround ... DONE")
  
    cl
  }
})


getSocketSelectTimeout <- function(future, timeout = NULL) {
  if (!is.null(timeout)) return(timeout)
  
  ## FIXME: This should be memoized per plan, when setting up
  ## the plan /HB 2025-02-18
  timeout <- future[["resolved.timeout"]]
  if (!is.null(timeout)) return(timeout)

  timeout <- getOption("future.cluster.resolved.timeout")
  if (is.null(timeout)) {
    if (is.null(timeout)) {
      timeout <- getOption("future.resolved.timeout")
      if (is.null(timeout)) {
        timeout <- 0.01
      }
    }
      
    if (timeout < 0) {
      warning("Secret option 'future.resolved.timeout' is negative, which causes resolved() to wait until the future is resolved. This feature is only used for testing purposes of the future framework and must not be used elsewhere", immediate. = TRUE)
      timeout <- NULL
    }
  }

  ## WORKAROUND: Non-integer timeouts (at least < 2.0 seconds) may result
  ## in infinite waiting (PR17203).  Fixed in R devel r73470 (2017-10-05)
  ## and R 3.4.3
  ## Source: https://github.com/HenrikBengtsson/Wishlist-for-R/issues/35
  if (.Platform[["OS.type"]] != "windows" && getRversion() < "3.4.3") {
    timeout <- round(timeout, digits = 0L)
  }
  attr(timeout, "validated") <- TRUE
    
  ## Memoize 'timeout' in Future object
  if (!is.null(timeout)) {
    future[["resolved.timeout"]] <- timeout
  }

  timeout
} ## getSocketSelectTimeout()


#' @importFrom parallelly connectionId isConnectionValid 
#' @export
resolved.ClusterFuture <- function(x, run = TRUE, timeout = NULL, ...) {
  debug <- isTRUE(getOption("future.debug"))
  
  future <- x
  backend <- future[["backend"]]
  stop_if_not(inherits(backend, "FutureBackend"))
  workers <- backend[["workers"]]
  reg <- backend[["reg"]]
  
  ## A lazy future not even launched?
  if (future[["state"]] == "created") {
    if (run) {
      ## Can we launch it?  Are there available workers?
      
      ## Collect one resolved future, if one exists
      FutureRegistry(reg, action = "collect-first", earlySignal = TRUE, debug = debug)

      ## Find which nodes are available
      avail <- rep(TRUE, times = length(workers))
      futures <- FutureRegistry(reg, action = "list", earlySignal = FALSE, debug = debug)
      nodes <- unlist(lapply(futures, FUN = function(f) f[["node"]]), use.names = FALSE)
      avail[nodes] <- FALSE
      if (debug) mdebugf("avail: [n=%d] %s", length(avail), commaq(avail))
      
      ## If at least one is available, then launch this lazy future
      if (any(avail)) future <- run(future)
    }
    return(FALSE)
  }

  ## Is value already collected?
  if (!is.null(future[["result"]])) {
    ## Signal conditions early?
    signalEarly(future, ...)
    return(TRUE)
  }

  ## Assert that the process that created the future is
  ## also the one that evaluates/resolves/queries it.
  assertOwner(future)

  if (debug) mdebug_push("isFutureResolved() ...")

  node_idx <- future[["node"]]
  cl <- workers[node_idx]
  node <- cl[[1]]

  
  ## Check if workers socket connection is available for reading
  if (!is.null(con <- node[["con"]])) {
    ## AD HOC/SPECIAL CASE: Skip if connection has been serialized and lacks
    ## internal representation. /HB 2018-10-27
    connId <- connectionId(con)
    if (!is.na(connId) && connId < 0L) return(FALSE)

    ## Broken connection due to interruption?
    isValid <- isConnectionValid(con)
    if (!isValid && future[["state"]] %in% c("interrupted", "running")) {
      ## Did it fail because we interrupted a future, which resulted in the
      ## worker also shutting done? If so, turn the error into a run-time
      ## FutureInterruptError and revive the worker
      future <- handleInterruptedFuture(backend, future = future)
      return(TRUE)
    }
    
    assertValidConnection(future)

    if (is.null(timeout)) {
      timeout <- getSocketSelectTimeout(future, timeout = timeout)
    } else {
      ## WORKAROUND: Non-integer timeouts (at least < 2.0 seconds) may result
      ## n infinite waiting (PR17203).  Fixed in R devel r73470 (2017-10-05)
      ## and R 3.4.3
      ## Source: https://github.com/HenrikBengtsson/Wishlist-for-R/issues/35
      if (!isTRUE(attr(timeout, "validated", exact = TRUE)) && .Platform[["OS.type"]] != "windows" && getRversion() < "3.4.3") {
        timeout <- round(timeout, digits = 0L)
      }
    }

    ## Number of non-FutureResult objects to receive, before giving up
    maxCount <- 100L

    count <- 0L
    while (count < maxCount) {
      ## Is there a message from the worker waiting?
      res <- socketSelect(list(con), write = FALSE, timeout = timeout)
      if (!res) break

      ## Receive it
      msg <- receiveMessageFromWorker(future, debug = debug)

      ## If the message contains a FutureResult, then the future is resolved
      ## and we are done here
      res <- inherits(msg, "FutureResult")
      if (res) break

      ## If the message contains a FutureInterruptError, then the future
      ## was interrupted and we are done here
      res <- inherits(msg, "FutureInterruptError")
      if (res) break

      msg <- NULL

      ## If not, we received a condition that has already been signaled
      ## by receiveMessageFromWorker().  However, it could be that there is
      ## another condition messages available, so lets check again
      count <- count + 1L
    } ## while()
  } else if (inherits(node, "MPInode")) {
    res <- resolveMPI(future)
  } else {
    warnf("resolved() is not yet implemented for workers of class %s. Will use value() instead and return TRUE", sQuote(class(node)[1]))
    value(future, stdout = FALSE, signal = FALSE)
    res <- TRUE
  }

  if (debug) mdebug_pop("isFutureResolved() ... done")

  ## Signal conditions early? (happens only iff requested)
  if (res) signalEarly(x, ...)

  res
}


#' @importFrom parallelly isConnectionValid 
#' @export
result.ClusterFuture <- function(future, ...) {
  debug <- isTRUE(getOption("future.debug"))
  if (debug) {
    mdebug_push("result() for ClusterFuture ...")
    on.exit(mdebug_pop("result() for ClusterFuture ... done"))
  }

  ## Has the result already been collected?
  result <- future[["result"]]
  if (!is.null(result)) { 
    if (debug) mdebugf("result already collected: %s", class(result)[1])
    
    if (inherits(result, "FutureError")) {
      stop(result)
    }
    
    return(result)
  }

  ## Assert that the process that created the future is
  ## also the one that evaluates/resolves/queries it.
  assertOwner(future)

  backend <- future[["backend"]]  
  workers <- backend[["workers"]]  
  worker <- future[["node"]]
  node <- workers[[worker]]
  if (!is.null(con <- node[["con"]])) {
    ## Broken connection due to interruption?
    isValid <- isConnectionValid(con)
    if (!isValid && future[["state"]] %in% c("interrupted", "running")) {
      ## Did it fail because we interrupted a future, which resulted in the
      ## worker also shutting done? If so, turn the error into a run-time
      ## FutureInterruptError and revive the worker
      future <- handleInterruptedFuture(backend, future = future)
      return(future)
    }
    assertValidConnection(future)
  }

  repeat({
    result <- receiveMessageFromWorker(future, debug = debug)
    if (inherits(result, "FutureResult")) {
      return(result)
    } else if (inherits(result, "FutureInterruptError")) {
      stop(result)
    } else if (inherits(result, "FutureLaunchError")) {
      future[["result"]] <- result
      ## Remove future from registry
      backend <- future[["backend"]]
      reg <- backend[["reg"]]
      FutureRegistry(reg, action = "remove", future = future, earlySignal = FALSE, debug = debug)
      stop(result)
    }
  })
}



#' @importFrom parallelly isNodeAlive
receiveMessageFromWorker <- local({
  recvData <- import_parallel_fcn("recvData")
  
  closeNode <- import_parallel("closeNode", default = function(node) {
    con <- node[["con"]]
    if (inherits(con, "connection")) close(con)
  })
  
  function(future, debug = FALSE, ...) {
    if (debug) {
      mdebug_push("receiveMessageFromWorker() for ClusterFuture ...")
      on.exit(mdebug_pop("receiveMessageFromWorker() for ClusterFuture ... done"))
    }
    
    if (future[["state"]] == "created") {
      if (debug) mdebug("starting non-launched future")
      future <- run(future)
    }
  
    backend <- future[["backend"]]
    if (!inherits(backend, "FutureBackend") && !is.list(backend)) {
      stop(sprintf("[INTERNAL ERROR] receiveMessageFromWorker(): the 'backend' element of the %s object is neither a FutureBackend object nor a list: %s", class(future)[1], class(backend)[1]))
    }
    workers <- backend[["workers"]]
    reg <- backend[["reg"]]
  
    node_idx <- future[["node"]]
    if (debug) mdebugf("cluster node index: %d", node_idx)
    cl <- workers[node_idx]
    node <- cl[[1]]
  
    t_start <- Sys.time()

    ## If not, wait for process to finish, and
    ## then collect and record the value
    msg <- NULL
    ack <- tryCatch({
      data <- recvData(node)
      TRUE    
    }, error = function(ex) ex)
    if (debug) mprint(ack)
  
    if (inherits(ack, "error")) {
      if (debug) mdebugf("- parallel::recvData() produced an error: %s", conditionMessage(ack))

      ## Did it fail because we interrupted a future, which resulted in the worker
      ## also shutting done? If so, turn the error into a run-time FutureInterruptError
      ## and revive the worker
      if (future[["state"]] %in% c("interrupted", "running")) {
        future <- handleInterruptedFuture(backend, future = future)
        return(future[["result"]])
      }

      msg <- post_mortem_cluster_failure(ack, when = "receive message results from", node = node, future = future)
      ex <- FutureError(msg, call = ack[["call"]], future = future)
      future[["result"]] <- ex
      stop(ex)          
    } ## if (inherits(ack, "error"))
    
    stop_if_not(isTRUE(ack))
    if (debug) {
      mdebug("received data:")
      mstr(data)
    }

    msg <- data[["value"]]

    ## Non-expected message from worker?
    if (!inherits(msg, "FutureResult") && !inherits(msg, "condition")) {
      ## If parallel:::slaveLoop() ends up capturing the error, which should
      ## not happen unless there is a critical error, then it'll be of captured
      ## by try().
      if (inherits(msg, "try-error")) {
        ex <- FutureError(msg, future = future)
        future[["result"]] <- ex
        stop(ex)
      }

      str(list(data = data, msg = msg))
      
      node_info <- sprintf("%s #%d", sQuote(class(node)[1]), node_idx)
      if (inherits(node, "RichSOCKnode")) {
        specs <- summary(node)
        alive <- isNodeAlive(node)
        if (is.na(alive)) {
          alive <- "unknown if it is alive"
        } else if (alive) {
          alive <- "alive"
        } else {
          alive <- "not alive"
        }
        node_info <- sprintf("%s (PID %s; %s) on host %s (%s, platform %s)",
                             node_info,
                             specs[["pid"]], alive, sQuote(specs[["host"]]),
                             specs[["r_version"]], specs[["platform"]])
      }
      
      hint <- sprintf("This suggests that the communication with %s is out of sync.", node_info)
      ex <- UnexpectedFutureResultError(future, hint = hint)
      future[["result"]] <- ex
      stop(ex)
    }
  
    if (inherits(msg, "FutureResult")) {
      result <- msg
  
      if (inherits(future[[".journal"]], "FutureJournal")) {
        appendToFutureJournal(future,
             event = "receiveResult",
          category = "overhead",
            parent = "gather",
             start = t_start,
              stop = Sys.time()
        )
      }
  
      ## Add back already signaled and muffled conditions so that also
      ## they will be resignaled each time value() is called.
      signaled <- future[[".signaledConditions"]]
      if (length(signaled) > 0) {
        result[["conditions"]] <- c(future[[".signaledConditions"]], result[["conditions"]])
        future[[".signaledConditions"]] <- NULL
      }
  
      future[["result"]] <- result
      future[["state"]] <- "finished"
      if (debug) mdebug("Received FutureResult")
    
      ## Remove from backend
      FutureRegistry(reg, action = "remove", future = future, earlySignal = FALSE, debug = debug)
      if (debug) mdebug("Erased future from future backend")
  
      ## Always signal immediateCondition:s and as soon as possible.
      ## They will always be signaled if they exist.
      signalImmediateConditions(future)
    
      ## Garbage collect cluster worker?
      if (future[["gc"]]) {
        if (debug) mdebug_push("Garbage collecting worker ...")
        ## Cleanup global environment while at it
        if (!isTRUE(future[["persistent"]])) {
          ## Blocking cluster-node call
          cluster_call_blocking(cl[1], fun = grmall, future = future, when = "call grmall() on", expected = "future-grmall")
        }
        
        ## WORKAROUND: Need to clear cluster worker before garbage collection.
        ## This is needed for workers running R (<= 3.3.1). It will create
        ## another teeny, dummy object on the worker allowing any previous
        ## objects to be garbage collected.  For more details, see
        ## https://github.com/HenrikBengtsson/Wishlist-for-R/issues/27.
        ## (We return a value identifiable for troubleshooting purposes)
        ## Blocking cluster-node call
        cluster_call_blocking(cl[1], function() "future-clearing-cluster-worker", future = future, when = "call dummy() on", expected = "future-clearing-cluster-worker")
        
        ## Blocking cluster-node call
        cluster_call_blocking(cl[1], function() { gc(); "future-gc" }, verbose = FALSE, reset = FALSE, future = future, when = "call gc() on", expected = "future-gc")
        if (debug) mdebug_pop("Garbage collecting worker ... done")
      }
    } else if (inherits(msg, "FutureLaunchError")) {
    } else if (inherits(msg, "condition")) {
      condition <- msg
      
      if (debug) {
        mdebug("Received condition:")
        mstr(condition)
      }
  
      ## Sanity check
      if (inherits(condition, "error")) {
        label <- sQuoteLabel(future[["label"]])
        stop(FutureError(sprintf("Received a %s condition from the %s worker for future (%s), which is not possible to relay because that would break the internal state of the future-worker communication. The condition message was: %s", class(condition)[1], class(future)[1], label, sQuote(conditionMessage(condition))), future = future))
      }
  
      ## Resignal condition
      if (inherits(condition, "warning")) {
        warning(condition)
      } else if (inherits(condition, "message")) {
        message(condition)
      } else {
        signalCondition(condition)
      }
  
      ## Increment signal count
      signaled <- condition[["signaled"]]
      if (is.null(signaled)) signaled <- 0L
      condition[["signaled"]] <- signaled + 1L
      
      ## Record condition as signaled
      signaled <- future[[".signaledConditions"]]
      if (is.null(signaled)) signaled <- list()
      signaled <- c(signaled, list(condition))
      future[[".signaledConditions"]] <- signaled
    }
  
    msg
  }
}) ## receiveMessageFromWorker()


## Returns the index of an available cluster node among the
## backend[["workers"]] nodes. It validates that the node is
## functional, by to a quick parallel::clusterCall(). If that
## fails, it re-launches the cluster node and re-assigns it
## to backend[["workers"]], before returning.
#' @importFrom parallelly isConnectionValid isNodeAlive cloneNode
#' @importFrom parallel clusterCall
requestNode <- function(await, backend, timeout, delta, alpha) {
  debug <- isTRUE(getOption("future.debug"))
  if (debug) {
    mdebug_push("requestNode() ...")
    on.exit(mdebug_pop("requestNode() ... done"), add = TRUE)
  }

  stop_if_not(inherits(backend, "FutureBackend"))
  workers <- backend[["workers"]]
  stop_if_not(inherits(workers, "cluster"))
  stop_if_not(is.function(await))
  stop_if_not(is.finite(timeout), timeout >= 0)
  stop_if_not(is.finite(alpha), alpha > 0)

  ## Maximum number of nodes available
  total <- length(workers)
  if (debug) mdebugf("Number of workers: %d", total)

  ## FutureRegistry to use
  name <- attr(workers, "name", exact = TRUE)
  stop_if_not(is.character(name), length(name) == 1L)
  reg <- sprintf("workers-%s", name)
  
  usedNodes <- function() {
    ## Number of unresolved cluster futures
    length(FutureRegistry(reg, action = "list", earlySignal = FALSE, debug = debug))
  }

  if (debug) mdebug_push("Polling for a free worker ...")
  t0 <- Sys.time()
  dt <- 0
  iter <- 1L
  interval <- delta
  finished <- FALSE
  while (dt <= timeout) {
    ## Check for available nodes
    used <- usedNodes()
    finished <- (used < total)
    if (finished) break

    if (debug) mdebugf("Poll #%d (%s): usedNodes() = %d, workers = %d", iter, format(round(dt, digits = 2L)), used, total)

    ## Wait
    Sys.sleep(interval)
    interval <- alpha * interval
    
    ## Finish/close workers, iff possible
    await()

    iter <- iter + 1L
    dt <- difftime(Sys.time(), t0)
  }
  if (debug) {
    mdebugf("Total time: %s", dt)
    mdebug_pop("Polling for a free worker ... done")
  }

  if (!finished) {
    msg <- sprintf("TIMEOUT: All %d cluster nodes are still occupied after %s (polled %d times)", total, format(round(dt, digits = 2L)), iter)
    if (debug) mdebug(msg)
    ex <- FutureError(msg, future = future)
    stop(ex)
  }

  ## Find which node is available
  avail <- rep(TRUE, times = length(workers))
  futures <- FutureRegistry(reg, action = "list", earlySignal = FALSE, debug = debug)
  nodes <- unlist(lapply(futures, FUN = function(f) f[["node"]]), use.names = FALSE)
  avail[nodes] <- FALSE

  ## Sanity check
  stop_if_not(any(avail))

  node_idx <- which(avail)[1L]
  stop_if_not(is.numeric(node_idx), is.finite(node_idx), node_idx >= 1)
  if (debug) mdebugf("Index of first available worker: %d", node_idx)
  
  ## Validate that the cluster node is working
  if (debug) mdebug_push("Validate that the worker is functional ...")
  cl <- workers[node_idx]
  stop_if_not(length(cl) == 1L, inherits(cl, "cluster"))

  maxTries <- 3L
  for (kk in maxTries:1) {
    okay <- TRUE
    truth <- "future:::requestNode() validation call"
    res <- tryCatch({
      clusterCall(cl = cl, function(x) x, truth)[[1]]
    }, error = identity)
    ## If not working, investigate why, and relaunch a new worker
    if (inherits(res, "error") || !identical(res, truth)) {
      if (debug) {
        mdebug("Worker is non-functional")
        if (inherits(res, "error")) {
          mdebug("Error received: ", conditionMessage(res))
        } else {
          mdebug("Result received: ", sQuote(res))
        }
      }
      okay <- FALSE
  
      ## Is the connection working?
      node <- cl[[1]]
      con <- node[["con"]]
      connectionOkay <- NA
      if (inherits(con, "connection")) {
        connectionOkay <- isConnectionValid(con)
        if (debug) mdebug("Connection is valid: ", connectionOkay)
      }
  
      if (is.na(connectionOkay) || connectionOkay) {
        ## If the node does not use a connection, or the  connection is working,
        ## we can only assume the worker is also alive.  If so, we should try to
        ## kill the worker.
        res <- suppressWarnings(killNode(node))
        if (debug) mdebugf("Killed %s: %s", class(node)[1], res)
      } else {
        ## If connection is not working, we could assume the worker is no longer
        ## alive, but it could also be a network issues. In either case, we
        ## should try to kill it, just in case.
        res <- suppressWarnings(killNode(node))
        if (debug) mdebugf("Killed %s: %s", class(node)[1], res)
      }
      if (kk == 1L) {
        stop(FutureError(sprintf("Failed to find a functional cluster workers, after attempting to relaunch the parallel worker %d times", maxTries)))
      }
    } else {
      if (debug) mdebug("Worker is functional")
      break
    }
    if (debug) mdebug_pop("Validate that the worker is functional ... done")
    
    ## Relaunch worker?
    if (!okay) {
      if (debug) mdebugf_push("Restarting non-alive cluster node %d ...", node_idx)
      node2 <- tryCatch({
        cloneNode(node)
      }, error = identity)
      if (inherits(node2, "error")) {
        msg <- sprintf("One of the future workers of class %s, part of a cluster of class %s, was interrupted and attempts to relaunch it failed", sQuote(class(node)[1]), sQuote(class(cl)[1]))
        if (inherits(node, c("SOCKnode", "SOCK0node")) &&
            !inherits(node, c("RichSOCKnode"))) {
          msg <- sprintf("%s. If you created your cluster with parallel::makeCluster(), try with parallelly::makeClusterPSOCK() instead", msg)
        }
        msg <- sprintf("%s. The reported reason was: %s", msg, conditionMessage(node2))
        stop(FutureError(msg))
      } else {
        node <- node2
      }
      
      cl[[1]] <- node
      
      workers[[node_idx]] <- node
      backend[["workers"]] <- workers
      
      if (debug) {
        mdebug("Re-launched cluster node:")
        mprint(node)
        mdebugf_pop("Restarting non-alive cluster node %d ... done", node_idx)
      }
    }
    
    Sys.sleep(0.1)
  } ##  for (kk in maxTries:1)
  
  node_idx
} ## requestNode()



node_call_nonblocking <- local({
  sendCall <- import_parallel_fcn("sendCall")
  
  function(node, ..., when = "send call to", future) {
    tryCatch({
      sendCall(node, ...)
    }, error = function(ex) {
      msg <- post_mortem_cluster_failure(ex, when = when, node = node, future = future)
      ex <- FutureError(msg, future = future)
      stop(ex)          
    })
  }
})

#' @importFrom parallel clusterCall
cluster_call_blocking <- function(cl, ..., when = "call function on", future, expected = NULL) {
  stop_if_not(inherits(cl, "cluster"), length(cl) == 1L)
  stop_if_not(inherits(future, "Future"))
  
  ans <- tryCatch({
    clusterCall(cl = cl, ...)
  }, error = function(ex) {
    msg <- post_mortem_cluster_failure(ex, when = when, node = cl[[1]], future = future)
    ex <- FutureError(msg, future = future)
    future[["result"]] <- ex
    stop(ex)          
  })

  with_assert({
    stop_if_not(length(ans) == 1L, is.list(ans))
    if (!is.null(expected)) {
      value <- ans[[1]]
      if (length(value) != 1L || !is.character(value) || 
          is.na(value) || value != "future-grmall") {
        utils::str(list(ans = ans, expected = expected))
        stop(sprintf("parallel::clusterCall() did not return string %s as expected. Received a %s object instead: %s", sQuote(expected), class(value)[1], paste(deparse(value), collapse = "; ")))
      }
    }
  })
} ## cluster_call_blocking()


#' @importFrom parallelly isNodeAlive
post_mortem_cluster_failure <- local({
  pid_exists <- import_parallelly("pid_exists")
  
  function(ex, when, node, future) {
    stop_if_not(inherits(ex, "error"))
    stop_if_not(length(when) == 1L, is.character(when))
    stop_if_not(inherits(future, "Future"))
    
    node_idx <- future[["node"]]
    if (is.null(node_idx)) {
      node_idx <- NA_integer_
    } else {
      stop_if_not(length(node_idx) == 1L, is.numeric(node_idx))
      node_idx <- as.integer(node_idx)
    }
    
    ## (1) Trimmed error message
    reason <- conditionMessage(ex)
  
    ## (2) Information on the cluster node
    
    ## (a) Process information on the worker, if available
    pid <- node[["session_info"]][["process"]][["pid"]]
    pid_info <- if (is.numeric(pid)) sprintf("PID %.0f", pid) else NULL
  
    ## (b) Host information on the worker, if available
    ##     AD HOC: This assumes that the worker has a hostname, which is not
    ##     the case for MPI workers. /HB 2017-03-07
    host <- node[["host"]]
    localhost <- isTRUE(attr(host, "localhost", exact = TRUE))
    host_info <- if (!is.null(host)) {
      sprintf("on %s%s", if (localhost) "localhost " else "", sQuote(host))
    } else NULL
    
    node_info <- sprintf("cluster %s #%d (%s)",
                         class(node)[1], node_idx,
                         paste(c(pid_info, host_info), collapse = " "))
    stop_if_not(length(node_info) == 1L)
    
    ## (3) Information on the future
    label <- sQuoteLabel(future[["label"]])
  
    ## (4) POST-MORTEM ANALYSIS:
    postmortem <- list()
  
    ## (a) Inspect the 'reason' for known clues
    if (grepl("ignoring SIGPIPE signal", reason)) {
      postmortem[["sigpipe"]] <- "The SIGPIPE error suggests that the R socket connection to the parallel worker broke, which can happen for different reasons, e.g. the parallel worker crashed"
    }
  
    ## (a) Did the worker process terminate?
    if (!is.null(host) && is.numeric(pid)) {
      if (localhost) {
        alive <- pid_exists(pid)
        if (is.na(alive)) {
          msg2 <- "Failed to determined whether a process with this PID exists or not, i.e. cannot infer whether localhost worker is alive or not"
        } else if (alive) {
          msg2 <- "A process with this PID exists, which suggests that the localhost worker is still alive"
        } else {
          msg2 <- "No process exists with this PID, i.e. the localhost worker is no longer alive"
        }
      } else {
        ## Checking remote workers on hosts requires parallelly (>= 1.36.0)
        alive <- isNodeAlive(node, timeout = getOption("future.alive.timeout", 30.0))
        if (is.na(alive)) {
          msg2 <- "Failed to determined whether the process with this PID exists or not on the remote host, i.e. cannot infer whether remote worker is alive or not"
        } else if (alive) {
          msg2 <- "A process with this PID exists on the remote host, which suggests that the remote worker is still alive"
        } else {
          msg2 <- "No process exists with this PID on the remote host, i.e. the remote worker is no longer alive"
        }
      }
      postmortem[["alive"]] <- msg2
    }
  
    ## (b) Did the worker use a connection that changed?
    if (inherits(node[["con"]], "connection")) {
      postmortem[["connection"]] <- check_connection_details(node, future = future)
    }
  
    ## (c) Any non-exportable globals?
    globals <- future[["globals"]]
    postmortem[["non_exportable"]] <- assert_no_references(globals, action = "string")
  
    ## (d) Size of globals
    postmortem[["global_sizes"]] <- summarize_size_of_globals(globals)
  
    ## (5) The final error message
    msg <- sprintf("%s (%s) failed to %s %s. The reason reported was %s",
                   class(future)[1], label, when, node_info, sQuote(reason))
    stop_if_not(length(msg) == 1L)
    if (length(postmortem) > 0) {
      postmortem <- unlist(postmortem, use.names = FALSE)
      msg <- sprintf("%s. Post-mortem diagnostic: %s",
                     msg, paste(postmortem, collapse = ". "))
      stop_if_not(length(msg) == 1L)
    }
  
    msg
  } # post_mortem_cluster_failure()
})



getPsockImmediateConditionHandler <- local({
  sendCondition <- NULL

  function(frame = 1L) {
    if (is.function(sendCondition)) return(sendCondition)

    ns <- getNamespace("parallel")
    if (exists("sendData", mode = "function", envir = ns)) {
      parallel_sendData <- get("sendData", mode = "function", envir = ns)

      ## Find the 'master' argument of the worker's {slave,work}Loop()
      envir <- sys.frame(frame)
      master <- NULL
      while (!identical(envir, .GlobalEnv) && !identical(envir, emptyenv())) {
        if (exists("master", mode = "list", envir = envir, inherits = FALSE)) {
          master <- get("master", mode = "list", envir = envir, inherits = FALSE)
          if (inherits(master, c("SOCKnode", "SOCK0node"))) {
            sendCondition <<- function(cond) {
              data <- list(type = "VALUE", value = cond, success = TRUE)
              parallel_sendData(master, data)
            }
            return(sendCondition)
          }
        }
        frame <- frame + 1L
        envir <- sys.frame(frame)
      }
    }  

    ## Failed to locate 'master' or 'parallel:::sendData()',
    ## so just ignore immedicate conditions
    sendCondition <<- function(cond) NULL
  }
}) ## getPsockImmediateConditionHandler()


psockImmediateConditionHandler <- function(cond) {
  handler <- getPsockImmediateConditionHandler()
  handler(cond)
}


#' @importFrom parallelly connectionId isConnectionValid
assertValidConnection <- function(future) {
  debug <- isTRUE(getOption("future.debug"))
  if (debug) {
    mdebug_push("assertValidConnection() ...")
    on.exit(mdebug_pop("assertValidConnection() ... done"))
  }

  backend <- future[["backend"]]

  node_idx <- future[["node"]]
  if (debug) mdebugf("cluster node index: %d", node_idx)

  cl <- backend[["workers"]][node_idx]
  node <- cl[[1]]

  ## Nothing to do?
  if (is.null(con <- node[["con"]])) return()

  ## AD HOC/SPECIAL CASE: Skip if connection has been serialized and lacks internal representation. /HB 2018-10-27
  connId <- connectionId(con)
  if (!is.na(connId) && connId < 0L) return()

  isValid <- isConnectionValid(con)
  if (!isValid) {
    ex <- simpleError("Connection to the worker is corrupt")
    msg <- post_mortem_cluster_failure(ex, when = "receiving message from", node = node, future = future)
    stop(FutureError(msg, future = future))
  }
} ## assertValidConnection()



#' @export
getFutureBackendConfigs.ClusterFuture <- function(future, ..., debug = isTRUE(getOption("future.debug"))) {
  resignalImmediateConditions <- getOption("future.psock.relay.immediate")
  if (isFALSE(resignalImmediateConditions)) return(list())

  conditionClasses <- future[["conditions"]]
  if (is.null(conditionClasses)) return(list())
  
  immediateConditionClasses <- attr(conditionClasses, "immediateConditionClasses", exact = TRUE)
  if (is.null(immediateConditionClasses)) {
    immediateConditionClasses <- "immediateCondition"
  } else if (length(immediateConditionClasses) == 0L) {
    return(list())
  }
  
  ## Does the cluster node communicate with a connection?
  ## (if not, it's via MPI)
  if (!future[["nodeHasConnection"]]) return(list())

  capture <- list(
    immediateConditionHandlers = list(
      immediateCondition = psockImmediateConditionHandler
    )
  )

  list(
    capture = capture
  )
}





#' @importFrom parallelly cloneNode isNodeAlive
handleInterruptedFuture <- local({
  closeNode <- import_parallel("closeNode", default = function(node) {
    con <- node[["con"]]
    if (inherits(con, "connection")) close(con)
  })
  
  function(backend, future, relaunchWorker = TRUE, ...) {
    debug <- isTRUE(getOption("future.debug"))
    if (debug) {
      mdebug_push("handleInterruptedFuture() for ClusterFutureBackend ...")
      on.exit(mdebug_pop("handleInterruptedFuture() for ClusterFutureBackend ... done"))
    }
  
    stop_if_not(future[["state"]] %in% c("interrupted", "running"))
  
    label <- sQuoteLabel(future[["label"]])
    workers <- backend[["workers"]]
    node_idx <- future[["node"]]
    cl <- workers[node_idx]
    node <- cl[[1]]
    host <- node[["host"]]
    msg <- sprintf("A future (%s) of class %s was interrupted, while running on %s", label, class(future)[1], sQuote(host))
    if (inherits(node, "RichSOCKnode")) {
      pid <- node[["session_info"]][["process"]][["pid"]]
      if (!is.null(pid)) msg <- sprintf("%s (pid %s)", msg, pid)
    }
    result <- FutureInterruptError(msg, future = future)
    future[["result"]] <- result
    future[["state"]] <- "interrupted"
  
    ## Remove from backend
    reg <- backend[["reg"]]
    exists <- FutureRegistry(reg, action = "contains", future = future, debug = debug)
    if (exists) {
      FutureRegistry(reg, action = "remove", future = future, earlySignal = FALSE, debug = debug)
      if (debug) mdebug("Erased future from future backend")
    }
  
    ## Try to relaunch worker, if it is no longer running?
    if (relaunchWorker) {
      alive <- isNodeAlive(node)
      if (debug) mdebugf("cluster node is alive: %s", alive)
      ## It is not possible to check if a node is alive on all types of clusters.
      ## If that is the case, the best we can do is to assume it is alive
      if (!is.na(alive) && !alive) {
        ## Launch a new cluster node, by cloning the old one
        node2 <- cloneNode(node)
        
        ## Add to cluster
        workers[[node_idx]] <- node2
    
        ## Update backend
        backend[["workers"]] <- workers
        
        ## Make sure to close the old cluster node (including any connection)
        tryCatch(closeNode(node), error = identity)
      }
    } ## if (relaunchWorker)
  
    future
  } ## handleInterruptedFuture()
})



#' Create a cluster future whose value will be resolved asynchronously in a parallel process
#'
#' A cluster future is a future that uses cluster evaluation,
#' which means that its _value is computed and resolved in
#' parallel in another process_.
#'
#' @details
#' This function is _not_ meant to be called directly.  Instead, the
#' typical usages are:
#'
#' ```r
#' # Evaluate futures via a single background R process on the local machine
#' plan(cluster, workers = 1)
#'
#' # Evaluate futures via two background R processes on the local machine
#' plan(cluster, workers = 2)
#'
#' # Evaluate futures via a single R process on another machine on on the
#' # local area network (LAN)
#' plan(cluster, workers = "raspberry-pi")
#'
#' # Evaluate futures via a single R process running on a remote machine
#' plan(cluster, workers = "pi.example.org")
#'
#' # Evaluate futures via four R processes, one running on the local machine,
#' # two running on LAN machine 'n1' and one on a remote machine
#' plan(cluster, workers = c("localhost", "n1", "n1", "pi.example.org"))
#' ```
#'
#' @inheritParams Future-class
#' @inheritParams future
#' @inheritParams FutureBackend
#'
#' @param workers A \code{\link[parallel:makeCluster]{cluster}} object,
#' a character vector of host names, a positive numeric scalar,
#' or a function.
#' If a character vector or a numeric scalar, a `cluster` object
#' is created using \code{\link[parallelly:makeClusterPSOCK]{makeClusterPSOCK}(workers)}.
#' If a function, it is called without arguments _when the future
#' is created_ and its value is used to configure the workers.
#' The function should return any of the above types.
#'
#' @param persistent If FALSE, the evaluation environment is cleared
#' from objects prior to the evaluation of the future.
#' 
#' @param \ldots Additional named elements passed to [Future()].
#'
#' @return
#' A ClusterFuture.
#'
#' @example incl/cluster.R
#'
#' @export
cluster <- function(..., workers = availableWorkers(), gc = FALSE, earlySignal = FALSE, persistent = FALSE, envir = parent.frame()) {
  ## WORKAROUNDS:
  ## (1) promises::future_promise() calls the "evaluator" function directly
  if ("promises" %in% loadedNamespaces()) {
    return(future(..., gc = gc, earlySignal = earlySignal, envir = envir))
  }
  
  stop("INTERNAL ERROR: The future::cluster() function implements the FutureBackend and should never be called directly")
}
class(cluster) <- c("cluster", "multiprocess", "future", "function")
attr(cluster, "init") <- TRUE
attr(cluster, "factory") <- ClusterFutureBackend
attr(cluster, "tweakable") <- tweakable(attr(cluster, "factory"))


## NOTE, we must not memoize the cluster as part of the ClusterFutureBackend
## function, because that function is set as attribute "factory" of the
## 'cluster' function, which will be passed along to parallel workers
## as part of plan("tail").
#' @importFrom parallel stopCluster
clusterRegistry <- local({
  ## We only allow one parallel 'cluster' per session
  cluster <- NULL

  getCluster <- function(..., debug = FALSE) {
    if (debug) {
      mdebug_push("getCluster() ...")
      mdebug_pop("getCluster() ... done")
    }
    cluster
  }

  startCluster <- function(workers, makeCluster, ..., debug = FALSE) {
    if (debug) {
      mdebug_push("makeCluster(workers, ...) ...")
      on.exit(mdebug_pop("makeCluster(workers, ...) ... done"))
      mdebug("Arguments:")
      mstr(list(workers, ...))
    }
    
    cl <- makeCluster(workers, ...)
    
    ## Attach name to cluster?
    name <- attr(cl, "name", exact = TRUE)
    if (is.null(name)) {
      name <- uuid(cl)
      stop_if_not(length(name) > 0, nzchar(name))
      attr(cl, "name") <- name
      if (debug) mdebug("Generated cluster UUID")
    }
    if (debug) {
      mdebugf("Cluster UUID: %s", sQuote(name))
      mprint(cl)
    }

    ## Memoize
    cluster <<- cl

    cluster
  } ## startCluster()

  stopCluster <- function(debug = FALSE) {
    if (debug) {
      mdebug_push("Stopping existing cluster ...")
      on.exit(mdebug_pop("Stopping existing cluster ... done"))
    }
    ## Nothing to do?
    if (is.null(cluster)) {
      if (debug) mdebug("No pre-existing cluster. Skipping")
      return(NULL)
    }

    ## ROBUSTNESS:
    ## Clear memoization of cluster, regardless of success or failure
    on.exit({
      cluster <<- NULL
    }, add = TRUE)

    if (debug) {
      mdebug("Cluster to shut down:")
      mprint(cluster)
    }
   
    res <- tryCatch({
      parallel::stopCluster(cluster)
    }, error = identity)
    if (debug) mdebugf("Stopped cluster: %s", commaq(deparse(res)))

    NULL
  } ## stopCluster()

  list(
      getCluster = getCluster,
    startCluster = startCluster,
     stopCluster = stopCluster
  )
}) ## clusterRegistry()

Try the future package in your browser

Any scripts or data that you put into this service are public.

future documentation built on April 12, 2025, 1:25 a.m.