tests/testthat/test-RushWorker.R

test_that("constructing a rush worker works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
  expect_class(rush, "Rush")
  expect_equal(rush$network_id, "test-rush")
  expect_string(rush$worker_id)
  expect_false(rush$remote)
  expect_equal(rush$worker_ids, rush$worker_id)
  expect_equal(rush$running_worker_ids, rush$worker_id)

  expect_rush_reset(rush, type = "terminate")

  # pass worker id
  config = start_flush_redis()
  worker_id = uuid::UUIDgenerate()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE, worker_id = worker_id)
  expect_equal(rush$worker_id, worker_id)

  expect_rush_reset(rush, type = "terminate")
})

test_that("active bindings work after construction", {
  skip_on_cran()

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)

  expect_equal(rush$n_workers, 1)

  # check task count
  expect_equal(rush$n_queued_tasks, 0)
  expect_equal(rush$n_queued_priority_tasks, 0)
  expect_equal(rush$n_running_tasks, 0)
  expect_equal(rush$n_finished_tasks, 0)
  expect_equal(rush$n_failed_tasks, 0)

  # check keys in sets
  expect_null(rush$queued_tasks)
  expect_null(rush$running_tasks)
  expect_null(rush$finished_tasks)
  expect_null(rush$failed_tasks)

  expect_rush_reset(rush, type = "terminate")
})

test_that("a worker is registered", {
  skip_on_cran()

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)

  # check meta data from redis
  worker_info = rush$worker_info
  expect_data_table(worker_info, nrows = 1)
  expect_names(names(worker_info), permutation.of = c("worker_id", "pid", "remote", "hostname", "heartbeat"))
  expect_string(worker_info$heartbeat, na.ok = TRUE)
  expect_equal(worker_info$worker_id, rush$worker_id)
  expect_false(worker_info$remote)
  expect_equal(worker_info$pid, Sys.getpid())
  expect_equal(rush$worker_ids, rush$worker_id)
  expect_equal(rush$worker_states$state, "running")

  expect_rush_reset(rush, type = "terminate")
})

test_that("a worker is terminated", {
  skip_on_cran()

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
  expect_equal(rush$running_worker_ids, rush$worker_id)

  rush$set_terminated()
  expect_null(rush$running_worker_ids)
  expect_equal(rush$terminated_worker_ids, rush$worker_id)

  expect_rush_reset(rush, type = "terminate")
})

test_that("a heartbeat is started", {
  skip_on_cran()

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE, heartbeat_period = 3)

  expect_class(rush$heartbeat, "r_process")
  expect_true(rush$heartbeat$is_alive())
  expect_string(rush$worker_info$heartbeat)

  expect_rush_reset(rush, type = "terminate")
})

test_that("pushing a task to the queue works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)

  xss = list(list(x1 = 1, x2 = 2))
  keys = rush$push_tasks(xss)

  # check task count
  expect_equal(rush$n_tasks, 1)
  expect_equal(rush$n_queued_tasks, 1)
  expect_equal(rush$n_running_tasks, 0)
  expect_equal(rush$n_finished_tasks, 0)
  expect_equal(rush$n_failed_tasks, 0)

  # check keys in sets
  expect_string(rush$tasks)
  expect_set_equal(rush$queued_tasks, keys)
  expect_null(rush$running_tasks)
  expect_null(rush$finished_tasks)
  expect_null(rush$failed_tasks)

  # check fetching
  expect_data_table(rush$fetch_running_tasks(), nrows = 0)
  expect_data_table(rush$fetch_finished_tasks(), nrows = 0)
  expect_data_table(rush$fetch_failed_tasks(), nrows = 0)
  data = rush$fetch_queued_tasks()
  expect_names(names(data), must.include = c("x1", "x2", "keys"))
  expect_data_table(data, nrows = 1)
  expect_data_table(rush$fetch_tasks(), nrows = 1)

  # status checks
  expect_false(rush$is_running_task(keys))
  expect_false(rush$is_failed_task(keys))

  expect_rush_reset(rush, type = "terminate")
})

test_that("pushing a task with extras to the queue works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)

  xss = list(list(x1 = 1, x2 = 2))
  timestamp = Sys.time()
  extra = list(list(timestamp = timestamp))
  keys = rush$push_tasks(xss, extra)

  # check task count
  expect_equal(rush$n_tasks, 1)
  expect_equal(rush$n_queued_tasks, 1)
  expect_equal(rush$n_running_tasks, 0)
  expect_equal(rush$n_finished_tasks, 0)
  expect_equal(rush$n_failed_tasks, 0)

  # check keys in sets
  expect_string(rush$tasks)
  expect_set_equal(rush$queued_tasks, keys)
  expect_null(rush$running_tasks)
  expect_null(rush$finished_tasks)
  expect_null(rush$failed_tasks)

  # check fetching
  expect_data_table(rush$fetch_running_tasks(), nrows = 0)
  expect_data_table(rush$fetch_finished_tasks(), nrows = 0)
  expect_data_table(rush$fetch_failed_tasks(), nrows = 0)
  data = rush$fetch_queued_tasks()
  expect_names(names(data), must.include = c("x1", "x2", "timestamp", "keys"))
  expect_data_table(data, nrows = 1)
  expect_equal(data$timestamp, timestamp)
  expect_data_table(rush$fetch_tasks(), nrows = 1)

  # status checks
  expect_false(rush$is_running_task(keys))
  expect_false(rush$is_failed_task(keys))

  expect_rush_reset(rush, type = "terminate")
})

test_that("pushing tasks to the queue works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)

  xss = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3))
  keys = rush$push_tasks(xss)

  # check task count
  expect_equal(rush$n_tasks, 2)
  expect_equal(rush$n_queued_tasks, 2)
  expect_equal(rush$n_running_tasks, 0)
  expect_equal(rush$n_finished_tasks, 0)
  expect_equal(rush$n_failed_tasks, 0)

  # check keys in sets
  expect_character(rush$tasks, len = 2)
  expect_set_equal(rush$queued_tasks, keys)
  expect_null(rush$running_tasks)
  expect_null(rush$finished_tasks)
  expect_null(rush$failed_tasks)

  # check fetching
  expect_data_table(rush$fetch_running_tasks(), nrows = 0)
  expect_data_table(rush$fetch_finished_tasks(), nrows = 0)
  expect_data_table(rush$fetch_failed_tasks(), nrows = 0)
  data = rush$fetch_queued_tasks()
  expect_names(names(data), must.include = c("x1", "x2", "keys"))
  expect_data_table(data, nrows = 2)
  expect_character(data$keys, unique = TRUE, len = 2)
  expect_data_table(rush$fetch_tasks(), nrows = 2)

  # status checks
  expect_false(any(rush$is_running_task(keys)))
  expect_false(any(rush$is_failed_task(keys)))

  expect_rush_reset(rush, type = "terminate")
})

test_that("pushing tasks with extras to the queue works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)

  xss = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3))
  timestamp = Sys.time()
  extra = list(list(timestamp = timestamp), list(timestamp = timestamp))
  keys = rush$push_tasks(xss, extra)

  # check task count
  expect_equal(rush$n_tasks, 2)
  expect_equal(rush$n_queued_tasks, 2)
  expect_equal(rush$n_running_tasks, 0)
  expect_equal(rush$n_finished_tasks, 0)
  expect_equal(rush$n_failed_tasks, 0)

  # check keys in sets
  expect_character(rush$tasks, len = 2)
  expect_set_equal(rush$queued_tasks, keys)
  expect_null(rush$running_tasks)
  expect_null(rush$finished_tasks)
  expect_null(rush$failed_tasks)

  # check fetching
  expect_data_table(rush$fetch_running_tasks(), nrows = 0)
  expect_data_table(rush$fetch_finished_tasks(), nrows = 0)
  expect_data_table(rush$fetch_failed_tasks(), nrows = 0)
  data = rush$fetch_queued_tasks()
  expect_names(names(data), must.include = c("x1", "x2", "timestamp", "keys"))
  expect_data_table(data, nrows = 2)
  expect_character(data$keys, unique = TRUE, len = 2)
  expect_equal(data$timestamp, c(timestamp, timestamp))
  expect_data_table(rush$fetch_tasks(), nrows = 2)

  # status checks
  expect_false(any(rush$is_running_task(keys)))
  expect_false(any(rush$is_failed_task(keys)))

  expect_rush_reset(rush, type = "terminate")
})

test_that("popping a task from the queue works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
  xss = list(list(x1 = 1, x2 = 2))
  rush$push_tasks(xss)

  # check task
  task = rush$pop_task()
  expect_rush_task(task)

  # check task count
  expect_equal(rush$n_tasks, 1)
  expect_equal(rush$n_queued_tasks, 0)
  expect_equal(rush$n_running_tasks, 1)
  expect_equal(rush$n_finished_tasks, 0)
  expect_equal(rush$n_failed_tasks, 0)

  # check keys in sets
  expect_string(rush$tasks)
  expect_null(rush$queued_tasks)
  expect_string(rush$running_tasks)
  expect_null(rush$finished_tasks)
  expect_null(rush$failed_tasks)

  # check fetching
  expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
  expect_data_table(rush$fetch_finished_tasks(), nrows = 0)
  expect_data_table(rush$fetch_failed_tasks(), nrows = 0)
  data = rush$fetch_running_tasks()
  expect_names(names(data), must.include = c("x1", "x2", "worker_id", "keys"))
  expect_data_table(data, nrows = 1)
  expect_data_table(rush$fetch_tasks(), nrows = 1)

  # status checks
  expect_true(rush$is_running_task(task$key))
  expect_false(rush$is_failed_task(task$key))

  expect_rush_reset(rush, type = "terminate")
})

test_that("popping a task with seed, max_retries and timeout works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
  xss = list(list(x1 = 1, x2 = 2))
  seed = 123456
  max_retries = 2
  timeout = 1
  rush$push_tasks(xss, seeds = list(seed), max_retries = max_retries, timeouts = timeout)

  # check task
  task = rush$pop_task(fields = c("xs", "seed", "max_retries", "timeout"))
  expect_equal(task$seed, seed)
  expect_equal(task$max_retries, max_retries)
  expect_equal(task$timeout, timeout)
  expect_rush_task(task)

  # check task count
  expect_equal(rush$n_tasks, 1)
  expect_equal(rush$n_queued_tasks, 0)
  expect_equal(rush$n_running_tasks, 1)
  expect_equal(rush$n_finished_tasks, 0)
  expect_equal(rush$n_failed_tasks, 0)

  # check keys in sets
  expect_string(rush$tasks)
  expect_null(rush$queued_tasks)
  expect_string(rush$running_tasks)
  expect_null(rush$finished_tasks)
  expect_null(rush$failed_tasks)

  # check fetching
  expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
  expect_data_table(rush$fetch_finished_tasks(), nrows = 0)
  expect_data_table(rush$fetch_failed_tasks(), nrows = 0)
  data = rush$fetch_running_tasks()
  expect_names(names(data), must.include = c("x1", "x2", "worker_id", "keys"))
  expect_data_table(data, nrows = 1)
  expect_data_table(rush$fetch_tasks(), nrows = 1)

  # status checks
  expect_true(rush$is_running_task(task$key))
  expect_false(rush$is_failed_task(task$key))

  expect_rush_reset(rush, type = "terminate")
})

test_that("pushing a finished task works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
  xss = list(list(x1 = 1, x2 = 2))
  rush$push_tasks(xss)
  task = rush$pop_task()

  rush$push_results(task$key, list(list(y = 3)))

  # check task count
  expect_equal(rush$n_tasks, 1)
  expect_equal(rush$n_queued_tasks, 0)
  expect_equal(rush$n_running_tasks, 0)
  expect_equal(rush$n_finished_tasks, 1)
  expect_equal(rush$n_failed_tasks, 0)

  # check keys in sets
  expect_string(rush$tasks)
  expect_null(rush$queued_tasks)
  expect_null(rush$running_tasks)
  expect_string(rush$finished_tasks)
  expect_null(rush$failed_tasks)

  # check fetching
  expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
  expect_data_table(rush$fetch_running_tasks(), nrows = 0)
  expect_data_table(rush$fetch_failed_tasks(), nrows = 0)
  data = rush$fetch_finished_tasks()
  expect_names(names(data), must.include = c("x1", "x2", "worker_id", "y", "keys"))
  expect_data_table(data, nrows = 1)
  expect_data_table(rush$fetch_tasks(), nrows = 1)

  # status checks
  expect_false(rush$is_running_task(task$key))
  expect_false(rush$is_failed_task(task$key))

  expect_rush_reset(rush, type = "terminate")
})

test_that("pushing a failed tasks works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
  xss = list(list(x1 = 1, x2 = 2))
  rush$push_tasks(xss)
  task = rush$pop_task()

  rush$push_failed(task$key, conditions = list(list(message = "error")))

  # check task count
  expect_equal(rush$n_tasks, 1)
  expect_equal(rush$n_queued_tasks, 0)
  expect_equal(rush$n_running_tasks, 0)
  expect_equal(rush$n_finished_tasks, 0)
  expect_equal(rush$n_failed_tasks, 1)

  # check keys in sets
  expect_string(rush$tasks)
  expect_null(rush$queued_tasks)
  expect_null(rush$running_tasks)
  expect_null(rush$finished_tasks)
  expect_string(rush$failed_tasks)

  # check fetching
  expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
  expect_data_table(rush$fetch_running_tasks(), nrows = 0)
  expect_data_table(rush$fetch_finished_tasks(), nrows = 0)
  data = rush$fetch_failed_tasks()
  expect_names(names(data), must.include = c("x1", "x2", "worker_id", "message", "keys"))
  expect_data_table(data, nrows = 1)
  expect_data_table(rush$fetch_tasks(), nrows = 1)

  # status checks
  expect_false(rush$is_running_task(task$key))
  expect_true(rush$is_failed_task(task$key))

  expect_rush_reset(rush, type = "terminate")
})

test_that("retry a failed task works", {
  skip_on_cran()

  lg_rush = lgr::get_logger("rush")
  old_threshold_rush = lg_rush$threshold
  on.exit(lg_rush$set_threshold(old_threshold_rush))
  lg_rush$set_threshold("info")

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
  xss = list(list(x1 = 1, x2 = 2))
  keys = rush$push_tasks(xss)
  task = rush$pop_task()

  expect_output(rush$retry_tasks(keys), "Not all task")

  rush$push_failed(task$key, conditions = list(list(message = "error")))

  expect_equal(rush$n_queued_tasks, 0)
  expect_equal(rush$n_failed_tasks, 1)
  expect_true(rush$is_failed_task(task$key))

  rush$retry_tasks(keys)

  expect_equal(rush$n_queued_tasks, 1)
  expect_equal(rush$n_failed_tasks, 0)
  expect_false(rush$is_failed_task(task$key))

  expect_rush_reset(rush, type = "terminate")
})

test_that("retry a failed task works and setting a new seed works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
  xss = list(list(x1 = 1, x2 = 2))
  seed = c(10407L, 1280795612L, -169270483L, -442010614L, -603558397L, -222347416L, 1489374793L)
  keys = rush$push_tasks(xss, seeds = list(seed))
  task = rush$pop_task(fields = c("xs", "seed"))
  expect_equal(task$seed, seed)

  rush$push_failed(task$key, conditions = list(list(message = "error")))

  expect_equal(rush$n_queued_tasks, 0)
  expect_equal(rush$n_failed_tasks, 1)
  expect_true(rush$is_failed_task(task$key))

  rush$retry_tasks(keys, next_seed = TRUE)
  task_info = rush$read_hash(keys, "seed")
  expect_true(is_lecyer_cmrg_seed(task_info$seed))
  expect_true(task_info$seed[2] != seed[2])

  expect_rush_reset(rush, type = "terminate")
})

test_that("retry a failed task works with a maximum of retries", {
  skip_on_cran()

  lg_rush = lgr::get_logger("rush")
  old_threshold_rush = lg_rush$threshold
  on.exit(lg_rush$set_threshold(old_threshold_rush))
  lg_rush$set_threshold("info")

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
  xss = list(list(x1 = 1, x2 = 2))
  keys = rush$push_tasks(xss, max_retries = 1)
  task = rush$pop_task(fields = c("max_retries", "n_retries"))

  expect_equal(task$max_retries, 1)
  expect_null(task$n_retries)
  expect_output(rush$retry_tasks(keys), "Not all task")

  rush$push_failed(task$key, conditions = list(list(message = "error")))

  expect_equal(rush$n_queued_tasks, 0)
  expect_equal(rush$n_failed_tasks, 1)
  expect_true(rush$is_failed_task(task$key))

  rush$retry_tasks(keys)

  task_info = rush$read_hash(keys, fields = c("max_retries", "n_retries"))
  expect_equal(task_info$max_retries, 1)
  expect_equal(task_info$n_retries, 1)
  expect_equal(rush$n_queued_tasks, 1)
  expect_equal(rush$n_failed_tasks, 0)
  expect_false(rush$is_failed_task(task$key))
  task = rush$pop_task()

  rush$push_failed(task$key, conditions = list(list(message = "error")))
  expect_output(rush$retry_tasks(keys), "reached the maximum number of retries")

  rush$retry_tasks(keys, ignore_max_retries = TRUE)
  task_info = rush$read_hash(keys, fields = c("max_retries", "n_retries"))
  expect_equal(task_info$max_retries, 1)
  expect_equal(task_info$n_retries, 2)
  expect_equal(rush$n_queued_tasks, 1)
  expect_equal(rush$n_failed_tasks, 0)
  expect_false(rush$is_failed_task(task$key))

  expect_rush_reset(rush, type = "terminate")
})

test_that("retry failed tasks works", {
  skip_on_cran()

  lg_rush = lgr::get_logger("rush")
  old_threshold_rush = lg_rush$threshold
  on.exit(lg_rush$set_threshold(old_threshold_rush))
  lg_rush$set_threshold("info")

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
  xss = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3))
  rush$push_tasks(xss)
  task_1 = rush$pop_task()
  task_2 = rush$pop_task()
  keys = c(task_1$key, task_2$key)

  expect_output(rush$retry_tasks(keys), "Not all task")

  rush$push_failed(keys, conditions = list(list(message = "error")))

  expect_equal(rush$n_queued_tasks, 0)
  expect_equal(rush$n_failed_tasks, 2)
  expect_true(all(rush$is_failed_task(keys)))

  rush$retry_tasks(keys)

  expect_equal(rush$n_queued_tasks, 2)
  expect_equal(rush$n_failed_tasks, 0)
  expect_false(any(rush$is_failed_task(keys)))

  expect_rush_reset(rush, type = "terminate")
})

test_that("moving and fetching tasks works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)

  # queue tasks
  xss = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3), list(x1 = 1, x2 = 4), list(x1 = 1, x2 = 5))
  rush$push_tasks(xss)
  queued_tasks = rush$fetch_queued_tasks()
  expect_data_table(queued_tasks, nrows = 4)
  expect_character(queued_tasks$keys, unique = TRUE)
  all_tasks = rush$fetch_tasks()
  expect_data_table(all_tasks, nrows = 4)
  expect_character(all_tasks$keys, unique = TRUE)

  # pop task
  task = rush$pop_task()
  queued_tasks = rush$fetch_queued_tasks()
  expect_data_table(queued_tasks, nrows = 3)
  expect_character(queued_tasks$keys, unique = TRUE)
  running_tasks = rush$fetch_running_tasks()
  expect_data_table(running_tasks, nrows = 1)
  expect_character(running_tasks$keys, unique = TRUE)
  expect_disjunct(queued_tasks$keys, running_tasks$keys)
  all_tasks = rush$fetch_tasks()
  expect_data_table(all_tasks, nrows = 4)
  expect_character(all_tasks$keys, unique = TRUE)

  # push result
  rush$pop_task()
  rush$push_results(task$key, list(list(y = 3)))
  queued_tasks = rush$fetch_queued_tasks()
  expect_data_table(queued_tasks, nrows = 2)
  expect_character(queued_tasks$keys, unique = TRUE)
  running_tasks = rush$fetch_running_tasks()
  expect_data_table(running_tasks, nrows = 1)
  expect_character(running_tasks$keys, unique = TRUE)
  expect_disjunct(queued_tasks$keys, running_tasks$keys)
  finished_tasks = rush$fetch_finished_tasks()
  expect_data_table(finished_tasks, nrows = 1)
  expect_character(finished_tasks$keys, unique = TRUE)
  expect_disjunct(queued_tasks$keys, finished_tasks$keys)
  expect_disjunct(running_tasks$keys, finished_tasks$keys)
  all_tasks = rush$fetch_tasks()
  expect_data_table(all_tasks, nrows = 4)
  expect_character(all_tasks$keys, unique = TRUE)

  # push failed task
  task = rush$pop_task()
  rush$push_failed(task$key, conditions = list(list(message = "error")))
  queued_tasks = rush$fetch_queued_tasks()
  expect_data_table(queued_tasks, nrows = 1)
  expect_character(queued_tasks$keys, unique = TRUE)
  running_tasks = rush$fetch_running_tasks()
  expect_data_table(running_tasks, nrows = 1)
  expect_character(running_tasks$keys, unique = TRUE)
  expect_disjunct(queued_tasks$keys, running_tasks$keys)
  finished_tasks = rush$fetch_finished_tasks()
  expect_data_table(finished_tasks, nrows = 1)
  expect_character(finished_tasks$keys, unique = TRUE)
  expect_disjunct(queued_tasks$keys, finished_tasks$keys)
  expect_disjunct(running_tasks$keys, finished_tasks$keys)
  failed_tasks = rush$fetch_failed_tasks()
  expect_data_table(failed_tasks, nrows = 1)
  expect_character(failed_tasks$keys, unique = TRUE)
  expect_disjunct(queued_tasks$keys, failed_tasks$keys)
  expect_disjunct(running_tasks$keys, failed_tasks$keys)
  all_tasks = rush$fetch_tasks()
  expect_data_table(all_tasks, nrows = 4)
  expect_character(all_tasks$keys, unique = TRUE)

  expect_rush_reset(rush, type = "terminate")
})

test_that("fetching as list works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)

  # queue tasks
  xss = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3), list(x1 = 1, x2 = 4), list(x1 = 1, x2 = 5))
  keys = rush$push_tasks(xss)
  queued_tasks = rush$fetch_queued_tasks(data_format = "list")
  expect_list(queued_tasks, len = 4)
  expect_names(names(queued_tasks), permutation.of = keys)

  # pop task
  task_1 = rush$pop_task()
  running_tasks = rush$fetch_running_tasks(data_format = "list")
  expect_list(running_tasks, len = 1)
  expect_names(names(running_tasks), identical.to = task_1$key)
  task_2 = rush$pop_task()
  running_tasks = rush$fetch_running_tasks(data_format = "list")
  expect_list(running_tasks, len = 2)
  expect_names(names(running_tasks), permutation.of = c(task_1$key, task_2$key))

  # push result
  rush$push_results(task_1$key, list(list(y = 3)))
  finished_tasks = rush$fetch_finished_tasks(data_format = "list")
  expect_list(finished_tasks, len = 1)
  expect_names(names(finished_tasks), permutation.of = task_1$key)

  rush$push_results(task_2$key, list(list(y = 3)))
  finished_tasks = rush$fetch_finished_tasks(data_format = "list")
  expect_list(finished_tasks, len = 2)
  expect_names(names(finished_tasks), permutation.of = c(task_1$key, task_2$key))

  expect_null(rush$wait_for_finished_tasks(timeout = 0.1, data_format = "list"))

  latest_results = rush$fetch_new_tasks(data_format = "list")
  expect_list(latest_results, len = 2)
  expect_names(names(latest_results), permutation.of = c(task_1$key, task_2$key))

  task_3 = rush$pop_task()
  rush$push_results(task_3$key, list(list(y = 3)))
  latest_results = rush$wait_for_new_tasks(data_format = "list")
  expect_list(latest_results, len = 1)
  expect_names(names(latest_results), permutation.of = task_3$key)

  # push failed task
  task = rush$pop_task()
  rush$push_failed(task$key, conditions = list(list(message = "error")))
  failed_tasks = rush$fetch_failed_tasks(data_format = "list")
  expect_list(failed_tasks, len = 1)
  expect_names(names(failed_tasks), identical.to = task$key)

  expect_rush_reset(rush, type = "terminate")
})

test_that("fetch task with states works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE, seed = 123)
  xss = list(list(x1 = 1, x2 = 2))
  keys = rush$push_tasks(xss)

  # queued
  expect_equal(rush$n_queued_tasks, 1)
  expect_data_table(rush$fetch_tasks_with_state(states = "finished"), nrows = 0)
  expect_data_table(rush$fetch_tasks_with_state(states = c("running", "finished")), nrows = 0)
  tab = rush$fetch_tasks_with_state(states = "queued")
  expect_data_table(tab, nrows = 1)
  expect_names(names(tab), must.include = "state")

  expect_list(rush$fetch_tasks_with_state(states = "finished", data_format = "list"), len = 1)
  expect_list(rush$fetch_tasks_with_state(states = c("running", "finished"), data_format = "list"), len = 2)
  l = rush$fetch_tasks_with_state(states = "queued", data_format = "list")
  expect_list(l, len = 1)
  expect_list(l$queued, len = 1)

  # running
  task = rush$pop_task(fields = c("xs", "seed"))
  tab = rush$fetch_tasks_with_state()
  expect_data_table(tab, nrows = 1)
  expect_equal(tab$state, "running")

  l = rush$fetch_tasks_with_state(data_format = "list")
  expect_list(l, len = 4)
  expect_list(l$running, len = 1)

  # finished
  rush$push_results(task$key, list(list(y = 3)))
  tab = rush$fetch_tasks_with_state()
  expect_data_table(tab, nrows = 1)
  expect_equal(tab$state, "finished")

  l = rush$fetch_tasks_with_state(data_format = "list")
  expect_list(l, len = 4)
  expect_list(l$finished, len = 1)

  # failed
  xss = list(list(x1 = 2, x2 = 2))
  rush$push_tasks(xss)
  task_2 = rush$pop_task()
  rush$push_failed(task_2$key, conditions = list(list(message = "error")))
  tab = rush$fetch_tasks_with_state()
  expect_data_table(tab, nrows = 2)
  expect_equal(tab$state, c("finished", "failed"))

  l = rush$fetch_tasks_with_state(data_format = "list")
  expect_list(l, len = 4)
  expect_list(l$finished, len = 1)
  expect_list(l$failed, len = 1)
})

test_that("latest results are fetched", {
  skip_on_cran()

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)

  # add 1 task
  rush$push_tasks(list(list(x1 = 1, x2 = 2)))
  task = rush$pop_task()
  rush$push_results(task$key, list(list(y = 3)))

  latest_results = rush$fetch_new_tasks()
  expect_data_table(latest_results, nrows = 1)
  expect_set_equal(latest_results$y, 3)
  expect_data_table(rush$fetch_new_tasks(), nrows = 0)

  # add 1 task
  keys = rush$push_tasks(list(list(x1 = 1, x2 = 3)))
  task = rush$pop_task()
  rush$push_results(task$key, list(list(y = 4)))

  latest_results = rush$fetch_new_tasks()
  expect_data_table(latest_results, nrows = 1)
  expect_set_equal(latest_results$y, 4)
  expect_data_table(rush$fetch_new_tasks(), nrows = 0)

  # add 2 tasks
  keys = rush$push_tasks(list(list(x1 = 1, x2 = 4)))
  task = rush$pop_task()
  rush$push_results(task$key, list(list(y = 5)))
  keys = rush$push_tasks(list(list(x1 = 1, x2 = 5)))
  task = rush$pop_task()
  rush$push_results(task$key, list(list(y = 6)))

  latest_results = rush$fetch_new_tasks()
  expect_data_table(latest_results, nrows = 2)
  expect_set_equal(latest_results$y, c(5, 6))
  expect_data_table(rush$fetch_new_tasks(), nrows = 0)

  expect_rush_reset(rush, type = "terminate")
})

test_that("priority queues work", {
  skip_on_cran()

  config = start_flush_redis()
  rush = Rush$new(network_id = "test-rush", config = config)

  expect_equal(rush$n_queued_priority_tasks, 0)
  expect_data_table(rush$fetch_priority_tasks(), nrows = 0)
  expect_data_table(rush$priority_info, nrows = 0)

  rush_1 = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
  rush_2 = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)

  expect_equal(rush$n_queued_priority_tasks, 0)
  expect_data_table(rush$fetch_priority_tasks(), nrows = 0)
  priority_info = rush$priority_info
  expect_data_table(rush$priority_info, nrows = 2)
  expect_equal(priority_info[list(rush_1$worker_id), n_tasks, on = "worker_id"], 0)
  expect_equal(priority_info[list(rush_2$worker_id), n_tasks, on = "worker_id"], 0)

  keys = rush$push_priority_tasks(list(list(x1 = 1, x2 = 2), list(x1 = 2, x2 = 2), list(x1 = 3, x2 = 5)), priority = c(rep(rush_1$worker_id, 2), rush_2$worker_id))

  expect_equal(rush$n_queued_priority_tasks, 3)
  expect_data_table(rush$fetch_priority_tasks(), nrows = 3)
  priority_info = rush$priority_info
  expect_data_table(priority_info, nrows = 2)
  expect_equal(priority_info[list(rush_1$worker_id), n_tasks, on = "worker_id"], 2)
  expect_equal(priority_info[list(rush_2$worker_id), n_tasks, on = "worker_id"], 1)

  expect_rush_task(rush_2$pop_task())

  expect_equal(rush$n_queued_priority_tasks, 2)
  expect_data_table(rush$fetch_priority_tasks(), nrows = 2)
  priority_info = rush$priority_info
  expect_data_table(priority_info, nrows = 2)
  expect_equal(priority_info[list(rush_1$worker_id), n_tasks, on = "worker_id"], 2)
  expect_equal(priority_info[list(rush_2$worker_id), n_tasks, on = "worker_id"], 0)

  expect_null(rush_2$pop_task())
  expect_rush_task(rush_1$pop_task())
  expect_equal(rush$n_queued_priority_tasks, 1)
  priority_info = rush$priority_info
  expect_data_table(priority_info, nrows = 2)
  expect_equal(priority_info[list(rush_1$worker_id), n_tasks, on = "worker_id"], 1)
  expect_equal(priority_info[list(rush_2$worker_id), n_tasks, on = "worker_id"], 0)

  expect_rush_task(rush_1$pop_task())
  expect_equal(rush$n_queued_priority_tasks, 0)
  expect_set_equal(rush$priority_info$n_tasks, 0)

  expect_rush_reset(rush, type = "terminate")
})

test_that("redirecting to shared queue works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = Rush$new(network_id = "test-rush", config = config)

  rush_1 = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
  rush_2 = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)

  keys = rush$push_priority_tasks(list(list(x1 = 1, x2 = 2)), priority = rush_1$worker_id)

  expect_equal(rush$n_queued_tasks, 0)
  expect_equal(rush$n_queued_priority_tasks, 1)
  expect_data_table(rush$fetch_priority_tasks(), nrows = 1)
  expect_null(rush_2$pop_task())
  expect_rush_task(rush_1$pop_task())

  keys = rush$push_priority_tasks(list(list(x1 = 2, x2 = 2)), priority = uuid::UUIDgenerate())
  expect_equal(rush$n_queued_tasks, 1)
  expect_equal(rush$n_queued_priority_tasks, 0)
  expect_rush_task(rush_1$pop_task())

  rush_1$set_terminated()
  keys = rush$push_priority_tasks(list(list(x1 = 1, x2 = 2)), priority = rush_1$worker_id)
  expect_equal(rush$n_queued_tasks, 1)
  expect_equal(rush$n_queued_priority_tasks, 0)

  expect_rush_reset(rush, type = "terminate")
})

test_that("mixing priority queue and shared queue works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = Rush$new(network_id = "test-rush", config = config)

  rush_1 = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
  rush_2 = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)

  keys = rush$push_priority_tasks(list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 2)), priority = c(rush_1$worker_id, NA_character_))

  expect_equal(rush$n_queued_tasks, 1)
  expect_equal(rush$n_queued_priority_tasks, 1)
  expect_data_table(rush$fetch_priority_tasks(), nrows = 1)
  expect_rush_task(rush_2$pop_task())
  expect_null(rush_2$pop_task())
  expect_rush_task(rush_1$pop_task())

  expect_rush_reset(rush, type = "terminate")
})

test_that("saving logs with redis appender works", {
  skip_on_cran()

  appenders = lgr::get_logger("root")$appenders

  on.exit({
    lgr::get_logger("root")$set_appenders(appenders)
  })

  config = start_flush_redis()
  rush = RushWorker$new(
    network_id = "test-rush",
    config = config,
    remote = FALSE,
    lgr_thresholds = c(rush = "info"),
    lgr_buffer_size = 0)
  lg = lgr::get_logger("rush")

  lg$info("test-1")

  log = rush$read_log()
  expect_data_table(log, nrows = 1)
  expect_names(colnames(log), identical.to =  c("worker_id", "level", "timestamp", "logger", "caller", "msg"))
  expect_equal(log$msg, "[rush] test-1")

  lg$info("test-2")

  log = rush$read_log()
  expect_data_table(log, nrows = 2)
  expect_names(colnames(log), identical.to =  c("worker_id", "level", "timestamp", "logger", "caller", "msg"))
  expect_equal(log$msg, c("[rush] test-1", "[rush] test-2"))

  expect_rush_reset(rush, type = "terminate")
})

test_that("settings the buffer size in redis appender works", {
  skip_on_cran()

  appenders = lgr::get_logger("root")$appenders

  on.exit({
    lgr::get_logger("root")$set_appenders(appenders)
  })

  config = start_flush_redis()
  rush = RushWorker$new(
    network_id = "test-rush",
    config = config,
    remote = FALSE,
    lgr_thresholds = c(rush = "info"),
    lgr_buffer_size = 2)
  lg = lgr::get_logger("rush")

  lg$info("test-1")
  expect_data_table(rush$read_log(), nrows = 0)

  lg$info("test-2")
  expect_data_table(rush$read_log(), nrows = 0)

  lg$info("test-3")

  log = rush$read_log()
  expect_data_table(log, nrows = 3)
  expect_names(colnames(log), identical.to =  c("worker_id", "level", "timestamp", "logger", "caller", "msg"))
  expect_equal(log$msg, c("[rush] test-1", "[rush] test-2", "[rush] test-3"))
})

test_that("pushing tasks and terminating worker works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)
  expect_false(rush$terminated)
  expect_false(rush$terminated_on_idle)

  xss = list(list(x1 = 1, x2 = 2))
  keys = rush$push_tasks(xss, terminate_workers = TRUE)
  expect_false(rush$terminated)
  expect_false(rush$terminated_on_idle)

  rush$pop_task()
  expect_false(rush$terminated)
  expect_true(rush$terminated_on_idle)

  expect_rush_reset(rush, type = "terminate")
})

test_that("terminate on idle works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)

  xss = list(list(x1 = 1, x2 = 2))
  keys = rush$push_tasks(xss, terminate_workers = TRUE)
  expect_false(rush$terminated_on_idle)

  rush$pop_task()
  expect_true(rush$terminated_on_idle)

  expect_rush_reset(rush, type = "terminate")
})


# seed -------------------------------------------------------------------------

test_that("popping a task with seed from the queue works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE, seed = 123)
  xss = list(list(x1 = 1, x2 = 2))
  rush$push_tasks(xss)

  # check task seed
  task = rush$pop_task(fields = c("xs", "seed"))
  expect_true(is_lecyer_cmrg_seed(task$seed))

  expect_rush_reset(rush, type = "terminate")
})

# atomic operations -----------------------------------------------------------

test_that("task in states works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE, seed = 123)
  xss = list(list(x1 = 1, x2 = 2))
  keys = rush$push_tasks(xss)

  keys_list = rush$tasks_with_state(c("queued", "running", "finished", "failed"))
  expect_list(keys_list, len = 4)
  expect_names(names(keys_list), identical.to = c("queued", "running", "finished", "failed"))
  expect_equal(keys_list$queued, keys)
  expect_null(keys_list$running)
  expect_null(keys_list$finished)
  expect_null(keys_list$failed)
  # switch order
  keys_list = rush$tasks_with_state(c("running", "queued", "finished", "failed"))
  expect_equal(keys_list$queued, keys)


  task = rush$pop_task()
  keys_list = rush$tasks_with_state(c("queued", "running", "finished", "failed"))
  expect_list(keys_list, len = 4)
  expect_names(names(keys_list), identical.to = c("queued", "running", "finished", "failed"))
  expect_equal(keys_list$running, keys)
  expect_null(keys_list$queued)
  expect_null(keys_list$finished)
  expect_null(keys_list$failed)

  rush$push_results(task$key, list(list(y = 3)))
  keys_list = rush$tasks_with_state(c("queued", "running", "finished", "failed"))
  expect_list(keys_list, len = 4)
  expect_names(names(keys_list), identical.to = c("queued", "running", "finished", "failed"))
  expect_null(keys_list$queued)
  expect_null(keys_list$running)
  expect_equal(keys_list$finished, task$key)
  expect_null(keys_list$failed)

  xss = list(list(x1 = 2, x2 = 2))
  keys = rush$push_tasks(xss)
  task_2 = rush$pop_task()
  rush$push_failed(task_2$key, conditions = list(list(message = "error")))
  keys_list = rush$tasks_with_state(c("queued", "running", "finished", "failed"))
  expect_list(keys_list, len = 4)
  expect_names(names(keys_list), identical.to = c("queued", "running", "finished", "failed"))
  expect_null(keys_list$queued)
  expect_null(keys_list$running)
  expect_equal(keys_list$finished, task$key)
  expect_equal(keys_list$failed, task_2$key)

  keys_list = rush$tasks_with_state(c("queued"))
  expect_list(keys_list, len = 1)
  expect_names(names(keys_list), identical.to = c("queued"))
  expect_null(keys_list$queued)

  keys_list = rush$tasks_with_state(c("queued", "running"))
  expect_list(keys_list, len = 2)
  expect_names(names(keys_list), identical.to = c("queued", "running"))
  expect_null(keys_list$queued)
  expect_null(keys_list$running)

  keys_list = rush$tasks_with_state(c("queued", "running", "finished"))
  expect_list(keys_list, len = 3)
  expect_names(names(keys_list), identical.to = c("queued", "running", "finished"))
  expect_null(keys_list$queued)
  expect_null(keys_list$running)
  expect_equal(keys_list$finished, task$key)
})

Try the rush package in your browser

Any scripts or data that you put into this service are public.

rush documentation built on June 22, 2024, 9:38 a.m.