Nothing
skip_if_no_redis()
# starting worker and terminating ----------------------------------------------
test_that("constructing a rush worker works", {
config = redux::redis_config()
r = redux::hiredis(config)
r$FLUSHDB()
rush = RushWorker$new(network_id = "test-rush", config = config)
expect_class(rush, "RushWorker")
expect_equal(rush$network_id, "test-rush")
expect_string(rush$worker_id)
expect_equal(rush$worker_ids, rush$worker_id)
expect_equal(rush$running_worker_ids, rush$worker_id)
})
test_that("active bindings work after construction", {
rush = start_rush_worker()
expect_equal(rush$n_workers, 1)
expect_equal(rush$n_queued_tasks, 0)
expect_equal(rush$n_running_tasks, 0)
expect_equal(rush$n_finished_tasks, 0)
expect_equal(rush$n_failed_tasks, 0)
expect_null(rush$queued_tasks)
expect_null(rush$running_tasks)
expect_null(rush$finished_tasks)
expect_null(rush$failed_tasks)
})
test_that("a worker is registered", {
rush = start_rush_worker()
worker_info = rush$worker_info
expect_data_table(worker_info, nrows = 1)
expect_names(names(worker_info), permutation.of = c("worker_id", "pid", "hostname", "heartbeat", "state"))
expect_equal(worker_info$worker_id, rush$worker_id)
expect_equal(worker_info$pid, Sys.getpid())
expect_equal(rush$worker_ids, rush$worker_id)
expect_equal(rush$worker_info$state, "running")
})
test_that("a worker is terminated", {
rush = start_rush_worker()
expect_equal(rush$running_worker_ids, rush$worker_id)
rush$set_terminated()
expect_null(rush$running_worker_ids)
expect_equal(rush$terminated_worker_ids, rush$worker_id)
})
# low level read and write -----------------------------------------------------
test_that("reading and writing a hash works with flatten", {
rush = start_rush_worker()
# one field with list
key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)))
expect_equal(rush$read_hashes(key, "xs"), list(list(x1 = 1, x2 = 2)))
# one field with atomic
key = rush$write_hashes(timeout = 1)
expect_equal(rush$read_hashes(key, "timeout"), list(list(timeout = 1)))
# two fields with lists
key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), ys = list(list(y = 3)))
expect_equal(rush$read_hashes(key, c("xs", "ys")), list(list(x1 = 1, x2 = 2, y = 3)))
# two fields with list and empty list
key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), ys = list())
expect_equal(rush$read_hashes(key, c("xs", "ys")), list(list(x1 = 1, x2 = 2)))
# two fields with list and atomic
key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), timeout = 1)
expect_equal(rush$read_hashes(key, c("xs", "timeout")), list(list(x1 = 1, x2 = 2, timeout = 1)))
})
test_that("reading and writing a hash works without flatten", {
rush = start_rush_worker()
# one field with list
key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)))
expect_equal(rush$read_hashes(key, "xs", flatten = FALSE), list(list(xs = list(x1 = 1, x2 = 2))))
# one field with atomic
key = rush$write_hashes(timeout = 1)
expect_equal(rush$read_hashes(key, "timeout", flatten = FALSE), list(list(timeout = 1)))
# two fields with lists
key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), ys = list(list(y = 3)))
expect_equal(
rush$read_hashes(key, c("xs", "ys"), flatten = FALSE),
list(list(xs = list(x1 = 1, x2 = 2), ys = list(y = 3)))
)
# two fields with list and empty list
key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), ys = list())
expect_equal(rush$read_hashes(key, c("xs", "ys"), flatten = FALSE), list(list(xs = list(x1 = 1, x2 = 2), ys = NULL)))
# two fields with list and atomic
key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), timeout = 1)
expect_equal(
rush$read_hashes(key, c("xs", "timeout"), flatten = FALSE),
list(list(xs = list(x1 = 1, x2 = 2), timeout = 1))
)
})
test_that("reading and writing hashes works", {
rush = start_rush_worker()
# one field with list
keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)))
expect_equal(rush$read_hashes(keys, "xs"), list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)))
# one field atomic
keys = rush$write_hashes(timeout = c(1, 1))
expect_equal(rush$read_hashes(keys, "timeout"), list(list(timeout = 1), list(timeout = 1)))
# two fields with list and recycled atomic
keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), timeout = 1)
expect_equal(
rush$read_hashes(keys, c("xs", "timeout")),
list(list(x1 = 1, x2 = 2, timeout = 1), list(x1 = 1, x2 = 3, timeout = 1))
)
# two fields
keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), ys = list(list(y = 3), list(y = 4)))
expect_equal(rush$read_hashes(keys, c("xs", "ys")), list(list(x1 = 1, x2 = 2, y = 3), list(x1 = 1, x2 = 3, y = 4)))
# two fields with list and atomic
keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), timeout = c(1, 1))
expect_equal(
rush$read_hashes(keys, c("xs", "timeout")),
list(list(x1 = 1, x2 = 2, timeout = 1), list(x1 = 1, x2 = 3, timeout = 1))
)
# two fields with list and recycled atomic
keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), timeout = 1)
expect_equal(
rush$read_hashes(keys, c("xs", "timeout")),
list(list(x1 = 1, x2 = 2, timeout = 1), list(x1 = 1, x2 = 3, timeout = 1))
)
# two fields, one empty
keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), ys = list())
expect_equal(rush$read_hashes(keys, c("xs", "ys")), list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)))
# recycle
keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), ys = list(list(y = 3)))
expect_equal(rush$read_hashes(keys, c("xs", "ys")), list(list(x1 = 1, x2 = 2, y = 3), list(x1 = 1, x2 = 3, y = 3)))
})
test_that("writing hashes to specific keys works", {
rush = start_rush_worker()
# one element
keys = uuid::UUIDgenerate()
rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), keys = keys)
expect_equal(rush$read_hashes(keys, "xs"), list(list(x1 = 1, x2 = 2)))
# two elements
keys = uuid::UUIDgenerate(n = 2)
rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), keys = keys)
expect_equal(rush$read_hashes(keys, "xs"), list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)))
# wrong number of keys
keys = uuid::UUIDgenerate()
expect_error(
rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), keys = keys),
"Assertion on 'keys' failed"
)
})
test_that("writing list columns works", {
rush = start_rush_worker()
keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), xs_extra = list(list(extra = list("A"))))
rush$finish_tasks(keys, yss = list(list(y = 3)))
expect_list(rush$fetch_finished_tasks()$extra, len = 1)
rush$reset(workers = FALSE)
keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), xs_extra = list(list(extra = list(letters[1:3]))))
rush$finish_tasks(keys, yss = list(list(y = 3)))
expect_list(rush$fetch_finished_tasks()$extra, len = 1)
rush$reset(workers = FALSE)
keys = rush$write_hashes(
xs = list(list(x1 = 1, x2 = 2), list(x1 = 2, x2 = 2)),
xs_extra = list(list(extra = list("A")), list(extra = list("B")))
)
rush$finish_tasks(keys, yss = list(list(y = 3), list(y = 4)))
expect_list(rush$fetch_finished_tasks()$extra, len = 2)
})
# moving tasks between states --------------------------------------------------
test_that("popping a task works", {
rush = start_rush_worker()
xss = list(list(x1 = 1, x2 = 2))
rush$push_tasks(xss)
# check task
task = rush$pop_task()
expect_list(task)
expect_names(names(task), must.include = c("key", "xs"))
expect_list(task, names = "unique")
# check task count
expect_equal(rush$n_tasks, 1)
expect_equal(rush$n_queued_tasks, 0)
expect_equal(rush$n_running_tasks, 1)
expect_equal(rush$n_finished_tasks, 0)
expect_equal(rush$n_failed_tasks, 0)
# check keys in sets
expect_string(rush$tasks)
expect_null(rush$queued_tasks)
expect_string(rush$running_tasks)
expect_null(rush$finished_tasks)
expect_null(rush$failed_tasks)
# check fetching
expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
expect_data_table(rush$fetch_finished_tasks(), nrows = 0)
expect_data_table(rush$fetch_failed_tasks(), nrows = 0)
data = rush$fetch_running_tasks()
expect_names(names(data), must.include = c("x1", "x2", "worker_id", "keys"))
expect_data_table(data, nrows = 1)
expect_data_table(rush$fetch_tasks(), nrows = 1)
# status checks
expect_true(rush$is_running_task(task$key))
expect_false(rush$is_failed_task(task$key))
})
test_that("finishing a task works", {
rush = start_rush_worker()
xss = list(list(x1 = 1, x2 = 2))
rush$push_tasks(xss)
task = rush$pop_task()
rush$finish_tasks(task$key, list(list(y = 3)))
# check task count
expect_equal(rush$n_tasks, 1)
expect_equal(rush$n_queued_tasks, 0)
expect_equal(rush$n_running_tasks, 0)
expect_equal(rush$n_finished_tasks, 1)
expect_equal(rush$n_failed_tasks, 0)
# check keys in sets
expect_string(rush$tasks)
expect_null(rush$queued_tasks)
expect_null(rush$running_tasks)
expect_string(rush$finished_tasks)
expect_null(rush$failed_tasks)
# check fetching
expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
expect_data_table(rush$fetch_running_tasks(), nrows = 0)
expect_data_table(rush$fetch_failed_tasks(), nrows = 0)
data = rush$fetch_finished_tasks()
expect_names(names(data), must.include = c("x1", "x2", "worker_id", "y", "keys"))
expect_data_table(data, nrows = 1)
expect_data_table(rush$fetch_tasks(), nrows = 1)
# status checks
expect_false(rush$is_running_task(task$key))
expect_false(rush$is_failed_task(task$key))
})
test_that("failing a tasks works", {
rush = start_rush_worker()
xss = list(list(x1 = 1, x2 = 2))
rush$push_tasks(xss)
task = rush$pop_task()
rush$fail_tasks(task$key, conditions = list(list(message = "error")))
# check task count
expect_equal(rush$n_tasks, 1)
expect_equal(rush$n_queued_tasks, 0)
expect_equal(rush$n_running_tasks, 0)
expect_equal(rush$n_finished_tasks, 0)
expect_equal(rush$n_failed_tasks, 1)
# check keys in sets
expect_string(rush$tasks)
expect_null(rush$queued_tasks)
expect_null(rush$running_tasks)
expect_null(rush$finished_tasks)
expect_string(rush$failed_tasks)
# check fetching
expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
expect_data_table(rush$fetch_running_tasks(), nrows = 0)
expect_data_table(rush$fetch_finished_tasks(), nrows = 0)
data = rush$fetch_failed_tasks()
expect_names(names(data), must.include = c("x1", "x2", "worker_id", "message", "keys"))
expect_data_table(data, nrows = 1)
expect_data_table(rush$fetch_tasks(), nrows = 1)
# status checks
expect_false(rush$is_running_task(task$key))
expect_true(rush$is_failed_task(task$key))
})
test_that("moving and fetching tasks works", {
rush = start_rush_worker()
xss = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3), list(x1 = 1, x2 = 4), list(x1 = 1, x2 = 5))
rush$push_tasks(xss)
queued_tasks = rush$fetch_queued_tasks()
expect_data_table(queued_tasks, nrows = 4)
expect_character(queued_tasks$keys, unique = TRUE)
all_tasks = rush$fetch_tasks()
expect_data_table(all_tasks, nrows = 4)
expect_character(all_tasks$keys, unique = TRUE)
# pop task
task = rush$pop_task()
queued_tasks = rush$fetch_queued_tasks()
expect_data_table(queued_tasks, nrows = 3)
expect_character(queued_tasks$keys, unique = TRUE)
running_tasks = rush$fetch_running_tasks()
expect_data_table(running_tasks, nrows = 1)
expect_character(running_tasks$keys, unique = TRUE)
expect_disjunct(queued_tasks$keys, running_tasks$keys)
all_tasks = rush$fetch_tasks()
expect_data_table(all_tasks, nrows = 4)
expect_character(all_tasks$keys, unique = TRUE)
# finish task
rush$pop_task()
rush$finish_tasks(task$key, list(list(y = 3)))
queued_tasks = rush$fetch_queued_tasks()
expect_data_table(queued_tasks, nrows = 2)
expect_character(queued_tasks$keys, unique = TRUE)
running_tasks = rush$fetch_running_tasks()
expect_data_table(running_tasks, nrows = 1)
expect_character(running_tasks$keys, unique = TRUE)
expect_disjunct(queued_tasks$keys, running_tasks$keys)
finished_tasks = rush$fetch_finished_tasks()
expect_data_table(finished_tasks, nrows = 1)
expect_character(finished_tasks$keys, unique = TRUE)
expect_disjunct(queued_tasks$keys, finished_tasks$keys)
expect_disjunct(running_tasks$keys, finished_tasks$keys)
all_tasks = rush$fetch_tasks()
expect_data_table(all_tasks, nrows = 4)
expect_character(all_tasks$keys, unique = TRUE)
# fail task
task = rush$pop_task()
rush$fail_tasks(task$key, conditions = list(list(message = "error")))
queued_tasks = rush$fetch_queued_tasks()
expect_data_table(queued_tasks, nrows = 1)
expect_character(queued_tasks$keys, unique = TRUE)
running_tasks = rush$fetch_running_tasks()
expect_data_table(running_tasks, nrows = 1)
expect_character(running_tasks$keys, unique = TRUE)
expect_disjunct(queued_tasks$keys, running_tasks$keys)
finished_tasks = rush$fetch_finished_tasks()
expect_data_table(finished_tasks, nrows = 1)
expect_character(finished_tasks$keys, unique = TRUE)
expect_disjunct(queued_tasks$keys, finished_tasks$keys)
expect_disjunct(running_tasks$keys, finished_tasks$keys)
failed_tasks = rush$fetch_failed_tasks()
expect_data_table(failed_tasks, nrows = 1)
expect_character(failed_tasks$keys, unique = TRUE)
expect_disjunct(queued_tasks$keys, failed_tasks$keys)
expect_disjunct(running_tasks$keys, failed_tasks$keys)
all_tasks = rush$fetch_tasks()
expect_data_table(all_tasks, nrows = 4)
expect_character(all_tasks$keys, unique = TRUE)
})
test_that("moving a queued task to failed works", {
rush = start_rush_worker()
xss = list(list(x1 = 1, x2 = 2))
rush$push_tasks(xss)
queued_tasks = rush$queued_tasks
rush$fail_tasks(queued_tasks, conditions = list(list(message = "error")))
expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
expect_data_table(rush$fetch_failed_tasks(), nrows = 1)
xss = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3))
rush$push_tasks(xss)
task = rush$pop_task()
rush$fail_tasks(task$key, conditions = list(list(message = "error")))
expect_data_table(rush$fetch_queued_tasks(), nrows = 1)
expect_data_table(rush$fetch_failed_tasks(), nrows = 2)
expect_set_equal(rush$failed_tasks, c(task$key, queued_tasks))
queued_tasks = rush$queued_tasks
rush$fail_tasks(queued_tasks, conditions = list(list(message = "error")))
expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
expect_data_table(rush$fetch_failed_tasks(), nrows = 3)
xss = list(list(x1 = 1, x2 = 4), list(x1 = 1, x2 = 5))
rush$push_tasks(xss)
queued_tasks = rush$queued_tasks
rush$fail_tasks(queued_tasks, conditions = replicate(2, list(message = "error"), simplify = FALSE))
expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
expect_data_table(rush$fetch_failed_tasks(), nrows = 5)
})
test_that("fetch task with states works", {
rush = start_rush_worker()
xss = list(list(x1 = 1, x2 = 2))
keys = rush$push_tasks(xss)
# queued
expect_equal(rush$n_queued_tasks, 1)
expect_data_table(rush$fetch_tasks_with_state(states = "finished"), nrows = 0)
expect_data_table(rush$fetch_tasks_with_state(states = c("running", "finished")), nrows = 0)
tab = rush$fetch_tasks_with_state(states = "queued")
expect_data_table(tab, nrows = 1)
expect_names(names(tab), must.include = "state")
# running
task = rush$pop_task(fields = c("xs"))
tab = rush$fetch_tasks_with_state()
expect_data_table(tab, nrows = 1)
expect_equal(tab$state, "running")
# finished
rush$finish_tasks(task$key, list(list(y = 3)))
tab = rush$fetch_tasks_with_state()
expect_data_table(tab, nrows = 1)
expect_equal(tab$state, "finished")
# failed
xss = list(list(x1 = 2, x2 = 2))
rush$push_tasks(xss)
task_2 = rush$pop_task()
rush$fail_tasks(task_2$key, conditions = list(list(message = "error")))
tab = rush$fetch_tasks_with_state()
expect_data_table(tab, nrows = 2)
expect_equal(tab$state, c("finished", "failed"))
})
test_that("latest results are fetched", {
rush = start_rush_worker()
# add 1 task
rush$push_tasks(list(list(x1 = 1, x2 = 2)))
task = rush$pop_task()
rush$finish_tasks(task$key, list(list(y = 3)))
latest_results = rush$fetch_new_tasks()
expect_data_table(latest_results, nrows = 1)
expect_set_equal(latest_results$y, 3)
expect_data_table(rush$fetch_new_tasks(), nrows = 0)
# add 1 task
keys = rush$push_tasks(list(list(x1 = 1, x2 = 3)))
task = rush$pop_task()
rush$finish_tasks(task$key, list(list(y = 4)))
latest_results = rush$fetch_new_tasks()
expect_data_table(latest_results, nrows = 1)
expect_set_equal(latest_results$y, 4)
expect_data_table(rush$fetch_new_tasks(), nrows = 0)
# add 2 tasks
keys = rush$push_tasks(list(list(x1 = 1, x2 = 4)))
task = rush$pop_task()
rush$finish_tasks(task$key, list(list(y = 5)))
keys = rush$push_tasks(list(list(x1 = 1, x2 = 5)))
task = rush$pop_task()
rush$finish_tasks(task$key, list(list(y = 6)))
latest_results = rush$fetch_new_tasks()
expect_data_table(latest_results, nrows = 2)
expect_set_equal(latest_results$y, c(5, 6))
expect_data_table(rush$fetch_new_tasks(), nrows = 0)
})
test_that("pushing finished tasks works", {
rush = start_rush_worker()
rush$push_finished_tasks(
list(list(x1 = 1, x2 = 2)),
list(list(y = 3)),
xss_extra = list(list(extra_input = "A")),
yss_extra = list(list(extra_output = "B"))
)
expect_equal(rush$n_finished_tasks, 1)
expect_equal(rush$n_tasks, 1)
expect_equal(rush$fetch_finished_tasks()$extra_input, "A")
expect_equal(rush$fetch_finished_tasks()$extra_output, "B")
})
test_that("pushing failed tasks works", {
rush = start_rush_worker()
rush$push_failed_tasks(list(list(x1 = 1, x2 = 2)), conditions = list(list(message = "error")))
expect_equal(rush$n_failed_tasks, 1)
expect_equal(rush$n_tasks, 1)
})
# atomic operations -----------------------------------------------------------
test_that("task in states works", {
rush = start_rush_worker()
xss = list(list(x1 = 1, x2 = 2))
keys = rush$push_tasks(xss)
keys_list = rush$tasks_with_state(c("queued", "running", "finished", "failed"))
expect_list(keys_list, len = 4)
expect_names(names(keys_list), identical.to = c("queued", "running", "finished", "failed"))
expect_equal(keys_list$queued, keys)
expect_null(keys_list$running)
expect_null(keys_list$finished)
expect_null(keys_list$failed)
# switch order
keys_list = rush$tasks_with_state(c("running", "queued", "finished", "failed"))
expect_equal(keys_list$queued, keys)
task = rush$pop_task()
keys_list = rush$tasks_with_state(c("queued", "running", "finished", "failed"))
expect_list(keys_list, len = 4)
expect_names(names(keys_list), identical.to = c("queued", "running", "finished", "failed"))
expect_equal(keys_list$running, keys)
expect_null(keys_list$queued)
expect_null(keys_list$finished)
expect_null(keys_list$failed)
rush$finish_tasks(task$key, list(list(y = 3)))
keys_list = rush$tasks_with_state(c("queued", "running", "finished", "failed"))
expect_list(keys_list, len = 4)
expect_names(names(keys_list), identical.to = c("queued", "running", "finished", "failed"))
expect_null(keys_list$queued)
expect_null(keys_list$running)
expect_equal(keys_list$finished, task$key)
expect_null(keys_list$failed)
xss = list(list(x1 = 2, x2 = 2))
keys = rush$push_tasks(xss)
task_2 = rush$pop_task()
rush$fail_tasks(task_2$key, conditions = list(list(message = "error")))
keys_list = rush$tasks_with_state(c("queued", "running", "finished", "failed"))
expect_list(keys_list, len = 4)
expect_names(names(keys_list), identical.to = c("queued", "running", "finished", "failed"))
expect_null(keys_list$queued)
expect_null(keys_list$running)
expect_equal(keys_list$finished, task$key)
expect_equal(keys_list$failed, task_2$key)
keys_list = rush$tasks_with_state(c("queued"))
expect_list(keys_list, len = 1)
expect_names(names(keys_list), identical.to = c("queued"))
expect_null(keys_list$queued)
keys_list = rush$tasks_with_state(c("queued", "running"))
expect_list(keys_list, len = 2)
expect_names(names(keys_list), identical.to = c("queued", "running"))
expect_null(keys_list$queued)
expect_null(keys_list$running)
keys_list = rush$tasks_with_state(c("queued", "running", "finished"))
expect_list(keys_list, len = 3)
expect_names(names(keys_list), identical.to = c("queued", "running", "finished"))
expect_null(keys_list$queued)
expect_null(keys_list$running)
expect_equal(keys_list$finished, task$key)
})
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.