R/parallel.R

Defines functions queue_teardown queue_task queue_process_setup queue_setup replay_events parallel_event_loop_chunky parallel_event_loop_smooth default_num_cpus test_files_reporter_parallel test_files_parallel

# +-----------------------------+     +-------------------------------+
# | Main R process              |     | Subprocess 1                  |
# | +------------------------+  |     | +---------------------------+ |
# | | test_dir_parallel()    |  |     | | test_file()               | |
# | | +-------------------+  |  |     | | +-----------------------+ | |
# | | | Event loop        |< ------+  | | | SubprocessReporter    | | |
# | | +-------------------+  |  |  |  | | | +-------------------+ | | |
# | |    |                   |  |  |  | | | | test_that()       | | | |
# | |    v                   |  |  |  | | | +-------------------+ | | |
# | | +-------------------+  |  |  |  | | |    |                  | | |
# | | | Progress2Reporter |  |  |  |  | | |    v                  | | |
# | | +-------------------+  |  |  |  | | | +-------------------+ | | |
# | +------------------------+  |  |--------| signalCondition() | | | |
# +-----------------------------+  |  | | | +-------------------+ | | |
#                                  |  | | +-----------------------+ | |
#                                  |  | +---------------------------+ |
#                                  |  +-------------------------------+
#                                  |  +-------------------------------+
#                                  |--| Subprocess 2                  |
#                                  |  +-------------------------------+
#                                  |  +-------------------------------+
#                                  +--| Subprocess 3                  |
#                                     +-------------------------------+
#                                       ...
#
# ## Notes
#
# * Subprocesses run `callr::r_session` R sessions. They are re-used,
#   one R session can be used for several test_file() calls.
# * Helper and setup files are loaded in the subprocesses after this.
# * The main process puts all test files in the task queue, and then
#   runs an event loop.

test_files_parallel <- function(
                       test_dir,
                       test_package,
                       test_paths,
                       load_helpers = TRUE,
                       reporter = default_parallel_reporter(),
                       env = NULL,
                       stop_on_failure = FALSE,
                       stop_on_warning = FALSE,
                       wrap = TRUE,  # unused, to match test_files signature
                       load_package = c("none", "installed", "source")
                       ) {

  # TODO: support timeouts. 20-30s for each file by default?

  num_workers <- min(default_num_cpus(), length(test_paths))
  inform(paste0(
    "Starting ", num_workers, " test process",
    if (num_workers != 1) "es"
  ))

  # Set up work queue ------------------------------------------
  queue <- NULL
  withr::defer(queue_teardown(queue))

  # Start workers in parallel and add test tasks to queue.
  queue <- queue_setup(
    test_paths = test_paths,
    test_package = test_package,
    test_dir = test_dir,
    load_helpers = load_helpers,
    num_workers = num_workers,
    load_package = load_package
  )

  withr::with_dir(test_dir, {
    reporters <- test_files_reporter_parallel(reporter)
    with_reporter(reporters$multi, {
      parallel_updates <- reporter$capabilities$parallel_updates
      if (parallel_updates) {
        parallel_event_loop_smooth(queue, reporters, ".")
      } else {
        parallel_event_loop_chunky(queue, reporters, ".")
      }
    })

    test_files_check(reporters$list$get_results(),
      stop_on_failure = stop_on_failure,
      stop_on_warning = stop_on_warning
    )
  })
}

test_files_reporter_parallel <- function(reporter, .env = parent.frame()) {
  lister <- ListReporter$new()
  snapshotter <- MainprocessSnapshotReporter$new("_snaps", fail_on_new = FALSE)
  reporters <- list(
    find_reporter(reporter),
    lister, # track data
    snapshotter
  )
  withr::local_options(
    "testthat.snapshotter" = snapshotter,
    .local_envir = .env
  )
  list(
    multi = MultiReporter$new(reporters = compact(reporters)),
    list = lister
  )
}

default_num_cpus <- function() {
  # Use common option, if set
  ncpus <- getOption("Ncpus", NULL)
  if (!is.null(ncpus)) {
    ncpus <- suppressWarnings(as.integer(ncpus))
    if (is.na(ncpus)) abort("`getOption(Ncpus)` must be an integer")
    return(ncpus)
  }

  # Otherwise use env var if set
  ncpus <- Sys.getenv("TESTTHAT_CPUS", "")
  if (ncpus != "") {
    ncpus <- suppressWarnings(as.integer(ncpus))
    if (is.na(ncpus)) abort("TESTTHAT_CPUS must be an integer")
    return(ncpus)
  }

  # Otherwise 2
  2L
}

parallel_event_loop_smooth <- function(queue, reporters, test_dir) {
  update_interval <- 0.1
  next_update <- proc.time()[[3]] + update_interval

  while (!queue$is_idle()) {
    # How much time do we have to poll before the next UI update?
    now <- proc.time()[[3]]
    poll_time <- max(next_update - now, 0)
    next_update <- now + update_interval

    msgs <- queue$poll(poll_time)

    updated <- FALSE
    for (x in msgs) {
      if (x$code != PROCESS_MSG) {
        next
      }

      m <- x$message
      if (!inherits(m, "testthat_message")) {
        message(m)
        next
      }

      if (m$cmd != "DONE") {
        reporters$multi$start_file(m$filename)
        reporters$multi$start_test(m$context, m$test)
        if (m$type == "snapshotter") {
          snapshotter <- getOption("testthat.snapshotter")
          do.call(snapshotter[[m$cmd]], m$args)
        } else {
          do.call(reporters$multi[[m$cmd]], m$args)
          updated <- TRUE
        }
      }
    }

    # We need to spin, even if there were no events
    if (!updated) {
      reporters$multi$update()
    }
  }
}

parallel_event_loop_chunky <- function(queue, reporters, test_dir) {
  files <- list()
  while (!queue$is_idle()) {
    msgs <- queue$poll(Inf)
    for (x in msgs) {
      if (x$code != PROCESS_MSG) {
        next
      }

      m <- x$message
      if (!inherits(m, "testthat_message")) {
        message(m)
        next
      }

      # Record all events until we get end of file, then we replay them all
      # with the local reporters. This prevents out of order reporting.
      if (m$cmd != "DONE") {
        files[[m$filename]] <- append(files[[m$filename]], list(m))
      } else {
        replay_events(reporters$multi, files[[m$filename]])
        reporters$multi$end_context_if_started()
        files[[m$filename]] <- NULL
      }
    }
  }
}

replay_events <- function(reporter, events) {
  snapshotter <- getOption("testthat.snapshotter")
  for (m in events) {
    if (m$type == "snapshotter") {
      do.call(snapshotter[[m$cmd]], m$args)
    } else {
      do.call(reporter[[m$cmd]], m$args)
    }
  }
}

queue_setup <- function(test_paths,
                        test_package,
                        test_dir,
                        num_workers,
                        load_helpers,
                        load_package) {

  # TODO: observe `load_package`, but the "none" default is not
  # OK for the subprocess, because it'll not have the tested package
  if (load_package == "none") load_package <- "source"

  # TODO: similarly, load_helpers = FALSE, coming from devtools,
  # is not appropriate in the subprocess
  load_helpers <- TRUE

  test_package <- test_package %||% Sys.getenv("TESTTHAT_PKG")

  hyperlinks <- cli::ansi_has_hyperlink_support()

  # First we load the package "manually", in case it is testthat itself
  load_hook <- expr({
    switch(!!load_package,
      installed = library(!!test_package, character.only = TRUE),
      source = pkgload::load_all(!!test_dir, helpers = FALSE, quiet = TRUE)
    )

    # Ensure snapshot can generate hyperlinks for snapshot_accept()
    options(
      cli.hyperlink = !!hyperlinks,
      cli.hyperlink_run = !!hyperlinks
    )

    asNamespace("testthat")$queue_process_setup(
      test_package = !!test_package,
      test_dir = !!test_dir,
      load_helpers = !!load_helpers,
      load_package = "none"
    )
  })
  queue <- task_q$new(concurrency = num_workers, load_hook = load_hook)

  fun <- transport_fun(function(path) asNamespace("testthat")$queue_task(path))
  for (path in test_paths) {
    queue$push(fun, list(path))
  }

  queue
}

queue_process_setup <- function(test_package, test_dir, load_helpers, load_package) {
  env <- asNamespace("testthat")$test_files_setup_env(
    test_package,
    test_dir,
    load_package
  )

  asNamespace("testthat")$test_files_setup_state(
    test_dir = test_dir,
    test_package = test_package,
    load_helpers = load_helpers,
    env = env,
    frame = .GlobalEnv
  )

  # record testing env for mocks & queue_task
  # manual implementation of local_testing_env()
  the$testing_env <- env
}

queue_task <- function(path) {
  withr::local_envvar("TESTTHAT_IS_PARALLEL" = "true")
  snapshotter <- SubprocessSnapshotReporter$new(
    snap_dir = "_snaps",
    fail_on_new = FALSE
  )
  withr::local_options(testthat.snapshotter = snapshotter)
  reporters <- list(
    SubprocessReporter$new(),
    snapshotter
  )
  multi <- MultiReporter$new(reporters = reporters)
  with_reporter(multi, test_one_file(path, env = the$testing_env))
  NULL
}

# Clean up subprocesses: we call teardown methods, but we only give them a
# second, before killing the whole process tree using ps's env var marker
# method.
queue_teardown <- function(queue) {
  if (is.null(queue)) {
    return()
  }

  tasks <- queue$list_tasks()
  num <- nrow(tasks)

  clean_fn <- function() {
    withr::deferred_run(.GlobalEnv)
    quit(save = "no", status = 1L, runLast = TRUE)
  }

  topoll <- list()
  for (i in seq_len(num)) {
    if (!is.null(tasks$worker[[i]])) {
      # The worker might have crashed or exited, so this might fail.
      # If it does then we'll just ignore that worker
      tryCatch({
        tasks$worker[[i]]$call(clean_fn)
        topoll <- c(topoll, tasks$worker[[i]]$get_poll_connection())
      }, error = function(e) tasks$worker[i] <- list(NULL))
    }
  }

  # Give covr time to write out the coverage files
  if (in_covr()) grace <- 30L else grace <- 3L
  limit <- Sys.time() + grace
  while (length(topoll) > 0 && (timeout <- limit - Sys.time()) > 0) {
    timeout <- as.double(timeout, units = "secs") * 1000
    pr <- processx::poll(topoll, as.integer(timeout))
    topoll <- topoll[pr != "ready"]
  }

  for (i in seq_len(num)) {
    if (!is.null(tasks$worker[[i]])) {
      tryCatch(
        close(tasks$worker[[i]]$get_input_connection()),
        error = function(e) NULL
      )
      if (ps::ps_is_supported()) {
        tasks$worker[[i]]$kill_tree()
      } else {
        tasks$worker[[i]]$kill()
      }
    }
  }
}

# Reporter that just forwards events in the subprocess back to the main process
#
# Ideally, these messages would be throttled, i.e. if the test code
# emits many expectation conditions fast, SubprocessReporter should
# collect several of them and only emit a condition a couple of times
# a second. End-of-test and end-of-file events would be transmitted
# immediately.
SubprocessReporter <- R6::R6Class("SubprocessReporter",
  inherit = Reporter,
  public = list(
    start_file = function(filename) {
      private$filename <- filename
      private$event("start_file", filename)
    },
    start_test = function(context, test) {
      private$context <- context
      private$test <- test
      private$event("start_test", context, test)
    },
    start_context = function(context) {
      private$context <- context
      private$event("start_context", context)
    },
    add_result = function(context, test, result) {
      if (inherits(result, "expectation_success")) {
        # Strip bulky components to reduce data transfer cost
        result[["srcref"]] <- NULL
        result[["trace"]] <- NULL
      }
      private$event("add_result", context, test, result)
    },
    end_test = function(context, test) {
      private$event("end_test", context, test)
    },
    end_context = function(context) {
      private$event("end_context", context)
    },
    end_file = function() {
      private$event("end_file")
    },
    end_reporter = function() {
      private$event("DONE")
    }
  ),

  private = list(
    filename = NULL,
    context = NULL,
    test = NULL,
    event = function(cmd, ...) {
      msg <- list(
        code = PROCESS_MSG,
        type = "reporter",
        cmd = cmd,
        filename = private$filename,
        context = private$context,
        test = private$test,
        time = proc.time()[[3]],
        args = list(...)
      )
      class(msg) <- c("testthat_message", "callr_message", "condition")
      signalCondition(msg)
    }
  )
)

Try the testthat package in your browser

Any scripts or data that you put into this service are public.

testthat documentation built on Oct. 6, 2023, 5:10 p.m.