tests/testthat/test-cluster.R

test_that("test parallel collisions", {
  skip_on_cran() # testing multi-threaded things on CRAN
  # skip_on_os("mac")
  skip_if_not_installed("parallel")

  startTime <- Sys.time()

  testInit("parallel", tmpFileExt = c(".tif", ".grd", ".txt"))

  # make cluster -- note this works if cluster is FORK also, but for simplicity, using default
  #   which works on Linux, Mac, Windows
  N <- min(2L, parallel::detectCores())

  if (useDBI()) {
    if (!file.exists(CacheDBFile(tmpdir))) {
      createCache(tmpdir)
    }
  }

  # make function that will write to cache repository from with clusters
  numNew <- 20
  fun <- function(x, cachePath, numNew, keepLog) {
    library(reproducible)
    if (keepLog) {
      fn <- file.path("c:/Eliot/tmpCache/log", Sys.getpid())
      checkPath(dirname(fn), create = TRUE)
    }
    withCallingHandlers(
      Cache(rnorm, 10,
        sd = x %% numNew + 1, cachePath = cachePath,
        verbose = 4
      ), # The "overwrite" means delete, rewrite, delete, rewrite over and over
      message = function(m) {
        if (keepLog) {
          cat(m$message, file = fn, append = TRUE)
        }
      }
    )
  }
  cl <- makeCluster(N)
  on.exit(parallel::stopCluster(cl), add = TRUE)

  keepLog <- FALSE
  if (isWindows() && Sys.info()["user"] == "emcintir") {
    # tmpdir <- 'c:/Eliot/tmpCache/'
    keepLog <- TRUE
  }

  clusterSetRNGStream(cl)
  parallel::clusterEvalQ(cl, {
    library(reproducible)
  })
  numToRun <- 140
  if (interactive()) {
    print(tmpdir)
  }

  # There is a 'creating Cache at the same time' problem -- haven't resolved
  #  Just make cache first and it seems fine
  clearCache(tmpdir) # If in c:/Eliot, then it may be reused
  Cache(rnorm, 1, cachePath = tmpdir)
  a <- try(
    clusterMap(
      cl = cl, fun, seq(numToRun), numNew = numNew, keepLog = keepLog,
      cachePath = tmpdir, .scheduling = "dynamic"
    ),
    silent = FALSE
  )
  if (!is(a, "try-error")) {
    expect_true(is.list(a))
    expect_true(length(a) == numToRun)
    news <- vapply(a, function(x) attr(x, ".Cache")$newCache, FUN.VALUE = logical(1))
    expect_equal(sum(news), numNew)
  }
  endTime <- Sys.time()
  if (interactive()) {
    print(endTime - startTime)
  }
})
PredictiveEcology/reproducible documentation built on June 3, 2024, 10:05 p.m.