Nothing
# /-----------------------------\ /-------------------------------\
# | Main R process | | Subprocess 1 |
# | +------------------------+ | | +---------------------------+ |
# | | test_dir_parallel() | | | | test_file() | |
# | | +-------------------+ | | | | +-----------------------+ | |
# | | | Event loop |< ------+ | | | SubprocessReporter | | |
# | | +-------------------+ | | | | | | +-------------------+ | | |
# | | | | | | | | | | test_that() | | | |
# | | v | | | | | | +-------------------+ | | |
# | | +-------------------+ | | | | | | | | | |
# | | | Progress2Reporter | | | | | | | v | | |
# | | +-------------------+ | | | | | | +-------------------+ | | |
# | +------------------------+ | |--------| signalCondition() | | | |
# \-----------------------------/ | | | | +-------------------+ | | |
# | | | +-----------------------+ | |
# | | +---------------------------+ |
# | \-------------------------------/
# | /-------------------------------\
# |--| Subprocess 2 |
# | \-------------------------------/
# | /-------------------------------\
# \--| Subprocess 3 |
# \-------------------------------/
#
# ## Notes
#
# * Subprocesses run `callr::r_session` R sessions. They are re-used,
# one R session can be used for several test_file() calls.
# * Helper and setup files are loaded in the subprocesses after this.
# * The main process puts all test files in the task queue, and then
# runs an event loop.
test_files_parallel <- function(
test_dir,
test_package,
test_paths,
load_helpers = TRUE,
reporter = default_reporter(TRUE),
env = NULL,
stop_on_failure = FALSE,
stop_on_warning = FALSE,
wrap = TRUE, # unused, to match test_files signature
load_package = c("none", "installed", "source"),
shuffle = FALSE
) {
# TODO: support timeouts. 20-30s for each file by default?
num_workers <- min(default_num_cpus(), length(test_paths))
cli::cli_inform("Starting {num_workers} test process{?es}.")
# Set up work queue ------------------------------------------
queue <- NULL
withr::defer(queue_teardown(queue))
# Start workers in parallel and add test tasks to queue.
queue <- queue_setup(
test_paths = test_paths,
test_package = test_package,
test_dir = test_dir,
load_helpers = load_helpers,
num_workers = num_workers,
load_package = load_package,
shuffle = shuffle
)
withr::with_dir(test_dir, {
reporters <- test_files_reporter(reporter, "parallel")
with_reporter(reporters$multi, {
parallel_updates <- reporter$capabilities$parallel_updates
if (parallel_updates) {
parallel_event_loop_smooth(queue, reporters, ".")
} else {
parallel_event_loop_chunky(queue, reporters, ".")
}
})
test_files_check(
reporters$list$get_results(),
stop_on_failure = stop_on_failure,
stop_on_warning = stop_on_warning
)
})
}
default_num_cpus <- function() {
# Use common option, if set
ncpus <- getOption("Ncpus", NULL)
if (!is.null(ncpus)) {
ncpus <- suppressWarnings(as.integer(ncpus))
if (is.na(ncpus)) {
cli::cli_abort(
"{.code getOption('Ncpus')} must be an integer.",
call = NULL
)
}
return(ncpus)
}
# Otherwise use env var if set
ncpus <- Sys.getenv("TESTTHAT_CPUS", "")
if (ncpus != "") {
ncpus <- suppressWarnings(as.integer(ncpus))
if (is.na(ncpus)) {
cli::cli_abort("{.envvar TESTTHAT_CPUS} must be an integer.")
}
return(ncpus)
}
# Otherwise 2
2L
}
parallel_event_loop_smooth <- function(queue, reporters, test_dir) {
update_interval <- 0.1
next_update <- proc.time()[[3]] + update_interval
while (!queue$is_idle()) {
# How much time do we have to poll before the next UI update?
now <- proc.time()[[3]]
poll_time <- max(next_update - now, 0)
next_update <- now + update_interval
msgs <- queue$poll(poll_time)
updated <- FALSE
for (x in msgs) {
if (x$code == PROCESS_OUTPUT) {
lns <- paste0("> ", x$path, ": ", x$message)
cat("\n", file = stdout())
base::writeLines(lns, stdout())
next
}
if (x$code != PROCESS_MSG) {
next
}
m <- x$message
if (!inherits(m, "testthat_message")) {
cli::cli_inform(as.character(m))
next
}
if (m$cmd != "DONE") {
reporters$multi$start_file(m$filename)
reporters$multi$start_test(m$context, m$test)
if (m$type == "snapshotter") {
snapshotter <- getOption("testthat.snapshotter")
do.call(snapshotter[[m$cmd]], m$args)
} else {
do.call(reporters$multi[[m$cmd]], m$args)
updated <- TRUE
}
}
}
# We need to spin, even if there were no events
if (!updated) {
reporters$multi$update()
}
}
}
parallel_event_loop_chunky <- function(queue, reporters, test_dir) {
files <- list()
while (!queue$is_idle()) {
msgs <- queue$poll(Inf)
for (x in msgs) {
if (x$code == PROCESS_OUTPUT) {
lns <- paste0("> ", x$path, ": ", x$message)
base::writeLines(lns, stdout())
next
}
if (x$code != PROCESS_MSG) {
next
}
m <- x$message
if (!inherits(m, "testthat_message")) {
cli::cli_inform(as.character(m))
next
}
# Record all events until we get end of file, then we replay them all
# with the local reporters. This prevents out of order reporting.
if (m$cmd != "DONE") {
files[[m$filename]] <- append(files[[m$filename]], list(m))
} else {
replay_events(reporters$multi, files[[m$filename]])
reporters$multi$end_context_if_started()
files[[m$filename]] <- NULL
}
}
}
}
replay_events <- function(reporter, events) {
snapshotter <- getOption("testthat.snapshotter")
for (m in events) {
if (m$type == "snapshotter") {
do.call(snapshotter[[m$cmd]], m$args)
} else {
do.call(reporter[[m$cmd]], m$args)
}
}
}
queue_setup <- function(
test_paths,
test_package,
test_dir,
num_workers,
load_helpers,
load_package,
shuffle = FALSE
) {
# TODO: observe `load_package`, but the "none" default is not
# OK for the subprocess, because it'll not have the tested package
if (load_package == "none") {
load_package <- "source"
}
# TODO: similarly, load_helpers = FALSE, coming from devtools,
# is not appropriate in the subprocess
load_helpers <- TRUE
test_package <- test_package %||% Sys.getenv("TESTTHAT_PKG")
hyperlinks <- cli::ansi_has_hyperlink_support()
# First we load the package "manually", in case it is testthat itself
load_hook <- expr({
switch(
!!load_package,
installed = library(!!test_package, character.only = TRUE),
source = pkgload::load_all(!!test_dir, helpers = FALSE, quiet = TRUE)
)
# Ensure snapshot can generate hyperlinks for snapshot_accept()
options(
cli.hyperlink = !!hyperlinks,
cli.hyperlink_run = !!hyperlinks
)
asNamespace("testthat")$queue_process_setup(
test_package = !!test_package,
test_dir = !!test_dir,
load_helpers = !!load_helpers,
load_package = "none"
)
})
queue <- task_q$new(concurrency = num_workers, load_hook = load_hook)
fun <- transport_fun(function(path, shuffle) {
asNamespace("testthat")$queue_task(path, shuffle)
})
for (path in test_paths) {
queue$push(fun, list(path, shuffle))
}
queue
}
queue_process_setup <- function(
test_package,
test_dir,
load_helpers,
load_package
) {
env <- asNamespace("testthat")$test_files_setup_env(
test_package,
test_dir,
load_package
)
asNamespace("testthat")$test_files_setup_state(
test_dir = test_dir,
test_package = test_package,
load_helpers = load_helpers,
env = env,
frame = .GlobalEnv
)
# record testing env for mocks & queue_task
# manual implementation of local_testing_env()
the$testing_env <- env
}
queue_task <- function(path, shuffle = FALSE) {
withr::local_envvar("TESTTHAT_IS_PARALLEL" = "true")
snapshotter <- SubprocessSnapshotReporter$new(snap_dir = "_snaps")
withr::local_options(testthat.snapshotter = snapshotter)
reporters <- list(
SubprocessReporter$new(),
snapshotter
)
multi <- MultiReporter$new(reporters = reporters)
with_reporter(
multi,
test_one_file(path, env = the$testing_env, shuffle = shuffle)
)
NULL
}
# Clean up subprocesses: we call teardown methods, but we only give them a
# second, before killing the whole process tree using ps's env var marker
# method.
queue_teardown <- function(queue) {
if (is.null(queue)) {
return()
}
tasks <- queue$list_tasks()
num <- nrow(tasks)
# calling quit() here creates a race condition, and the output of
# the deferred_run() might be lost. Instead we close the input
# connection in a separate task.
clean_fn <- function() {
withr::deferred_run(.GlobalEnv)
}
topoll <- integer()
for (i in seq_len(num)) {
if (
!is.null(tasks$worker[[i]]) && tasks$worker[[i]]$get_state() == "idle"
) {
# The worker might have crashed or exited, so this might fail.
# If it does then we'll just ignore that worker
tryCatch(
{
tasks$worker[[i]]$call(clean_fn)
topoll <- c(topoll, i)
},
error = function(e) NULL
)
}
}
# Give covr a bit more time
if (in_covr()) {
grace <- 30L
} else {
grace <- 1L
}
first_error <- NULL
limit <- Sys.time() + grace
while (length(topoll) > 0 && (timeout <- limit - Sys.time()) > 0) {
timeout <- as.double(timeout, units = "secs") * 1000
conns <- lapply(tasks$worker[topoll], function(x) x$get_poll_connection())
pr <- unlist(processx::poll(conns, as.integer(timeout)))
for (i in which(pr == "ready")) {
msg <- tasks$worker[[topoll[i]]]$read()
first_error <- first_error %||% msg$error
}
topoll <- topoll[pr != "ready"]
}
topoll <- integer()
for (i in seq_len(num)) {
if (
!is.null(tasks$worker[[i]]) && tasks$worker[[i]]$get_state() == "idle"
) {
tryCatch(
{
close(tasks$worker[[i]]$get_input_connection())
topoll <- c(topoll, i)
},
error = function(e) NULL
)
}
}
limit <- Sys.time() + grace
while (length(topoll) > 0 && (timeout <- limit - Sys.time()) > 0) {
timeout <- as.double(timeout, units = "secs") * 1000
conns <- lapply(tasks$worker[topoll], function(x) x$get_poll_connection())
pr <- unlist(processx::poll(conns, as.integer(timeout)))
topoll <- topoll[pr != "ready"]
}
for (i in seq_len(num)) {
if (!is.null(tasks$worker[[i]])) {
if (ps::ps_is_supported()) {
tryCatch(tasks$worker[[i]]$kill_tree(), error = function(e) NULL)
} else {
tryCatch(tasks$worker[[i]]$kill(), error = function(e) NULL)
}
}
}
if (!is.null(first_error)) {
cli::cli_abort(
"At least one parallel worker failed to run teardown",
parent = first_error
)
}
}
# Reporter that just forwards events in the subprocess back to the main process
#
# Ideally, these messages would be throttled, i.e. if the test code
# emits many expectation conditions fast, SubprocessReporter should
# collect several of them and only emit a condition a couple of times
# a second. End-of-test and end-of-file events would be transmitted
# immediately.
SubprocessReporter <- R6::R6Class(
"SubprocessReporter",
inherit = Reporter,
public = list(
start_file = function(filename) {
private$filename <- filename
private$event("start_file", filename)
},
start_test = function(context, test) {
private$context <- context
private$test <- test
private$event("start_test", context, test)
},
start_context = function(context) {
private$context <- context
private$event("start_context", context)
},
add_result = function(context, test, result) {
if (inherits(result, "expectation_success")) {
# Strip bulky components to reduce data transfer cost
result[["srcref"]] <- NULL
result[["trace"]] <- NULL
}
private$event("add_result", context, test, result)
},
end_test = function(context, test) {
private$event("end_test", context, test)
},
end_context = function(context) {
private$event("end_context", context)
},
end_file = function() {
private$event("end_file")
},
end_reporter = function() {
private$event("DONE")
}
),
private = list(
filename = NULL,
context = NULL,
test = NULL,
event = function(cmd, ...) {
msg <- list(
code = PROCESS_MSG,
type = "reporter",
cmd = cmd,
filename = private$filename,
context = private$context,
test = private$test,
time = proc.time()[[3]],
args = list(...)
)
class(msg) <- c("testthat_message", "callr_message", "condition")
signalCondition(msg)
}
)
)
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.