Nothing
test_that("constructing a rush worker works", {
skip_on_cran()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
expect_class(rush, "Rush")
expect_equal(rush$network_id, "test-rush")
expect_string(rush$worker_id)
expect_false(rush$remote)
expect_equal(rush$worker_ids, rush$worker_id)
expect_equal(rush$running_worker_ids, rush$worker_id)
expect_rush_reset(rush, type = "terminate")
# pass worker id
config = start_flush_redis()
worker_id = uuid::UUIDgenerate()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE, worker_id = worker_id)
expect_equal(rush$worker_id, worker_id)
expect_rush_reset(rush, type = "terminate")
})
test_that("active bindings work after construction", {
skip_on_cran()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
expect_equal(rush$n_workers, 1)
# check task count
expect_equal(rush$n_queued_tasks, 0)
expect_equal(rush$n_queued_priority_tasks, 0)
expect_equal(rush$n_running_tasks, 0)
expect_equal(rush$n_finished_tasks, 0)
expect_equal(rush$n_failed_tasks, 0)
# check keys in sets
expect_null(rush$queued_tasks)
expect_null(rush$running_tasks)
expect_null(rush$finished_tasks)
expect_null(rush$failed_tasks)
expect_rush_reset(rush, type = "terminate")
})
test_that("a worker is registered", {
skip_on_cran()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
# check meta data from redis
worker_info = rush$worker_info
expect_data_table(worker_info, nrows = 1)
expect_names(names(worker_info), permutation.of = c("worker_id", "pid", "remote", "hostname", "heartbeat"))
expect_string(worker_info$heartbeat, na.ok = TRUE)
expect_equal(worker_info$worker_id, rush$worker_id)
expect_false(worker_info$remote)
expect_equal(worker_info$pid, Sys.getpid())
expect_equal(rush$worker_ids, rush$worker_id)
expect_equal(rush$worker_states$state, "running")
expect_rush_reset(rush, type = "terminate")
})
test_that("a worker is terminated", {
skip_on_cran()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
expect_equal(rush$running_worker_ids, rush$worker_id)
rush$set_terminated()
expect_null(rush$running_worker_ids)
expect_equal(rush$terminated_worker_ids, rush$worker_id)
expect_rush_reset(rush, type = "terminate")
})
test_that("a heartbeat is started", {
skip_on_cran()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE, heartbeat_period = 3)
expect_class(rush$heartbeat, "r_process")
expect_true(rush$heartbeat$is_alive())
expect_string(rush$worker_info$heartbeat)
expect_rush_reset(rush, type = "terminate")
})
test_that("pushing a task to the queue works", {
skip_on_cran()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
xss = list(list(x1 = 1, x2 = 2))
keys = rush$push_tasks(xss)
# check task count
expect_equal(rush$n_tasks, 1)
expect_equal(rush$n_queued_tasks, 1)
expect_equal(rush$n_running_tasks, 0)
expect_equal(rush$n_finished_tasks, 0)
expect_equal(rush$n_failed_tasks, 0)
# check keys in sets
expect_string(rush$tasks)
expect_set_equal(rush$queued_tasks, keys)
expect_null(rush$running_tasks)
expect_null(rush$finished_tasks)
expect_null(rush$failed_tasks)
# check fetching
expect_data_table(rush$fetch_running_tasks(), nrows = 0)
expect_data_table(rush$fetch_finished_tasks(), nrows = 0)
expect_data_table(rush$fetch_failed_tasks(), nrows = 0)
data = rush$fetch_queued_tasks()
expect_names(names(data), must.include = c("x1", "x2", "keys"))
expect_data_table(data, nrows = 1)
expect_data_table(rush$fetch_tasks(), nrows = 1)
# status checks
expect_false(rush$is_running_task(keys))
expect_false(rush$is_failed_task(keys))
expect_rush_reset(rush, type = "terminate")
})
test_that("pushing a task with extras to the queue works", {
skip_on_cran()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
xss = list(list(x1 = 1, x2 = 2))
timestamp = Sys.time()
extra = list(list(timestamp = timestamp))
keys = rush$push_tasks(xss, extra)
# check task count
expect_equal(rush$n_tasks, 1)
expect_equal(rush$n_queued_tasks, 1)
expect_equal(rush$n_running_tasks, 0)
expect_equal(rush$n_finished_tasks, 0)
expect_equal(rush$n_failed_tasks, 0)
# check keys in sets
expect_string(rush$tasks)
expect_set_equal(rush$queued_tasks, keys)
expect_null(rush$running_tasks)
expect_null(rush$finished_tasks)
expect_null(rush$failed_tasks)
# check fetching
expect_data_table(rush$fetch_running_tasks(), nrows = 0)
expect_data_table(rush$fetch_finished_tasks(), nrows = 0)
expect_data_table(rush$fetch_failed_tasks(), nrows = 0)
data = rush$fetch_queued_tasks()
expect_names(names(data), must.include = c("x1", "x2", "timestamp", "keys"))
expect_data_table(data, nrows = 1)
expect_equal(data$timestamp, timestamp)
expect_data_table(rush$fetch_tasks(), nrows = 1)
# status checks
expect_false(rush$is_running_task(keys))
expect_false(rush$is_failed_task(keys))
expect_rush_reset(rush, type = "terminate")
})
test_that("pushing tasks to the queue works", {
skip_on_cran()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
xss = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3))
keys = rush$push_tasks(xss)
# check task count
expect_equal(rush$n_tasks, 2)
expect_equal(rush$n_queued_tasks, 2)
expect_equal(rush$n_running_tasks, 0)
expect_equal(rush$n_finished_tasks, 0)
expect_equal(rush$n_failed_tasks, 0)
# check keys in sets
expect_character(rush$tasks, len = 2)
expect_set_equal(rush$queued_tasks, keys)
expect_null(rush$running_tasks)
expect_null(rush$finished_tasks)
expect_null(rush$failed_tasks)
# check fetching
expect_data_table(rush$fetch_running_tasks(), nrows = 0)
expect_data_table(rush$fetch_finished_tasks(), nrows = 0)
expect_data_table(rush$fetch_failed_tasks(), nrows = 0)
data = rush$fetch_queued_tasks()
expect_names(names(data), must.include = c("x1", "x2", "keys"))
expect_data_table(data, nrows = 2)
expect_character(data$keys, unique = TRUE, len = 2)
expect_data_table(rush$fetch_tasks(), nrows = 2)
# status checks
expect_false(any(rush$is_running_task(keys)))
expect_false(any(rush$is_failed_task(keys)))
expect_rush_reset(rush, type = "terminate")
})
test_that("pushing tasks with extras to the queue works", {
skip_on_cran()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
xss = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3))
timestamp = Sys.time()
extra = list(list(timestamp = timestamp), list(timestamp = timestamp))
keys = rush$push_tasks(xss, extra)
# check task count
expect_equal(rush$n_tasks, 2)
expect_equal(rush$n_queued_tasks, 2)
expect_equal(rush$n_running_tasks, 0)
expect_equal(rush$n_finished_tasks, 0)
expect_equal(rush$n_failed_tasks, 0)
# check keys in sets
expect_character(rush$tasks, len = 2)
expect_set_equal(rush$queued_tasks, keys)
expect_null(rush$running_tasks)
expect_null(rush$finished_tasks)
expect_null(rush$failed_tasks)
# check fetching
expect_data_table(rush$fetch_running_tasks(), nrows = 0)
expect_data_table(rush$fetch_finished_tasks(), nrows = 0)
expect_data_table(rush$fetch_failed_tasks(), nrows = 0)
data = rush$fetch_queued_tasks()
expect_names(names(data), must.include = c("x1", "x2", "timestamp", "keys"))
expect_data_table(data, nrows = 2)
expect_character(data$keys, unique = TRUE, len = 2)
expect_equal(data$timestamp, c(timestamp, timestamp))
expect_data_table(rush$fetch_tasks(), nrows = 2)
# status checks
expect_false(any(rush$is_running_task(keys)))
expect_false(any(rush$is_failed_task(keys)))
expect_rush_reset(rush, type = "terminate")
})
test_that("popping a task from the queue works", {
skip_on_cran()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
xss = list(list(x1 = 1, x2 = 2))
rush$push_tasks(xss)
# check task
task = rush$pop_task()
expect_rush_task(task)
# check task count
expect_equal(rush$n_tasks, 1)
expect_equal(rush$n_queued_tasks, 0)
expect_equal(rush$n_running_tasks, 1)
expect_equal(rush$n_finished_tasks, 0)
expect_equal(rush$n_failed_tasks, 0)
# check keys in sets
expect_string(rush$tasks)
expect_null(rush$queued_tasks)
expect_string(rush$running_tasks)
expect_null(rush$finished_tasks)
expect_null(rush$failed_tasks)
# check fetching
expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
expect_data_table(rush$fetch_finished_tasks(), nrows = 0)
expect_data_table(rush$fetch_failed_tasks(), nrows = 0)
data = rush$fetch_running_tasks()
expect_names(names(data), must.include = c("x1", "x2", "worker_id", "keys"))
expect_data_table(data, nrows = 1)
expect_data_table(rush$fetch_tasks(), nrows = 1)
# status checks
expect_true(rush$is_running_task(task$key))
expect_false(rush$is_failed_task(task$key))
expect_rush_reset(rush, type = "terminate")
})
test_that("popping a task with seed, max_retries and timeout works", {
skip_on_cran()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
xss = list(list(x1 = 1, x2 = 2))
seed = 123456
max_retries = 2
timeout = 1
rush$push_tasks(xss, seeds = list(seed), max_retries = max_retries, timeouts = timeout)
# check task
task = rush$pop_task(fields = c("xs", "seed", "max_retries", "timeout"))
expect_equal(task$seed, seed)
expect_equal(task$max_retries, max_retries)
expect_equal(task$timeout, timeout)
expect_rush_task(task)
# check task count
expect_equal(rush$n_tasks, 1)
expect_equal(rush$n_queued_tasks, 0)
expect_equal(rush$n_running_tasks, 1)
expect_equal(rush$n_finished_tasks, 0)
expect_equal(rush$n_failed_tasks, 0)
# check keys in sets
expect_string(rush$tasks)
expect_null(rush$queued_tasks)
expect_string(rush$running_tasks)
expect_null(rush$finished_tasks)
expect_null(rush$failed_tasks)
# check fetching
expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
expect_data_table(rush$fetch_finished_tasks(), nrows = 0)
expect_data_table(rush$fetch_failed_tasks(), nrows = 0)
data = rush$fetch_running_tasks()
expect_names(names(data), must.include = c("x1", "x2", "worker_id", "keys"))
expect_data_table(data, nrows = 1)
expect_data_table(rush$fetch_tasks(), nrows = 1)
# status checks
expect_true(rush$is_running_task(task$key))
expect_false(rush$is_failed_task(task$key))
expect_rush_reset(rush, type = "terminate")
})
test_that("pushing a finished task works", {
skip_on_cran()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
xss = list(list(x1 = 1, x2 = 2))
rush$push_tasks(xss)
task = rush$pop_task()
rush$push_results(task$key, list(list(y = 3)))
# check task count
expect_equal(rush$n_tasks, 1)
expect_equal(rush$n_queued_tasks, 0)
expect_equal(rush$n_running_tasks, 0)
expect_equal(rush$n_finished_tasks, 1)
expect_equal(rush$n_failed_tasks, 0)
# check keys in sets
expect_string(rush$tasks)
expect_null(rush$queued_tasks)
expect_null(rush$running_tasks)
expect_string(rush$finished_tasks)
expect_null(rush$failed_tasks)
# check fetching
expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
expect_data_table(rush$fetch_running_tasks(), nrows = 0)
expect_data_table(rush$fetch_failed_tasks(), nrows = 0)
data = rush$fetch_finished_tasks()
expect_names(names(data), must.include = c("x1", "x2", "worker_id", "y", "keys"))
expect_data_table(data, nrows = 1)
expect_data_table(rush$fetch_tasks(), nrows = 1)
# status checks
expect_false(rush$is_running_task(task$key))
expect_false(rush$is_failed_task(task$key))
expect_rush_reset(rush, type = "terminate")
})
test_that("pushing a failed tasks works", {
skip_on_cran()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
xss = list(list(x1 = 1, x2 = 2))
rush$push_tasks(xss)
task = rush$pop_task()
rush$push_failed(task$key, conditions = list(list(message = "error")))
# check task count
expect_equal(rush$n_tasks, 1)
expect_equal(rush$n_queued_tasks, 0)
expect_equal(rush$n_running_tasks, 0)
expect_equal(rush$n_finished_tasks, 0)
expect_equal(rush$n_failed_tasks, 1)
# check keys in sets
expect_string(rush$tasks)
expect_null(rush$queued_tasks)
expect_null(rush$running_tasks)
expect_null(rush$finished_tasks)
expect_string(rush$failed_tasks)
# check fetching
expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
expect_data_table(rush$fetch_running_tasks(), nrows = 0)
expect_data_table(rush$fetch_finished_tasks(), nrows = 0)
data = rush$fetch_failed_tasks()
expect_names(names(data), must.include = c("x1", "x2", "worker_id", "message", "keys"))
expect_data_table(data, nrows = 1)
expect_data_table(rush$fetch_tasks(), nrows = 1)
# status checks
expect_false(rush$is_running_task(task$key))
expect_true(rush$is_failed_task(task$key))
expect_rush_reset(rush, type = "terminate")
})
test_that("retry a failed task works", {
skip_on_cran()
lg_rush = lgr::get_logger("rush")
old_threshold_rush = lg_rush$threshold
on.exit(lg_rush$set_threshold(old_threshold_rush))
lg_rush$set_threshold("info")
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
xss = list(list(x1 = 1, x2 = 2))
keys = rush$push_tasks(xss)
task = rush$pop_task()
expect_output(rush$retry_tasks(keys), "Not all task")
rush$push_failed(task$key, conditions = list(list(message = "error")))
expect_equal(rush$n_queued_tasks, 0)
expect_equal(rush$n_failed_tasks, 1)
expect_true(rush$is_failed_task(task$key))
rush$retry_tasks(keys)
expect_equal(rush$n_queued_tasks, 1)
expect_equal(rush$n_failed_tasks, 0)
expect_false(rush$is_failed_task(task$key))
expect_rush_reset(rush, type = "terminate")
})
test_that("retry a failed task works and setting a new seed works", {
skip_on_cran()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
xss = list(list(x1 = 1, x2 = 2))
seed = c(10407L, 1280795612L, -169270483L, -442010614L, -603558397L, -222347416L, 1489374793L)
keys = rush$push_tasks(xss, seeds = list(seed))
task = rush$pop_task(fields = c("xs", "seed"))
expect_equal(task$seed, seed)
rush$push_failed(task$key, conditions = list(list(message = "error")))
expect_equal(rush$n_queued_tasks, 0)
expect_equal(rush$n_failed_tasks, 1)
expect_true(rush$is_failed_task(task$key))
rush$retry_tasks(keys, next_seed = TRUE)
task_info = rush$read_hash(keys, "seed")
expect_true(is_lecyer_cmrg_seed(task_info$seed))
expect_true(task_info$seed[2] != seed[2])
expect_rush_reset(rush, type = "terminate")
})
test_that("retry a failed task works with a maximum of retries", {
skip_on_cran()
lg_rush = lgr::get_logger("rush")
old_threshold_rush = lg_rush$threshold
on.exit(lg_rush$set_threshold(old_threshold_rush))
lg_rush$set_threshold("info")
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
xss = list(list(x1 = 1, x2 = 2))
keys = rush$push_tasks(xss, max_retries = 1)
task = rush$pop_task(fields = c("max_retries", "n_retries"))
expect_equal(task$max_retries, 1)
expect_null(task$n_retries)
expect_output(rush$retry_tasks(keys), "Not all task")
rush$push_failed(task$key, conditions = list(list(message = "error")))
expect_equal(rush$n_queued_tasks, 0)
expect_equal(rush$n_failed_tasks, 1)
expect_true(rush$is_failed_task(task$key))
rush$retry_tasks(keys)
task_info = rush$read_hash(keys, fields = c("max_retries", "n_retries"))
expect_equal(task_info$max_retries, 1)
expect_equal(task_info$n_retries, 1)
expect_equal(rush$n_queued_tasks, 1)
expect_equal(rush$n_failed_tasks, 0)
expect_false(rush$is_failed_task(task$key))
task = rush$pop_task()
rush$push_failed(task$key, conditions = list(list(message = "error")))
expect_output(rush$retry_tasks(keys), "reached the maximum number of retries")
rush$retry_tasks(keys, ignore_max_retries = TRUE)
task_info = rush$read_hash(keys, fields = c("max_retries", "n_retries"))
expect_equal(task_info$max_retries, 1)
expect_equal(task_info$n_retries, 2)
expect_equal(rush$n_queued_tasks, 1)
expect_equal(rush$n_failed_tasks, 0)
expect_false(rush$is_failed_task(task$key))
expect_rush_reset(rush, type = "terminate")
})
test_that("retry failed tasks works", {
skip_on_cran()
lg_rush = lgr::get_logger("rush")
old_threshold_rush = lg_rush$threshold
on.exit(lg_rush$set_threshold(old_threshold_rush))
lg_rush$set_threshold("info")
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
xss = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3))
rush$push_tasks(xss)
task_1 = rush$pop_task()
task_2 = rush$pop_task()
keys = c(task_1$key, task_2$key)
expect_output(rush$retry_tasks(keys), "Not all task")
rush$push_failed(keys, conditions = list(list(message = "error")))
expect_equal(rush$n_queued_tasks, 0)
expect_equal(rush$n_failed_tasks, 2)
expect_true(all(rush$is_failed_task(keys)))
rush$retry_tasks(keys)
expect_equal(rush$n_queued_tasks, 2)
expect_equal(rush$n_failed_tasks, 0)
expect_false(any(rush$is_failed_task(keys)))
expect_rush_reset(rush, type = "terminate")
})
test_that("moving and fetching tasks works", {
skip_on_cran()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
# queue tasks
xss = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3), list(x1 = 1, x2 = 4), list(x1 = 1, x2 = 5))
rush$push_tasks(xss)
queued_tasks = rush$fetch_queued_tasks()
expect_data_table(queued_tasks, nrows = 4)
expect_character(queued_tasks$keys, unique = TRUE)
all_tasks = rush$fetch_tasks()
expect_data_table(all_tasks, nrows = 4)
expect_character(all_tasks$keys, unique = TRUE)
# pop task
task = rush$pop_task()
queued_tasks = rush$fetch_queued_tasks()
expect_data_table(queued_tasks, nrows = 3)
expect_character(queued_tasks$keys, unique = TRUE)
running_tasks = rush$fetch_running_tasks()
expect_data_table(running_tasks, nrows = 1)
expect_character(running_tasks$keys, unique = TRUE)
expect_disjunct(queued_tasks$keys, running_tasks$keys)
all_tasks = rush$fetch_tasks()
expect_data_table(all_tasks, nrows = 4)
expect_character(all_tasks$keys, unique = TRUE)
# push result
rush$pop_task()
rush$push_results(task$key, list(list(y = 3)))
queued_tasks = rush$fetch_queued_tasks()
expect_data_table(queued_tasks, nrows = 2)
expect_character(queued_tasks$keys, unique = TRUE)
running_tasks = rush$fetch_running_tasks()
expect_data_table(running_tasks, nrows = 1)
expect_character(running_tasks$keys, unique = TRUE)
expect_disjunct(queued_tasks$keys, running_tasks$keys)
finished_tasks = rush$fetch_finished_tasks()
expect_data_table(finished_tasks, nrows = 1)
expect_character(finished_tasks$keys, unique = TRUE)
expect_disjunct(queued_tasks$keys, finished_tasks$keys)
expect_disjunct(running_tasks$keys, finished_tasks$keys)
all_tasks = rush$fetch_tasks()
expect_data_table(all_tasks, nrows = 4)
expect_character(all_tasks$keys, unique = TRUE)
# push failed task
task = rush$pop_task()
rush$push_failed(task$key, conditions = list(list(message = "error")))
queued_tasks = rush$fetch_queued_tasks()
expect_data_table(queued_tasks, nrows = 1)
expect_character(queued_tasks$keys, unique = TRUE)
running_tasks = rush$fetch_running_tasks()
expect_data_table(running_tasks, nrows = 1)
expect_character(running_tasks$keys, unique = TRUE)
expect_disjunct(queued_tasks$keys, running_tasks$keys)
finished_tasks = rush$fetch_finished_tasks()
expect_data_table(finished_tasks, nrows = 1)
expect_character(finished_tasks$keys, unique = TRUE)
expect_disjunct(queued_tasks$keys, finished_tasks$keys)
expect_disjunct(running_tasks$keys, finished_tasks$keys)
failed_tasks = rush$fetch_failed_tasks()
expect_data_table(failed_tasks, nrows = 1)
expect_character(failed_tasks$keys, unique = TRUE)
expect_disjunct(queued_tasks$keys, failed_tasks$keys)
expect_disjunct(running_tasks$keys, failed_tasks$keys)
all_tasks = rush$fetch_tasks()
expect_data_table(all_tasks, nrows = 4)
expect_character(all_tasks$keys, unique = TRUE)
expect_rush_reset(rush, type = "terminate")
})
test_that("fetching as list works", {
skip_on_cran()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
# queue tasks
xss = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3), list(x1 = 1, x2 = 4), list(x1 = 1, x2 = 5))
keys = rush$push_tasks(xss)
queued_tasks = rush$fetch_queued_tasks(data_format = "list")
expect_list(queued_tasks, len = 4)
expect_names(names(queued_tasks), permutation.of = keys)
# pop task
task_1 = rush$pop_task()
running_tasks = rush$fetch_running_tasks(data_format = "list")
expect_list(running_tasks, len = 1)
expect_names(names(running_tasks), identical.to = task_1$key)
task_2 = rush$pop_task()
running_tasks = rush$fetch_running_tasks(data_format = "list")
expect_list(running_tasks, len = 2)
expect_names(names(running_tasks), permutation.of = c(task_1$key, task_2$key))
# push result
rush$push_results(task_1$key, list(list(y = 3)))
finished_tasks = rush$fetch_finished_tasks(data_format = "list")
expect_list(finished_tasks, len = 1)
expect_names(names(finished_tasks), permutation.of = task_1$key)
rush$push_results(task_2$key, list(list(y = 3)))
finished_tasks = rush$fetch_finished_tasks(data_format = "list")
expect_list(finished_tasks, len = 2)
expect_names(names(finished_tasks), permutation.of = c(task_1$key, task_2$key))
expect_null(rush$wait_for_finished_tasks(timeout = 0.1, data_format = "list"))
latest_results = rush$fetch_new_tasks(data_format = "list")
expect_list(latest_results, len = 2)
expect_names(names(latest_results), permutation.of = c(task_1$key, task_2$key))
task_3 = rush$pop_task()
rush$push_results(task_3$key, list(list(y = 3)))
latest_results = rush$wait_for_new_tasks(data_format = "list")
expect_list(latest_results, len = 1)
expect_names(names(latest_results), permutation.of = task_3$key)
# push failed task
task = rush$pop_task()
rush$push_failed(task$key, conditions = list(list(message = "error")))
failed_tasks = rush$fetch_failed_tasks(data_format = "list")
expect_list(failed_tasks, len = 1)
expect_names(names(failed_tasks), identical.to = task$key)
expect_rush_reset(rush, type = "terminate")
})
test_that("fetch task with states works", {
skip_on_cran()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE, seed = 123)
xss = list(list(x1 = 1, x2 = 2))
keys = rush$push_tasks(xss)
# queued
expect_equal(rush$n_queued_tasks, 1)
expect_data_table(rush$fetch_tasks_with_state(states = "finished"), nrows = 0)
expect_data_table(rush$fetch_tasks_with_state(states = c("running", "finished")), nrows = 0)
tab = rush$fetch_tasks_with_state(states = "queued")
expect_data_table(tab, nrows = 1)
expect_names(names(tab), must.include = "state")
expect_list(rush$fetch_tasks_with_state(states = "finished", data_format = "list"), len = 1)
expect_list(rush$fetch_tasks_with_state(states = c("running", "finished"), data_format = "list"), len = 2)
l = rush$fetch_tasks_with_state(states = "queued", data_format = "list")
expect_list(l, len = 1)
expect_list(l$queued, len = 1)
# running
task = rush$pop_task(fields = c("xs", "seed"))
tab = rush$fetch_tasks_with_state()
expect_data_table(tab, nrows = 1)
expect_equal(tab$state, "running")
l = rush$fetch_tasks_with_state(data_format = "list")
expect_list(l, len = 4)
expect_list(l$running, len = 1)
# finished
rush$push_results(task$key, list(list(y = 3)))
tab = rush$fetch_tasks_with_state()
expect_data_table(tab, nrows = 1)
expect_equal(tab$state, "finished")
l = rush$fetch_tasks_with_state(data_format = "list")
expect_list(l, len = 4)
expect_list(l$finished, len = 1)
# failed
xss = list(list(x1 = 2, x2 = 2))
rush$push_tasks(xss)
task_2 = rush$pop_task()
rush$push_failed(task_2$key, conditions = list(list(message = "error")))
tab = rush$fetch_tasks_with_state()
expect_data_table(tab, nrows = 2)
expect_equal(tab$state, c("finished", "failed"))
l = rush$fetch_tasks_with_state(data_format = "list")
expect_list(l, len = 4)
expect_list(l$finished, len = 1)
expect_list(l$failed, len = 1)
})
test_that("latest results are fetched", {
skip_on_cran()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
# add 1 task
rush$push_tasks(list(list(x1 = 1, x2 = 2)))
task = rush$pop_task()
rush$push_results(task$key, list(list(y = 3)))
latest_results = rush$fetch_new_tasks()
expect_data_table(latest_results, nrows = 1)
expect_set_equal(latest_results$y, 3)
expect_data_table(rush$fetch_new_tasks(), nrows = 0)
# add 1 task
keys = rush$push_tasks(list(list(x1 = 1, x2 = 3)))
task = rush$pop_task()
rush$push_results(task$key, list(list(y = 4)))
latest_results = rush$fetch_new_tasks()
expect_data_table(latest_results, nrows = 1)
expect_set_equal(latest_results$y, 4)
expect_data_table(rush$fetch_new_tasks(), nrows = 0)
# add 2 tasks
keys = rush$push_tasks(list(list(x1 = 1, x2 = 4)))
task = rush$pop_task()
rush$push_results(task$key, list(list(y = 5)))
keys = rush$push_tasks(list(list(x1 = 1, x2 = 5)))
task = rush$pop_task()
rush$push_results(task$key, list(list(y = 6)))
latest_results = rush$fetch_new_tasks()
expect_data_table(latest_results, nrows = 2)
expect_set_equal(latest_results$y, c(5, 6))
expect_data_table(rush$fetch_new_tasks(), nrows = 0)
expect_rush_reset(rush, type = "terminate")
})
test_that("priority queues work", {
skip_on_cran()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
expect_equal(rush$n_queued_priority_tasks, 0)
expect_data_table(rush$fetch_priority_tasks(), nrows = 0)
expect_data_table(rush$priority_info, nrows = 0)
rush_1 = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
rush_2 = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
expect_equal(rush$n_queued_priority_tasks, 0)
expect_data_table(rush$fetch_priority_tasks(), nrows = 0)
priority_info = rush$priority_info
expect_data_table(rush$priority_info, nrows = 2)
expect_equal(priority_info[list(rush_1$worker_id), n_tasks, on = "worker_id"], 0)
expect_equal(priority_info[list(rush_2$worker_id), n_tasks, on = "worker_id"], 0)
keys = rush$push_priority_tasks(list(list(x1 = 1, x2 = 2), list(x1 = 2, x2 = 2), list(x1 = 3, x2 = 5)), priority = c(rep(rush_1$worker_id, 2), rush_2$worker_id))
expect_equal(rush$n_queued_priority_tasks, 3)
expect_data_table(rush$fetch_priority_tasks(), nrows = 3)
priority_info = rush$priority_info
expect_data_table(priority_info, nrows = 2)
expect_equal(priority_info[list(rush_1$worker_id), n_tasks, on = "worker_id"], 2)
expect_equal(priority_info[list(rush_2$worker_id), n_tasks, on = "worker_id"], 1)
expect_rush_task(rush_2$pop_task())
expect_equal(rush$n_queued_priority_tasks, 2)
expect_data_table(rush$fetch_priority_tasks(), nrows = 2)
priority_info = rush$priority_info
expect_data_table(priority_info, nrows = 2)
expect_equal(priority_info[list(rush_1$worker_id), n_tasks, on = "worker_id"], 2)
expect_equal(priority_info[list(rush_2$worker_id), n_tasks, on = "worker_id"], 0)
expect_null(rush_2$pop_task())
expect_rush_task(rush_1$pop_task())
expect_equal(rush$n_queued_priority_tasks, 1)
priority_info = rush$priority_info
expect_data_table(priority_info, nrows = 2)
expect_equal(priority_info[list(rush_1$worker_id), n_tasks, on = "worker_id"], 1)
expect_equal(priority_info[list(rush_2$worker_id), n_tasks, on = "worker_id"], 0)
expect_rush_task(rush_1$pop_task())
expect_equal(rush$n_queued_priority_tasks, 0)
expect_set_equal(rush$priority_info$n_tasks, 0)
expect_rush_reset(rush, type = "terminate")
})
test_that("redirecting to shared queue works", {
skip_on_cran()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
rush_1 = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
rush_2 = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
keys = rush$push_priority_tasks(list(list(x1 = 1, x2 = 2)), priority = rush_1$worker_id)
expect_equal(rush$n_queued_tasks, 0)
expect_equal(rush$n_queued_priority_tasks, 1)
expect_data_table(rush$fetch_priority_tasks(), nrows = 1)
expect_null(rush_2$pop_task())
expect_rush_task(rush_1$pop_task())
keys = rush$push_priority_tasks(list(list(x1 = 2, x2 = 2)), priority = uuid::UUIDgenerate())
expect_equal(rush$n_queued_tasks, 1)
expect_equal(rush$n_queued_priority_tasks, 0)
expect_rush_task(rush_1$pop_task())
rush_1$set_terminated()
keys = rush$push_priority_tasks(list(list(x1 = 1, x2 = 2)), priority = rush_1$worker_id)
expect_equal(rush$n_queued_tasks, 1)
expect_equal(rush$n_queued_priority_tasks, 0)
expect_rush_reset(rush, type = "terminate")
})
test_that("mixing priority queue and shared queue works", {
skip_on_cran()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
rush_1 = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
rush_2 = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
keys = rush$push_priority_tasks(list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 2)), priority = c(rush_1$worker_id, NA_character_))
expect_equal(rush$n_queued_tasks, 1)
expect_equal(rush$n_queued_priority_tasks, 1)
expect_data_table(rush$fetch_priority_tasks(), nrows = 1)
expect_rush_task(rush_2$pop_task())
expect_null(rush_2$pop_task())
expect_rush_task(rush_1$pop_task())
expect_rush_reset(rush, type = "terminate")
})
test_that("saving logs with redis appender works", {
skip_on_cran()
appenders = lgr::get_logger("root")$appenders
on.exit({
lgr::get_logger("root")$set_appenders(appenders)
})
config = start_flush_redis()
rush = RushWorker$new(
network_id = "test-rush",
config = config,
remote = FALSE,
lgr_thresholds = c(rush = "info"),
lgr_buffer_size = 0)
lg = lgr::get_logger("rush")
lg$info("test-1")
log = rush$read_log()
expect_data_table(log, nrows = 1)
expect_names(colnames(log), identical.to = c("worker_id", "level", "timestamp", "logger", "caller", "msg"))
expect_equal(log$msg, "[rush] test-1")
lg$info("test-2")
log = rush$read_log()
expect_data_table(log, nrows = 2)
expect_names(colnames(log), identical.to = c("worker_id", "level", "timestamp", "logger", "caller", "msg"))
expect_equal(log$msg, c("[rush] test-1", "[rush] test-2"))
expect_rush_reset(rush, type = "terminate")
})
test_that("settings the buffer size in redis appender works", {
skip_on_cran()
appenders = lgr::get_logger("root")$appenders
on.exit({
lgr::get_logger("root")$set_appenders(appenders)
})
config = start_flush_redis()
rush = RushWorker$new(
network_id = "test-rush",
config = config,
remote = FALSE,
lgr_thresholds = c(rush = "info"),
lgr_buffer_size = 2)
lg = lgr::get_logger("rush")
lg$info("test-1")
expect_data_table(rush$read_log(), nrows = 0)
lg$info("test-2")
expect_data_table(rush$read_log(), nrows = 0)
lg$info("test-3")
log = rush$read_log()
expect_data_table(log, nrows = 3)
expect_names(colnames(log), identical.to = c("worker_id", "level", "timestamp", "logger", "caller", "msg"))
expect_equal(log$msg, c("[rush] test-1", "[rush] test-2", "[rush] test-3"))
})
test_that("pushing tasks and terminating worker works", {
skip_on_cran()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
expect_false(rush$terminated)
expect_false(rush$terminated_on_idle)
xss = list(list(x1 = 1, x2 = 2))
keys = rush$push_tasks(xss, terminate_workers = TRUE)
expect_false(rush$terminated)
expect_false(rush$terminated_on_idle)
rush$pop_task()
expect_false(rush$terminated)
expect_true(rush$terminated_on_idle)
expect_rush_reset(rush, type = "terminate")
})
test_that("terminate on idle works", {
skip_on_cran()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
xss = list(list(x1 = 1, x2 = 2))
keys = rush$push_tasks(xss, terminate_workers = TRUE)
expect_false(rush$terminated_on_idle)
rush$pop_task()
expect_true(rush$terminated_on_idle)
expect_rush_reset(rush, type = "terminate")
})
# seed -------------------------------------------------------------------------
test_that("popping a task with seed from the queue works", {
skip_on_cran()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE, seed = 123)
xss = list(list(x1 = 1, x2 = 2))
rush$push_tasks(xss)
# check task seed
task = rush$pop_task(fields = c("xs", "seed"))
expect_true(is_lecyer_cmrg_seed(task$seed))
expect_rush_reset(rush, type = "terminate")
})
# atomic operations -----------------------------------------------------------
test_that("task in states works", {
skip_on_cran()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE, seed = 123)
xss = list(list(x1 = 1, x2 = 2))
keys = rush$push_tasks(xss)
keys_list = rush$tasks_with_state(c("queued", "running", "finished", "failed"))
expect_list(keys_list, len = 4)
expect_names(names(keys_list), identical.to = c("queued", "running", "finished", "failed"))
expect_equal(keys_list$queued, keys)
expect_null(keys_list$running)
expect_null(keys_list$finished)
expect_null(keys_list$failed)
# switch order
keys_list = rush$tasks_with_state(c("running", "queued", "finished", "failed"))
expect_equal(keys_list$queued, keys)
task = rush$pop_task()
keys_list = rush$tasks_with_state(c("queued", "running", "finished", "failed"))
expect_list(keys_list, len = 4)
expect_names(names(keys_list), identical.to = c("queued", "running", "finished", "failed"))
expect_equal(keys_list$running, keys)
expect_null(keys_list$queued)
expect_null(keys_list$finished)
expect_null(keys_list$failed)
rush$push_results(task$key, list(list(y = 3)))
keys_list = rush$tasks_with_state(c("queued", "running", "finished", "failed"))
expect_list(keys_list, len = 4)
expect_names(names(keys_list), identical.to = c("queued", "running", "finished", "failed"))
expect_null(keys_list$queued)
expect_null(keys_list$running)
expect_equal(keys_list$finished, task$key)
expect_null(keys_list$failed)
xss = list(list(x1 = 2, x2 = 2))
keys = rush$push_tasks(xss)
task_2 = rush$pop_task()
rush$push_failed(task_2$key, conditions = list(list(message = "error")))
keys_list = rush$tasks_with_state(c("queued", "running", "finished", "failed"))
expect_list(keys_list, len = 4)
expect_names(names(keys_list), identical.to = c("queued", "running", "finished", "failed"))
expect_null(keys_list$queued)
expect_null(keys_list$running)
expect_equal(keys_list$finished, task$key)
expect_equal(keys_list$failed, task_2$key)
keys_list = rush$tasks_with_state(c("queued"))
expect_list(keys_list, len = 1)
expect_names(names(keys_list), identical.to = c("queued"))
expect_null(keys_list$queued)
keys_list = rush$tasks_with_state(c("queued", "running"))
expect_list(keys_list, len = 2)
expect_names(names(keys_list), identical.to = c("queued", "running"))
expect_null(keys_list$queued)
expect_null(keys_list$running)
keys_list = rush$tasks_with_state(c("queued", "running", "finished"))
expect_list(keys_list, len = 3)
expect_names(names(keys_list), identical.to = c("queued", "running", "finished"))
expect_null(keys_list$queued)
expect_null(keys_list$running)
expect_equal(keys_list$finished, task$key)
})
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.