tests/testthat/test-zzz-future_promise.R

skip_on_cran()
skip_on_os("windows") # timing is not consistent on Windows GHA

skip_if_not_installed("future", "1.21.0")
skip_if_not_installed("fastmap", "1.1.0")

source(test_path("common.R"))

local({
  ## Setup ##

  n_workers <- 2
  # Set up a plan with 2 future workers
  with_test_workers <- function(code) {
    # (Can not use a variable for workers if in a local({}))
    old_plan <- future::plan(future::multisession(workers = 2))
    on.exit({future::plan(old_plan)}, add = TRUE)

    force(code)
  }


  start <- Sys.time()
  time_diffs <- c()

  reset_baselines <- function() {
    start <<- Sys.time()
    time_diffs <<- c()
  }

  time_diff <- function() {
    difftime(Sys.time(), start, units = "secs")
  }

  # This function will print every `delay` seconds
  # The thing to notes is that the function will not execute unless the main R session is free
  # We should expect to see `run_every()` statements interleaved with `future_promise()` calls
  # We should NOT expect to see `run_every()` statements while `future::future()` is blocking the main R session
  run_every <- function(i = 0, max = 2 / delay, delay = 0.1) {
    if (i > max) return()
    time_diffs <<- c(time_diffs, time_diff())
    # Do it again, later
    later::later(function() { run_every(i + 1, max = max, delay = delay) }, delay = delay)
  }


  worker_jobs <- 8
  # allow for more time on CI (4s on CI vs 1s locally)
  worker_job_time <- if (on_ci) 4 else 1
  expected_total_time <- worker_jobs * worker_job_time / n_workers

  do_future_test <- function(
    prom_fn = future_promise,
    # Introduce extra future workers mid execution
    block_mid_session = FALSE,
    # expect that the average finish lag time is less than 2 * n_time
    expect_reasonable_exec_lag_time = TRUE,
    # expect the lapply to finish in less than 1s
    expect_immediate_lapply = TRUE,
    # expect `run_every()` delay to be < 1s (Expected 0.1s)
    expect_no_main_blocking = TRUE
  ) {

    with_test_workers({
      # prep future sessions
      f1 <- future::future({1})
      f2 <- future::future({2})
      c(future::value(future::resolve(f1)), future::value(future::resolve(f2)))

      expect_true(future_worker_is_free())
      expect_equal(future::nbrOfWorkers(), 2)
      expect_equal(future::nbrOfFreeWorkers(), 2)

      reset_baselines()
      run_every()

      future_exec_times <- c()
      if (block_mid_session) {
        # Have `future` block the main R session 1 second into execution
        lapply(seq_len(worker_jobs), function(i) {
          later::later(
            function() {
              future::future({
                Sys.sleep(worker_job_time)
                time_diff()
              }) %...>% {
                future_exec_times <<- c(future_exec_times, .)
              }
            },
            delay = 1
          )
        })
      }

      exec_times <- NA
      lapply(seq_len(worker_jobs), function(i) {
        prom_fn({
          Sys.sleep(worker_job_time)
          time_diff()
        })
      }) %>%
        promise_all(.list = .) %...>% {
          exec_times <<- unlist(.)
        }
      post_lapply_time_diff <- time_diff()

      wait_for_it()

      # expect that the average time is less than the expected total time
      expect_equal(median(exec_times) < expected_total_time, !block_mid_session)

      # expect prom_fn to take a reasonable amount of time to finish
      exec_times_lag <- exec_times[-1] - exec_times[-length(exec_times)]
      expect_equal(all(exec_times_lag < (2 * worker_job_time)), expect_reasonable_exec_lag_time)

      # post_lapply_time_diff should be ~ 0s
      expect_equal(post_lapply_time_diff < (worker_job_time * ((worker_jobs - n_workers) / n_workers)), expect_immediate_lapply)

      # time_diffs should never grow by more than 1s; (Expected 0.1)
      time_diffs_lag <- time_diffs[-1] - time_diffs[-length(time_diffs)]
      expect_equal(all(time_diffs_lag < worker_job_time), expect_no_main_blocking)
    })

  }


  test_that("future_promise() allows the main thread to keep the main R process open", {

    do_future_test(
      prom_fn = future_promise,
      # expect that the average finish lag time is less than 2 * n_time
      expect_reasonable_exec_lag_time = TRUE,
      # expect the lapply to finish in less than 1s
      expect_immediate_lapply = TRUE,
      # expect `run_every()` delay to be < 1s (Expected 0.1s)
      expect_no_main_blocking = TRUE
    )
  })


  test_that("future::future() does not keep the main process open when all workers are busy", {
    do_future_test(
      prom_fn = future::future,
      # expect that the average finish lag time is less than 2 * n_time
      expect_reasonable_exec_lag_time = TRUE,
      # expect the lapply to finish after 1s
      expect_immediate_lapply = FALSE,
      # expect one `run_every()` delay to be >= 1s (Expected 0.1s)
      expect_no_main_blocking = FALSE
    )
  })


  test_that("future_promise() recovers from losing all future workers", {

    do_future_test(
      prom_fn = future_promise,
      block_mid_session = TRUE,
      # expect that the average finish lag time is less than 2 * n_time
      # b/c `future::future()` blocks the main R session, expect >=1 unreasonable lag time
      expect_reasonable_exec_lag_time = FALSE,
      # expect the lapply to finish in less than 1s
      expect_immediate_lapply = TRUE,
      # expect one `run_every()` delay to be >= 1s (Expected 0.1s)
      # b/c `future::future()` blocks the main R session, expect >=1 unreasonable lag time
      expect_no_main_blocking = FALSE
    )
  })

  test_that("future_promise reports unhandled errors", {
    with_test_workers({
      err <- capture.output(type="message", {
        future_promise(stop("boom1"))
        wait_for_it()
      })
      expect_match(err, "boom1")
    })
  })

  test_that("future_promise doesn't report errors that have been handled", {
    with_test_workers({
      err <- capture.output(type="message", {
        future_promise(stop("boom1")) %>% then(onRejected = ~{})
        wait_for_it()
      })
      expect_equal(err, character(0))
    })
  })
})

Try the promises package in your browser

Any scripts or data that you put into this service are public.

promises documentation built on Aug. 10, 2023, 5:08 p.m.