test_that("empty", {
obj <- test_rrq()
expect_s3_class(obj, "rrq_controller")
expect_equal(rrq_worker_list(controller = obj), character(0))
expect_equal(rrq_task_list(controller = obj), character(0))
expect_equal(rrq_worker_len(controller = obj), 0)
expect_equal(rrq_queue_length(controller = obj), 0)
expect_equal(rrq_queue_list(controller = obj), character(0))
id <- rrq_task_create_expr(sin(1), controller = obj)
expect_equal(rrq_task_list(controller = obj), id)
expect_equal(rrq_queue_list(controller = obj), id)
expect_equal(rrq_task_status(id, controller = obj), TASK_PENDING)
expect_equal(
rrq_worker_log_tail(controller = obj),
data_frame(worker_id = character(0),
child = integer(0),
time = numeric(0),
command = character(0),
message = character(0)))
test_queue_clean(obj$queue_id)
})
test_that("basic use", {
obj <- test_rrq("myfuns.R")
w <- test_worker_blocking(obj)
t <- rrq_task_create_expr(slowdouble(0.1), controller = obj)
expect_type(t, "character")
w$step(TRUE)
expect_true(rrq_task_wait(t, 2, controller = obj))
expect_equal(rrq_task_result(t, controller = obj), 0.2)
})
test_that("task errors are returned", {
obj <- test_rrq("myfuns.R")
w <- test_worker_blocking(obj)
t1 <- rrq_task_create_expr(only_positive(1), controller = obj)
w$step(TRUE)
expect_equal(rrq_task_result(t1, controller = obj), 1)
t2 <- rrq_task_create_expr(only_positive(-1), controller = obj)
w$step(TRUE)
res <- rrq_task_result(t2, controller = obj)
expect_s3_class(res, "rrq_task_error")
expect_null(res$warnings)
expect_equal(res$task_id, t2)
expect_equal(res$queue_id, obj$queue_id)
expect_equal(res$status, TASK_ERROR)
t3 <- rrq_task_create_expr(nonexistant_function(-1), controller = obj)
w$step(TRUE)
res <- rrq_task_result(t3, controller = obj)
expect_s3_class(res, "rrq_task_error")
expect_null(res$warnings)
expect_equal(res$task_id, t3)
expect_equal(res$queue_id, obj$queue_id)
expect_equal(res$status, TASK_ERROR)
})
test_that("task warnings are returned", {
obj <- test_rrq("myfuns.R")
w <- test_worker_blocking(obj)
t1 <- rrq_task_create_expr(warning_then_error(2), controller = obj)
w$step(TRUE)
r1 <- rrq_task_result(t1, controller = obj)
expect_s3_class(r1, "rrq_task_error")
expect_type(r1$warnings, "list")
expect_equal(length(r1$warnings), 2)
expect_equal(conditionMessage(r1$warnings[[1]]),
"This is warning number 1")
expect_equal(conditionMessage(r1$warnings[[2]]),
"This is warning number 2")
expect_s3_class(r1$trace, "rlang_trace")
expect_match(format(r1$trace), "warning_then_error", all = FALSE)
})
test_that("task_position", {
obj <- test_rrq("myfuns.R")
t1 <- rrq_task_create_expr(sin(1), controller = obj)
t2 <- rrq_task_create_expr(sin(1), controller = obj)
t3 <- rrq_task_create_expr(sin(1), controller = obj)
expect_equal(rrq_task_position(t1, controller = obj), 1L)
expect_equal(rrq_task_position(c(t1, t2, t3), controller = obj),
c(1L, 2L, 3L))
expect_equal(rrq_task_position("not a real task", controller = obj), 0L)
expect_equal(
rrq_task_position("not a real task", NA_integer_, controller = obj),
NA_integer_)
expect_equal(
rrq_task_position(c(t1, "not a real task"), NA_integer_, controller = obj),
c(1L, NA_integer_))
})
test_that("task_preceeding", {
obj <- test_rrq("myfuns.R")
t1 <- rrq_task_create_expr(sin(1), controller = obj)
t2 <- rrq_task_create_expr(sin(1), controller = obj)
t3 <- rrq_task_create_expr(sin(1), controller = obj)
expect_equal(rrq_task_preceeding(t1, controller = obj), character(0))
expect_equal(rrq_task_preceeding(t2, controller = obj), t1)
expect_equal(rrq_task_preceeding(t3, controller = obj), c(t1, t2))
expect_null(rrq_task_preceeding("not a real task", controller = obj))
w <- test_worker_spawn(obj)
rrq_task_wait(t3, controller = obj)
expect_null(rrq_task_preceeding(t3, controller = obj))
})
test_that("task_position and task_preceeding are consistent", {
obj <- test_rrq("myfuns.R")
t1 <- rrq_task_create_expr(sin(1), controller = obj)
t2 <- rrq_task_create_expr(sin(1), controller = obj)
t3 <- rrq_task_create_expr(sin(1), controller = obj)
expect_equal(length(rrq_task_preceeding(t1, controller = obj)),
rrq_task_position(t1, controller = obj) - 1)
expect_equal(length(rrq_task_preceeding(t2, controller = obj)),
rrq_task_position(t2, controller = obj) - 1)
expect_equal(length(rrq_task_preceeding(t3, controller = obj)),
rrq_task_position(t3, controller = obj) - 1)
})
test_that("task_overview", {
obj <- test_rrq("myfuns.R")
expect_equal(
rrq_task_overview(controller = obj),
list(PENDING = 0, RUNNING = 0, COMPLETE = 0, ERROR = 0, CANCELLED = 0,
DIED = 0, TIMEOUT = 0, IMPOSSIBLE = 0, DEFERRED = 0, MOVED = 0))
t1 <- rrq_task_create_expr(sin(1), controller = obj)
t2 <- rrq_task_create_expr(sin(1), controller = obj)
t3 <- rrq_task_create_expr(sin(1), controller = obj)
expect_equal(
rrq_task_overview(controller = obj),
list(PENDING = 3, RUNNING = 0, COMPLETE = 0, ERROR = 0, CANCELLED = 0,
DIED = 0, TIMEOUT = 0, IMPOSSIBLE = 0, DEFERRED = 0, MOVED = 0))
})
test_that("wait for tasks", {
obj <- test_rrq("myfuns.R")
w <- test_worker_spawn(obj)
t1 <- rrq_task_create_expr(1 + 1, controller = obj)
t2 <- rrq_task_create_expr(2 + 2, controller = obj)
expect_true(rrq_task_wait(c(t1, t2), controller = obj))
expect_equal(rrq_task_results(c(t1, t2), controller = obj), list(2, 4))
## Slightly slower jobs:
t3 <- rrq_task_create_expr(slowdouble(0.1), controller = obj)
t4 <- rrq_task_create_expr(slowdouble(0.1), controller = obj)
expect_true(rrq_task_wait(c(t3, t4), controller = obj))
expect_equal(rrq_task_results(c(t3, t4), controller = obj), list(0.2, 0.2))
expect_error(
rrq_task_wait(c(t1, t2), time_poll = -1, controller = obj),
"'time_poll' cannot be less than 0")
})
test_that("task delete", {
obj <- test_rrq("myfuns.R")
t1 <- rrq_task_create_expr(1 + 1, controller = obj)
t2 <- rrq_task_create_expr(2 + 2, controller = obj)
t3 <- rrq_task_create_expr(3 + 3, controller = obj)
expect_setequal(rrq_task_list(controller = obj), c(t1, t2, t3))
rrq_task_delete(t1, controller = obj)
expect_setequal(rrq_task_list(controller = obj), c(t2, t3))
rrq_task_delete(c(t2, t3), controller = obj)
expect_setequal(rrq_task_list(controller = obj), character(0))
})
test_that("stop worker", {
obj <- test_rrq("myfuns.R")
w <- test_worker_blocking(obj)
rrq_worker_stop(controller = obj)
expect_error(
w$step(), "BYE", class = "rrq_worker_stop")
})
test_that("Can't read logs unless enabled", {
obj <- test_rrq("myfuns.R")
w <- test_worker_blocking(obj)
expect_error(
rrq_worker_process_log(w$id, controller = obj),
"Process log not enabled for this worker")
})
test_that("Can't delete running tasks", {
obj <- test_rrq()
w <- test_worker_blocking(obj)
id <- rrq_task_create_expr(sin(1), controller = obj)
w$poll(TRUE)
worker_run_task_start(w, r6_private(w), id)
expect_error(
rrq_task_delete(id, controller = obj),
"Can't delete running tasks")
})
test_that("Error if results are not ready", {
obj <- test_rrq()
id1 <- rrq_task_create_expr(sin(1), controller = obj)
id2 <- rrq_task_create_expr(sin(1), controller = obj)
expect_error(rrq_task_result(id1, controller = obj),
sprintf("Missing result for task: '%s'", id1))
expect_error(rrq_task_results(id1, controller = obj),
sprintf("Missing result for task:\n - %s", id1))
expect_error(rrq_task_results(c(id1, id2), controller = obj),
sprintf("Missing result for task:\n - %s\n - %s", id1, id2))
})
test_that("worker load", {
obj <- test_rrq()
w1 <- test_worker_blocking(obj)
w2 <- test_worker_blocking(obj)
load <- rrq_worker_load(controller = obj)
expect_s3_class(load, "worker_load")
expect_setequal(load$worker_id, c(w1$id, w2$id))
avg <- mean(load)
expect_true(all(avg["used", ] == 0))
expect_true(all(avg["available", ] == 1))
})
test_that("change environment", {
create <- function(value) {
force(value)
function(envir) {
envir$x <- value
}
}
environment(create) <- .GlobalEnv # avoid serialisation warning
obj <- test_rrq()
rrq_worker_envir_set(create(1), controller = obj)
w <- test_worker_blocking(obj)
expect_equal(r6_private(w)$envir$x, 1)
rrq_worker_envir_set(NULL, controller = obj)
w$step(TRUE)
expect_equal(ls(r6_private(w)$envir), character(0))
})
test_that("queue remove", {
obj <- test_rrq()
t1 <- rrq_task_create_expr(sin(1), controller = obj)
t2 <- rrq_task_create_expr(sin(1), controller = obj)
t3 <- rrq_task_create_expr(sin(1), controller = obj)
expect_equal(rrq_queue_length(controller = obj), 3)
expect_equal(rrq_queue_remove(t2, controller = obj), TRUE)
expect_equal(rrq_queue_length(controller = obj), 2)
expect_equal(rrq_queue_list(controller = obj), c(t1, t3))
expect_equal(rrq_queue_remove(c(t1, t2, t3), controller = obj),
c(TRUE, FALSE, TRUE))
expect_equal(rrq_queue_remove(character(0), controller = obj), logical(0))
})
test_that("cancel queued task", {
obj <- test_rrq()
t <- rrq_task_create_expr(sqrt(1), controller = obj)
expect_equal(rrq_task_status(t, controller = obj), TASK_PENDING)
rrq_task_cancel(t, controller = obj)
expect_equal(rrq_task_status(t, controller = obj), TASK_CANCELLED)
expect_equal(rrq_queue_list(controller = obj), character(0))
})
test_that("cancel queued task from alternative queue", {
obj <- test_rrq()
t1 <- rrq_task_create_expr(sqrt(1), queue = "other", controller = obj)
t2 <- rrq_task_create_expr(sqrt(1), queue = "other", controller = obj)
t3 <- rrq_task_create_expr(sqrt(1), queue = "other", controller = obj)
expect_equal(rrq_queue_list("other", controller = obj), c(t1, t2, t3))
rrq_task_cancel(t2, controller = obj)
expect_equal(rrq_queue_list("other", controller = obj), c(t1, t3))
expect_equal(rrq_task_status(t2, controller = obj), TASK_CANCELLED)
})
test_that("can't cancel completed task", {
obj <- test_rrq()
w <- test_worker_blocking(obj)
t <- rrq_task_create_expr(sqrt(1), controller = obj)
w$step(TRUE)
expect_error(
rrq_task_cancel(t, controller = obj),
"Task [[:xdigit:]]{32} is not cancelable \\(COMPLETE\\)")
expect_equal(rrq_task_result(t, controller = obj), 1)
})
test_that("can't cancel nonexistant task", {
obj <- test_rrq()
id <- ids::random_id()
expect_error(
rrq_task_cancel(id, controller = obj),
"Task [[:xdigit:]]{32} is not cancelable \\(MISSING\\)")
})
test_that("can't cancel running in-process task", {
obj <- test_rrq(timeout_worker_stop = 0)
w <- test_worker_spawn(obj)
t <- rrq_task_create_expr(Sys.sleep(20), controller = obj)
wait_status(t, obj)
expect_error(
rrq_task_cancel(t, wait = TRUE, controller = obj),
"Can't cancel running task '[[:xdigit:]]{32}' as not in separate process")
w$stop(type = "kill_local")
})
test_that("get task info", {
obj <- test_rrq()
a <- 1
b <- 2
t <- rrq_task_create_expr(log(a, b), controller = obj)
res <- rrq_task_data(t, controller = obj)
expect_equal(res$expr, quote(log(a, b)))
expect_mapequal(res$variables, list(a = 1, b = 2))
})
test_that("get task data errors appropriately if task is missing", {
obj <- test_rrq()
id <- ids::random_id()
expect_error(
rrq_task_data(id, controller = obj),
"Task '[[:xdigit:]]+' not found")
})
test_that("a worker will pick up tasks from the priority queue", {
obj <- test_rrq("myfuns.R")
cfg <- rrq_worker_config(queue = c("a", "b"), verbose = FALSE)
rrq_worker_config_save(WORKER_CONFIG_DEFAULT, cfg, controller = obj)
w <- test_worker_blocking(obj)
t1 <- rrq_task_create_expr(sin(1), controller = obj)
t2 <- rrq_task_create_expr(sin(2), queue = "b", controller = obj)
t3 <- rrq_task_create_expr(sin(3), queue = "a", controller = obj)
expect_equal(rrq_task_status(c(t1, t2, t3), controller = obj),
rep("PENDING", 3))
expect_equal(rrq_queue_list(controller = obj), t1)
expect_equal(rrq_queue_list("b", controller = obj), t2)
expect_equal(rrq_queue_list("a", controller = obj), t3)
w$step(TRUE)
expect_equal(rrq_task_status(c(t1, t2, t3), controller = obj),
c("PENDING", "PENDING", "COMPLETE"))
w$step(TRUE)
expect_equal(rrq_task_status(c(t1, t2, t3), controller = obj),
c("PENDING", "COMPLETE", "COMPLETE"))
w$step(TRUE)
expect_equal(rrq_task_status(c(t1, t2, t3), controller = obj),
c("COMPLETE", "COMPLETE", "COMPLETE"))
})
test_that("Query jobs in different queues", {
obj <- test_rrq("myfuns.R")
t1 <- rrq_task_create_expr(sin(1), queue = "a", controller = obj)
t2 <- rrq_task_create_expr(sin(2), queue = "a", controller = obj)
t3 <- rrq_task_create_expr(sin(3), queue = "a", controller = obj)
expect_equal(rrq_task_status(c(t1, t2, t3), controller = obj),
rep("PENDING", 3))
expect_equal(rrq_queue_list(controller = obj), character(0))
expect_equal(rrq_queue_list("a", controller = obj), c(t1, t2, t3))
expect_equal(rrq_task_position(t1, queue = "a", controller = obj), 1)
expect_equal(rrq_task_position(t2, queue = "a", controller = obj), 2)
expect_true(rrq_queue_remove(t1, queue = "a", controller = obj))
expect_false(rrq_queue_remove(t1, queue = "a", controller = obj))
expect_equal(rrq_task_position(t2, queue = "a", controller = obj), 1)
expect_equal(rrq_queue_length("a", controller = obj), 2)
expect_equal(rrq_queue_list("a", controller = obj), c(t2, t3))
})
test_that("Send job to new process", {
obj <- test_rrq("myfuns.R")
w <- test_worker_blocking(obj)
rrq_worker_log_tail(w$id, Inf, controller = obj)
t <- rrq_task_create_expr(slowdouble(0.1), separate_process = TRUE,
controller = obj)
expect_type(t, "character")
w$step(TRUE)
rrq_task_wait(t, timeout = 2, controller = obj)
expect_equal(rrq_task_result(t, controller = obj), 0.2)
})
## Doing this version does require that we spawn a worker because we
## need to interrupt the secondary worker loop. However, we could do
## an in-process version by being a bit sneaky and writing a job that
## will cancel itself.
test_that("Cancel job sent to new process", {
obj <- test_rrq("myfuns.R")
w <- test_worker_spawn(obj)
t <- rrq_task_create_expr(slowdouble(50), separate_process = TRUE,
controller = obj)
wait_status(t, obj)
rrq_task_cancel(t, wait = TRUE, controller = obj)
expect_equal(rrq_task_status(t, controller = obj), TASK_CANCELLED)
expect_equal(rrq_task_result(t, controller = obj),
worker_task_failed(TASK_CANCELLED, obj$queue_id, t))
## Flakey on covr, probably due to the job being cancelled before
## the second process really finishes starting up.
skip_on_covr()
log <- rrq_worker_log_tail(w$id, Inf, controller = obj)
expect_equal(log$command,
c("ALIVE", "ENVIR", "ENVIR", "QUEUE",
"TASK_START", "REMOTE",
"CHILD", "ENVIR", "ENVIR", "CANCEL", "TASK_CANCELLED"))
})
test_that("can check task exists", {
obj <- test_rrq("myfuns.R")
t <- rrq_task_create_expr(sin(0), controller = obj)
t2 <- rrq_task_create_expr(sin(0), controller = obj)
expect_true(rrq_task_exists(t, controller = obj))
expect_false(rrq_task_exists("123", controller = obj))
expect_equal(rrq_task_exists(c(t, t2), controller = obj),
c(TRUE, TRUE))
expect_equal(rrq_task_exists(c(t, "123"), controller = obj),
c(TRUE, FALSE))
})
test_that("queue task with missing dependency throws an error", {
obj <- test_rrq("myfuns.R")
w <- test_worker_blocking(obj)
expect_error(rrq_task_create_expr(sin(0), depends_on = "123",
controller = obj),
"Failed to queue as dependency 123 does not exist.")
expect_error(rrq_task_create_expr(sin(0), depends_on = c("123", "456"),
controller = obj),
"Failed to queue as dependencies 123, 456 do not exist.")
})
test_that("task can be queued with dependency", {
obj <- test_rrq("myfuns.R")
w <- test_worker_blocking(obj)
t <- rrq_task_create_expr(sin(0), controller = obj)
t2 <- rrq_task_create_expr(sin(0), controller = obj)
expect_equal(rrq_queue_list(controller = obj), c(t, t2))
t3 <- rrq_task_create_expr(sin(pi / 2),
depends_on = c(t, t2),
controller = obj)
## t3 has not been added to main queue yet
expect_equal(rrq_queue_list(controller = obj), c(t, t2))
expect_equal(rrq_task_status(t3, controller = obj), "DEFERRED")
## Original dependencies are stored
original_deps_keys <- rrq_key_task_depends_up_original(obj$queue_id, t3)
expect_setequal(obj$con$SMEMBERS(original_deps_keys), c(t, t2))
## Pending dependencies are stored
dependency_keys <- rrq_key_task_depends_up(obj$queue_id, t3)
expect_setequal(obj$con$SMEMBERS(dependency_keys), c(t, t2))
## Inverse depends_on relationship is stored
dependent_keys <- rrq_key_task_depends_down(obj$queue_id, c(t, t2))
for (key in dependent_keys) {
expect_equal(obj$con$SMEMBERS(key), list(t3))
}
## Function to retrieve status of t3 and see what it is waiting for
w$step(TRUE)
rrq_task_wait(t, 2, controller = obj)
expect_equal(rrq_task_status(t, controller = obj), "COMPLETE")
expect_equal(rrq_task_status(t2, controller = obj), "PENDING")
expect_equal(rrq_task_status(t3, controller = obj), "DEFERRED")
## Still not on queue
expect_equal(rrq_queue_list(controller = obj), t2)
## Status of it has updated
expect_setequal(obj$con$SMEMBERS(original_deps_keys), c(t, t2))
expect_equal(obj$con$SMEMBERS(dependency_keys), list(t2))
w$step(TRUE)
rrq_task_wait(t2, 2, controller = obj)
expect_equal(rrq_task_status(t2, controller = obj), "COMPLETE")
expect_equal(rrq_task_status(t3, controller = obj), "PENDING")
## Now added to queue
expect_equal(rrq_queue_list(controller = obj), t3)
## Status can be retrieved
expect_setequal(obj$con$SMEMBERS(original_deps_keys), c(t, t2))
expect_equal(obj$con$SMEMBERS(dependency_keys), list())
w$step(TRUE)
rrq_task_wait(t3, 2, controller = obj)
expect_equal(rrq_task_status(t3, controller = obj), "COMPLETE")
})
test_that("task added to queue immediately if dependencies satified", {
obj <- test_rrq("myfuns.R")
w <- test_worker_blocking(obj)
t <- rrq_task_create_expr(sin(0), controller = obj)
expect_equal(rrq_queue_list(controller = obj), t)
w$step(TRUE)
rrq_task_wait(t, 2, controller = obj)
## Immediately added to queue as t has completed
t2 <- rrq_task_create_expr(sin(pi / 2), depends_on = t, controller = obj)
expect_equal(rrq_queue_list(controller = obj), t2)
expect_equal(rrq_task_status(t2, controller = obj), TASK_PENDING)
})
test_that("queueing with depends_on errored task fails", {
obj <- test_rrq("myfuns.R")
w <- test_worker_blocking(obj)
t1 <- rrq_task_create_expr(only_positive(-1), controller = obj)
w$step(TRUE)
res <- rrq_task_result(t1, controller = obj)
expect_s3_class(res, "rrq_task_error")
expect_error(rrq_task_create_expr(sin(0), depends_on = t1, controller = obj),
paste0("Failed to queue as dependent tasks failed:\n",
t1, ": ERROR"),
fixed = TRUE)
ids <- rrq_task_list(controller = obj)
expect_length(ids, 2)
t2 <- setdiff(ids, t1)
## Task is set to impossible
expect_equal(rrq_task_status(t1, controller = obj), TASK_ERROR)
expect_equal(rrq_task_status(t2, controller = obj), TASK_IMPOSSIBLE)
})
test_that("dependent tasks updated if dependency fails", {
obj <- test_rrq("myfuns.R")
w <- test_worker_blocking(obj)
t <- rrq_task_create_expr(only_positive(-1), controller = obj)
t2 <- rrq_task_create_expr(sin(0), depends_on = t, controller = obj)
t3 <- rrq_task_create_expr(sin(0), depends_on = t, controller = obj)
t4 <- rrq_task_create_expr(sin(0), depends_on = t2, controller = obj)
expect_equal(rrq_queue_list(controller = obj), t)
expect_equal(rrq_task_status(t, controller = obj), "PENDING")
expect_equal(rrq_task_status(t2, controller = obj), "DEFERRED")
expect_equal(rrq_task_status(t3, controller = obj), "DEFERRED")
expect_equal(rrq_task_status(t4, controller = obj), "DEFERRED")
w$step(TRUE)
rrq_task_wait(t, 2, controller = obj)
expect_equal(rrq_task_status(t, controller = obj), "ERROR")
## Dependent task updated and nothing queued
expect_equal(rrq_task_status(t2, controller = obj), "IMPOSSIBLE")
expect_equal(rrq_task_status(t3, controller = obj), "IMPOSSIBLE")
expect_equal(rrq_task_status(t4, controller = obj), "IMPOSSIBLE")
expect_equal(rrq_queue_list(controller = obj), character(0))
})
test_that("multiple tasks can be queued with same dependency", {
obj <- test_rrq("myfuns.R")
w <- test_worker_blocking(obj)
t <- rrq_task_create_expr(sin(0), controller = obj)
t2 <- rrq_task_create_expr(sin(0), depends_on = t, controller = obj)
t3 <- rrq_task_create_expr(sin(0), depends_on = t, controller = obj)
expect_equal(rrq_queue_list(controller = obj), t)
expect_equal(rrq_task_status(t2, controller = obj), "DEFERRED")
expect_equal(rrq_task_status(t3, controller = obj), "DEFERRED")
w$step(TRUE)
rrq_task_wait(t, 2, controller = obj)
expect_equal(rrq_task_status(t, controller = obj), "COMPLETE")
expect_equal(rrq_task_status(t2, controller = obj), "PENDING")
expect_equal(rrq_task_status(t3, controller = obj), "PENDING")
expect_setequal(rrq_queue_list(controller = obj), c(t2, t3))
})
test_that("deferred task is added to specified queue", {
obj <- test_rrq("myfuns.R")
w <- test_worker_blocking(obj)
t <- rrq_task_create_expr(sin(0), controller = obj)
t2 <- rrq_task_create_expr(sin(0), depends_on = t, controller = obj)
t3 <- rrq_task_create_expr(sin(0), depends_on = t, queue = "a",
controller = obj)
expect_equal(rrq_queue_list(controller = obj), t)
expect_equal(rrq_queue_list("a", controller = obj), character(0))
w$step(TRUE)
rrq_task_wait(t, 2, controller = obj)
expect_equal(rrq_queue_list(controller = obj), t2)
expect_equal(rrq_queue_list("a", controller = obj), t3)
})
test_that("task set to impossible cannot be added to queue", {
obj <- test_rrq("myfuns.R")
w <- test_worker_blocking(obj)
t <- rrq_task_create_expr(only_positive(-1), controller = obj)
t2 <- rrq_task_create_expr(sin(0), controller = obj)
t3 <- rrq_task_create_expr(sin(pi / 2), depends_on = c(t, t2),
controller = obj)
expect_equal(rrq_task_status(t, controller = obj), "PENDING")
expect_equal(rrq_task_status(t2, controller = obj), "PENDING")
expect_equal(rrq_task_status(t3, controller = obj), "DEFERRED")
expect_equal(rrq_queue_list(controller = obj), c(t, t2))
w$step(TRUE)
res <- rrq_task_result(t, controller = obj)
expect_s3_class(res, "rrq_task_error")
expect_equal(rrq_task_status(t, controller = obj), "ERROR")
expect_equal(rrq_task_status(t2, controller = obj), "PENDING")
expect_equal(rrq_task_status(t3, controller = obj), "IMPOSSIBLE")
expect_equal(rrq_queue_list(controller = obj), t2)
w$step(TRUE)
rrq_task_wait(t2, 2, controller = obj)
expect_equal(rrq_task_status(t2, controller = obj), "COMPLETE")
expect_equal(rrq_task_status(t3, controller = obj), "IMPOSSIBLE")
expect_equal(rrq_queue_list(controller = obj), character(0))
})
test_that("deferred task delete", {
obj <- test_rrq("myfuns.R")
t1 <- rrq_task_create_expr(1 + 1, controller = obj)
t2 <- rrq_task_create_expr(2 + 2, depends_on = t1, controller = obj)
t3 <- rrq_task_create_expr(2 + 2, depends_on = t1, controller = obj)
t4 <- rrq_task_create_expr(2 + 2, depends_on = t3, controller = obj)
expect_setequal(rrq_task_list(controller = obj), c(t1, t2, t3, t4))
expect_equal(rrq_queue_list(controller = obj), t1)
rrq_task_delete(t2, controller = obj)
expect_setequal(rrq_task_list(controller = obj), c(t1, t3, t4))
expect_equal(rrq_queue_list(controller = obj), t1)
expect_equal(rrq_task_status(t1, controller = obj), "PENDING")
expect_equal(rrq_task_status(t2, controller = obj), "MISSING")
expect_equal(rrq_task_status(t3, controller = obj), "DEFERRED")
expect_equal(rrq_task_status(t4, controller = obj), "DEFERRED")
rrq_task_delete(t1, controller = obj)
expect_setequal(rrq_task_list(controller = obj), c(t3, t4))
expect_setequal(rrq_queue_list(controller = obj), character(0))
expect_equal(rrq_task_status(t1, controller = obj), "MISSING")
expect_equal(rrq_task_status(t2, controller = obj), "MISSING")
expect_equal(rrq_task_status(t3, controller = obj), "IMPOSSIBLE")
expect_equal(rrq_task_status(t4, controller = obj), "IMPOSSIBLE")
rrq_task_delete(t3, controller = obj)
expect_setequal(rrq_task_list(controller = obj), t4)
expect_setequal(rrq_queue_list(controller = obj), character(0))
expect_equal(rrq_task_status(t1, controller = obj), "MISSING")
expect_equal(rrq_task_status(t2, controller = obj), "MISSING")
expect_equal(rrq_task_status(t3, controller = obj), "MISSING")
expect_equal(rrq_task_status(t4, controller = obj), "IMPOSSIBLE")
})
test_that("delete completed task does not clear dependencies", {
obj <- test_rrq("myfuns.R")
w <- test_worker_blocking(obj)
t1 <- rrq_task_create_expr(1 + 1, controller = obj)
t2 <- rrq_task_create_expr(2 + 2, depends_on = t1, controller = obj)
w$step(TRUE)
rrq_task_wait(t1, 2, controller = obj)
expect_setequal(rrq_task_list(controller = obj), c(t1, t2))
expect_equal(rrq_queue_list(controller = obj), t2)
expect_equal(rrq_task_status(t1, controller = obj), "COMPLETE")
expect_equal(rrq_task_status(t2, controller = obj), "PENDING")
rrq_task_delete(t1, controller = obj)
expect_setequal(rrq_task_list(controller = obj), t2)
expect_equal(rrq_queue_list(controller = obj), t2)
expect_equal(rrq_task_status(t1, controller = obj), "MISSING")
expect_equal(rrq_task_status(t2, controller = obj), "PENDING")
})
test_that("deferred task cancel", {
obj <- test_rrq("myfuns.R")
t1 <- rrq_task_create_expr(1 + 1, controller = obj)
t2 <- rrq_task_create_expr(2 + 2, depends_on = t1, controller = obj)
t3 <- rrq_task_create_expr(2 + 2, depends_on = t1, controller = obj)
t4 <- rrq_task_create_expr(2 + 2, depends_on = t3, controller = obj)
expect_setequal(rrq_task_list(controller = obj), c(t1, t2, t3, t4))
expect_equal(rrq_queue_list(controller = obj), t1)
rrq_task_cancel(t2, controller = obj)
expect_setequal(rrq_task_list(controller = obj), c(t1, t2, t3, t4))
expect_equal(rrq_queue_list(controller = obj), t1)
expect_equal(rrq_task_status(t3, controller = obj), "DEFERRED")
expect_equal(rrq_task_status(t4, controller = obj), "DEFERRED")
rrq_task_cancel(t1, controller = obj)
expect_setequal(rrq_task_list(controller = obj), c(t1, t2, t3, t4))
expect_setequal(rrq_queue_list(controller = obj), character(0))
expect_equal(rrq_task_status(t3, controller = obj), "IMPOSSIBLE")
expect_equal(rrq_task_status(t4, controller = obj), "IMPOSSIBLE")
})
test_that("can get deferred tasks", {
obj <- test_rrq("myfuns.R")
w <- test_worker_blocking(obj)
t1 <- rrq_task_create_expr(1 + 1, controller = obj)
t2 <- rrq_task_create_expr(1 + 1, controller = obj)
t3 <- rrq_task_create_expr(2 + 2, depends_on = c(t1, t2), controller = obj)
t4 <- rrq_task_create_expr(2 + 2, depends_on = t1, controller = obj)
tasks <- rrq_deferred_list(controller = obj)
expect_setequal(names(tasks), c(t3, t4))
expect_setequal(names(tasks[[t3]]), c(t1, t2))
expect_equal(tasks[[t3]][[t1]], "PENDING")
expect_equal(tasks[[t3]][[t2]], "PENDING")
expect_equal(names(tasks[[t4]]), t1)
expect_equal(tasks[[t4]][[t1]], "PENDING")
w$step(TRUE)
rrq_task_wait(t1, 2, controller = obj)
tasks <- rrq_deferred_list(controller = obj)
expect_equal(names(tasks), t3)
expect_setequal(names(tasks[[t3]]), c(t1, t2))
expect_equal(tasks[[t3]][[t1]], "COMPLETE")
expect_equal(tasks[[t3]][[t2]], "PENDING")
w$step(TRUE)
w$step(TRUE)
rrq_task_wait(t2, 2, controller = obj)
rrq_task_wait(t4, 2, controller = obj)
tasks <- rrq_deferred_list(controller = obj)
expect_setequal(rrq_deferred_list(controller = obj), list())
})
test_that("can use task_wait with impossible tasks", {
obj <- test_rrq("myfuns.R")
w <- test_worker_blocking(obj)
t <- rrq_task_create_expr(only_positive(-1), controller = obj)
t2 <- rrq_task_create_expr(sin(0), depends_on = t, controller = obj)
expect_equal(rrq_queue_list(controller = obj), t)
expect_equal(rrq_task_status(t, controller = obj), "PENDING")
expect_equal(rrq_task_status(t2, controller = obj), "DEFERRED")
w$step(TRUE)
rrq_task_wait(t, 2, controller = obj)
out <- rrq_task_result(t, controller = obj)
expect_equal(rrq_task_status(t, controller = obj), "ERROR")
expect_equal(out$task_id, t)
expect_equal(out$queue_id, obj$queue_id)
## task wait returns for an impossible task
expect_equal(rrq_task_status(t2, controller = obj), "IMPOSSIBLE")
rrq_task_wait(t2, 2, controller = obj)
out <- rrq_task_result(t2, controller = obj)
## Not just a timeout error - it has returned
expect_s3_class(out, "rrq_task_error")
expect_equal(out$message, "Task not successful: IMPOSSIBLE")
expect_equal(out$task_id, t2)
expect_equal(out$queue_id, obj$queue_id)
})
test_that("submit a task with a timeout requires separate process", {
obj <- test_rrq("myfuns.R")
expect_error(
rrq_task_create_expr(slowdouble(10), timeout_task_run = 1,
controller = obj),
"Can't set timeout as 'separate_process' is FALSE")
})
test_that("submit a task with a timeout", {
obj <- test_rrq("myfuns.R")
t <- rrq_task_create_expr(slowdouble(10),
timeout_task_run = 1,
separate_process = TRUE,
controller = obj)
expect_equal(obj$con$HGET(obj$keys$task_timeout, t), "1")
w <- test_worker_blocking(obj)
w$step(TRUE)
expect_equal(rrq_task_status(t, controller = obj), TASK_TIMEOUT)
expect_equal(rrq_task_result(t, controller = obj),
worker_task_failed(TASK_TIMEOUT, obj$queue_id, t))
## Flakey on covr, probably due to the job being cancelled before
## the second process really finishes starting up.
skip_on_covr()
expect_equal(rrq_worker_log_tail(w$id, Inf, controller = obj)$command,
c("ALIVE", "ENVIR", "ENVIR", "QUEUE",
"TASK_START", "REMOTE",
"CHILD", "ENVIR", "ENVIR", "TIMEOUT", "TASK_TIMEOUT"))
})
test_that("can offload storage", {
skip_if_no_redis()
name <- sprintf("rrq:%s", ids::random_id())
path <- tempfile()
obj <- test_rrq(store_max_size = 100, offload_path = path)
a <- 10
b <- runif(20)
t <- rrq_task_create_expr(sum(b) / a, controller = obj)
w <- test_worker_blocking(obj)
w$step(TRUE)
expect_equal(rrq_task_result(t, controller = obj), sum(b) / a)
## Did successfully offload data:
store <- obj$store
h <- store$list()
expect_length(h, 3)
expect_setequal(store$location(h), c("redis", "offload"))
expect_equal(store$tags(), t)
expect_length(dir(path), 1L)
expect_true(dir(path) %in% h)
rrq_task_delete(t, controller = obj)
expect_equal(store$list(), character(0))
expect_equal(dir(path), character(0))
})
test_that("offload storage in result", {
path <- tempfile()
obj <- test_rrq(store_max_size = 100, offload_path = path)
t <- rrq_task_create_expr(rep(1, 100), controller = obj)
w <- test_worker_blocking(obj)
w$step(TRUE)
expect_equal(rrq_task_result(t, controller = obj), rep(1, 100))
## Did successfully offload data:
store <- obj$store
h <- store$list()
expect_length(h, 1)
expect_setequal(store$location(h), "offload")
expect_equal(store$tags(), t)
expect_equal(dir(path), h)
rrq_task_delete(t, controller = obj)
expect_equal(store$list(), character(0))
expect_equal(dir(path), character(0))
})
test_that("collect times", {
obj <- test_rrq("myfuns.R")
w <- test_worker_blocking(obj)
t1 <- rrq_task_create_expr(slowdouble(0.1), controller = obj)
t2 <- rrq_task_create_expr(slowdouble(0.1), controller = obj)
tt <- c(t1, t2)
times1 <- rrq_task_times(tt, controller = obj)
expect_true(is.matrix(times1)) # testthat 3e makes this quite hard
expect_equal(dimnames(times1),
list(tt, c("submit", "start", "complete", "moved")))
expect_type(times1, "double")
expect_equal(times1[tt, "start"], set_names(rep(NA_real_, 2), tt))
expect_equal(times1[tt, "complete"], set_names(rep(NA_real_, 2), tt))
expect_equal(times1[tt, "moved"], set_names(rep(NA_real_, 2), tt))
expect_false(any(is.na(times1[tt, "submit"])))
w$step(TRUE)
times2 <- rrq_task_times(tt, controller = obj)
expect_equal(times2[t2, , drop = FALSE], times1[t2, , drop = FALSE])
expect_equal(rrq_task_times(t2, controller = obj), times1[t2, , drop = FALSE])
expect_equal(is.na(times2[t1, ]),
c(submit = FALSE, start = FALSE, complete = FALSE, moved = TRUE))
})
test_that("task errors can be immediately thrown", {
obj <- test_rrq("myfuns.R")
w <- test_worker_blocking(obj)
t <- rrq_task_create_expr(only_positive(-1), controller = obj)
w$step(TRUE)
err <- expect_error(rrq_task_result(t, error = TRUE, controller = obj),
class = "rrq_task_error")
expect_equal(err$queue_id, obj$queue_id)
expect_equal(err$task_id, t)
err2 <- expect_error(
rrq_task_results(c(t, t), error = TRUE, controller = obj),
class = "rrq_task_error_group")
expect_equal(err2$errors, list(err, err))
expect_equal(
format(err2),
c("<rrq_task_error_group>",
paste(" 2/2 tasks failed:",
" - x must be positive",
" - x must be positive",
sep = "\n"),
" * To throw this error, use stop() with it",
" * To inspect individual errors, see $errors"))
})
test_that("Can set the task wait timeout on controller creation", {
obj <- test_rrq()
f <- function(timeout) {
r <- rrq_controller(obj$queue_id, timeout_task_wait = timeout)
r$timeout_task_wait
}
withr::with_options(list(rrq.timeout_task_wait = 30), {
expect_equal(f(10), 10)
expect_equal(f(NULL), 30)
})
withr::with_options(list(rrq.timeout_task_wait = NULL), {
expect_equal(f(10), 10)
expect_equal(f(NULL), Inf)
})
})
test_that("Can get information about a task in the same process", {
obj <- test_rrq()
w <- test_worker_blocking(obj)
t <- rrq_task_create_expr(sqrt(4), controller = obj)
d1 <- rrq_task_info(t, controller = obj)
expect_setequal(
names(d1),
c("id", "status", "queue", "separate_process", "timeout", "worker", "pid",
"moved", "depends"))
expect_equal(d1$id, t)
expect_equal(d1$status, TASK_PENDING)
expect_equal(d1$queue, "default")
expect_false(d1$separate_process)
expect_null(d1$timeout)
expect_null(d1$worker)
expect_null(d1$pid)
expect_equal(d1$moved, list(up = NULL, down = NULL))
expect_equal(d1$depends, list(up = NULL, down = NULL))
w$step(TRUE)
d2 <- rrq_task_info(t, controller = obj)
expect_setequal(names(d2), names(d1))
expect_equal(d2$id, t)
expect_equal(d2$status, TASK_COMPLETE)
expect_equal(d2$queue, "default")
expect_false(d2$separate_process)
expect_null(d2$timeout, 5)
expect_equal(d2$worker, w$id)
expect_null(d2$pid, "integer")
expect_equal(d2$moved, list(up = NULL, down = NULL))
expect_equal(d2$depends, list(up = NULL, down = NULL))
})
test_that("Can get information about a task in a different process", {
obj <- test_rrq()
w <- test_worker_blocking(obj)
t <- rrq_task_create_expr(sqrt(4), separate_process = TRUE,
timeout_task_run = 5, controller = obj)
d1 <- rrq_task_info(t, controller = obj)
expect_setequal(
names(d1),
c("id", "status", "queue", "separate_process", "timeout", "worker", "pid",
"moved", "depends"))
expect_equal(d1$id, t)
expect_equal(d1$status, TASK_PENDING)
expect_equal(d1$queue, "default")
expect_true(d1$separate_process)
expect_equal(d1$timeout, 5)
expect_null(d1$worker)
expect_null(d1$pid)
expect_equal(d1$moved, list(up = NULL, down = NULL))
expect_equal(d1$depends, list(up = NULL, down = NULL))
w$step(TRUE)
d2 <- rrq_task_info(t, controller = obj)
expect_setequal(names(d2), names(d1))
expect_equal(d2$id, t)
expect_equal(d2$status, TASK_COMPLETE)
expect_equal(d2$queue, "default")
expect_true(d2$separate_process)
expect_equal(d2$timeout, 5)
expect_equal(d2$worker, w$id)
expect_type(d2$pid, "integer")
expect_equal(d2$moved, list(up = NULL, down = NULL))
expect_equal(d2$depends, list(up = NULL, down = NULL))
})
test_that("Can get information about task retries", {
obj <- test_rrq()
t <- list()
w <- test_worker_blocking(obj)
t1 <- rrq_task_create_expr(identity(1), controller = obj)
w$step(TRUE)
expect_mapequal(rrq_task_info(t1, controller = obj)$moved,
list(up = NULL, down = NULL))
t2 <- rrq_task_retry(t1, controller = obj)
w$step(TRUE)
expect_mapequal(rrq_task_info(t1, controller = obj)$moved,
list(up = NULL, down = t2))
expect_mapequal(rrq_task_info(t2, controller = obj)$moved,
list(up = t1, down = NULL))
t3 <- rrq_task_retry(t2, controller = obj)
w$step(TRUE)
expect_mapequal(rrq_task_info(t1, controller = obj)$moved,
list(up = NULL, down = c(t2, t3)))
expect_mapequal(rrq_task_info(t2, controller = obj)$moved,
list(up = t1, down = t3))
expect_mapequal(rrq_task_info(t3, controller = obj)$moved,
list(up = c(t1, t2), down = NULL))
t4 <- rrq_task_retry(t3, controller = obj)
w$step(TRUE)
expect_mapequal(rrq_task_info(t1, controller = obj)$moved,
list(up = NULL, down = c(t2, t3, t4)))
expect_mapequal(rrq_task_info(t2, controller = obj)$moved,
list(up = t1, down = c(t3, t4)))
expect_mapequal(rrq_task_info(t3, controller = obj)$moved,
list(up = c(t1, t2), down = t4))
expect_mapequal(rrq_task_info(t4, controller = obj)$moved,
list(up = c(t1, t2, t3), down = NULL))
})
test_that("Can retry tasks that span multiple queues at once", {
obj <- test_rrq()
cfg <- rrq_worker_config(queue = c("a", "b"), verbose = FALSE)
rrq_worker_config_save(WORKER_CONFIG_DEFAULT, cfg, controller = obj)
t1 <- c(rrq_task_create_expr(sin(1), queue = "a", controller = obj),
rrq_task_create_expr(sin(2), queue = "a", controller = obj),
rrq_task_create_expr(sin(3), queue = "b", controller = obj))
w <- test_worker_blocking(obj)
for (i in 1:3) {
w$step(TRUE)
}
t2 <- rrq_task_retry(t1, controller = obj)
expect_equal(rrq_task_status(t2, controller = obj), rep(TASK_PENDING, 3))
expect_equal(rrq_queue_list("a", controller = obj), t2[1:2])
expect_equal(rrq_queue_list("b", controller = obj), t2[3])
expect_equal(rrq_task_position(t2, queue = "a", controller = obj), c(1, 2, 0))
expect_equal(rrq_task_position(t2, queue = "b", controller = obj), c(0, 0, 1))
## Can also get the position from the old ids:
expect_equal(rrq_task_position(t1, queue = "a", controller = obj), c(1, 2, 0))
expect_equal(rrq_task_position(t1, queue = "b", controller = obj), c(0, 0, 1))
expect_equal(rrq_task_position(t1, queue = "a", follow = FALSE,
controller = obj), c(0, 0, 0))
expect_equal(rrq_task_position(t1, queue = "b", follow = FALSE,
controller = obj), c(0, 0, 0))
})
test_that("can get task position and preceeding from retried tasks", {
obj <- test_rrq()
t1 <- rrq_task_create_expr(sin(1), controller = obj)
w <- test_worker_blocking(obj)
w$step()
ta <- c(rrq_task_create_expr(sin(2), controller = obj),
rrq_task_create_expr(sin(3), controller = obj))
t2 <- rrq_task_retry(t1, controller = obj)
tb <- c(rrq_task_create_expr(sin(4), controller = obj),
rrq_task_create_expr(sin(5), controller = obj),
rrq_task_create_expr(sin(6), controller = obj))
expect_equal(rrq_task_position(t2, controller = obj), 3)
expect_equal(rrq_task_position(t1, controller = obj), 3)
expect_equal(rrq_task_position(t1, follow = FALSE, controller = obj), 0)
expect_equal(rrq_task_preceeding(t2, controller = obj), ta)
expect_equal(rrq_task_preceeding(t1, controller = obj), ta)
expect_null(rrq_task_preceeding(t1, follow = FALSE, controller = obj))
})
test_that("Can set the follow default on controller creation", {
obj <- test_rrq()
f <- function(follow) {
rrq_controller(obj$queue_id, follow = follow)$follow
}
withr::with_options(list(rrq.follow = FALSE), {
expect_equal(f(TRUE), TRUE)
expect_equal(f(FALSE), FALSE)
expect_equal(f(NULL), FALSE)
})
withr::with_options(list(rrq.follow = NULL), {
expect_equal(f(TRUE), TRUE)
expect_equal(f(FALSE), FALSE)
expect_equal(f(NULL), TRUE)
})
})
test_that("Can avoid following on controller creation", {
obj1 <- test_rrq(follow = FALSE)
obj2 <- rrq_controller(obj1$queue_id, follow = TRUE)
w <- test_worker_blocking(obj1)
t1 <- rrq_task_create_expr(runif(1), controller = obj1)
w$step(TRUE)
t2 <- rrq_task_retry(t1, controller = obj1)
expect_equal(rrq_task_status(t1, controller = obj1), TASK_MOVED)
expect_equal(rrq_task_status(t1, controller = obj2), TASK_PENDING)
})
test_that("Running in separate process produces coherent logs", {
obj <- test_rrq()
w <- test_worker_blocking(obj)
log0 <- rrq_worker_log_tail(w$id, Inf, controller = obj)
t <- rrq_task_create_expr(runif(1), separate_process = TRUE, controller = obj)
expect_type(t, "character")
w$step(TRUE)
expect_equal(rrq_task_status(t, controller = obj), TASK_COMPLETE)
log <- rrq_worker_log_tail(w$id, Inf, controller = obj)
expect_equal(log[seq_len(nrow(log0)), ], log0)
log1 <- log[-seq_len(nrow(log0)), ]
expect_true(all(log1$worker_id == w$id))
expect_equal(
log1$command,
c("TASK_START", "REMOTE",
"CHILD", "ENVIR", "ENVIR", "STOP",
"TASK_COMPLETE"))
expect_equal(
is.na(log1$child),
c(TRUE, TRUE, FALSE, FALSE, FALSE, FALSE, TRUE))
})
test_that("Can extract dependency information", {
obj <- test_rrq()
t1 <- rrq_task_create_expr(identity(1), controller = obj)
t2 <- rrq_task_create_expr(identity(2), depends_on = t1, controller = obj)
t3 <- rrq_task_create_expr(identity(3), depends_on = t1, controller = obj)
t4 <- rrq_task_create_expr(identity(4), depends_on = c(t1, t2),
controller = obj)
t5 <- rrq_task_create_expr(identity(4), depends_on = c(t4, t3),
controller = obj)
deps1 <- rrq_task_info(t1, controller = obj)$depends
deps2 <- rrq_task_info(t2, controller = obj)$depends
deps3 <- rrq_task_info(t3, controller = obj)$depends
deps4 <- rrq_task_info(t4, controller = obj)$depends
deps5 <- rrq_task_info(t5, controller = obj)$depends
expect_null(deps1$up)
expect_setequal(names(deps1$down), c(t1, t2, t3, t4))
expect_setequal(deps1$down[[t1]], c(t2, t3, t4))
expect_equal(deps2$up, set_names(list(t1), t2))
expect_equal(deps2$down, set_names(list(t4, t5), c(t2, t4)))
expect_equal(deps3$up, set_names(list(t1), t3))
expect_equal(deps3$down, set_names(list(t5), t3))
expect_setequal(names(deps4$up), c(t4, t2))
expect_setequal(deps4$up[[t4]], c(t1, t2))
expect_equal(deps4$up[[t2]], deps2$up[[t2]])
expect_equal(deps4$down, set_names(list(t5), t4))
expect_setequal(names(deps5$up), c(t5, t4, t3, t2))
expect_setequal(deps5$up[[t5]], c(t4, t3))
expect_equal(deps5$up[[t4]], deps4$up[[t4]])
expect_equal(deps5$up[[t3]], deps3$up[[t3]])
expect_equal(deps5$up[[t2]], deps2$up[[t2]])
expect_null(deps5$depends$down)
})
test_that("can log tasks when running in a separate process", {
obj <- test_rrq("myfuns.R")
logdir <- withr::local_tempdir()
cfg <- rrq_worker_config(logdir = logdir, verbose = FALSE)
rrq_worker_config_save(WORKER_CONFIG_DEFAULT, cfg, controller = obj)
w <- test_worker_blocking(obj)
t <- rrq_task_create_expr(noisydouble(1), separate_process = TRUE,
controller = obj)
expect_null(rrq_task_log(t, controller = obj))
w$step(TRUE)
logfile <- file.path(logdir, t)
expect_true(file.exists(logfile))
expect_equal(readLines(logfile), "doubling 1")
expect_equal(rrq_task_log(t, controller = obj), "doubling 1")
})
test_that("can read empty logs", {
obj <- test_rrq("myfuns.R")
logdir <- withr::local_tempdir()
cfg <- rrq_worker_config(logdir = logdir, verbose = FALSE)
rrq_worker_config_save(WORKER_CONFIG_DEFAULT, cfg, controller = obj)
w <- test_worker_blocking(obj)
t <- rrq_task_create_expr(sin(1), separate_process = TRUE,
controller = obj)
expect_null(rrq_task_log(t, controller = obj))
w$step(TRUE)
expect_equal(rrq_task_log(t, controller = obj), character())
})
test_that("can't read logs of unknown tasks", {
obj <- test_rrq("myfuns.R")
t <- ids::random_id()
expect_error(
rrq_task_log(t, controller = obj),
"Task '[[:xdigit:]]+' does not exist")
})
test_that("can't read logs of tasks run in the same process", {
obj <- test_rrq("myfuns.R")
w <- test_worker_blocking(obj)
t <- rrq_task_create_expr(sin(1), controller = obj)
msg <- "Task log not enabled for '[[:xdigit:]]+', as it was not configured"
expect_error(
rrq_task_log(t, controller = obj),
msg)
w$step(TRUE)
expect_error(
rrq_task_log(t, controller = obj),
msg)
})
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.