tests/resolved-non-blocking-test.R

source("incl/start.R")
options(future.debug = FALSE)

## Don't poll too frequently in order for these tests to not fail
options(future.wait.interval = 0.1)

message("*** resolved() - assert non-blocking while launching lazy futures ...")

for (cores in 1:availCores) {
  message(sprintf("Testing with %d cores ...", cores))
  options(mc.cores = cores)

  strategies <- supportedStrategies(cores)
  print(strategies)
  
  for (strategy in strategies) {
    message(sprintf("- plan('%s') with cores=%d ...", strategy, cores))
    plan(strategy)

    ## This test requires that all workers are free when tests start
    resetWorkers(plan())
  
    message("Creating lazy futures:")
    xs <- as.list(1:3)
    fs <- lapply(xs, FUN = function(kk) {
      future({
        Sys.sleep(kk)
        kk
      }, lazy = TRUE)
    })
    vs <- vector("list", length = length(fs))
    ss <- vapply(fs, FUN = function(f) f$state, NA_character_)
    print(ss)
    stopifnot(all(ss == "created"))
    rs <- rep(NA, times = length(fs))
    
    for (ff in seq_along(fs)) {
      for (kk in ff:length(fs)) {
        message(sprintf("Checking if future #%d is resolved:", kk))
        rs[[kk]] <- resolved(fs[[kk]])
        ss <- vapply(fs, FUN = function(f) f$state, NA_character_)
        print(ss)
        if (inherits(fs[[kk]], "UniprocessFuture")) {
          stopifnot(rs[[kk]])
          stopifnot(ss[[kk]] == "finished")
        } else if (inherits(fs[[kk]], "MultiprocessFuture")) {
          if (nbrOfWorkers() + ff - 1L >= kk) {
            stopifnot(ss[[kk]] == "running")
          } else {
            ## Most commonly, we get 'created' here, but it might already be
            ## 'running' (observed once on win-builder on 2020-10-30)
            stopifnot(ss[[kk]] %in% c("created", "running"))
          }
          stopifnot(!rs[[kk]])
        }
      } ## for (kk ...)
    
      message(sprintf("Waiting for future #%d to finish ... ", ff), appendLF = FALSE)
      vs[[ff]] <- value(fs[[ff]])
      message("done")
    
      rs[[ff]] <- resolved(fs[[ff]])
      stopifnot(rs[ff])
    
      ss <- vapply(fs, FUN = function(f) f$state, NA_character_)
      stopifnot(ss[ff] == "finished")
      nbrOfFinished <- sum(ss == "finished")
      if (inherits(fs[[kk]], "UniprocessFuture")) {
        stopifnot(nbrOfFinished == length(fs))
      } else {
        stopifnot(nbrOfFinished == ff)
      }
    } ## for (ff ...)
    
    ss <- vapply(fs, FUN = function(f) f$state, NA_character_)
    print(ss)
    stopifnot(all(ss == "finished"))
    
    message("Collecting values:")
    vs <- value(fs)
    str(vs)
    stopifnot(identical(vs, xs))
  
    message(sprintf("- plan('%s') with cores=%d ... DONE", strategy, cores))
  } ## for (strategy ...)

  message(sprintf("Testing with %d cores ... DONE", cores))
} ## for (cores ...)

message("*** resolved() - assert non-blocking while launching lazy futures ... DONE")

source("incl/end.R")

Try the future package in your browser

Any scripts or data that you put into this service are public.

future documentation built on Sept. 11, 2024, 7:35 p.m.