Nothing
# start workers with processx --------------------------------------------------
test_that("constructing a rush controller works", {
skip_on_cran()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
expect_class(rush, "Rush")
expect_equal(rush$network_id, "test-rush")
expect_rush_reset(rush)
})
test_that("local workers are started", {
skip_on_cran()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
fun = function(x1, x2, ...) list(y = x1 + x2)
expect_data_table(rush$worker_info, nrows = 0)
worker_ids = rush$start_local_workers(
fun = fun,
n_workers = 2,
lgr_thresholds = c(rush = "debug"),
wait_for_workers = TRUE)
expect_equal(rush$n_workers, 2)
# check fields
walk(rush$processes, function(process) expect_class(process, "process"))
# check meta data from redis
worker_info = rush$worker_info
expect_data_table(worker_info, nrows = 2)
expect_integer(worker_info$pid, unique = TRUE)
expect_false(any(worker_info$remote))
expect_set_equal(worker_ids, worker_info$worker_id)
expect_set_equal(rush$worker_ids, worker_ids)
expect_set_equal(rush$worker_states$state, "running")
expect_rush_reset(rush)
})
test_that("local workers are started with Redis on unix socket", {
skip_if(TRUE)
system(sprintf("redis-server --port 0 --unixsocket /tmp/redis.sock --daemonize yes --pidfile /tmp/redis.pid --dir %s", tempdir()))
Sys.sleep(5)
config = redux::redis_config(path = "/tmp/redis.sock")
r = redux::hiredis(config)
on.exit({
try({r$SHUTDOWN()}, silent = TRUE)
})
r$FLUSHDB()
rush = Rush$new(network_id = "test-rush", config = config)
fun = function(x1, x2, ...) list(y = x1 + x2)
worker_ids = rush$start_local_workers(fun = fun, n_workers = 2, wait_for_workers = TRUE)
expect_equal(rush$n_workers, 2)
# check fields
walk(rush$processes, function(process) expect_class(process, "process"))
# check meta data from redis
worker_info = rush$worker_info
expect_data_table(worker_info, nrows = 2)
expect_integer(worker_info$pid, unique = TRUE)
expect_false(any(worker_info$remote))
expect_set_equal(worker_ids, worker_info$worker_id)
expect_set_equal(rush$worker_ids, worker_ids)
expect_set_equal(rush$worker_states$state, "running")
})
test_that("workers are started with a heartbeat", {
skip_on_cran()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
fun = function(x1, x2, ...) list(y = x1 + x2)
rush$start_local_workers(fun = fun, n_workers = 2, heartbeat_period = 3, heartbeat_expire = 9, wait_for_workers = TRUE)
# check meta data from redis
worker_info = rush$worker_info
expect_character(worker_info$heartbeat, unique = TRUE)
expect_rush_reset(rush)
})
test_that("additional workers are started", {
skip_on_cran()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
fun = function(x1, x2, ...) list(y = x1 + x2)
worker_ids = rush$start_local_workers(fun = fun, n_workers = 2, wait_for_workers = TRUE)
expect_equal(rush$n_workers, 2)
worker_ids_2 = rush$start_local_workers(fun = fun, n_workers = 2, wait_for_workers = TRUE)
rush$wait_for_workers(4)
expect_equal(rush$n_workers, 4)
expect_length(rush$processes, 4)
walk(rush$processes, function(process) expect_class(process, "process"))
worker_info = rush$worker_info
expect_data_table(worker_info, nrows = 4)
expect_set_equal(c(worker_ids, worker_ids_2), worker_info$worker_id)
expect_integer(worker_info$pid, unique = TRUE)
expect_false(any(worker_info$remote))
expect_set_equal(rush$worker_states$state, "running")
expect_rush_reset(rush)
})
test_that("packages are available on the worker", {
skip_on_cran()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
fun = function(x1, x2, ...) list(y = UUIDgenerate(n = 1))
rush$start_local_workers(fun = fun, n_workers = 2, packages = "uuid", wait_for_workers = TRUE)
xss = list(list(x1 = 1, x2 = 2))
keys = rush$push_tasks(xss)
rush$wait_for_tasks(keys, detect_lost_workers = TRUE)
expect_equal(rush$n_finished_tasks, 1)
expect_rush_reset(rush)
})
test_that("globals are available on the worker", {
skip_on_cran()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
fun = function(x1, x2, ...) list(y = x)
x <<- 33
rush$start_local_workers(fun = fun, n_workers = 2, globals = "x", wait_for_workers = TRUE)
xss = list(list(x1 = 1, x2 = 2))
keys = rush$push_tasks(xss)
rush$wait_for_tasks(keys, detect_lost_workers = TRUE)
expect_equal(rush$n_finished_tasks, 1)
expect_equal(rush$fetch_finished_tasks()$y, 33)
expect_rush_reset(rush)
})
test_that("named globals are available on the worker", {
skip_on_cran()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
fun = function(x1, x2, ...) list(y = z)
x <<- 33
rush$start_local_workers(fun = fun, n_workers = 2, globals = c(z = "x"), wait_for_workers = TRUE)
xss = list(list(x1 = 1, x2 = 2))
keys = rush$push_tasks(xss)
rush$wait_for_tasks(keys, detect_lost_workers = TRUE)
expect_equal(rush$n_finished_tasks, 1)
expect_equal(rush$fetch_finished_tasks()$y, 33)
expect_rush_reset(rush)
})
# start workers with script ----------------------------------------------------
test_that("worker can be started with script", {
skip_on_cran()
set.seed(1) # make log messages reproducible
root_logger = lgr::get_logger("root")
old_fmt = root_logger$appenders$console$layout$fmt
root_logger$appenders$console$layout$set_fmt("%L (%n): %m")
on.exit({
root_logger$appenders$console$layout$set_fmt(old_fmt)
}, add = TRUE)
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
expect_snapshot(rush$create_worker_script())
px = processx::process$new("Rscript",
args = c("-e", sprintf("rush::start_worker(network_id = 'test-rush', remote = TRUE, url = 'redis://127.0.0.1:6379', scheme = 'redis', host = '127.0.0.1', port = '6379')")),
supervise = TRUE,
stderr = "|", stdout = "|")
Sys.sleep(5)
expect_string(rush$pre_worker_ids)
on.exit({
px$kill()
}, add = TRUE)
fun = function(x1, x2, ...) list(y = x1 + x2)
rush$start_remote_workers(fun = fun, lgr_thresholds = c(rush = "debug"))
Sys.sleep(5)
expect_true(px$is_alive())
expect_equal(rush$n_running_workers, 1)
expect_true(all(rush$worker_info$remote))
expect_null(rush$pre_worker_ids)
expect_rush_reset(rush, type = "terminate")
px$kill()
})
# stop workers -----------------------------------------------------------------
test_that("a worker is terminated", {
skip_on_cran()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
fun = function(x1, x2, ...) list(y = x1 + x2)
expect_class(rush$stop_workers(), "Rush")
rush$start_local_workers(fun = fun, n_workers = 2, wait_for_workers = TRUE)
worker_id_1 = rush$running_worker_ids[1]
worker_id_2 = rush$running_worker_ids[2]
# worker 1
rush$stop_workers(worker_ids = worker_id_1, type = "terminate")
Sys.sleep(3)
expect_false(rush$processes[[worker_id_1]]$is_alive())
expect_equal(rush$running_worker_ids, worker_id_2)
expect_equal(worker_id_1, rush$terminated_worker_ids)
# worker 2
rush$stop_workers(worker_ids = worker_id_2, type = "terminate")
Sys.sleep(3)
expect_false(rush$processes[[worker_id_2]]$is_alive())
expect_set_equal(c(worker_id_1, worker_id_2), rush$terminated_worker_ids)
expect_null(rush$running_worker_ids)
expect_rush_reset(rush, type = "terminate")
})
test_that("a local worker is killed", {
skip_on_cran()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
fun = function(x1, x2, ...) list(y = x1 + x2)
expect_class(rush$stop_workers(type = "kill"), "Rush")
rush$start_local_workers(fun = fun, n_workers = 2, wait_for_workers = TRUE)
worker_id_1 = rush$running_worker_ids[1]
worker_id_2 = rush$running_worker_ids[2]
# worker 1
rush$stop_workers(worker_ids = worker_id_1, type = "kill")
Sys.sleep(1)
expect_equal(worker_id_1, rush$killed_worker_ids)
expect_false(rush$processes[[worker_id_1]]$is_alive())
expect_true(rush$processes[[worker_id_2]]$is_alive())
# worker 2
rush$stop_workers(worker_ids = worker_id_2, type = "kill")
Sys.sleep(1)
expect_set_equal(c(worker_id_1, worker_id_2), rush$killed_worker_ids)
expect_false(rush$processes[[worker_id_1]]$is_alive())
expect_false(rush$processes[[worker_id_2]]$is_alive())
expect_rush_reset(rush)
})
test_that("a remote worker is killed via the heartbeat", {
skip_on_cran()
skip_on_os("windows")
config = start_flush_redis()
fun = function(x1, x2, ...) list(y = x1 + x2)
rush = Rush$new(network_id = "test-rush", config = config)
rush$start_local_workers(fun = fun, n_workers = 2, heartbeat_period = 1, heartbeat_expire = 2, wait_for_workers = TRUE)
worker_id_1 = rush$running_worker_ids[1]
worker_id_2 = rush$running_worker_ids[2]
worker_info = rush$worker_info
expect_true(all(tools::pskill(worker_info$pid, signal = 0L)))
# worker 1
rush$stop_workers(worker_ids = worker_id_1, type = "kill")
Sys.sleep(1)
expect_equal(worker_id_1, rush$killed_worker_ids)
expect_equal(rush$running_worker_ids, worker_id_2)
expect_false(tools::pskill(worker_info[worker_id == worker_id_1, pid], signal = 0L))
# worker 2
rush$stop_workers(worker_ids = worker_id_2, type = "kill")
Sys.sleep(1)
expect_set_equal(c(worker_id_1, worker_id_2), rush$killed_worker_ids)
expect_false(tools::pskill(worker_info[worker_id == worker_id_2, pid], signal = 0L))
expect_rush_reset(rush)
})
# low level read and write -----------------------------------------------------
test_that("reading and writing a hash works with flatten", {
skip_on_cran()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
# one field with list
key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)))
expect_equal(rush$read_hashes(key, "xs"), list(list(x1 = 1, x2 = 2)))
# one field with atomic
key = rush$write_hashes(timeout = 1)
expect_equal(rush$read_hashes(key, "timeout"), list(list(timeout = 1)))
# two fields with lists
key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), ys = list(list(y = 3)))
expect_equal(rush$read_hashes(key, c("xs", "ys")), list(list(x1 = 1, x2 = 2, y = 3)))
# two fields with list and empty list
key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), ys = list())
expect_equal(rush$read_hashes(key, c("xs", "ys")), list(list(x1 = 1, x2 = 2)))
# two fields with list and atomic
key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), timeout = 1)
expect_equal(rush$read_hashes(key, c("xs", "timeout")), list(list(x1 = 1, x2 = 2, timeout = 1)))
})
test_that("reading and writing a hash works without flatten", {
skip_on_cran()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
# one field with list
key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)))
expect_equal(rush$read_hashes(key, "xs", flatten = FALSE), list(list(xs = list(x1 = 1, x2 = 2))))
# one field with atomic
key = rush$write_hashes(timeout = 1)
expect_equal(rush$read_hashes(key, "timeout", flatten = FALSE), list(list(timeout = 1)))
# two fields with lists
key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), ys = list(list(y = 3)))
expect_equal(rush$read_hashes(key, c("xs", "ys"), flatten = FALSE), list(list(xs = list(x1 = 1, x2 = 2), ys = list(y = 3))))
# two fields with list and empty list
key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), ys = list())
expect_equal(rush$read_hashes(key, c("xs", "ys"), flatten = FALSE), list(list(xs = list(x1 = 1, x2 = 2), ys = NULL)))
# two fields with list and atomic
key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), timeout = 1)
expect_equal(rush$read_hashes(key, c("xs", "timeout"), flatten = FALSE), list(list(xs = list(x1 = 1, x2 = 2), timeout = 1)))
})
test_that("reading and writing hashes works", {
skip_on_cran()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
# one field with list
keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)))
expect_equal(rush$read_hashes(keys, "xs"), list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)))
# one field atomic
keys = rush$write_hashes(timeout = c(1, 1))
expect_equal(rush$read_hashes(keys, "timeout"), list(list(timeout = 1), list(timeout = 1)))
# two fields with list and recycled atomic
keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), timeout = 1)
expect_equal(rush$read_hashes(keys, c("xs", "timeout")), list(list(x1 = 1, x2 = 2, timeout = 1), list(x1 = 1, x2 = 3, timeout = 1)))
# two fields
keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), ys = list(list(y = 3), list(y = 4)))
expect_equal(rush$read_hashes(keys, c("xs", "ys")), list(list(x1 = 1, x2 = 2, y = 3), list(x1 = 1, x2 = 3, y = 4)))
# two fields with list and atomic
keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), timeout = c(1, 1))
expect_equal(rush$read_hashes(keys, c("xs", "timeout")), list(list(x1 = 1, x2 = 2, timeout = 1), list(x1 = 1, x2 = 3, timeout = 1)))
# two fields with list and recycled atomic
keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), timeout = 1)
expect_equal(rush$read_hashes(keys, c("xs", "timeout")), list(list(x1 = 1, x2 = 2, timeout = 1), list(x1 = 1, x2 = 3, timeout = 1)))
# two fields, one empty
keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), ys = list())
expect_equal(rush$read_hashes(keys, c("xs", "ys")), list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)))
# recycle
keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), ys = list(list(y = 3)))
expect_equal(rush$read_hashes(keys, c("xs", "ys")), list(list(x1 = 1, x2 = 2, y = 3), list(x1 = 1, x2 = 3, y = 3)))
})
test_that("writing hashes to specific keys works", {
skip_on_cran()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
# one element
keys = uuid::UUIDgenerate()
rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), keys = keys)
expect_equal(rush$read_hashes(keys, "xs"), list(list(x1 = 1, x2 = 2)))
# two elements
keys = uuid::UUIDgenerate(n = 2)
rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), keys = keys)
expect_equal(rush$read_hashes(keys, "xs"), list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)))
# wrong number of keys
keys = uuid::UUIDgenerate()
expect_error(rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), keys = keys), "Assertion on 'keys' failed")
})
test_that("writing list columns works", {
skip_on_cran()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), xs_extra = list(list(extra = list("A"))))
rush$connector$command(c("LPUSH", "test-rush:finished_tasks", keys))
expect_list(rush$fetch_finished_tasks()$extra, len = 1)
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), xs_extra = list(list(extra = list(letters[1:3]))))
rush$connector$command(c("LPUSH", "test-rush:finished_tasks", keys))
expect_list(rush$fetch_finished_tasks()$extra, len = 1)
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 2, x2 = 2)), xs_extra = list(list(extra = list("A")), list(extra = list("B"))))
rush$connector$command(c("LPUSH", "test-rush:finished_tasks", keys))
rush$read_hashes(keys, c("xs", "xs_extra"))
expect_list(rush$fetch_finished_tasks()$extra, len = 2)
})
# task evaluation --------------------------------------------------------------
test_that("evaluating a task works", {
skip_on_cran()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
fun = function(x1, x2, ...) list(y = x1 + x2)
rush$start_local_workers(fun = fun, n_workers = 4, wait_for_workers = TRUE)
xss = list(list(x1 = 1, x2 = 2))
keys = rush$push_tasks(xss)
rush$wait_for_tasks(keys)
# check task count
expect_equal(rush$n_tasks, 1)
expect_equal(rush$n_queued_tasks, 0)
expect_equal(rush$n_running_tasks, 0)
expect_equal(rush$n_finished_tasks, 1)
expect_equal(rush$n_failed_tasks, 0)
# check keys in sets
expect_string(rush$tasks)
expect_null(rush$queued_tasks)
expect_null(rush$running_tasks)
expect_string(rush$finished_tasks)
expect_null(rush$failed_tasks)
# check fetching
expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
expect_data_table(rush$fetch_running_tasks(), nrows = 0)
expect_data_table(rush$fetch_failed_tasks(), nrows = 0)
data = rush$fetch_finished_tasks()
expect_names(names(data), must.include = c("x1", "x2", "worker_id", "y", "keys"))
expect_data_table(data, nrows = 1)
expect_data_table(rush$fetch_tasks(), nrows = 1)
expect_rush_reset(rush)
})
test_that("evaluating tasks works", {
skip_on_cran()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
fun = function(x1, x2, ...) list(y = x1 + x2)
rush$start_local_workers(fun = fun, n_workers = 4, wait_for_workers = TRUE)
xss = replicate(10, list(list(x1 = 1, x2 = 2)))
keys = rush$push_tasks(xss)
rush$wait_for_tasks(keys)
# check task count
expect_equal(rush$n_tasks, 10)
expect_equal(rush$n_queued_tasks, 0)
expect_equal(rush$n_running_tasks, 0)
expect_equal(rush$n_finished_tasks, 10)
expect_equal(rush$n_failed_tasks, 0)
# check keys in sets
expect_character(rush$tasks, len = 10)
expect_null(rush$queued_tasks)
expect_null(rush$running_tasks)
expect_character(rush$finished_tasks, len = 10)
expect_null(rush$failed_tasks)
# check fetching
expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
expect_data_table(rush$fetch_running_tasks(), nrows = 0)
expect_data_table(rush$fetch_failed_tasks(), nrows = 0)
data = rush$fetch_finished_tasks()
expect_names(names(data), must.include = c("x1", "x2", "worker_id", "y", "keys"))
expect_data_table(data, nrows = 10)
expect_data_table(rush$fetch_tasks(), nrows = 10)
expect_rush_reset(rush)
})
test_that("caching results works", {
skip_on_cran()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
fun = function(x1, x2, ...) list(y = x1 + x2)
rush$start_local_workers(fun = fun, n_workers = 2, wait_for_workers = TRUE)
xss = replicate(10, list(list(x1 = 1, x2 = 2)))
keys = rush$push_tasks(xss)
rush$wait_for_tasks(keys)
expect_data_table(rush$fetch_finished_tasks(), nrows = 10)
expect_list(get_private(rush)$.cached_tasks, len = 10)
expect_list(rush$fetch_finished_tasks(data_format = "list"), len = 10)
expect_list(get_private(rush)$.cached_tasks, len = 10)
expect_data_table(rush$fetch_finished_tasks(), nrows = 10)
expect_list(get_private(rush)$.cached_tasks, len = 10)
expect_list(rush$fetch_finished_tasks(data_format = "list"), len = 10)
expect_list(get_private(rush)$.cached_tasks, len = 10)
xss = replicate(10, list(list(x1 = 1, x2 = 2)))
keys = rush$push_tasks(xss)
rush$wait_for_tasks(keys)
expect_data_table(rush$fetch_finished_tasks(), nrows = 20)
expect_list(get_private(rush)$.cached_tasks, len = 20)
expect_list(rush$fetch_finished_tasks(data_format = "list"), len = 20)
expect_list(get_private(rush)$.cached_tasks, len = 20)
})
# segfault detection -----------------------------------------------------------
test_that("a segfault on a local worker is detected", {
skip_if(TRUE) # does not work in testthat on environment
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
fun = function(x1, x2, ...) get("attach")(structure(list(), class = "UserDefinedDatabase"))
worker_ids = rush$start_local_workers(fun = fun, n_workers = 1, wait_for_workers = TRUE)
xss = list(list(x1 = 1, x2 = 2))
rush$push_tasks(xss)
Sys.sleep(3)
expect_null(rush$lost_worker_ids)
rush$detect_lost_workers()
expect_equal(rush$lost_worker_ids, worker_ids)
rush$fetch_failed_tasks()
expect_rush_reset(rush)
})
test_that("a segfault on a worker is detected via the heartbeat", {
skip_if(TRUE) # does not work in testthat on environment
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
fun = function(x1, x2, ...) get("attach")(structure(list(), class = "UserDefinedDatabase"))
worker_ids = rush$start_local_workers(fun = fun, n_workers = 1, heartbeat_period = 1, heartbeat_expire = 2, wait_for_workers = TRUE)
xss = list(list(x1 = 1, x2 = 2))
rush$push_tasks(xss)
Sys.sleep(15)
expect_null(rush$lost_worker_ids)
rush$detect_lost_workers()
expect_equal(rush$lost_worker_ids, worker_ids)
expect_rush_reset(rush)
})
# fault detection --------------------------------------------------------------
test_that("a simple error is catched", {
skip_on_cran()
skip_if(TRUE) # does not work in testthat on environment
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
fun = function(x1, x2, ...) {
if (x1 < 1) stop("Test error")
list(y = x1 + x2)
}
rush$start_local_workers(fun = fun, n_workers = 2, wait_for_workers = TRUE)
xss = list(list(x1 = 1, x2 = 2), list(x1 = 0, x2 = 2))
keys = rush$push_tasks(xss)
rush$wait_for_tasks(keys, detect_lost_workers = TRUE)
Sys.sleep(2)
# check task count
expect_equal(rush$n_tasks, 2)
expect_equal(rush$n_queued_tasks, 0)
expect_equal(rush$n_running_tasks, 0)
expect_equal(rush$n_finished_tasks, 1)
expect_equal(rush$n_failed_tasks, 1)
# check keys in sets
expect_character(rush$tasks, len = 2)
expect_null(rush$queued_tasks)
expect_null(rush$running_tasks)
expect_string(rush$finished_tasks)
expect_string(rush$failed_tasks)
# check fetching
expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
expect_data_table(rush$fetch_running_tasks(), nrows = 0)
expect_data_table(rush$fetch_tasks(), nrows = 2)
data = rush$fetch_finished_tasks()
expect_names(names(data), must.include = c("x1", "x2", "worker_id", "y", "keys"))
expect_data_table(data, nrows = 1)
data = rush$fetch_failed_tasks()
expect_names(names(data), must.include = c("x1", "x2", "worker_id", "message", "keys"))
expect_data_table(data, nrows = 1)
expect_rush_reset(rush)
})
test_that("a lost task is detected", {
skip_on_cran()
skip_if(TRUE) # does not work in testthat on environment
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
# no task is running
expect_class(rush$detect_lost_workers(), "Rush")
fun = function(x1, x2, ...) get("attach")(structure(list(), class = "UserDefinedDatabase"))
rush$start_local_workers(fun = fun, n_workers = 1, wait_for_workers = TRUE)
xss = list(list(x1 = 1, x2 = 2))
keys = rush$push_tasks(xss)
Sys.sleep(2)
rush$detect_lost_workers()
# check task count
expect_equal(rush$n_tasks, 1)
expect_equal(rush$n_queued_tasks, 0)
expect_equal(rush$n_running_tasks, 0)
expect_equal(rush$n_finished_tasks, 0)
expect_equal(rush$n_failed_tasks, 1)
# check keys in sets
expect_character(rush$tasks, len = 1)
expect_null(rush$queued_tasks)
expect_null(rush$running_tasks)
expect_null(rush$finished_tasks)
expect_string(rush$failed_tasks)
# check fetching
expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
expect_data_table(rush$fetch_running_tasks(), nrows = 0)
expect_data_table(rush$fetch_tasks(), nrows = 1)
data = rush$fetch_failed_tasks()
expect_names(names(data), must.include = c("x1", "x2", "worker_id", "message", "keys"))
expect_data_table(data, nrows = 1)
expect_equal(data$message, "Worker has crashed or was killed")
expect_class(rush$detect_lost_workers(), "Rush")
expect_rush_reset(rush)
})
test_that("a lost task is detected when waiting", {
skip_on_cran()
skip_if(TRUE) # does not work in testthat on environment
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
# no task is running
expect_class(rush$detect_lost_workers(), "Rush")
fun = function(x1, x2, ...) get("attach")(structure(list(), class = "UserDefinedDatabase"))
rush$start_local_workers(fun = fun, n_workers = 1, wait_for_workers = TRUE)
xss = list(list(x1 = 1, x2 = 2), list(x1 = 2, x2 = 2))
keys = rush$push_tasks(xss)
Sys.sleep(5)
rush$wait_for_tasks(keys, detect_lost_workers = TRUE)
# check task count
expect_equal(rush$n_tasks, 2)
expect_equal(rush$n_queued_tasks, 1)
expect_equal(rush$n_running_tasks, 0)
expect_equal(rush$n_finished_tasks, 0)
expect_equal(rush$n_failed_tasks, 1)
# check keys in sets
expect_character(rush$tasks, len = 2)
expect_string(rush$queued_tasks)
expect_null(rush$running_tasks)
expect_null(rush$finished_tasks)
expect_string(rush$failed_tasks)
# check fetching
expect_data_table(rush$fetch_queued_tasks(), nrows = 1)
expect_data_table(rush$fetch_running_tasks(), nrows = 0)
expect_data_table(rush$fetch_tasks(), nrows = 2)
data = rush$fetch_failed_tasks()
expect_names(names(data), must.include = c("x1", "x2", "worker_id", "keys"))
expect_data_table(data, nrows = 1)
expect_rush_reset(rush)
})
# restart workers --------------------------------------------------------------
test_that("restarting a worker works", {
skip_on_cran()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
fun = function(x1, x2, ...) list(y = x1 + x2)
rush$start_local_workers(fun = fun, n_workers = 2, wait_for_workers = TRUE)
worker_id_1 = rush$running_worker_ids[1]
worker_id_2 = rush$running_worker_ids[2]
tools::pskill(rush$worker_info[worker_id == worker_id_1, pid])
Sys.sleep(1)
expect_false(rush$processes[[worker_id_1]]$is_alive())
rush$detect_lost_workers(restart_local_workers = TRUE)
expect_true(rush$processes[[worker_id_1]]$is_alive())
expect_rush_reset(rush)
})
test_that("restarting a worker kills the worker", {
skip_on_cran()
skip_on_os("windows")
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
fun = function(x1, x2, ...) list(y = x1 + x2)
rush$start_local_workers(fun = fun, n_workers = 1, wait_for_workers = TRUE)
pid = rush$worker_info$pid
worker_id = rush$running_worker_ids
expect_true(tools::pskill(pid, signal = 0))
rush$restart_local_workers(worker_ids = worker_id)
Sys.sleep(1)
expect_false(pid == rush$worker_info$pid)
expect_false(tools::pskill(pid, signal = 0))
expect_rush_reset(rush)
})
# receiving results ------------------------------------------------------------
test_that("blocking on new results works", {
skip_on_cran()
skip_if(TRUE) # does not work in testthat on environment
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
fun = function(x1, x2, ...) {
Sys.sleep(5)
list(y = x1 + x2)
}
rush$start_local_workers(fun = fun, n_workers = 2, wait_for_workers = TRUE)
xss = list(list(x1 = 1, x2 = 2))
keys = rush$push_tasks(xss)
expect_data_table(rush$wait_for_new_tasks(timeout = 1), nrows = 0)
expect_data_table(rush$wait_for_new_tasks(timeout = 10), nrows = 1)
expect_data_table(rush$wait_for_new_tasks(timeout = 1), nrows = 0)
expect_rush_reset(rush)
})
test_that("wait for tasks works when a task gets lost", {
skip_on_cran()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
fun = function(x1, x2, ...) {
if (x1 < 1) get("attach")(structure(list(), class = "UserDefinedDatabase"))
list(y = x1 + x2)
}
rush$start_local_workers(fun = fun, n_workers = 2, wait_for_workers = TRUE)
rush$wait_for_workers(2)
xss = list(list(x1 = 1, x2 = 2), list(x1 = 0, x2 = 2))
keys = rush$push_tasks(xss)
expect_class(rush$wait_for_tasks(keys, detect_lost_workers = TRUE), "Rush")
expect_rush_reset(rush)
})
# misc--------------------------------------------------------------------------
test_that("saving lgr logs works", {
skip_on_cran()
skip_if(TRUE) # does not work in testthat on environment
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
fun = function(x1, x2, ...) list(y = x1 + x2)
rush$start_local_workers(fun = fun, n_workers = 2, lgr_thresholds = c(rush = "debug"), wait_for_workers = TRUE)
Sys.sleep(5)
xss = list(list(x1 = 2, x2 = 2))
keys = rush$push_tasks(xss)
rush$wait_for_tasks(keys)
Sys.sleep(5)
log = rush$read_log()
expect_data_table(log, nrows = 6)
expect_names(names(log), must.include = c("worker_id", "timestamp", "logger", "caller", "msg"))
xss = list(list(x1 = 1, x2 = 2), list(x1 = 0, x2 = 2), list(x1 = 1, x2 = 2))
keys = rush$push_tasks(xss)
rush$wait_for_tasks(keys)
Sys.sleep(5)
log = rush$read_log()
expect_data_table(log, nrows = 18)
expect_names(names(log), must.include = c("worker_id", "timestamp", "logger", "caller", "msg"))
expect_rush_reset(rush)
})
test_that("snapshot option works", {
skip_on_cran()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
fun = function(x1, x2, ...) list(y = x1 + x2)
rush$start_local_workers(fun = fun, n_workers = 2, lgr_thresholds = c(rush = "debug"))
rush$snapshot_schedule = c(1, 1)
expect_equal(rush$connector$CONFIG_GET("save")[[2]], "1 1")
expect_equal(rush$snapshot_schedule, c(1, 1))
rush$snapshot_schedule = NULL
expect_equal(rush$connector$CONFIG_GET("save")[[2]], "")
expect_equal(rush$snapshot_schedule, "")
expect_rush_reset(rush)
})
test_that("terminating workers on idle works", {
skip_on_cran()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
fun = function(x1, x2, ...) list(y = x1 + x2)
worker_ids = rush$start_local_workers(fun = fun, n_workers = 2, wait_for_workers = TRUE)
xss = list(list(x1 = 1, x2 = 2))
keys = rush$push_tasks(xss, terminate_workers = TRUE)
rush$wait_for_tasks(keys)
Sys.sleep(5)
expect_set_equal(rush$worker_states$state, "terminated")
expect_rush_reset(rush)
})
test_that("constants works", {
skip_on_cran()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
fun = function(x1, x2, x3, ...) list(y = x1 + x2 + x3)
rush$start_local_workers(fun = fun, n_workers = 4, constants = list(x3 = 5), wait_for_workers = TRUE)
xss = list(list(x1 = 1, x2 = 2))
keys = rush$push_tasks(xss)
rush$wait_for_tasks(keys)
expect_equal(rush$fetch_finished_tasks()$y, 8)
expect_rush_reset(rush)
})
# rush network without controller ----------------------------------------------
test_that("network without controller works", {
skip_on_cran()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
fun = function(rush) {
while (rush$n_finished_tasks < 100) {
# ask
xs = list(
x1 = sample(seq(1000), 1),
x2 = sample(seq(1000), 1)
)
keys = rush$push_running_tasks(list(xs))
# evaluate
ys = list(y = xs$x1 + xs$x2)
# tell
rush$push_results(keys, list(ys))
}
return(NULL)
}
rush$start_local_workers(worker_loop = fun, n_workers = 2, wait_for_workers = TRUE)
Sys.sleep(10)
expect_gte(rush$n_finished_tasks, 100)
expect_rush_reset(rush)
})
# seed -------------------------------------------------------------------------
test_that("seeds are generated from regular rng seed", {
skip_on_cran()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config, seed = 123)
rush$push_tasks(list(list(x1 = 1, x2 = 2)))
tab = rush$fetch_tasks(fields = c("xs", "seed"))
expect_true(is_lecyer_cmrg_seed(tab$seed[[1]]))
rush$push_tasks(list(list(x1 = 2, x2 = 2), list(x1 = 3, x2 = 2)))
tab = rush$fetch_tasks(fields = c("xs", "seed"))
expect_true(tab$seed[[1]][2] != tab$seed[[2]][2])
expect_true(tab$seed[[2]][2] != tab$seed[[3]][2])
})
test_that("seed are generated from L'Ecuyer-CMRG seed", {
skip_on_cran()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config, seed = c(10407L, 1801422725L, -2057975723L, 1156894209L, 1595475487L, 210384600L, -1655729657L))
rush$push_tasks(list(list(x1 = 1, x2 = 2)))
tab = rush$fetch_tasks(fields = c("xs", "seed"))
expect_true(is_lecyer_cmrg_seed(tab$seed[[1]]))
rush$push_tasks(list(list(x1 = 2, x2 = 2), list(x1 = 3, x2 = 2)))
tab = rush$fetch_tasks(fields = c("xs", "seed"))
expect_true(tab$seed[[1]][2] != tab$seed[[2]][2])
expect_true(tab$seed[[2]][2] != tab$seed[[3]][2])
})
test_that("seed is set correctly on two workers", {
skip_on_cran()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config, seed = 123)
fun = function(x1, x2, ...) list(y = sample(10000, 1))
worker_ids = rush$start_local_workers(fun = fun, n_workers = 2, wait_for_workers = TRUE)
.keys = rush$push_tasks(list(list(x1 = 1, x2 = 2), list(x1 = 2, x2 = 2), list(x1 = 2, x2 = 3), list(x1 = 2, x2 = 4)))
rush$wait_for_tasks(.keys)
finished_tasks = rush$fetch_finished_tasks()
expect_set_equal(finished_tasks$y, c(5971L, 4090L, 1754L, 9794L))
.keys = rush$push_tasks(list(list(x1 = 5, x2 = 3), list(x1 = 5, x2 = 4)))
rush$wait_for_tasks(.keys)
finished_tasks = rush$fetch_finished_tasks()
expect_set_equal(finished_tasks$y, c(1754L, 9794L, 4090L, 5971L, 8213L, 3865L))
expect_rush_reset(rush, type = "terminate")
})
# log --------------------------------------------------------------------------
test_that("printing logs with redis appender works", {
skip_on_cran()
skip_if(TRUE) # does not work in testthat on environment
lg_rush = lgr::get_logger("rush")
old_threshold_rush = lg_rush$threshold
on.exit(lg_rush$set_threshold(old_threshold_rush))
lg_rush$set_threshold("info")
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config, seed = 123)
fun = function(x1, x2, ...) {
lg = lgr::get_logger("rush")
lg$info("test-1-info")
lg$warn("test-1-warn")
lg$error("test-1-error")
list(y = x1 + x2)
}
worker_ids = rush$start_local_workers(fun = fun, n_workers = 2, wait_for_workers = TRUE, lgr_thresholds = c(rush = "info"))
xss = list(list(x1 = 1, x2 = 2), list(x1 = 2, x2 = 2))
keys = rush$push_tasks(xss)
Sys.sleep(1)
expect_output(rush$print_log(), ".*test-1-info.*test-1-warn.*test-1-error")
expect_silent(rush$print_log())
xss = list(list(x1 = 3, x2 = 2))
keys = rush$push_tasks(xss)
expect_output(rush$print_log(), ".*test-1-info.*test-1-warn.*test-1-error")
expect_rush_reset(rush, type = "terminate")
})
test_that("redis info works", {
skip_on_cran()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
expect_list(rush$redis_info)
})
# large objects ----------------------------------------------------------------
test_that("evaluating a task works", {
skip_on_cran()
skip_if(TRUE) # takes too long
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
fun = function(x1, x2, large_vector, ...) list(y = length(large_vector))
expect_error(
rush$start_local_workers(fun = fun, n_workers = 4, constants = list(large_vector = runif(1e8)), wait_for_workers = TRUE),
"Worker configuration is larger than 512 MiB.")
rush_plan(n_workers = 4, large_objects_path = tempdir())
rush$start_local_workers(fun = fun, constants = list(large_vector = runif(1e8)), wait_for_workers = TRUE)
xss = list(list(x1 = 1, x2 = 2))
keys = rush$push_tasks(xss)
rush$wait_for_tasks(keys)
expect_equal(rush$fetch_finished_tasks()$y, 1e8)
expect_rush_reset(rush)
})
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.