Nothing
# start workers with processx --------------------------------------------------
test_that("constructing a rush controller works", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config)
expect_class(rush, "Rush")
expect_equal(rush$network_id, "test-rush")
expect_rush_reset(rush)
})
test_that("local workers are started", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config)
expect_data_table(rush$worker_info, nrows = 0)
worker_ids = rush$start_local_workers(
worker_loop = test_worker_loop,
n_workers = 2,
lgr_thresholds = c("mlr3/rush" = "debug"))
rush$wait_for_workers(2, timeout = 5)
# check fields
walk(rush$processes_processx, function(process) expect_class(process, "process"))
# check meta data from redis
worker_info = rush$worker_info
expect_data_table(worker_info, nrows = 2)
expect_integer(worker_info$pid, unique = TRUE)
expect_false(any(worker_info$remote))
expect_set_equal(worker_ids, worker_info$worker_id)
expect_set_equal(rush$worker_ids, worker_ids)
expect_set_equal(rush$worker_states$state, "running")
expect_rush_reset(rush)
})
test_that("local workers are started with Redis on unix socket", {
skip_on_cran()
skip_on_ci() # does not work on github actions runner
system(sprintf("redis-server --port 0 --unixsocket /tmp/redis.sock --daemonize yes --pidfile /tmp/redis.pid --dir %s", tempdir()))
Sys.sleep(5)
config = redux::redis_config(path = "/tmp/redis.sock")
r = redux::hiredis(config)
on.exit({
try({r$SHUTDOWN()}, silent = TRUE)
})
r$FLUSHDB()
rush = rsh(network_id = "test-rush", config = config)
worker_ids = rush$start_local_workers(
worker_loop = test_worker_loop,
n_workers = 2,
lgr_thresholds = c("mlr3/rush" = "debug"))
rush$wait_for_workers(2, timeout = 5)
# check fields
walk(rush$processes_processx, function(process) expect_class(process, "process"))
# check meta data from redis
worker_info = rush$worker_info
expect_data_table(worker_info, nrows = 2)
expect_integer(worker_info$pid, unique = TRUE)
expect_false(any(worker_info$remote))
expect_set_equal(worker_ids, worker_info$worker_id)
expect_set_equal(rush$worker_ids, worker_ids)
expect_set_equal(rush$worker_states$state, "running")
})
test_that("additional workers are started", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config)
worker_ids = rush$start_local_workers(
worker_loop = test_worker_loop,
n_workers = 2,
lgr_thresholds = c("mlr3/rush" = "debug"))
rush$wait_for_workers(2, timeout = 5)
expect_equal(rush$n_workers, 2)
worker_ids_2 = rush$start_local_workers(
worker_loop = test_worker_loop,
n_workers = 2,
lgr_thresholds = c("mlr3/rush" = "debug"))
rush$wait_for_workers(4, timeout = 5)
expect_length(rush$processes_processx, 4)
walk(rush$processes_processx, function(process) expect_class(process, "process"))
worker_info = rush$worker_info
expect_data_table(worker_info, nrows = 4)
expect_set_equal(c(worker_ids, worker_ids_2), worker_info$worker_id)
expect_integer(worker_info$pid, unique = TRUE)
expect_false(any(worker_info$remote))
expect_set_equal(rush$worker_states$state, "running")
expect_rush_reset(rush)
})
test_that("packages are available on the worker", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config)
worker_ids = rush$start_local_workers(
worker_loop = test_worker_loop,
n_workers = 2,
packages = "uuid",
lgr_thresholds = c("mlr3/rush" = "debug"))
rush$wait_for_workers(2, timeout = 5)
expect_equal(rush$n_workers, 2)
xss = list(list(x1 = 1, x2 = 2))
keys = rush$push_tasks(xss)
rush$wait_for_tasks(keys, detect_lost_workers = TRUE)
expect_equal(rush$n_finished_tasks, 1)
expect_rush_reset(rush)
})
# test_that("globals are available on the worker", {
# skip_on_cran()
# config = start_flush_redis()
# rush = rsh(network_id = "test-rush", config = config)
# worker_loop = function(rush) {
# while(!rush$terminated && !rush$terminated_on_idle) {
# task = rush$pop_task(fields = c("xs", "seed"))
# if (!is.null(task)) {
# tryCatch({
# # evaluate task with seed
# fun = function(x1, x2, ...) list(y = x)
# ys = with_rng_state(fun, args = c(task$xs), seed = task$seed)
# rush$push_results(task$key, yss = list(ys))
# }, error = function(e) {
# condition = list(message = e$message)
# rush$push_failed(task$key, conditions = list(condition))
# })
# }
# }
# return(NULL)
# }
# x <<- 33
# worker_ids = rush$start_local_workers(
# worker_loop = worker_loop,
# n_workers = 2,
# globals = "x",
# lgr_thresholds = c("mlr3/rush" = "debug"))
# rush$wait_for_workers(2, timeout = 5)
# expect_equal(rush$n_workers, 2)
# xss = list(list(x1 = 1, x2 = 2))
# keys = rush$push_tasks(xss)
# rush$wait_for_tasks(keys, detect_lost_workers = TRUE)
# expect_equal(rush$n_finished_tasks, 1)
# expect_equal(rush$fetch_finished_tasks()$y, 33)
# expect_rush_reset(rush)
# })
# test_that("named globals are available on the worker", {
# skip_on_cran()
# config = start_flush_redis()
# rush = rsh(network_id = "test-rush", config = config)
# worker_loop = function(rush) {
# while(!rush$terminated && !rush$terminated_on_idle) {
# task = rush$pop_task(fields = c("xs", "seed"))
# if (!is.null(task)) {
# tryCatch({
# # evaluate task with seed
# fun = function(x1, x2, ...) list(y = z)
# ys = with_rng_state(fun, args = c(task$xs), seed = task$seed)
# rush$push_results(task$key, yss = list(ys))
# }, error = function(e) {
# condition = list(message = e$message)
# rush$push_failed(task$key, conditions = list(condition))
# })
# }
# }
# return(NULL)
# }
# x <<- 33
# worker_ids = rush$start_local_workers(
# worker_loop = worker_loop,
# n_workers = 2,
# globals = c(z = "x"),
# lgr_thresholds = c("mlr3/rush" = "debug"))
# rush$wait_for_workers(2, timeout = 5)
# xss = list(list(x1 = 1, x2 = 2))
# keys = rush$push_tasks(xss)
# rush$wait_for_tasks(keys, detect_lost_workers = TRUE)
# expect_equal(rush$n_finished_tasks, 1)
# expect_equal(rush$fetch_finished_tasks()$y, 33)
# expect_rush_reset(rush)
# })
# start workers with mirai -----------------------------------------------------
test_that("mirai workers are started", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config)
expect_data_table(rush$worker_info, nrows = 0)
mirai::daemons(2)
worker_ids = rush$start_remote_workers(
worker_loop = test_worker_loop,
n_workers = 2,
lgr_thresholds = c("mlr3/rush" = "debug"))
rush$wait_for_workers(2, timeout = 5)
# check fields
walk(rush$processes_mirai, function(process) expect_class(process, "mirai"))
# check meta data from redis
worker_info = rush$worker_info
expect_data_table(worker_info, nrows = 2)
expect_integer(worker_info$pid, unique = TRUE)
expect_true(all(worker_info$remote))
expect_set_equal(worker_ids, worker_info$worker_id)
expect_set_equal(rush$worker_ids, worker_ids)
expect_set_equal(rush$worker_states$state, "running")
expect_rush_reset(rush)
daemons(0)
})
test_that("new mirai workers can be started on used daemons", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config)
expect_data_table(rush$worker_info, nrows = 0)
mirai::daemons(2)
worker_loop = function(rush) {
xs = list(x1 = 1, x2 = 2)
key = rush$push_running_tasks(list(xs))
ys = list(y = xs$x1 + xs$x2)
rush$push_results(key, yss = list(ys))
}
worker_ids = rush$start_remote_workers(
worker_loop = worker_loop,
n_workers = 2,
lgr_thresholds = c("mlr3/rush" = "debug"))
rush$wait_for_workers(2, timeout = 5)
Sys.sleep(1)
# check fields
walk(rush$processes_mirai, function(process) expect_class(process, "mirai"))
expect_equal(mirai::status()$mirai["completed"], c(completed = 2))
rush = rsh(network_id = "test-rush-2", config = config)
worker_ids = rush$start_remote_workers(
worker_loop = worker_loop,
n_workers = 2,
lgr_thresholds = c("mlr3/rush" = "debug"))
rush$wait_for_workers(2, timeout = 5)
expect_data_table(rush$fetch_finished_tasks(), nrows = 2)
expect_equal(mirai::status()$mirai["completed"], c(completed = 4))
r = redux::hiredis()
r$FLUSHDB()
expect_rush_reset(rush)
daemons(0)
})
# start workers with script -----------------------------------------------------
test_that("heartbeat process is started", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config)
expect_data_table(rush$worker_info, nrows = 0)
rush$worker_script(
worker_loop = test_worker_loop,
heartbeat_period = 3,
heartbeat_expire = 9,
lgr_thresholds = c("mlr3/rush" = "debug"))
px = processx::process$new("Rscript",
args = c("-e", "rush::start_worker(network_id = 'test-rush', config = list(url = 'redis://127.0.0.1:6379', scheme = 'redis', host = '127.0.0.1', port = '6379'), remote = TRUE, lgr_thresholds = c('mlr3/rush' = 'debug'), lgr_buffer_size = 0, heartbeat_period = 3, heartbeat_expire = 9)"),
supervise = TRUE,
stderr = "|", stdout = "|")
on.exit({
px$kill()
}, add = TRUE)
Sys.sleep(5)
worker_info = rush$worker_info
expect_logical(worker_info$heartbeat)
})
# terminate workers ------------------------------------------------------------
test_that("a worker is terminated", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config)
worker_loop = function(rush) {
while(!rush$terminated && !rush$terminated_on_idle) {
task = rush$pop_task(fields = c("xs", "seed"))
if (!is.null(task)) {
tryCatch({
# evaluate task with seed
fun = function(x1, x2) list(y = x1 + x2)
ys = with_rng_state(fun, args = c(task$xs), seed = task$seed)
rush$push_results(task$key, yss = list(ys))
}, error = function(e) {
condition = list(message = e$message)
rush$push_failed(task$key, conditions = list(condition))
})
}
}
return(NULL)
}
worker_ids = rush$start_local_workers(
worker_loop = worker_loop,
n_workers = 2,
lgr_thresholds = c("mlr3/rush" = "debug"))
rush$wait_for_workers(2, timeout = 5)
worker_id_1 = rush$running_worker_ids[1]
worker_id_2 = rush$running_worker_ids[2]
# worker 1
rush$stop_workers(worker_ids = worker_id_1, type = "terminate")
Sys.sleep(3)
expect_false(rush$processes_processx[[worker_id_1]]$is_alive())
expect_equal(rush$running_worker_ids, worker_id_2)
expect_equal(worker_id_1, rush$terminated_worker_ids)
# worker 2
rush$stop_workers(worker_ids = worker_id_2, type = "terminate")
Sys.sleep(3)
expect_false(rush$processes_processx[[worker_id_2]]$is_alive())
expect_set_equal(c(worker_id_1, worker_id_2), rush$terminated_worker_ids)
expect_null(rush$running_worker_ids)
expect_rush_reset(rush, type = "terminate")
})
# kill workers -----------------------------------------------------------------
test_that("a local worker is killed", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config)
worker_ids = rush$start_local_workers(
worker_loop = test_worker_loop,
n_workers = 2,
lgr_thresholds = c("mlr3/rush" = "debug"))
rush$wait_for_workers(2, timeout = 5)
worker_id_1 = rush$running_worker_ids[1]
worker_id_2 = rush$running_worker_ids[2]
rush$worker_info
# worker 1
rush$stop_workers(worker_ids = worker_id_1, type = "kill")
Sys.sleep(1)
expect_equal(worker_id_1, rush$killed_worker_ids)
expect_false(rush$processes_processx[[worker_id_1]]$is_alive())
expect_true(rush$processes_processx[[worker_id_2]]$is_alive())
# worker 2
rush$stop_workers(worker_ids = worker_id_2, type = "kill")
Sys.sleep(1)
expect_set_equal(c(worker_id_1, worker_id_2), rush$killed_worker_ids)
expect_false(rush$processes_processx[[worker_id_1]]$is_alive())
expect_false(rush$processes_processx[[worker_id_2]]$is_alive())
expect_rush_reset(rush)
})
test_that("a mirai worker is killed", {
skip_on_cran()
config = start_flush_redis()
on.exit({
mirai::daemons(0)
}, add = TRUE)
mirai::daemons(2)
rush = rsh(network_id = "test-rush", config = config)
worker_ids = rush$start_remote_workers(
worker_loop = test_worker_loop,
n_workers = 2,
lgr_thresholds = c("mlr3/rush" = "debug"))
rush$wait_for_workers(2, timeout = 5)
worker_id_1 = rush$running_worker_ids[1]
worker_id_2 = rush$running_worker_ids[2]
worker_info = rush$worker_info
expect_true(all(tools::pskill(worker_info$pid, signal = 0L)))
# worker 1
rush$stop_workers(worker_ids = worker_id_1, type = "kill")
Sys.sleep(1)
expect_equal(worker_id_1, rush$killed_worker_ids)
expect_equal(rush$running_worker_ids, worker_id_2)
expect_true(mirai::is_error_value(rush$processes_mirai[[worker_id_1]]$data))
expect_false(mirai::is_error_value(rush$processes_mirai[[worker_id_2]]$data))
# worker 2
rush$stop_workers(worker_ids = worker_id_2, type = "kill")
Sys.sleep(1)
expect_set_equal(c(worker_id_1, worker_id_2), rush$killed_worker_ids)
expect_true(mirai::is_error_value(rush$processes_mirai[[worker_id_1]]$data))
expect_true(mirai::is_error_value(rush$processes_mirai[[worker_id_2]]$data))
expect_rush_reset(rush)
})
test_that("worker is killed with a heartbeat process", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config)
expect_data_table(rush$worker_info, nrows = 0)
rush$worker_script(
worker_loop = test_worker_loop,
heartbeat_period = 3,
heartbeat_expire = 9,
lgr_thresholds = c("mlr3/rush" = "debug"))
px = processx::process$new("Rscript",
args = c("-e", "rush::start_worker(network_id = 'test-rush', config = list(url = 'redis://127.0.0.1:6379', scheme = 'redis', host = '127.0.0.1', port = '6379'), remote = TRUE, lgr_thresholds = c('mlr3/rush' = 'debug'), lgr_buffer_size = 0, heartbeat_period = 3, heartbeat_expire = 9)"),
supervise = TRUE,
stderr = "|", stdout = "|")
on.exit({
px$kill()
}, add = TRUE)
Sys.sleep(5)
worker_info = rush$worker_info
expect_logical(worker_info$heartbeat)
# signal 0L returns TRUE if the process is still alive
expect_true(tools::pskill(worker_info$pid, signal = 0L))
rush$stop_workers(type = "kill")
Sys.sleep(1)
expect_false(tools::pskill(worker_info$pid, signal = 0L))
expect_true(rush$worker_states$state == "killed")
expect_equal(rush$killed_worker_ids, worker_info$worker_id)
expect_rush_reset(rush)
})
# low level read and write -----------------------------------------------------
test_that("reading and writing a hash works with flatten", {
skip_on_cran()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
# one field with list
key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)))
expect_equal(rush$read_hashes(key, "xs"), list(list(x1 = 1, x2 = 2)))
# one field with atomic
key = rush$write_hashes(timeout = 1)
expect_equal(rush$read_hashes(key, "timeout"), list(list(timeout = 1)))
# two fields with lists
key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), ys = list(list(y = 3)))
expect_equal(rush$read_hashes(key, c("xs", "ys")), list(list(x1 = 1, x2 = 2, y = 3)))
# two fields with list and empty list
key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), ys = list())
expect_equal(rush$read_hashes(key, c("xs", "ys")), list(list(x1 = 1, x2 = 2)))
# two fields with list and atomic
key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), timeout = 1)
expect_equal(rush$read_hashes(key, c("xs", "timeout")), list(list(x1 = 1, x2 = 2, timeout = 1)))
})
test_that("reading and writing a hash works without flatten", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config)
# one field with list
key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)))
expect_equal(rush$read_hashes(key, "xs", flatten = FALSE), list(list(xs = list(x1 = 1, x2 = 2))))
# one field with atomic
key = rush$write_hashes(timeout = 1)
expect_equal(rush$read_hashes(key, "timeout", flatten = FALSE), list(list(timeout = 1)))
# two fields with lists
key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), ys = list(list(y = 3)))
expect_equal(rush$read_hashes(key, c("xs", "ys"), flatten = FALSE), list(list(xs = list(x1 = 1, x2 = 2), ys = list(y = 3))))
# two fields with list and empty list
key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), ys = list())
expect_equal(rush$read_hashes(key, c("xs", "ys"), flatten = FALSE), list(list(xs = list(x1 = 1, x2 = 2), ys = NULL)))
# two fields with list and atomic
key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), timeout = 1)
expect_equal(rush$read_hashes(key, c("xs", "timeout"), flatten = FALSE), list(list(xs = list(x1 = 1, x2 = 2), timeout = 1)))
})
test_that("reading and writing hashes works", {
skip_on_cran()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
# one field with list
keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)))
expect_equal(rush$read_hashes(keys, "xs"), list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)))
# one field atomic
keys = rush$write_hashes(timeout = c(1, 1))
expect_equal(rush$read_hashes(keys, "timeout"), list(list(timeout = 1), list(timeout = 1)))
# two fields with list and recycled atomic
keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), timeout = 1)
expect_equal(rush$read_hashes(keys, c("xs", "timeout")), list(list(x1 = 1, x2 = 2, timeout = 1), list(x1 = 1, x2 = 3, timeout = 1)))
# two fields
keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), ys = list(list(y = 3), list(y = 4)))
expect_equal(rush$read_hashes(keys, c("xs", "ys")), list(list(x1 = 1, x2 = 2, y = 3), list(x1 = 1, x2 = 3, y = 4)))
# two fields with list and atomic
keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), timeout = c(1, 1))
expect_equal(rush$read_hashes(keys, c("xs", "timeout")), list(list(x1 = 1, x2 = 2, timeout = 1), list(x1 = 1, x2 = 3, timeout = 1)))
# two fields with list and recycled atomic
keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), timeout = 1)
expect_equal(rush$read_hashes(keys, c("xs", "timeout")), list(list(x1 = 1, x2 = 2, timeout = 1), list(x1 = 1, x2 = 3, timeout = 1)))
# two fields, one empty
keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), ys = list())
expect_equal(rush$read_hashes(keys, c("xs", "ys")), list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)))
# recycle
keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), ys = list(list(y = 3)))
expect_equal(rush$read_hashes(keys, c("xs", "ys")), list(list(x1 = 1, x2 = 2, y = 3), list(x1 = 1, x2 = 3, y = 3)))
})
test_that("writing hashes to specific keys works", {
skip_on_cran()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
# one element
keys = uuid::UUIDgenerate()
rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), keys = keys)
expect_equal(rush$read_hashes(keys, "xs"), list(list(x1 = 1, x2 = 2)))
# two elements
keys = uuid::UUIDgenerate(n = 2)
rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), keys = keys)
expect_equal(rush$read_hashes(keys, "xs"), list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)))
# wrong number of keys
keys = uuid::UUIDgenerate()
expect_error(rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), keys = keys), "Assertion on 'keys' failed")
})
test_that("writing list columns works", {
skip_on_cran()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), xs_extra = list(list(extra = list("A"))))
rush$connector$command(c("LPUSH", "test-rush:finished_tasks", keys))
expect_list(rush$fetch_finished_tasks()$extra, len = 1)
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), xs_extra = list(list(extra = list(letters[1:3]))))
rush$connector$command(c("LPUSH", "test-rush:finished_tasks", keys))
expect_list(rush$fetch_finished_tasks()$extra, len = 1)
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 2, x2 = 2)), xs_extra = list(list(extra = list("A")), list(extra = list("B"))))
rush$connector$command(c("LPUSH", "test-rush:finished_tasks", keys))
rush$read_hashes(keys, c("xs", "xs_extra"))
expect_list(rush$fetch_finished_tasks()$extra, len = 2)
})
# task evaluation --------------------------------------------------------------
test_that("evaluating a task works", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config)
worker_ids = rush$start_local_workers(
worker_loop = test_worker_loop,
n_workers = 4,
lgr_thresholds = c("mlr3/rush" = "debug"))
rush$wait_for_workers(2, timeout = 5)
xss = list(list(x1 = 1, x2 = 2))
keys = rush$push_tasks(xss)
rush$wait_for_tasks(keys)
# check task count
expect_equal(rush$n_tasks, 1)
expect_equal(rush$n_queued_tasks, 0)
expect_equal(rush$n_running_tasks, 0)
expect_equal(rush$n_finished_tasks, 1)
expect_equal(rush$n_failed_tasks, 0)
# check keys in sets
expect_string(rush$tasks)
expect_null(rush$queued_tasks)
expect_null(rush$running_tasks)
expect_string(rush$finished_tasks)
expect_null(rush$failed_tasks)
# check fetching
expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
expect_data_table(rush$fetch_running_tasks(), nrows = 0)
expect_data_table(rush$fetch_failed_tasks(), nrows = 0)
data = rush$fetch_finished_tasks()
expect_names(names(data), must.include = c("x1", "x2", "worker_id", "y", "keys"))
expect_data_table(data, nrows = 1)
expect_data_table(rush$fetch_tasks(), nrows = 1)
expect_rush_reset(rush)
})
test_that("evaluating tasks works", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config)
worker_ids = rush$start_local_workers(
worker_loop = test_worker_loop,
n_workers = 4,
lgr_thresholds = c("mlr3/rush" = "debug"))
rush$wait_for_workers(2, timeout = 5)
xss = replicate(10, list(list(x1 = 1, x2 = 2)))
keys = rush$push_tasks(xss)
rush$wait_for_tasks(keys)
# check task count
expect_equal(rush$n_tasks, 10)
expect_equal(rush$n_queued_tasks, 0)
expect_equal(rush$n_running_tasks, 0)
expect_equal(rush$n_finished_tasks, 10)
expect_equal(rush$n_failed_tasks, 0)
# check keys in sets
expect_character(rush$tasks, len = 10)
expect_null(rush$queued_tasks)
expect_null(rush$running_tasks)
expect_character(rush$finished_tasks, len = 10)
expect_null(rush$failed_tasks)
# check fetching
expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
expect_data_table(rush$fetch_running_tasks(), nrows = 0)
expect_data_table(rush$fetch_failed_tasks(), nrows = 0)
data = rush$fetch_finished_tasks()
expect_names(names(data), must.include = c("x1", "x2", "worker_id", "y", "keys"))
expect_data_table(data, nrows = 10)
expect_data_table(rush$fetch_tasks(), nrows = 10)
expect_rush_reset(rush)
})
test_that("caching results works", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config)
worker_ids = rush$start_local_workers(
worker_loop = test_worker_loop,
n_workers = 4,
lgr_thresholds = c("mlr3/rush" = "debug"))
rush$wait_for_workers(2, timeout = 5)
xss = replicate(10, list(list(x1 = 1, x2 = 2)))
keys = rush$push_tasks(xss)
rush$wait_for_tasks(keys)
expect_data_table(rush$fetch_finished_tasks(), nrows = 10)
expect_list(get_private(rush)$.cached_tasks, len = 10)
expect_list(rush$fetch_finished_tasks(data_format = "list"), len = 10)
expect_list(get_private(rush)$.cached_tasks, len = 10)
expect_data_table(rush$fetch_finished_tasks(), nrows = 10)
expect_list(get_private(rush)$.cached_tasks, len = 10)
expect_list(rush$fetch_finished_tasks(data_format = "list"), len = 10)
expect_list(get_private(rush)$.cached_tasks, len = 10)
xss = replicate(10, list(list(x1 = 1, x2 = 2)))
keys = rush$push_tasks(xss)
rush$wait_for_tasks(keys)
expect_data_table(rush$fetch_finished_tasks(), nrows = 20)
expect_list(get_private(rush)$.cached_tasks, len = 20)
expect_list(rush$fetch_finished_tasks(data_format = "list"), len = 20)
expect_list(get_private(rush)$.cached_tasks, len = 20)
})
# segfault detection -----------------------------------------------------------
test_that("a segfault on a local worker is detected", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config)
worker_loop = function(rush) {
while(TRUE) {
Sys.sleep(1)
xs = list(x1 = 1, x2 = 2)
key = rush$push_running_tasks(list(xs))
get("attach")(structure(list(), class = "UserDefinedDatabase"))
}
}
worker_ids = rush$start_local_workers(
worker_loop = worker_loop,
n_workers = 1,
lgr_thresholds = c("mlr3/rush" = "debug"))
rush$wait_for_workers(1, timeout = 5)
Sys.sleep(3)
expect_null(rush$lost_worker_ids)
rush$detect_lost_workers()
expect_equal(rush$lost_worker_ids, worker_ids)
rush$fetch_failed_tasks()
expect_rush_reset(rush)
})
test_that("a segfault on a mirai worker", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config)
worker_loop = function(rush) {
while(TRUE) {
Sys.sleep(1)
xs = list(x1 = 1, x2 = 2)
key = rush$push_running_tasks(list(xs))
get("attach")(structure(list(), class = "UserDefinedDatabase"))
}
}
mirai::daemons(1)
worker_ids = rush$start_remote_workers(
worker_loop = worker_loop,
n_workers = 1,
lgr_thresholds = c("mlr3/rush" = "debug"))
rush$wait_for_workers(1, timeout = 5)
Sys.sleep(3)
expect_null(rush$lost_worker_ids)
rush$detect_lost_workers()
expect_equal(rush$lost_worker_ids, worker_ids)
rush$fetch_failed_tasks()
expect_rush_reset(rush)
})
test_that("a segfault on a worker is detected via the heartbeat", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config)
expect_data_table(rush$worker_info, nrows = 0)
worker_loop = function(rush) {
while(TRUE) {
Sys.sleep(1)
xs = list(x1 = 1, x2 = 2)
key = rush$push_running_tasks(list(xs))
get("attach")(structure(list(), class = "UserDefinedDatabase"))
}
}
rush$worker_script(
worker_loop = worker_loop,
heartbeat_period = 1,
heartbeat_expire = 2,
lgr_thresholds = c("mlr3/rush" = "debug"))
px = processx::process$new("Rscript",
args = c("-e", "rush::start_worker(network_id = 'test-rush', config = list(url = 'redis://127.0.0.1:6379', scheme = 'redis', host = '127.0.0.1', port = '6379'), remote = TRUE, lgr_thresholds = c('mlr3/rush' = 'debug'), lgr_buffer_size = 0, heartbeat_period = 1, heartbeat_expire = 2)"),
supervise = TRUE,
stderr = "|", stdout = "|")
on.exit({
px$kill()
}, add = TRUE)
Sys.sleep(10)
expect_null(rush$lost_worker_ids)
rush$detect_lost_workers()
expect_string(rush$lost_worker_ids)
expect_rush_reset(rush)
})
test_that("segfaults on workers are detected via the heartbeat", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config)
expect_data_table(rush$worker_info, nrows = 0)
worker_loop = function(rush) {
while(TRUE) {
Sys.sleep(1)
xs = list(x1 = 1, x2 = 2)
key = rush$push_running_tasks(list(xs))
get("attach")(structure(list(), class = "UserDefinedDatabase"))
}
}
rush$worker_script(
worker_loop = worker_loop,
heartbeat_period = 1,
heartbeat_expire = 2,
lgr_thresholds = c("mlr3/rush" = "debug"))
px_1 = processx::process$new("Rscript",
args = c("-e", "rush::start_worker(network_id = 'test-rush', config = list(url = 'redis://127.0.0.1:6379', scheme = 'redis', host = '127.0.0.1', port = '6379'), remote = TRUE, lgr_thresholds = c('mlr3/rush' = 'debug'), lgr_buffer_size = 0, heartbeat_period = 1, heartbeat_expire = 2)"),
supervise = TRUE,
stderr = "|", stdout = "|")
px_2 = processx::process$new("Rscript",
args = c("-e", "rush::start_worker(network_id = 'test-rush', config = list(url = 'redis://127.0.0.1:6379', scheme = 'redis', host = '127.0.0.1', port = '6379'), remote = TRUE, lgr_thresholds = c('mlr3/rush' = 'debug'), lgr_buffer_size = 0, heartbeat_period = 1, heartbeat_expire = 2)"),
supervise = TRUE,
stderr = "|", stdout = "|")
on.exit({
px_1$kill()
px_2$kill()
}, add = TRUE)
Sys.sleep(10)
expect_null(rush$lost_worker_ids)
rush$detect_lost_workers()
expect_character(rush$lost_worker_ids, len = 2)
expect_rush_reset(rush)
})
# fault detection --------------------------------------------------------------
test_that("a lost task is detected", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config)
worker_loop = function(rush) {
while(TRUE) {
Sys.sleep(1)
xs = list(x1 = 1, x2 = 2)
key = rush$push_running_tasks(list(xs))
get("attach")(structure(list(), class = "UserDefinedDatabase"))
}
}
worker_ids = rush$start_local_workers(
worker_loop = worker_loop,
n_workers = 1,
lgr_thresholds = c("mlr3/rush" = "debug"))
rush$wait_for_workers(1, timeout = 5)
Sys.sleep(5)
rush$detect_lost_workers()
# check task count
expect_equal(rush$n_tasks, 1)
expect_equal(rush$n_queued_tasks, 0)
expect_equal(rush$n_running_tasks, 0)
expect_equal(rush$n_finished_tasks, 0)
expect_equal(rush$n_failed_tasks, 1)
# check keys in sets
expect_character(rush$tasks, len = 1)
expect_null(rush$queued_tasks)
expect_null(rush$running_tasks)
expect_null(rush$finished_tasks)
expect_string(rush$failed_tasks)
# check fetching
expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
expect_data_table(rush$fetch_running_tasks(), nrows = 0)
expect_data_table(rush$fetch_tasks(), nrows = 1)
data = rush$fetch_failed_tasks()
expect_names(names(data), must.include = c("x1", "x2", "worker_id", "message", "keys"))
expect_data_table(data, nrows = 1)
expect_equal(data$message, "Worker has crashed or was killed")
expect_class(rush$detect_lost_workers(), "Rush")
expect_rush_reset(rush)
})
# restart workers --------------------------------------------------------------
test_that("restarting a local worker works", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config)
worker_ids = rush$start_local_workers(
worker_loop = test_worker_loop,
n_workers = 4,
lgr_thresholds = c("mlr3/rush" = "debug"))
rush$wait_for_workers(2, timeout = 5)
worker_id_1 = rush$running_worker_ids[1]
worker_id_2 = rush$running_worker_ids[2]
tools::pskill(rush$worker_info[worker_id == worker_id_1, pid])
Sys.sleep(1)
expect_false(rush$processes_processx[[worker_id_1]]$is_alive())
rush$detect_lost_workers(restart_local_workers = TRUE)
expect_true(rush$processes_processx[[worker_id_1]]$is_alive())
expect_rush_reset(rush)
})
test_that("restarting a worker kills the local worker", {
skip_on_cran()
skip_on_os("windows")
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config)
worker_ids = rush$start_local_workers(
worker_loop = test_worker_loop,
n_workers = 1,
lgr_thresholds = c("mlr3/rush" = "debug"))
rush$wait_for_workers(1, timeout = 5)
pid = rush$worker_info$pid
worker_id = rush$running_worker_ids
expect_true(tools::pskill(pid, signal = 0))
rush$restart_workers(worker_ids = worker_id)
Sys.sleep(1)
expect_false(pid == rush$worker_info$pid)
expect_false(tools::pskill(pid, signal = 0))
expect_rush_reset(rush)
})
test_that("restarting a remote worker works", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config)
worker_loop = function(rush) {
while(TRUE) {
Sys.sleep(1)
xs = list(x1 = 1, x2 = 2)
key = rush$push_running_tasks(list(xs))
get("attach")(structure(list(), class = "UserDefinedDatabase"))
}
}
on.exit({
mirai::daemons(0)
}, add = TRUE)
mirai::daemons(1)
worker_ids = rush$start_remote_workers(
worker_loop = worker_loop,
n_workers = 1,
lgr_thresholds = c("mlr3/rush" = "debug"))
rush$wait_for_workers(1, timeout = 5)
Sys.sleep(3)
expect_null(rush$lost_worker_ids)
rush$detect_lost_workers()
expect_equal(rush$lost_worker_ids, worker_ids)
expect_data_table(rush$fetch_failed_tasks(), nrows = 1)
mirai::daemons(0)
mirai::daemons(1)
rush$restart_workers(worker_ids = worker_ids)
Sys.sleep(3)
rush$detect_lost_workers()
expect_equal(rush$lost_worker_ids, worker_ids)
expect_data_table(rush$fetch_failed_tasks(), nrows = 2)
expect_rush_reset(rush)
})
# receiving results ------------------------------------------------------------
# test_that("blocking on new results works", {
# skip_on_cran()
# config = start_flush_redis()
# rush = rsh(network_id = "test-rush", config = config)
# worker_loop = function(rush) {
# while(!rush$terminated && !rush$terminated_on_idle) {
# task = rush$pop_task(fields = c("xs", "seed"))
# if (!is.null(task)) {
# tryCatch({
# # evaluate task with seed
# fun = function(x1, x2, ...) {
# Sys.sleep(5)
# list(y = x1 + x2)
# }
# ys = with_rng_state(fun, args = c(task$xs), seed = task$seed)
# rush$push_results(task$key, yss = list(ys))
# }, error = function(e) {
# condition = list(message = e$message)
# rush$push_failed(task$key, conditions = list(condition))
# })
# }
# }
# return(NULL)
# }
# worker_ids = rush$start_local_workers(
# worker_loop = worker_loop,
# n_workers = 1,
# lgr_thresholds = c("mlr3/rush" = "debug"))
# rush$wait_for_workers(1, timeout = 5)
# xss = list(list(x1 = 1, x2 = 2))
# keys = rush$push_tasks(xss)
# expect_data_table(rush$wait_for_new_tasks(timeout = 1), nrows = 0)
# expect_data_table(rush$wait_for_new_tasks(timeout = 10), nrows = 1)
# expect_data_table(rush$wait_for_new_tasks(timeout = 1), nrows = 0)
# expect_rush_reset(rush)
# })
test_that("wait for tasks works when a task gets lost", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config)
worker_ids = rush$start_local_workers(
worker_loop = segfault_worker_loop,
n_workers = 2,
lgr_thresholds = c("mlr3/rush" = "debug"))
rush$wait_for_workers(2, timeout = 5)
xss = list(list(x1 = 1, x2 = 2), list(x1 = 0, x2 = 2))
keys = rush$push_tasks(xss)
expect_class(rush$wait_for_tasks(keys, detect_lost_workers = TRUE), "Rush")
expect_rush_reset(rush)
})
# misc--------------------------------------------------------------------------
test_that("saving lgr logs works", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config)
worker_ids = rush$start_local_workers(
worker_loop = test_worker_loop,
n_workers = 1,
lgr_thresholds = c("mlr3/rush" = "debug"))
rush$wait_for_workers(1, timeout = 5)
Sys.sleep(5)
xss = list(list(x1 = 2, x2 = 2))
keys = rush$push_tasks(xss)
rush$wait_for_tasks(keys)
Sys.sleep(5)
log = rush$read_log()
expect_data_table(log, min.rows = 1L)
expect_names(names(log), must.include = c("worker_id", "timestamp", "logger", "caller", "msg"))
log = rush$read_log(time_difference = TRUE)
expect_data_table(log, min.rows = 1L)
expect_names(names(log), must.include = c("time_difference"))
expect_class(log$time_difference, "difftime")
xss = list(list(x1 = 1, x2 = 2), list(x1 = 0, x2 = 2), list(x1 = 1, x2 = 2))
keys = rush$push_tasks(xss)
rush$wait_for_tasks(keys)
Sys.sleep(5)
log = rush$read_log()
expect_data_table(log, min.rows = 2L)
expect_names(names(log), must.include = c("worker_id", "timestamp", "logger", "caller", "msg"))
expect_rush_reset(rush)
})
test_that("snapshot option works", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config)
worker_ids = rush$start_local_workers(
worker_loop = test_worker_loop,
n_workers = 1,
lgr_thresholds = c("mlr3/rush" = "debug"))
rush$wait_for_workers(1, timeout = 5)
rush$snapshot_schedule = c(1, 1)
expect_equal(rush$connector$CONFIG_GET("save")[[2]], "1 1")
expect_equal(rush$snapshot_schedule, c(1, 1))
rush$snapshot_schedule = NULL
expect_equal(rush$connector$CONFIG_GET("save")[[2]], "")
expect_equal(rush$snapshot_schedule, "")
expect_rush_reset(rush)
})
test_that("terminating workers on idle works", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config)
worker_ids = rush$start_local_workers(
worker_loop = test_worker_loop,
n_workers = 2,
lgr_thresholds = c("mlr3/rush" = "debug"))
rush$wait_for_workers(2, timeout = 5)
xss = list(list(x1 = 1, x2 = 2))
keys = rush$push_tasks(xss, terminate_workers = TRUE)
rush$wait_for_tasks(keys)
Sys.sleep(5)
expect_set_equal(rush$worker_states$state, "terminated")
expect_rush_reset(rush)
})
test_that("reconnecting rush instance works", {
skip_on_cran()
on.exit({
file.remove("rush.rds")
})
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config)
saveRDS(rush, file = "rush.rds")
rush = readRDS("rush.rds")
expect_error(rush$print(), "Context is not connected")
rush$reconnect()
expect_r6(rush, "Rush")
expect_rush_reset(rush)
})
# seed -------------------------------------------------------------------------
test_that("seeds are generated from regular rng seed", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config, seed = 123)
rush$push_tasks(list(list(x1 = 1, x2 = 2)))
tab = rush$fetch_tasks(fields = c("xs", "seed"))
expect_true(is_lecyer_cmrg_seed(tab$seed[[1]]))
rush$push_tasks(list(list(x1 = 2, x2 = 2), list(x1 = 3, x2 = 2)))
tab = rush$fetch_tasks(fields = c("xs", "seed"))
expect_true(tab$seed[[1]][2] != tab$seed[[2]][2])
expect_true(tab$seed[[2]][2] != tab$seed[[3]][2])
})
test_that("seed are generated from L'Ecuyer-CMRG seed", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config, seed = c(10407L, 1801422725L, -2057975723L, 1156894209L, 1595475487L, 210384600L, -1655729657L))
rush$push_tasks(list(list(x1 = 1, x2 = 2)))
tab = rush$fetch_tasks(fields = c("xs", "seed"))
expect_true(is_lecyer_cmrg_seed(tab$seed[[1]]))
rush$push_tasks(list(list(x1 = 2, x2 = 2), list(x1 = 3, x2 = 2)))
tab = rush$fetch_tasks(fields = c("xs", "seed"))
expect_true(tab$seed[[1]][2] != tab$seed[[2]][2])
expect_true(tab$seed[[2]][2] != tab$seed[[3]][2])
})
test_that("seed is set correctly on two workers", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config, seed = 123)
worker_loop = function(rush) {
while(!rush$terminated && !rush$terminated_on_idle) {
task = rush$pop_task(fields = c("xs", "seed"))
if (!is.null(task)) {
tryCatch({
# evaluate task with seed
fun = function(x1, x2, ...) list(y = sample(10000, 1))
ys = with_rng_state(fun, args = c(task$xs), seed = task$seed)
rush$push_results(task$key, yss = list(ys))
}, error = function(e) {
condition = list(message = e$message)
rush$push_failed(task$key, conditions = list(condition))
})
}
}
return(NULL)
}
worker_ids = rush$start_local_workers(
worker_loop = worker_loop,
n_workers = 2,
lgr_thresholds = c("mlr3/rush" = "debug"))
rush$wait_for_workers(2, timeout = 5)
.keys = rush$push_tasks(list(list(x1 = 1, x2 = 2), list(x1 = 2, x2 = 2), list(x1 = 2, x2 = 3), list(x1 = 2, x2 = 4)))
rush$wait_for_tasks(.keys)
finished_tasks = rush$fetch_finished_tasks()
expect_set_equal(finished_tasks$y, c(5971L, 4090L, 1754L, 9794L))
.keys = rush$push_tasks(list(list(x1 = 5, x2 = 3), list(x1 = 5, x2 = 4)))
rush$wait_for_tasks(.keys)
finished_tasks = rush$fetch_finished_tasks()
expect_set_equal(finished_tasks$y, c(1754L, 9794L, 4090L, 5971L, 8213L, 3865L))
expect_rush_reset(rush, type = "terminate")
})
test_that("redis info works", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(network_id = "test-rush", config = config)
expect_list(rush$redis_info)
})
test_that("saving logs with redis appender works", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(
network_id = "test-rush",
config = config,
remote = FALSE)
worker_ids = rush$start_local_workers(
worker_loop = test_worker_loop,
n_workers = 1,
lgr_thresholds = c("mlr3/rush" = "debug"),
lgr_buffer_size = 1)
rush$wait_for_workers(1, timeout = 5)
log = rush$read_log()
expect_data_table(log, min.rows = 1)
expect_names(colnames(log), identical.to = c("worker_id", "level", "timestamp", "logger", "caller", "msg"))
expect_rush_reset(rush)
})
test_that("error and output logs work", {
skip_on_cran()
config = start_flush_redis()
rush = rsh(
network_id = "test-rush",
config = config,
remote = FALSE)
message_log = tempdir()
output_log = tempdir()
worker_ids = rush$start_local_workers(
worker_loop = test_worker_loop,
n_workers = 1,
lgr_thresholds = c("mlr3/rush" = "debug"),
lgr_buffer_size = 1,
message_log = message_log,
output_log = output_log)
rush$wait_for_workers(1)
expect_match(readLines(file.path(message_log, sprintf("message_%s.log", worker_ids[1])))[1], "Debug message logging on worker")
expect_match(readLines(file.path(output_log, sprintf("output_%s.log", worker_ids[1])))[1], "Debug output logging on worker")
expect_rush_reset(rush)
})
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.