test_that("Don't wait", {
obj <- test_rrq()
res <- test_worker_spawn(obj, 4, timeout = 0)
expect_s3_class(res, "rrq_worker_manager")
expect_type(res$id, "character")
expect_match(res$id, "_[0-9]+$")
ans <- withVisible(res$wait_alive(timeout = 10, time_poll = 0.1))
expect_false(ans$visible)
expect_s3_class(ans$value, "difftime")
## Can call again with no ill effect:
expect_s3_class(
res$wait_alive(timeout = 10, time_poll = 0.1),
"difftime")
})
test_that("failed spawn", {
skip_on_covr()
skip_on_windows()
root <- tempfile()
obj <- test_rrq("myfuns.R", root, verbose = TRUE)
unlink(file.path(root, "myfuns.R"))
err <- expect_error(
suppressMessages(rrq_worker_spawn(timeout = 5, controller = obj)),
"Worker died")
expect_length(err$logs, 1)
})
test_that("read worker process log", {
obj <- test_rrq(verbose = TRUE)
w <- test_worker_spawn(obj, 1)
rrq_message_send_and_wait("STOP", controller = obj)
txt <- rrq_worker_process_log(w$id, controller = obj)
expect_type(txt, "character")
expect_match(txt, "ALIVE", all = FALSE)
expect_equal(txt, w$logs(1))
})
test_that("wait for worker exit", {
obj <- test_rrq("myfuns.R", timeout_worker_stop = 0)
w <- test_worker_spawn(obj)
con <- obj$con # save a copy
queue_id <- obj$queue_id
rrq_destroy(timeout_worker_stop = 0.5, controller = obj)
expect_equal(
redux::scan_find(con, paste0(queue_id, ":*")),
character(0))
})
test_that("error if we try to interact with non-managed worker", {
obj <- test_rrq("myfuns.R")
w <- test_worker_spawn(obj)
expect_error(
w$logs(2),
"Worker not controlled by this manager: '.+_2'")
expect_error(
w$logs("fred"),
"Worker not controlled by this manager: 'fred'")
})
test_that("can wait on manually spawned workers", {
obj <- test_rrq("myfuns.R")
queue_id <- obj$queue_id
worker_ids <- sprintf("%s_%d", ids::adjective_animal(), 1:2)
expect_error(
suppressMessages(rrq_worker_wait(worker_ids, timeout = 0, time_poll = 1,
progress = FALSE, controller = obj)),
"All 2 workers not ready in time")
p1 <- callr::r_bg(function(queue_id, worker_id) {
rrq::rrq_worker$new(queue_id, worker_id = worker_id)$loop()
}, list(queue_id, worker_ids[[1]]), package = TRUE, cleanup = FALSE)
p2 <- callr::r_bg(function(queue_id, worker_id) {
rrq::rrq_worker$new(queue_id, worker_id = worker_id)$loop()
}, list(queue_id, worker_ids[[2]]), package = TRUE, cleanup = FALSE)
res <- rrq_worker_wait(worker_ids, timeout = 5, time_poll = 0.1,
progress = FALSE, controller = obj)
expect_s3_class(res, "difftime")
## If we're unlucky GC will happen within a loop here, so be kind
## and try a couple of times. Potentially problemetic as part of the
## coverage build, which is very slow.
testthat::try_again(
5,
expect_lt(rrq_worker_wait(worker_ids, timeout = 5, time_poll = 0.1,
progress = FALSE, controller = obj),
0.5))
})
test_that("Can build fallback where no logs present", {
expect_equal(
worker_format_failed_logs(list(logs = NULL)),
c("!" = "Logging not enabled for these workers"))
})
test_that("Can format logs for missing workers", {
expect_equal(
worker_format_failed_logs(list(logs = list(alice = c("a", "b")))),
c(i = "Log files recovered for 1 worker",
">" = "alice", "a", "b"))
expect_equal(
worker_format_failed_logs(list(logs = list(alice = c("a", "b"),
bob = "c"))),
c(i = "Log files recovered for 2 workers",
">" = "alice", "a", "b", "",
">" = "bob", "c"))
})
test_that("can provide informative error messages on worker spawn failure", {
expect_error(
abort_workers_not_ready("died", NULL),
"Worker died")
expect_error(
abort_workers_not_ready(rep("died", 3), NULL),
"All 3 workers died")
expect_error(
abort_workers_not_ready(c("died", "died", "ready", "waiting"), NULL),
"2 / 4 workers died")
expect_error(
abort_workers_not_ready("waiting", NULL),
"Worker not ready in time")
expect_error(
abort_workers_not_ready(rep("waiting", 3), NULL),
"All 3 workers not ready in time")
expect_error(
abort_workers_not_ready(c("timeout", "timeout", "running"), NULL),
"2 / 3 workers not ready in time")
})
test_that("report back correctly", {
skip_if_not_installed("mockery")
root <- tempfile()
obj <- test_rrq("myfuns.R", root, verbose = TRUE)
worker_ids <- paste(ids::adjective_animal(), 1:2, sep = "_")
path_logs <- withr::local_tempdir()
writeLines(c("a", "b"), file.path(path_logs, worker_ids[[1]]))
writeLines(c("c", "d"), file.path(path_logs, worker_ids[[2]]))
fetch_logs <- function(id) {
readLines(file.path(path_logs, id))
}
mock_logwatch <- mockery::mock(list(status = c("died", "died")))
mockery::stub(worker_wait_alive, "logwatch::logwatch", mock_logwatch)
err <- expect_error(
worker_wait_alive(obj, worker_ids, is_dead = NULL, fetch_logs = fetch_logs,
timeout = 0, time_poll = 1, progress = FALSE),
"All 2 workers died")
mockery::expect_called(mock_logwatch, 1)
expect_equal(err$logs, set_names(list(c("a", "b"), c("c", "d")), worker_ids))
expect_equal(err$footer, worker_format_failed_logs)
})
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.