tests/testthat/test-9-future.R

drake_context("future")

test_with_dir("future parallelism for CRAN", {
  skip_if_not_installed("future")
  plan <- drake_plan(x = 1)
  for (caching in c("main", "worker")) {
    clean()
    make(plan, parallelism = "future", caching = caching)
    config <- drake_config(plan)
    expect_equal(justbuilt(config), "x")
  }
})

test_with_dir("future package functionality", {
  skip_on_cran()
  skip_if_not_installed("future")
  future::plan(future::sequential)
  scenario <- get_testing_scenario()
  e <- eval(parse(text = scenario$envir))
  load_mtcars_example(envir = e)
  e$my_plan$hpc <- e$my_plan$target != "regression1_large"
  backends <- rep("future", 2)
  caching <- c("main", "worker")
  for (i in seq_along(backends)) {
    clean(destroy = TRUE)
    make(
      e$my_plan,
      envir = e,
      parallelism = backends[i],
      caching = caching[i],
      jobs = 1,
      verbose = 0L,
      session_info = FALSE,
      lock_envir = TRUE
    )
    config <- drake_config(
      e$my_plan,
      envir = e,
      parallelism = backends[i],
      caching = caching[i],
      jobs = 1,
      verbose = 0L,
      session_info = FALSE,
      lock_envir = TRUE
    )
    expect_equal(
      outdated_impl(config),
      character(0)
    )
    e$my_plan$command[[2]] <- as.call(
      c(quote(identity), e$my_plan$command[[2]])
    )
    make(
      e$my_plan,
      envir = e,
      parallelism = backends[i],
      caching = caching[i],
      jobs = 1,
      verbose = 0L,
      session_info = FALSE,
      lock_envir = TRUE
    )
    expect_equal(justbuilt(config), "small")
  }

  # Stuff is already up to date.
  for (i in seq_along(backends)) {
    make(
      e$my_plan,
      envir = e,
      parallelism = backends[i],
      caching = caching[i],
      jobs = 1,
      verbose = 0L,
      session_info = FALSE,
      lock_envir = TRUE
    )
    config <- drake_config(
      e$my_plan,
      envir = e,
      parallelism = backends[i],
      caching = caching[i],
      jobs = 1,
      verbose = 0L,
      session_info = FALSE,
      lock_envir = TRUE
    )
    nobuild(config)
  }

  # Workers can wait for dependencies.
  skip_on_os("windows")
  for (i in 1:2) {
    e$my_plan$command[[2]] <- as.call(
      c(quote(identity), quote({
        Sys.sleep(2); simulate(48) # nolint
      }))
    )
    future::plan(future::multisession)
    make(
      e$my_plan,
      envir = e,
      parallelism = backends[i],
      caching = caching[i],
      jobs = 2,
      verbose = 0L,
      session_info = FALSE
    )
    clean(destroy = TRUE)
  }
})

test_with_dir("can gracefully conclude a crashed worker", {
  skip_on_cran() # CRAN gets essential tests only (check time limits).
  skip_if_not_installed("future")
  for (caching in c("main", "worker")) {
    con <- dbug()
    con$envir_graph$graph <- con$graph
    con$caching <- caching
    worker <- structure(list(), target = "myinput")
    class(worker) <- "Future"
    expect_false(is_empty_worker(worker))
    expect_error(future::value(worker))
    con$queue <- priority_queue(con)
    expect_error(
      suppressWarnings(conclude_worker(worker, con)),
      regexp = "failed"
    )
    meta <- diagnose(myinput)
    expect_true(
      grepl(
        "future worker terminated",
        meta$error$message,
        fixed = TRUE
      )
    )
    clean(destroy = TRUE)
  }
})

test_with_dir("ft_no_target()", {
  skip_on_cran()
  con <- dbug()
  con$sleeps <- new.env(parent = emptyenv())
  con$sleeps$count <- 1L
  ft_no_target(con)
  expect_equal(con$sleeps$count, 2L)
})
wlandau-lilly/drake documentation built on March 6, 2024, 8:18 a.m.