inst/testme/test-resolved-non-blocking-test.R

#' @tags resolved
#' @tags sequential multisession multicore
#' @tags skip_on_cran

library(future)
options(future.debug = FALSE)

## Don't poll too frequently in order for these tests to not fail
options(future.wait.interval = 0.1)

message("*** resolved() - assert non-blocking while launching lazy futures ...")

for (cores in 1:availCores) {
  message(sprintf("Testing with %d cores ...", cores))
  options(mc.cores = cores)

  strategies <- supportedStrategies(cores)
  print(strategies)
  
  for (strategy in strategies) {
    message(sprintf("- plan('%s') with cores=%d ...", strategy, cores))
    plan(strategy)

    ## This test requires that all workers are free when tests start
    resetWorkers(plan())
  
    message("Creating lazy futures:")
    xs <- as.list(1:3)
    fs <- lapply(xs, FUN = function(kk) {
      future({
        Sys.sleep(kk/2)
        kk
      }, lazy = TRUE)
    })
    vs <- vector("list", length = length(fs))
    ss <- vapply(fs, FUN = function(f) f$state, NA_character_)
    print(ss)
    stopifnot(all(ss == "created"))
    rs <- rep(NA, times = length(fs))
    
    for (ff in seq_along(fs)) {
      for (kk in ff:length(fs)) {
        message(sprintf("Checking if future #%d is resolved:", kk))
        rs[[kk]] <- resolved(fs[[kk]])
        ss <- vapply(fs, FUN = function(f) f$state, NA_character_)
        print(ss)
        if (inherits(fs[[kk]], "UniprocessFuture")) {
          stopifnot(rs[[kk]])
          stopifnot(ss[[kk]] == "finished")
        } else if (inherits(fs[[kk]], "MultiprocessFuture")) {
          if (nbrOfWorkers() + ff - 1L >= kk) {
            stopifnot(ss[[kk]] == "running")
          } else {
            ## Most commonly, we get 'created' here, but it might already be
            ## 'running' (observed once on win-builder on 2020-10-30)
            stopifnot(ss[[kk]] %in% c("created", "running"))
          }
          stopifnot(!rs[[kk]])
        }
      } ## for (kk ...)
    
      message(sprintf("Waiting for future #%d to finish ... ", ff), appendLF = FALSE)
      vs[[ff]] <- value(fs[[ff]])
      message("done")
    
      rs[[ff]] <- resolved(fs[[ff]])
      stopifnot(rs[ff])
    
      ss <- vapply(fs, FUN = function(f) f$state, NA_character_)
      stopifnot(ss[ff] == "finished")
      nbrOfFinished <- sum(ss == "finished")
      if (inherits(fs[[kk]], "UniprocessFuture")) {
        stopifnot(nbrOfFinished == length(fs))
      } else {
        stopifnot(nbrOfFinished == ff)
      }
    } ## for (ff ...)
    
    ss <- vapply(fs, FUN = function(f) f$state, NA_character_)
    print(ss)
    stopifnot(all(ss == "finished"))
    
    message("Collecting values:")
    vs <- value(fs)
    str(vs)
    stopifnot(identical(vs, xs))
  
    message(sprintf("- plan('%s') with cores=%d ... DONE", strategy, cores))
  } ## for (strategy ...)

  message(sprintf("Testing with %d cores ... DONE", cores))
} ## for (cores ...)

message("*** resolved() - assert non-blocking while launching lazy futures ... DONE")

Try the future package in your browser

Any scripts or data that you put into this service are public.

future documentation built on April 12, 2025, 1:25 a.m.