tests/testthat/test-RushWorker.R

skip_if_no_redis()

# starting worker and terminating ----------------------------------------------

test_that("constructing a rush worker works", {
  config = redux::redis_config()
  r = redux::hiredis(config)
  r$FLUSHDB()

  rush = RushWorker$new(network_id = "test-rush", config = config)

  expect_class(rush, "RushWorker")
  expect_equal(rush$network_id, "test-rush")
  expect_string(rush$worker_id)
  expect_equal(rush$worker_ids, rush$worker_id)
  expect_equal(rush$running_worker_ids, rush$worker_id)
})

test_that("active bindings work after construction", {
  rush = start_rush_worker()

  expect_equal(rush$n_workers, 1)
  expect_equal(rush$n_queued_tasks, 0)
  expect_equal(rush$n_running_tasks, 0)
  expect_equal(rush$n_finished_tasks, 0)
  expect_equal(rush$n_failed_tasks, 0)
  expect_null(rush$queued_tasks)
  expect_null(rush$running_tasks)
  expect_null(rush$finished_tasks)
  expect_null(rush$failed_tasks)
})

test_that("a worker is registered", {
  rush = start_rush_worker()

  worker_info = rush$worker_info
  expect_data_table(worker_info, nrows = 1)
  expect_names(names(worker_info), permutation.of = c("worker_id", "pid", "hostname", "heartbeat", "state"))
  expect_equal(worker_info$worker_id, rush$worker_id)
  expect_equal(worker_info$pid, Sys.getpid())
  expect_equal(rush$worker_ids, rush$worker_id)
  expect_equal(rush$worker_info$state, "running")
})

test_that("a worker is terminated", {
  rush = start_rush_worker()

  expect_equal(rush$running_worker_ids, rush$worker_id)
  rush$set_terminated()
  expect_null(rush$running_worker_ids)
  expect_equal(rush$terminated_worker_ids, rush$worker_id)
})

# low level read and write -----------------------------------------------------

test_that("reading and writing a hash works with flatten", {
  rush = start_rush_worker()

  # one field with list
  key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)))
  expect_equal(rush$read_hashes(key, "xs"), list(list(x1 = 1, x2 = 2)))

  # one field with atomic
  key = rush$write_hashes(timeout = 1)
  expect_equal(rush$read_hashes(key, "timeout"), list(list(timeout = 1)))

  # two fields with lists
  key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), ys = list(list(y = 3)))
  expect_equal(rush$read_hashes(key, c("xs", "ys")), list(list(x1 = 1, x2 = 2, y = 3)))

  # two fields with list and empty list
  key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), ys = list())
  expect_equal(rush$read_hashes(key, c("xs", "ys")), list(list(x1 = 1, x2 = 2)))

  # two fields with list and atomic
  key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), timeout = 1)
  expect_equal(rush$read_hashes(key, c("xs", "timeout")), list(list(x1 = 1, x2 = 2, timeout = 1)))
})

test_that("reading and writing a hash works without flatten", {
  rush = start_rush_worker()

  # one field with list
  key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)))
  expect_equal(rush$read_hashes(key, "xs", flatten = FALSE), list(list(xs = list(x1 = 1, x2 = 2))))

  # one field with atomic
  key = rush$write_hashes(timeout = 1)
  expect_equal(rush$read_hashes(key, "timeout", flatten = FALSE), list(list(timeout = 1)))

  # two fields with lists
  key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), ys = list(list(y = 3)))
  expect_equal(
    rush$read_hashes(key, c("xs", "ys"), flatten = FALSE),
    list(list(xs = list(x1 = 1, x2 = 2), ys = list(y = 3)))
  )

  # two fields with list and empty list
  key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), ys = list())
  expect_equal(rush$read_hashes(key, c("xs", "ys"), flatten = FALSE), list(list(xs = list(x1 = 1, x2 = 2), ys = NULL)))

  # two fields with list and atomic
  key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), timeout = 1)
  expect_equal(
    rush$read_hashes(key, c("xs", "timeout"), flatten = FALSE),
    list(list(xs = list(x1 = 1, x2 = 2), timeout = 1))
  )
})

test_that("reading and writing hashes works", {
  rush = start_rush_worker()

  # one field with list
  keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)))
  expect_equal(rush$read_hashes(keys, "xs"), list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)))

  # one field atomic
  keys = rush$write_hashes(timeout = c(1, 1))
  expect_equal(rush$read_hashes(keys, "timeout"), list(list(timeout = 1), list(timeout = 1)))

  # two fields with list and recycled atomic
  keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), timeout = 1)
  expect_equal(
    rush$read_hashes(keys, c("xs", "timeout")),
    list(list(x1 = 1, x2 = 2, timeout = 1), list(x1 = 1, x2 = 3, timeout = 1))
  )

  # two fields
  keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), ys = list(list(y = 3), list(y = 4)))
  expect_equal(rush$read_hashes(keys, c("xs", "ys")), list(list(x1 = 1, x2 = 2, y = 3), list(x1 = 1, x2 = 3, y = 4)))

  # two fields with list and atomic
  keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), timeout = c(1, 1))
  expect_equal(
    rush$read_hashes(keys, c("xs", "timeout")),
    list(list(x1 = 1, x2 = 2, timeout = 1), list(x1 = 1, x2 = 3, timeout = 1))
  )

  # two fields with list and recycled atomic
  keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), timeout = 1)
  expect_equal(
    rush$read_hashes(keys, c("xs", "timeout")),
    list(list(x1 = 1, x2 = 2, timeout = 1), list(x1 = 1, x2 = 3, timeout = 1))
  )

  # two fields, one empty
  keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), ys = list())
  expect_equal(rush$read_hashes(keys, c("xs", "ys")), list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)))

  # recycle
  keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), ys = list(list(y = 3)))
  expect_equal(rush$read_hashes(keys, c("xs", "ys")), list(list(x1 = 1, x2 = 2, y = 3), list(x1 = 1, x2 = 3, y = 3)))
})

test_that("writing hashes to specific keys works", {
  rush = start_rush_worker()

  # one element
  keys = uuid::UUIDgenerate()
  rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), keys = keys)
  expect_equal(rush$read_hashes(keys, "xs"), list(list(x1 = 1, x2 = 2)))

  # two elements
  keys = uuid::UUIDgenerate(n = 2)
  rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), keys = keys)
  expect_equal(rush$read_hashes(keys, "xs"), list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)))

  # wrong number of keys
  keys = uuid::UUIDgenerate()
  expect_error(
    rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), keys = keys),
    "Assertion on 'keys' failed"
  )
})

test_that("writing list columns works", {
  rush = start_rush_worker()

  keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), xs_extra = list(list(extra = list("A"))))
  rush$finish_tasks(keys, yss = list(list(y = 3)))

  expect_list(rush$fetch_finished_tasks()$extra, len = 1)
  rush$reset(workers = FALSE)

  keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), xs_extra = list(list(extra = list(letters[1:3]))))
  rush$finish_tasks(keys, yss = list(list(y = 3)))

  expect_list(rush$fetch_finished_tasks()$extra, len = 1)
  rush$reset(workers = FALSE)

  keys = rush$write_hashes(
    xs = list(list(x1 = 1, x2 = 2), list(x1 = 2, x2 = 2)),
    xs_extra = list(list(extra = list("A")), list(extra = list("B")))
  )
  rush$finish_tasks(keys, yss = list(list(y = 3), list(y = 4)))

  expect_list(rush$fetch_finished_tasks()$extra, len = 2)
})


# moving tasks between states --------------------------------------------------

test_that("popping a task works", {
  rush = start_rush_worker()

  xss = list(list(x1 = 1, x2 = 2))
  rush$push_tasks(xss)

  # check task
  task = rush$pop_task()
  expect_list(task)
  expect_names(names(task), must.include = c("key", "xs"))
  expect_list(task, names = "unique")

  # check task count
  expect_equal(rush$n_tasks, 1)
  expect_equal(rush$n_queued_tasks, 0)
  expect_equal(rush$n_running_tasks, 1)
  expect_equal(rush$n_finished_tasks, 0)
  expect_equal(rush$n_failed_tasks, 0)

  # check keys in sets
  expect_string(rush$tasks)
  expect_null(rush$queued_tasks)
  expect_string(rush$running_tasks)
  expect_null(rush$finished_tasks)
  expect_null(rush$failed_tasks)

  # check fetching
  expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
  expect_data_table(rush$fetch_finished_tasks(), nrows = 0)
  expect_data_table(rush$fetch_failed_tasks(), nrows = 0)
  data = rush$fetch_running_tasks()
  expect_names(names(data), must.include = c("x1", "x2", "worker_id", "keys"))
  expect_data_table(data, nrows = 1)
  expect_data_table(rush$fetch_tasks(), nrows = 1)

  # status checks
  expect_true(rush$is_running_task(task$key))
  expect_false(rush$is_failed_task(task$key))
})

test_that("finishing a task works", {
  rush = start_rush_worker()

  xss = list(list(x1 = 1, x2 = 2))
  rush$push_tasks(xss)
  task = rush$pop_task()

  rush$finish_tasks(task$key, list(list(y = 3)))

  # check task count
  expect_equal(rush$n_tasks, 1)
  expect_equal(rush$n_queued_tasks, 0)
  expect_equal(rush$n_running_tasks, 0)
  expect_equal(rush$n_finished_tasks, 1)
  expect_equal(rush$n_failed_tasks, 0)

  # check keys in sets
  expect_string(rush$tasks)
  expect_null(rush$queued_tasks)
  expect_null(rush$running_tasks)
  expect_string(rush$finished_tasks)
  expect_null(rush$failed_tasks)

  # check fetching
  expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
  expect_data_table(rush$fetch_running_tasks(), nrows = 0)
  expect_data_table(rush$fetch_failed_tasks(), nrows = 0)
  data = rush$fetch_finished_tasks()
  expect_names(names(data), must.include = c("x1", "x2", "worker_id", "y", "keys"))
  expect_data_table(data, nrows = 1)
  expect_data_table(rush$fetch_tasks(), nrows = 1)

  # status checks
  expect_false(rush$is_running_task(task$key))
  expect_false(rush$is_failed_task(task$key))
})

test_that("failing a tasks works", {
  rush = start_rush_worker()
  xss = list(list(x1 = 1, x2 = 2))
  rush$push_tasks(xss)
  task = rush$pop_task()

  rush$fail_tasks(task$key, conditions = list(list(message = "error")))

  # check task count
  expect_equal(rush$n_tasks, 1)
  expect_equal(rush$n_queued_tasks, 0)
  expect_equal(rush$n_running_tasks, 0)
  expect_equal(rush$n_finished_tasks, 0)
  expect_equal(rush$n_failed_tasks, 1)

  # check keys in sets
  expect_string(rush$tasks)
  expect_null(rush$queued_tasks)
  expect_null(rush$running_tasks)
  expect_null(rush$finished_tasks)
  expect_string(rush$failed_tasks)

  # check fetching
  expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
  expect_data_table(rush$fetch_running_tasks(), nrows = 0)
  expect_data_table(rush$fetch_finished_tasks(), nrows = 0)
  data = rush$fetch_failed_tasks()
  expect_names(names(data), must.include = c("x1", "x2", "worker_id", "message", "keys"))
  expect_data_table(data, nrows = 1)
  expect_data_table(rush$fetch_tasks(), nrows = 1)

  # status checks
  expect_false(rush$is_running_task(task$key))
  expect_true(rush$is_failed_task(task$key))
})

test_that("moving and fetching tasks works", {
  rush = start_rush_worker()

  xss = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3), list(x1 = 1, x2 = 4), list(x1 = 1, x2 = 5))
  rush$push_tasks(xss)
  queued_tasks = rush$fetch_queued_tasks()
  expect_data_table(queued_tasks, nrows = 4)
  expect_character(queued_tasks$keys, unique = TRUE)
  all_tasks = rush$fetch_tasks()
  expect_data_table(all_tasks, nrows = 4)
  expect_character(all_tasks$keys, unique = TRUE)

  # pop task
  task = rush$pop_task()
  queued_tasks = rush$fetch_queued_tasks()
  expect_data_table(queued_tasks, nrows = 3)
  expect_character(queued_tasks$keys, unique = TRUE)
  running_tasks = rush$fetch_running_tasks()
  expect_data_table(running_tasks, nrows = 1)
  expect_character(running_tasks$keys, unique = TRUE)
  expect_disjunct(queued_tasks$keys, running_tasks$keys)
  all_tasks = rush$fetch_tasks()
  expect_data_table(all_tasks, nrows = 4)
  expect_character(all_tasks$keys, unique = TRUE)

  # finish task
  rush$pop_task()
  rush$finish_tasks(task$key, list(list(y = 3)))
  queued_tasks = rush$fetch_queued_tasks()
  expect_data_table(queued_tasks, nrows = 2)
  expect_character(queued_tasks$keys, unique = TRUE)
  running_tasks = rush$fetch_running_tasks()
  expect_data_table(running_tasks, nrows = 1)
  expect_character(running_tasks$keys, unique = TRUE)
  expect_disjunct(queued_tasks$keys, running_tasks$keys)
  finished_tasks = rush$fetch_finished_tasks()
  expect_data_table(finished_tasks, nrows = 1)
  expect_character(finished_tasks$keys, unique = TRUE)
  expect_disjunct(queued_tasks$keys, finished_tasks$keys)
  expect_disjunct(running_tasks$keys, finished_tasks$keys)
  all_tasks = rush$fetch_tasks()
  expect_data_table(all_tasks, nrows = 4)
  expect_character(all_tasks$keys, unique = TRUE)

  # fail task
  task = rush$pop_task()
  rush$fail_tasks(task$key, conditions = list(list(message = "error")))
  queued_tasks = rush$fetch_queued_tasks()
  expect_data_table(queued_tasks, nrows = 1)
  expect_character(queued_tasks$keys, unique = TRUE)
  running_tasks = rush$fetch_running_tasks()
  expect_data_table(running_tasks, nrows = 1)
  expect_character(running_tasks$keys, unique = TRUE)
  expect_disjunct(queued_tasks$keys, running_tasks$keys)
  finished_tasks = rush$fetch_finished_tasks()
  expect_data_table(finished_tasks, nrows = 1)
  expect_character(finished_tasks$keys, unique = TRUE)
  expect_disjunct(queued_tasks$keys, finished_tasks$keys)
  expect_disjunct(running_tasks$keys, finished_tasks$keys)
  failed_tasks = rush$fetch_failed_tasks()
  expect_data_table(failed_tasks, nrows = 1)
  expect_character(failed_tasks$keys, unique = TRUE)
  expect_disjunct(queued_tasks$keys, failed_tasks$keys)
  expect_disjunct(running_tasks$keys, failed_tasks$keys)
  all_tasks = rush$fetch_tasks()
  expect_data_table(all_tasks, nrows = 4)
  expect_character(all_tasks$keys, unique = TRUE)
})

test_that("moving a queued task to failed works", {
  rush = start_rush_worker()

  xss = list(list(x1 = 1, x2 = 2))
  rush$push_tasks(xss)
  queued_tasks = rush$queued_tasks
  rush$fail_tasks(queued_tasks, conditions = list(list(message = "error")))
  expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
  expect_data_table(rush$fetch_failed_tasks(), nrows = 1)

  xss = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3))
  rush$push_tasks(xss)
  task = rush$pop_task()

  rush$fail_tasks(task$key, conditions = list(list(message = "error")))

  expect_data_table(rush$fetch_queued_tasks(), nrows = 1)
  expect_data_table(rush$fetch_failed_tasks(), nrows = 2)
  expect_set_equal(rush$failed_tasks, c(task$key, queued_tasks))

  queued_tasks = rush$queued_tasks
  rush$fail_tasks(queued_tasks, conditions = list(list(message = "error")))
  expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
  expect_data_table(rush$fetch_failed_tasks(), nrows = 3)

  xss = list(list(x1 = 1, x2 = 4), list(x1 = 1, x2 = 5))
  rush$push_tasks(xss)
  queued_tasks = rush$queued_tasks
  rush$fail_tasks(queued_tasks, conditions = replicate(2, list(message = "error"), simplify = FALSE))

  expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
  expect_data_table(rush$fetch_failed_tasks(), nrows = 5)
})

test_that("fetch task with states works", {
  rush = start_rush_worker()

  xss = list(list(x1 = 1, x2 = 2))
  keys = rush$push_tasks(xss)

  # queued
  expect_equal(rush$n_queued_tasks, 1)
  expect_data_table(rush$fetch_tasks_with_state(states = "finished"), nrows = 0)
  expect_data_table(rush$fetch_tasks_with_state(states = c("running", "finished")), nrows = 0)
  tab = rush$fetch_tasks_with_state(states = "queued")
  expect_data_table(tab, nrows = 1)
  expect_names(names(tab), must.include = "state")

  # running
  task = rush$pop_task(fields = c("xs"))
  tab = rush$fetch_tasks_with_state()
  expect_data_table(tab, nrows = 1)
  expect_equal(tab$state, "running")

  # finished
  rush$finish_tasks(task$key, list(list(y = 3)))
  tab = rush$fetch_tasks_with_state()
  expect_data_table(tab, nrows = 1)
  expect_equal(tab$state, "finished")

  # failed
  xss = list(list(x1 = 2, x2 = 2))
  rush$push_tasks(xss)
  task_2 = rush$pop_task()
  rush$fail_tasks(task_2$key, conditions = list(list(message = "error")))
  tab = rush$fetch_tasks_with_state()
  expect_data_table(tab, nrows = 2)
  expect_equal(tab$state, c("finished", "failed"))
})

test_that("latest results are fetched", {
  rush = start_rush_worker()

  # add 1 task
  rush$push_tasks(list(list(x1 = 1, x2 = 2)))
  task = rush$pop_task()
  rush$finish_tasks(task$key, list(list(y = 3)))

  latest_results = rush$fetch_new_tasks()
  expect_data_table(latest_results, nrows = 1)
  expect_set_equal(latest_results$y, 3)
  expect_data_table(rush$fetch_new_tasks(), nrows = 0)

  # add 1 task
  keys = rush$push_tasks(list(list(x1 = 1, x2 = 3)))
  task = rush$pop_task()
  rush$finish_tasks(task$key, list(list(y = 4)))

  latest_results = rush$fetch_new_tasks()
  expect_data_table(latest_results, nrows = 1)
  expect_set_equal(latest_results$y, 4)
  expect_data_table(rush$fetch_new_tasks(), nrows = 0)

  # add 2 tasks
  keys = rush$push_tasks(list(list(x1 = 1, x2 = 4)))
  task = rush$pop_task()
  rush$finish_tasks(task$key, list(list(y = 5)))
  keys = rush$push_tasks(list(list(x1 = 1, x2 = 5)))
  task = rush$pop_task()
  rush$finish_tasks(task$key, list(list(y = 6)))

  latest_results = rush$fetch_new_tasks()
  expect_data_table(latest_results, nrows = 2)
  expect_set_equal(latest_results$y, c(5, 6))
  expect_data_table(rush$fetch_new_tasks(), nrows = 0)
})

test_that("pushing finished tasks works", {
  rush = start_rush_worker()

  rush$push_finished_tasks(
    list(list(x1 = 1, x2 = 2)),
    list(list(y = 3)),
    xss_extra = list(list(extra_input = "A")),
    yss_extra = list(list(extra_output = "B"))
  )
  expect_equal(rush$n_finished_tasks, 1)
  expect_equal(rush$n_tasks, 1)
  expect_equal(rush$fetch_finished_tasks()$extra_input, "A")
  expect_equal(rush$fetch_finished_tasks()$extra_output, "B")
})

test_that("pushing failed tasks works", {
  rush = start_rush_worker()

  rush$push_failed_tasks(list(list(x1 = 1, x2 = 2)), conditions = list(list(message = "error")))
  expect_equal(rush$n_failed_tasks, 1)
  expect_equal(rush$n_tasks, 1)
})

# atomic operations -----------------------------------------------------------

test_that("task in states works", {
  rush = start_rush_worker()

  xss = list(list(x1 = 1, x2 = 2))
  keys = rush$push_tasks(xss)

  keys_list = rush$tasks_with_state(c("queued", "running", "finished", "failed"))
  expect_list(keys_list, len = 4)
  expect_names(names(keys_list), identical.to = c("queued", "running", "finished", "failed"))
  expect_equal(keys_list$queued, keys)
  expect_null(keys_list$running)
  expect_null(keys_list$finished)
  expect_null(keys_list$failed)
  # switch order
  keys_list = rush$tasks_with_state(c("running", "queued", "finished", "failed"))
  expect_equal(keys_list$queued, keys)

  task = rush$pop_task()
  keys_list = rush$tasks_with_state(c("queued", "running", "finished", "failed"))
  expect_list(keys_list, len = 4)
  expect_names(names(keys_list), identical.to = c("queued", "running", "finished", "failed"))
  expect_equal(keys_list$running, keys)
  expect_null(keys_list$queued)
  expect_null(keys_list$finished)
  expect_null(keys_list$failed)

  rush$finish_tasks(task$key, list(list(y = 3)))
  keys_list = rush$tasks_with_state(c("queued", "running", "finished", "failed"))
  expect_list(keys_list, len = 4)
  expect_names(names(keys_list), identical.to = c("queued", "running", "finished", "failed"))
  expect_null(keys_list$queued)
  expect_null(keys_list$running)
  expect_equal(keys_list$finished, task$key)
  expect_null(keys_list$failed)

  xss = list(list(x1 = 2, x2 = 2))
  keys = rush$push_tasks(xss)
  task_2 = rush$pop_task()
  rush$fail_tasks(task_2$key, conditions = list(list(message = "error")))
  keys_list = rush$tasks_with_state(c("queued", "running", "finished", "failed"))
  expect_list(keys_list, len = 4)
  expect_names(names(keys_list), identical.to = c("queued", "running", "finished", "failed"))
  expect_null(keys_list$queued)
  expect_null(keys_list$running)
  expect_equal(keys_list$finished, task$key)
  expect_equal(keys_list$failed, task_2$key)

  keys_list = rush$tasks_with_state(c("queued"))
  expect_list(keys_list, len = 1)
  expect_names(names(keys_list), identical.to = c("queued"))
  expect_null(keys_list$queued)

  keys_list = rush$tasks_with_state(c("queued", "running"))
  expect_list(keys_list, len = 2)
  expect_names(names(keys_list), identical.to = c("queued", "running"))
  expect_null(keys_list$queued)
  expect_null(keys_list$running)

  keys_list = rush$tasks_with_state(c("queued", "running", "finished"))
  expect_list(keys_list, len = 3)
  expect_names(names(keys_list), identical.to = c("queued", "running", "finished"))
  expect_null(keys_list$queued)
  expect_null(keys_list$running)
  expect_equal(keys_list$finished, task$key)
})

Try the rush package in your browser

Any scripts or data that you put into this service are public.

rush documentation built on April 4, 2026, 5:07 p.m.