Nothing
skip_if_no_redis()
# start workers ----------------------------------------------------------------
test_that("constructing a rush manager works", {
config = redux::redis_config()
r = redux::hiredis(config)
r$FLUSHDB()
on.exit({
rush$reset()
})
rush = rsh(network_id = "test-rush", config = config)
expect_class(rush, "Rush")
expect_equal(rush$network_id, "test-rush")
expect_class(rush$config, "redis_config")
expect_class(rush$connector, "redis_api")
})
test_that("workers are started", {
rush = start_rush()
on.exit({
rush$reset()
mirai::daemons(0)
})
expect_data_table(rush$worker_info, nrows = 0)
worker_ids = rush$start_workers(
worker_loop = wl_queue,
n_workers = 2
)
rush$wait_for_workers(2, timeout = 5)
walk(rush$processes_mirai, function(process) expect_class(process, "mirai"))
worker_info = rush$worker_info
expect_data_table(worker_info, nrows = 2)
expect_integer(worker_info$pid, unique = TRUE)
expect_set_equal(worker_ids, worker_info$worker_id)
expect_set_equal(rush$worker_ids, worker_ids)
expect_set_equal(rush$worker_info$state, "running")
})
test_that("packages are available on the worker", {
rush = start_rush(n_workers = 1)
on.exit({
rush$reset()
mirai::daemons(0)
})
rush$start_workers(
worker_loop = wl_queue,
n_workers = 1,
packages = "uuid"
)
rush$wait_for_workers(1, timeout = 5)
expect_equal(rush$n_workers, 1)
xss = list(list(x1 = 1, x2 = 2))
keys = rush$push_tasks(xss)
rush$wait_for_tasks(keys, detect_lost_workers = TRUE)
expect_equal(rush$n_finished_tasks, 1)
})
test_that("wait for workers works with worker ids", {
rush = start_rush(n_workers = 1)
on.exit({
rush$reset()
mirai::daemons(0)
})
worker_ids = rush$start_workers(
worker_loop = wl_queue,
n_workers = 1
)
expect_error(rush$wait_for_workers(timeout = 1), class = "Mlr3ErrorConfig", regexp = "Either")
rush$wait_for_workers(worker_ids = worker_ids, timeout = 5)
expect_equal(rush$n_running_workers, 1)
# worker id does not exist so we expect a timeout
expect_error(rush$wait_for_workers(worker_ids = "x", timeout = 1), class = "Mlr3ErrorTimeout")
})
test_that("wait for workers works with n", {
rush = start_rush(n_workers = 1)
on.exit({
rush$reset()
mirai::daemons(0)
})
worker_ids = rush$start_workers(
worker_loop = wl_queue,
n_workers = 1
)
rush$wait_for_workers(n = 1, timeout = 5)
expect_equal(rush$n_running_workers, 1)
expect_error(rush$wait_for_workers(n = 2, timeout = 1), class = "Mlr3ErrorTimeout")
})
test_that("wait for workers works with both n and worker ids", {
rush = start_rush(n_workers = 1)
on.exit({
rush$reset()
mirai::daemons(0)
})
worker_ids = rush$start_workers(
worker_loop = wl_queue,
n_workers = 1
)
rush$wait_for_workers(n = 1, worker_ids = worker_ids, timeout = 5)
expect_equal(rush$n_running_workers, 1)
expect_error(
rush$wait_for_workers(n = 2, worker_ids = worker_ids, timeout = 1),
class = "Mlr3ErrorConfig",
regexp = "Number of workers to wait for"
)
expect_error(rush$wait_for_workers(n = 1, worker_ids = "x", timeout = 1), class = "Mlr3ErrorTimeout")
rush$wait_for_workers(n = 1, worker_ids = c(worker_ids, "x"), timeout = 1)
expect_equal(rush$n_running_workers, 1)
})
# local workers ----------------------------------------------------------------
test_that("local workers are started", {
config = redis_configuration()
rush = rsh(config = config)
on.exit({
rush$reset()
walk(rush$processes_processx, function(process) process$kill())
})
expect_data_table(rush$worker_info, nrows = 0)
worker_ids = rush$start_local_workers(
worker_loop = wl_queue,
n_workers = 1
)
rush$wait_for_workers(1, timeout = 5)
walk(rush$processes_processx, function(process) expect_class(process, "process"))
worker_info = rush$worker_info
expect_data_table(worker_info, nrows = 1)
expect_names(names(worker_info), must.include = c("worker_id", "pid", "hostname", "heartbeat", "state"))
expect_integer(worker_info$pid, unique = TRUE)
expect_set_equal(worker_ids, worker_info$worker_id)
expect_set_equal(rush$worker_ids, worker_ids)
expect_set_equal(rush$worker_info$state, "running")
})
test_that("additional local workers are started", {
rush = start_rush(n_workers = 1)
on.exit({
rush$reset()
walk(rush$processes_processx, function(process) process$kill())
})
worker_ids = rush$start_local_workers(
worker_loop = wl_queue,
n_workers = 1
)
rush$wait_for_workers(1, timeout = 5)
expect_equal(rush$n_workers, 1)
worker_ids_2 = rush$start_local_workers(
worker_loop = wl_queue,
n_workers = 1
)
rush$wait_for_workers(2, timeout = 5)
expect_length(rush$processes_processx, 2)
walk(rush$processes_processx, function(process) expect_class(process, "process"))
worker_info = rush$worker_info
expect_data_table(worker_info, nrows = 2)
expect_set_equal(c(worker_ids, worker_ids_2), worker_info$worker_id)
expect_integer(worker_info$pid, unique = TRUE)
expect_set_equal(rush$worker_info$state, "running")
})
# start workers with script ----------------------------------------------------
test_that("heartbeat process is started", {
skip_if_not_installed("callr")
config = redis_configuration()
rush = rsh(config = config)
on.exit({
rush$reset()
})
expect_data_table(rush$worker_info, nrows = 0)
script = rush$worker_script(
worker_loop = wl_queue,
heartbeat_period = 3,
heartbeat_expire = 9
)
px = start_script_worker(script)
on.exit(
{
px$kill()
},
add = TRUE
)
rush$wait_for_workers(1, timeout = 5)
worker_info = rush$worker_info
expect_logical(worker_info$heartbeat)
})
# terminate workers ------------------------------------------------------------
test_that("a worker is terminated", {
rush = start_rush(n_workers = 2)
on.exit({
rush$reset()
mirai::daemons(0)
})
worker_ids = rush$start_workers(
worker_loop = wl_queue,
n_workers = 2
)
rush$wait_for_workers(2, timeout = 5)
worker_id_1 = rush$running_worker_ids[1]
worker_id_2 = rush$running_worker_ids[2]
# worker 1
rush$stop_workers(worker_ids = worker_id_1, type = "terminate")
Sys.sleep(3)
expect_false(unresolved(rush$processes_mirai[[worker_id_1]]))
expect_true(unresolved(rush$processes_mirai[[worker_id_2]]))
expect_equal(rush$running_worker_ids, worker_id_2)
expect_equal(worker_id_1, rush$terminated_worker_ids)
# worker 2
rush$stop_workers(worker_ids = worker_id_2, type = "terminate")
Sys.sleep(3)
expect_false(unresolved(rush$processes_mirai[[worker_id_2]]))
expect_false(unresolved(rush$processes_mirai[[worker_id_1]]))
expect_set_equal(c(worker_id_1, worker_id_2), rush$terminated_worker_ids)
expect_null(rush$running_worker_ids)
})
test_that("reset workers works", {
rush = start_rush(n_workers = 1)
on.exit({
rush$reset()
mirai::daemons(0)
})
worker_ids = rush$start_workers(
worker_loop = wl_default,
n_workers = 1
)
rush$wait_for_workers(1, timeout = 5)
Sys.sleep(1)
rush$reset(workers = TRUE)
expect_data_table(rush$worker_info, nrows = 0)
expect_null(rush$running_worker_ids)
expect_null(rush$terminated_worker_ids)
expect_null(rush$tasks)
expect_null(rush$finished_tasks)
})
test_that("reset data works", {
rush = start_rush(n_workers = 1)
on.exit({
rush$reset()
mirai::daemons(0)
})
worker_ids = rush$start_workers(
worker_loop = wl_default,
n_workers = 1
)
rush$wait_for_workers(1, timeout = 5)
Sys.sleep(1)
expect_character(rush$finished_tasks)
keys = rush$finished_tasks
rush$reset(workers = FALSE)
expect_true(all(keys %nin% rush$finished_tasks))
expect_equal(rush$n_running_workers, 1)
expect_data_table(rush$worker_info, nrows = 1)
})
# kill workers -----------------------------------------------------------------
test_that("a worker is killed", {
rush = start_rush()
on.exit({
rush$reset()
mirai::daemons(0)
})
worker_ids = rush$start_workers(
worker_loop = wl_default,
n_workers = 2
)
rush$wait_for_workers(2, timeout = 5)
worker_id_1 = rush$running_worker_ids[1]
worker_id_2 = rush$running_worker_ids[2]
worker_info = rush$worker_info
expect_true(all(tools::pskill(worker_info$pid, signal = 0L)))
# worker 1
rush$stop_workers(worker_ids = worker_id_1, type = "kill")
Sys.sleep(1)
expect_equal(worker_id_1, rush$terminated_worker_ids)
expect_equal(rush$running_worker_ids, worker_id_2)
expect_true(mirai::is_error_value(rush$processes_mirai[[worker_id_1]]$data))
expect_false(mirai::is_error_value(rush$processes_mirai[[worker_id_2]]$data))
# worker 2
rush$stop_workers(worker_ids = worker_id_2, type = "kill")
Sys.sleep(1)
expect_set_equal(c(worker_id_1, worker_id_2), rush$terminated_worker_ids)
expect_true(mirai::is_error_value(rush$processes_mirai[[worker_id_1]]$data))
expect_true(mirai::is_error_value(rush$processes_mirai[[worker_id_2]]$data))
})
test_that("a local worker is killed", {
config = redis_configuration()
rush = rsh(config = config)
on.exit({
rush$reset()
walk(rush$processes_processx, function(process) process$kill())
})
worker_ids = rush$start_local_workers(
worker_loop = wl_queue,
n_workers = 2
)
rush$wait_for_workers(2, timeout = 5)
worker_id_1 = rush$running_worker_ids[1]
worker_id_2 = rush$running_worker_ids[2]
rush$worker_info
# worker 1
rush$stop_workers(worker_ids = worker_id_1, type = "kill")
Sys.sleep(1)
expect_equal(worker_id_1, rush$terminated_worker_ids)
expect_false(rush$processes_processx[[worker_id_1]]$is_alive())
expect_true(rush$processes_processx[[worker_id_2]]$is_alive())
# worker 2
rush$stop_workers(worker_ids = worker_id_2, type = "kill")
Sys.sleep(1)
expect_set_equal(c(worker_id_1, worker_id_2), rush$terminated_worker_ids)
expect_false(rush$processes_processx[[worker_id_1]]$is_alive())
expect_false(rush$processes_processx[[worker_id_2]]$is_alive())
})
test_that("worker is killed with a heartbeat process", {
skip_if_not_installed("callr")
config = redis_configuration()
rush = rsh(config = config)
on.exit({
rush$reset()
})
expect_data_table(rush$worker_info, nrows = 0)
script = rush$worker_script(
worker_loop = wl_queue,
heartbeat_period = 3,
heartbeat_expire = 9
)
px = start_script_worker(script)
rush$wait_for_workers(1, timeout = 5)
on.exit(
{
px$kill()
},
add = TRUE
)
worker_info = rush$worker_info
expect_logical(worker_info$heartbeat)
# signal 0L returns TRUE if the process is still alive
expect_true(tools::pskill(worker_info$pid, signal = 0L))
rush$stop_workers(type = "kill")
Sys.sleep(1)
expect_false(tools::pskill(worker_info$pid, signal = 0L))
expect_true(rush$worker_info$state == "terminated")
expect_equal(rush$terminated_worker_ids, worker_info$worker_id)
})
# pushing tasks to the queue ---------------------------------------------------
test_that("pushing a task to the queue works", {
rush = start_rush(n_workers = 2)
on.exit({
rush$reset()
mirai::daemons(0)
})
xss = list(list(x1 = 1, x2 = 2))
keys = rush$push_tasks(xss)
expect_equal(rush$n_queued_tasks, 1)
worker_ids = rush$start_workers(
worker_loop = wl_queue,
n_workers = 2
)
rush$wait_for_workers(2, timeout = 5)
rush$wait_for_tasks(keys)
# check task count
expect_equal(rush$n_tasks, 1)
expect_equal(rush$n_queued_tasks, 0)
expect_equal(rush$n_running_tasks, 0)
expect_equal(rush$n_finished_tasks, 1)
expect_equal(rush$n_failed_tasks, 0)
# check keys in sets
expect_string(rush$tasks)
expect_null(rush$queued_tasks)
expect_null(rush$running_tasks)
expect_string(rush$finished_tasks)
expect_null(rush$failed_tasks)
# check fetching
expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
expect_data_table(rush$fetch_running_tasks(), nrows = 0)
expect_data_table(rush$fetch_failed_tasks(), nrows = 0)
data = rush$fetch_finished_tasks()
expect_names(names(data), must.include = c("x1", "x2", "worker_id", "y", "keys"))
expect_data_table(data, nrows = 1)
expect_data_table(rush$fetch_tasks(), nrows = 1)
})
test_that("pushing multiple tasks to the queue works", {
rush = start_rush(n_workers = 2)
on.exit({
rush$reset()
mirai::daemons(0)
})
xss = replicate(10, list(list(x1 = 1, x2 = 2)))
keys = rush$push_tasks(xss)
expect_equal(rush$n_queued_tasks, 10)
worker_ids = rush$start_workers(
worker_loop = wl_queue,
n_workers = 2
)
rush$wait_for_workers(2, timeout = 5)
rush$wait_for_tasks(keys)
# check task count
expect_equal(rush$n_tasks, 10)
expect_equal(rush$n_queued_tasks, 0)
expect_equal(rush$n_running_tasks, 0)
expect_equal(rush$n_finished_tasks, 10)
expect_equal(rush$n_failed_tasks, 0)
# check keys in sets
expect_character(rush$tasks, len = 10)
expect_null(rush$queued_tasks)
expect_null(rush$running_tasks)
expect_character(rush$finished_tasks, len = 10)
expect_null(rush$failed_tasks)
# check fetching
expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
expect_data_table(rush$fetch_running_tasks(), nrows = 0)
expect_data_table(rush$fetch_failed_tasks(), nrows = 0)
data = rush$fetch_finished_tasks()
expect_names(names(data), must.include = c("x1", "x2", "worker_id", "y", "keys"))
expect_data_table(data, nrows = 10)
expect_data_table(rush$fetch_tasks(), nrows = 10)
})
test_that("pushing a task with extras to the queue works", {
rush = start_rush(n_workers = 2)
on.exit({
rush$reset()
mirai::daemons(0)
})
xss = list(list(x1 = 1, x2 = 2))
timestamp = Sys.time()
extra = list(list(timestamp = timestamp))
keys = rush$push_tasks(xss, extra)
expect_equal(rush$n_queued_tasks, 1)
worker_ids = rush$start_workers(
worker_loop = wl_queue,
n_workers = 2
)
rush$wait_for_workers(2, timeout = 5)
rush$wait_for_tasks(keys)
# check task count
expect_equal(rush$n_tasks, 1)
expect_equal(rush$n_queued_tasks, 0)
expect_equal(rush$n_running_tasks, 0)
expect_equal(rush$n_finished_tasks, 1)
expect_equal(rush$n_failed_tasks, 0)
# check keys in sets
expect_string(rush$tasks)
expect_null(rush$queued_tasks)
expect_null(rush$running_tasks)
expect_set_equal(rush$finished_tasks, keys)
expect_null(rush$failed_tasks)
# check fetchingtest-Rush.R:280:3'
expect_false(rush$is_failed_task(keys))
})
test_that("pushing multiple tasks with extras to the queue works", {
rush = start_rush(n_workers = 2)
on.exit({
rush$reset()
mirai::daemons(0)
})
xss = replicate(10, list(list(x1 = 1, x2 = 2)))
timestamp = Sys.time()
extra = list(list(timestamp = timestamp))
keys = rush$push_tasks(xss, extra)
expect_equal(rush$n_queued_tasks, 10)
worker_ids = rush$start_workers(
worker_loop = wl_queue,
n_workers = 2
)
rush$wait_for_workers(2, timeout = 5)
rush$wait_for_tasks(keys)
# check task count
expect_equal(rush$n_tasks, 10)
expect_equal(rush$n_queued_tasks, 0)
expect_equal(rush$n_running_tasks, 0)
expect_equal(rush$n_finished_tasks, 10)
expect_equal(rush$n_failed_tasks, 0)
# check keys in sets
expect_character(rush$tasks, len = 10)
expect_null(rush$queued_tasks)
expect_null(rush$running_tasks)
expect_set_equal(rush$finished_tasks, keys)
expect_null(rush$failed_tasks)
# check fetching
expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
expect_data_table(rush$fetch_running_tasks(), nrows = 0)
expect_data_table(rush$fetch_failed_tasks(), nrows = 0)
data = rush$fetch_finished_tasks()
expect_names(names(data), must.include = c("x1", "x2", "timestamp", "keys"))
expect_data_table(data, nrows = 10)
expect_equal(data$timestamp[[1]], timestamp[[1]])
expect_data_table(rush$fetch_tasks(), nrows = 10)
# status checks
expect_false(any(rush$is_running_task(keys)))
expect_false(any(rush$is_failed_task(keys)))
})
test_that("fetching tasks with vector parameters works", {
rush = start_rush(n_workers = 1)
on.exit({
rush$reset()
mirai::daemons(0)
})
xss = list(
list(x1 = c(1, 2, 3), x2 = c(4, 5, 6)),
list(x1 = c(7, 8, 9), x2 = c(10, 11, 12))
)
yss = list(list(y = 1), list(y = 2))
keys = rush$push_finished_tasks(xss, yss)
data = rush$fetch_finished_tasks()
expect_data_table(data, nrows = 2)
expect_equal(data$x1[[1]], c(1, 2, 3))
expect_equal(data$x1[[2]], c(7, 8, 9))
expect_data_table(rush$fetch_tasks(), nrows = 2)
})
test_that("empty queue works", {
rush = start_rush(n_workers = 1)
on.exit({
rush$reset()
mirai::daemons(0)
})
worker_loop_sleep = function(rush) {
while (TRUE) {
Sys.sleep(1)
}
}
worker_ids = rush$start_workers(
worker_loop = worker_loop_sleep,
n_workers = 1
)
xss = list(list(x1 = 1, x2 = 2))
keys = rush$push_tasks(xss)
Sys.sleep(1)
rush$empty_queue()
expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
expect_data_table(rush$fetch_failed_tasks(), nrows = 1)
xss = list(list(x1 = 2, x2 = 2), list(x1 = 3, x2 = 2))
keys = rush$push_tasks(xss)
Sys.sleep(1)
rush$empty_queue()
expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
expect_data_table(rush$fetch_failed_tasks(), nrows = 3)
})
# segfault detection -----------------------------------------------------------
test_that("segfaults on mirai workers are detected", {
rush = start_rush(n_workers = 2)
on.exit({
rush$reset()
mirai::daemons(0)
})
worker_ids = rush$start_workers(
worker_loop = wl_segfault,
n_workers = 2
)
rush$wait_for_workers(1, timeout = 5)
# wait until a lost worker is detected but timeout after 10 seconds
start_time = Sys.time()
lost_workers = character()
while (start_time + 10 > Sys.time()) {
lost_workers = c(lost_workers, rush$detect_lost_workers())
if (length(lost_workers) == 2) break
}
expect_character(rush$terminated_worker_ids, len = 2)
expect_set_equal(rush$terminated_worker_ids, lost_workers)
expect_data_table(rush$fetch_failed_tasks(), nrows = 2)
data = rush$fetch_failed_tasks()
expect_set_equal(data$message, "Worker has crashed or was killed")
})
test_that("segfaults on processx workers are detected", {
config = redis_configuration()
rush = rsh(config = config)
on.exit({
rush$reset()
walk(rush$processes_processx, function(process) process$kill())
})
worker_ids = rush$start_local_workers(
worker_loop = wl_segfault,
n_workers = 2
)
rush$wait_for_workers(2, timeout = 5)
# wait until a lost worker is detected but timeout after 10 seconds
start_time = Sys.time()
lost_workers = character()
while (start_time + 10 > Sys.time()) {
lost_workers = c(lost_workers, rush$detect_lost_workers())
if (length(lost_workers) == 2) break
}
expect_character(rush$terminated_worker_ids, len = 2)
expect_set_equal(rush$terminated_worker_ids, lost_workers)
expect_data_table(rush$fetch_failed_tasks(), nrows = 2)
data = rush$fetch_failed_tasks()
expect_set_equal(data$message, "Worker has crashed or was killed")
})
test_that("a segfault on a single worker is detected via heartbeat", {
skip_if_not_installed("callr")
config = redis_configuration()
rush = rsh(config = config)
on.exit({
rush$reset()
})
expect_data_table(rush$worker_info, nrows = 0)
script = rush$worker_script(
worker_loop = wl_segfault,
heartbeat_period = 1,
heartbeat_expire = 2
)
px = start_script_worker(script)
rush$wait_for_workers(1, timeout = 10)
on.exit(
{
px$kill()
},
add = TRUE
)
expect_null(rush$terminated_worker_ids)
# wait until a lost worker is detected but timeout after 10 seconds
start_time = Sys.time()
while (start_time + 10 > Sys.time()) {
lost_workers = rush$detect_lost_workers()
if (length(lost_workers)) break
}
expect_character(rush$terminated_worker_ids, len = 1)
expect_set_equal(rush$terminated_worker_ids, lost_workers)
expect_data_table(rush$fetch_failed_tasks(), nrows = 1)
data = rush$fetch_failed_tasks()
expect_equal(data$message, "Worker has crashed or was killed")
})
test_that("segfaults on multiple workers are detected via the heartbeat", {
skip_if_not_installed("callr")
config = redis_configuration()
rush = rsh(config = config)
on.exit({
rush$reset()
})
expect_data_table(rush$worker_info, nrows = 0)
script = rush$worker_script(
worker_loop = wl_segfault,
heartbeat_period = 1,
heartbeat_expire = 2
)
px_1 = start_script_worker(script)
px_2 = start_script_worker(script)
rush$wait_for_workers(2, timeout = 10)
on.exit(
{
px_1$kill()
px_2$kill()
},
add = TRUE
)
expect_null(rush$terminated_worker_ids)
# wait until all lost workers are detected but timeout after 10 seconds
start_time = Sys.time()
lost_workers = character()
while (start_time + 10 > Sys.time()) {
lost_workers = c(lost_workers, rush$detect_lost_workers())
if (length(lost_workers) == 2) break
}
expect_character(rush$terminated_worker_ids, len = 2)
expect_set_equal(rush$terminated_worker_ids, lost_workers)
expect_data_table(rush$fetch_failed_tasks(), nrows = 2)
data = rush$fetch_failed_tasks()
expect_set_equal(data$message, "Worker has crashed or was killed")
})
test_that("wait for tasks works when a task gets lost", {
rush = start_rush(n_workers = 1)
on.exit({
rush$reset()
mirai::daemons(0)
})
worker_ids = rush$start_workers(
worker_loop = wl_segfault,
n_workers = 1
)
rush$wait_for_workers(1, timeout = 5)
xss = list(list(x1 = 1, x2 = 2), list(x1 = 0, x2 = 2))
keys = rush$push_tasks(xss)
expect_class(rush$wait_for_tasks(keys, detect_lost_workers = TRUE), "Rush")
})
# logging ----------------------------------------------------------------------
test_that("saving lgr logs works", {
rush = start_rush(n_workers = 1)
on.exit({
rush$reset()
mirai::daemons(0)
})
worker_ids = rush$start_workers(
worker_loop = wl_queue,
n_workers = 1,
lgr_thresholds = c("mlr3/rush" = "debug")
)
rush$wait_for_workers(1, timeout = 5)
Sys.sleep(5)
xss = list(list(x1 = 2, x2 = 2))
keys = rush$push_tasks(xss)
rush$wait_for_tasks(keys)
Sys.sleep(5)
log = rush$read_log()
expect_data_table(log, min.rows = 1L)
expect_names(names(log), must.include = c("worker_id", "timestamp", "logger", "caller", "msg"))
log = rush$read_log(time_difference = TRUE)
expect_data_table(log, min.rows = 1L)
expect_names(names(log), must.include = c("time_difference"))
expect_class(log$time_difference, "difftime")
xss = list(list(x1 = 1, x2 = 2), list(x1 = 0, x2 = 2), list(x1 = 1, x2 = 2))
keys = rush$push_tasks(xss)
rush$wait_for_tasks(keys)
Sys.sleep(5)
log = rush$read_log()
expect_data_table(log, min.rows = 2L)
expect_names(names(log), must.include = c("worker_id", "timestamp", "logger", "caller", "msg"))
})
test_that("saving logs with redis appender works", {
rush = start_rush(n_workers = 1)
on.exit({
rush$reset()
mirai::daemons(0)
})
worker_ids = rush$start_workers(
worker_loop = wl_queue,
n_workers = 1,
lgr_thresholds = c("mlr3/rush" = "debug"),
lgr_buffer_size = 1
)
rush$wait_for_workers(1, timeout = 5)
log = rush$read_log()
expect_data_table(log, min.rows = 1)
expect_names(colnames(log), identical.to = c("worker_id", "level", "timestamp", "logger", "caller", "msg"))
})
test_that("error and output logs work", {
rush = start_rush(n_workers = 1)
on.exit({
rush$reset()
mirai::daemons(0)
})
message_log = tempdir()
output_log = tempdir()
worker_ids = rush$start_workers(
worker_loop = wl_queue,
n_workers = 1,
lgr_thresholds = c("mlr3/rush" = "debug"),
lgr_buffer_size = 1,
message_log = message_log,
output_log = output_log
)
rush$wait_for_workers(1)
expect_match(
readLines(file.path(message_log, sprintf("message_%s.log", worker_ids[1])))[1],
"Debug message logging on worker"
)
expect_match(
readLines(file.path(output_log, sprintf("output_%s.log", worker_ids[1])))[1],
"Debug output logging on worker"
)
})
# misc--------------------------------------------------------------------------
test_that("caching results works", {
rush = start_rush(n_workers = 2)
on.exit({
rush$reset()
mirai::daemons(0)
})
worker_ids = rush$start_workers(
worker_loop = wl_queue,
n_workers = 2,
lgr_thresholds = c("mlr3/rush" = "debug")
)
rush$wait_for_workers(2, timeout = 5)
xss = replicate(10, list(list(x1 = 1, x2 = 2)))
keys = rush$push_tasks(xss)
rush$wait_for_tasks(keys)
expect_data_table(rush$fetch_finished_tasks(), nrows = 10)
expect_data_table(get_private(rush)$.cached_tasks, nrows = 10)
expect_data_table(rush$fetch_finished_tasks(), nrows = 10)
expect_data_table(get_private(rush)$.cached_tasks, nrows = 10)
xss = replicate(10, list(list(x1 = 1, x2 = 2)))
keys = rush$push_tasks(xss)
rush$wait_for_tasks(keys)
expect_data_table(rush$fetch_finished_tasks(), nrows = 20)
expect_data_table(get_private(rush)$.cached_tasks, nrows = 20)
})
test_that("reconnecting rush instance works", {
rush = start_rush(n_workers = 1)
on.exit({
rush$reset()
mirai::daemons(0)
})
on.exit({
file.remove("rush.rds")
})
saveRDS(rush, file = "rush.rds")
rush = readRDS("rush.rds")
expect_error(rush$print(), "Context is not connected")
rush$reconnect()
expect_r6(rush, "Rush")
})
test_that("large objects limit works", {
rush = start_rush(n_workers = 1)
on.exit({
rush$reset()
mirai::daemons(0)
})
old_max_object_size = getOption("rush.max_object_size")
on.exit(options(rush.max_object_size = old_max_object_size))
options(rush.max_object_size = 1)
large_vector = runif(1e6)
worker_loop = function(rush, large_vector) {
rush$push_running_tasks(list(list(x1 = 1, x2 = length(large_vector))))
}
expect_error(
rush$start_workers(
worker_loop = worker_loop,
large_vector = large_vector,
n_workers = 1
),
class = "Mlr3ErrorConfig"
)
rush_plan(n_workers = 1, large_objects_path = tempdir())
rush$start_workers(
worker_loop = worker_loop,
large_vector = large_vector,
n_workers = 1
)
rush$wait_for_workers(1, timeout = 5)
Sys.sleep(1)
expect_equal(rush$fetch_tasks()$x2, 1e6)
})
test_that("worker_loop environment is stripped before serialization", {
rush = start_rush(n_workers = 1)
on.exit({
rush$reset()
mirai::daemons(0)
})
# large_data (~8 MB) lives in the same local environment as worker_loop.
# without crate(), R's closure mechanism would serialize it along with the function.
local({
large_data = runif(1e6)
worker_loop = function(rush) {
rush$push_running_tasks(list(list(x1 = 1, x2 = 2)))
}
rush$.__enclos_env__$private$.push_worker_config(worker_loop = worker_loop)
r = redux::hiredis(rush$config)
bin = r$GET(sprintf("%s:start_args", rush$network_id))
expect_lt(as.numeric(object.size(bin)), 1e6)
})
})
test_that("simple errors are pushed as failed tasks", {
rush = start_rush(n_workers = 1)
on.exit({
rush$reset()
mirai::daemons(0)
})
worker_ids = rush$start_workers(
worker_loop = wl_fail,
n_workers = 1
)
rush$wait_for_workers(1, timeout = 5)
xss = list(list(x1 = 1, x2 = 2), list(x1 = 0, x2 = 2))
keys = rush$push_tasks(xss)
rush$wait_for_tasks(keys, detect_lost_workers = TRUE)
Sys.sleep(2)
# check task count
expect_equal(rush$n_tasks, 2)
expect_equal(rush$n_queued_tasks, 0)
expect_equal(rush$n_running_tasks, 0)
expect_equal(rush$n_finished_tasks, 1)
expect_equal(rush$n_failed_tasks, 1)
# check keys in sets
expect_character(rush$tasks, len = 2)
expect_null(rush$queued_tasks)
expect_null(rush$running_tasks)
expect_string(rush$finished_tasks)
expect_string(rush$failed_tasks)
# check fetching
expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
expect_data_table(rush$fetch_running_tasks(), nrows = 0)
expect_data_table(rush$fetch_tasks(), nrows = 2)
data = rush$fetch_finished_tasks()
expect_names(names(data), must.include = c("x1", "x2", "worker_id", "y", "keys"))
expect_data_table(data, nrows = 1)
data = rush$fetch_failed_tasks()
expect_names(names(data), must.include = c("x1", "x2", "worker_id", "message", "keys"))
expect_data_table(data, nrows = 1)
})
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.