tests/testthat/test-Rush.R

# start workers with processx --------------------------------------------------

test_that("constructing a rush controller works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config)
  expect_class(rush, "Rush")
  expect_equal(rush$network_id, "test-rush")

  expect_rush_reset(rush)
})

test_that("local workers are started", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config)
  expect_data_table(rush$worker_info, nrows = 0)

  worker_ids = rush$start_local_workers(
    worker_loop = test_worker_loop,
    n_workers = 2,
    lgr_thresholds = c("mlr3/rush" = "debug"))
  rush$wait_for_workers(2, timeout = 5)

  # check fields
  walk(rush$processes_processx, function(process) expect_class(process, "process"))

  # check meta data from redis
  worker_info = rush$worker_info
  expect_data_table(worker_info, nrows = 2)
  expect_integer(worker_info$pid, unique = TRUE)
  expect_false(any(worker_info$remote))
  expect_set_equal(worker_ids, worker_info$worker_id)
  expect_set_equal(rush$worker_ids, worker_ids)
  expect_set_equal(rush$worker_states$state, "running")

  expect_rush_reset(rush)
})

test_that("local workers are started with Redis on unix socket", {
  skip_on_cran()
  skip_on_ci() # does not work on github actions runner

  system(sprintf("redis-server --port 0 --unixsocket /tmp/redis.sock --daemonize yes --pidfile /tmp/redis.pid --dir %s", tempdir()))
  Sys.sleep(5)

  config = redux::redis_config(path = "/tmp/redis.sock")
  r = redux::hiredis(config)

  on.exit({
    try({r$SHUTDOWN()}, silent = TRUE)
  })

  r$FLUSHDB()

  rush = rsh(network_id = "test-rush", config = config)
  worker_ids = rush$start_local_workers(
    worker_loop = test_worker_loop,
    n_workers = 2,
    lgr_thresholds = c("mlr3/rush" = "debug"))
  rush$wait_for_workers(2, timeout = 5)

  # check fields
  walk(rush$processes_processx, function(process) expect_class(process, "process"))

  # check meta data from redis
  worker_info = rush$worker_info
  expect_data_table(worker_info, nrows = 2)
  expect_integer(worker_info$pid, unique = TRUE)
  expect_false(any(worker_info$remote))
  expect_set_equal(worker_ids, worker_info$worker_id)
  expect_set_equal(rush$worker_ids, worker_ids)
  expect_set_equal(rush$worker_states$state, "running")
})

test_that("additional workers are started", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config)
  worker_ids = rush$start_local_workers(
    worker_loop = test_worker_loop,
    n_workers = 2,
    lgr_thresholds = c("mlr3/rush" = "debug"))
  rush$wait_for_workers(2, timeout = 5)

  expect_equal(rush$n_workers, 2)

  worker_ids_2 = rush$start_local_workers(
    worker_loop = test_worker_loop,
    n_workers = 2,
    lgr_thresholds = c("mlr3/rush" = "debug"))
  rush$wait_for_workers(4, timeout = 5)

  expect_length(rush$processes_processx, 4)
  walk(rush$processes_processx, function(process) expect_class(process, "process"))
  worker_info = rush$worker_info
  expect_data_table(worker_info, nrows = 4)
  expect_set_equal(c(worker_ids, worker_ids_2), worker_info$worker_id)
  expect_integer(worker_info$pid, unique = TRUE)
  expect_false(any(worker_info$remote))
  expect_set_equal(rush$worker_states$state, "running")

  expect_rush_reset(rush)
})

test_that("packages are available on the worker", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config)
  worker_ids = rush$start_local_workers(
    worker_loop = test_worker_loop,
    n_workers = 2,
    packages = "uuid",
    lgr_thresholds = c("mlr3/rush" = "debug"))
  rush$wait_for_workers(2, timeout = 5)
  expect_equal(rush$n_workers, 2)

  xss = list(list(x1 = 1, x2 = 2))
  keys = rush$push_tasks(xss)
  rush$wait_for_tasks(keys, detect_lost_workers = TRUE)

  expect_equal(rush$n_finished_tasks, 1)

  expect_rush_reset(rush)
})

# test_that("globals are available on the worker", {
#   skip_on_cran()

#   config = start_flush_redis()
#   rush = rsh(network_id = "test-rush", config = config)

#   worker_loop = function(rush) {
#     while(!rush$terminated && !rush$terminated_on_idle) {
#       task = rush$pop_task(fields = c("xs", "seed"))
#       if (!is.null(task)) {
#         tryCatch({
#           # evaluate task with seed
#           fun = function(x1, x2, ...) list(y = x)
#           ys = with_rng_state(fun, args = c(task$xs), seed = task$seed)
#           rush$push_results(task$key, yss = list(ys))
#         }, error = function(e) {
#           condition = list(message = e$message)
#           rush$push_failed(task$key, conditions = list(condition))
#         })
#       }
#     }

#     return(NULL)
#   }
#   x <<- 33

#   worker_ids = rush$start_local_workers(
#     worker_loop = worker_loop,
#     n_workers = 2,
#     globals = "x",
#     lgr_thresholds = c("mlr3/rush" = "debug"))
#   rush$wait_for_workers(2, timeout = 5)
#   expect_equal(rush$n_workers, 2)

#   xss = list(list(x1 = 1, x2 = 2))
#   keys = rush$push_tasks(xss)
#   rush$wait_for_tasks(keys, detect_lost_workers = TRUE)

#   expect_equal(rush$n_finished_tasks, 1)
#   expect_equal(rush$fetch_finished_tasks()$y, 33)

#   expect_rush_reset(rush)
# })

# test_that("named globals are available on the worker", {
#   skip_on_cran()

#   config = start_flush_redis()
#   rush = rsh(network_id = "test-rush", config = config)
#   worker_loop = function(rush) {
#       while(!rush$terminated && !rush$terminated_on_idle) {
#         task = rush$pop_task(fields = c("xs", "seed"))
#         if (!is.null(task)) {
#           tryCatch({
#             # evaluate task with seed
#             fun = function(x1, x2, ...) list(y = z)
#             ys = with_rng_state(fun, args = c(task$xs), seed = task$seed)
#             rush$push_results(task$key, yss = list(ys))
#           }, error = function(e) {
#             condition = list(message = e$message)
#             rush$push_failed(task$key, conditions = list(condition))
#           })
#         }
#       }

#       return(NULL)
#     }
#   x <<- 33

#   worker_ids = rush$start_local_workers(
#     worker_loop = worker_loop,
#     n_workers = 2,
#     globals = c(z = "x"),
#     lgr_thresholds = c("mlr3/rush" = "debug"))
#   rush$wait_for_workers(2, timeout = 5)

#   xss = list(list(x1 = 1, x2 = 2))
#   keys = rush$push_tasks(xss)
#   rush$wait_for_tasks(keys, detect_lost_workers = TRUE)

#   expect_equal(rush$n_finished_tasks, 1)
#   expect_equal(rush$fetch_finished_tasks()$y, 33)

#   expect_rush_reset(rush)
# })

# start workers with mirai -----------------------------------------------------

test_that("mirai workers are started", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config)
  expect_data_table(rush$worker_info, nrows = 0)

  mirai::daemons(2)

  worker_ids = rush$start_remote_workers(
    worker_loop = test_worker_loop,
    n_workers = 2,
    lgr_thresholds = c("mlr3/rush" = "debug"))
  rush$wait_for_workers(2, timeout = 5)

  # check fields
  walk(rush$processes_mirai, function(process) expect_class(process, "mirai"))

  # check meta data from redis
  worker_info = rush$worker_info
  expect_data_table(worker_info, nrows = 2)
  expect_integer(worker_info$pid, unique = TRUE)
  expect_true(all(worker_info$remote))
  expect_set_equal(worker_ids, worker_info$worker_id)
  expect_set_equal(rush$worker_ids, worker_ids)
  expect_set_equal(rush$worker_states$state, "running")

  expect_rush_reset(rush)
  daemons(0)
})

test_that("new mirai workers can be started on used daemons", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config)
  expect_data_table(rush$worker_info, nrows = 0)

  mirai::daemons(2)

  worker_loop = function(rush) {
    xs = list(x1 = 1, x2 = 2)
    key = rush$push_running_tasks(list(xs))
    ys = list(y = xs$x1 + xs$x2)
    rush$push_results(key, yss = list(ys))
  }

  worker_ids = rush$start_remote_workers(
    worker_loop = worker_loop,
    n_workers = 2,
    lgr_thresholds = c("mlr3/rush" = "debug"))
  rush$wait_for_workers(2, timeout = 5)

  Sys.sleep(1)

  # check fields
  walk(rush$processes_mirai, function(process) expect_class(process, "mirai"))

  expect_equal(mirai::status()$mirai["completed"], c(completed = 2))

  rush = rsh(network_id = "test-rush-2", config = config)
  worker_ids = rush$start_remote_workers(
    worker_loop = worker_loop,
    n_workers = 2,
    lgr_thresholds = c("mlr3/rush" = "debug"))
  rush$wait_for_workers(2, timeout = 5)

  expect_data_table(rush$fetch_finished_tasks(), nrows = 2)

  expect_equal(mirai::status()$mirai["completed"], c(completed = 4))

  r = redux::hiredis()
  r$FLUSHDB()
  expect_rush_reset(rush)
  daemons(0)
})

# start workers with script -----------------------------------------------------

test_that("heartbeat process is started", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config)
  expect_data_table(rush$worker_info, nrows = 0)

  rush$worker_script(
    worker_loop = test_worker_loop,
    heartbeat_period = 3,
    heartbeat_expire = 9,
    lgr_thresholds = c("mlr3/rush" = "debug"))

  px = processx::process$new("Rscript",
    args = c("-e", "rush::start_worker(network_id = 'test-rush', config = list(url = 'redis://127.0.0.1:6379', scheme = 'redis', host = '127.0.0.1', port = '6379'), remote = TRUE, lgr_thresholds = c('mlr3/rush' = 'debug'), lgr_buffer_size = 0, heartbeat_period = 3, heartbeat_expire = 9)"),
    supervise = TRUE,
    stderr = "|", stdout = "|")

  on.exit({
    px$kill()
  }, add = TRUE)

  Sys.sleep(5)

  worker_info = rush$worker_info
  expect_logical(worker_info$heartbeat)
})

# terminate workers ------------------------------------------------------------

test_that("a worker is terminated", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config)
  worker_loop = function(rush) {
    while(!rush$terminated && !rush$terminated_on_idle) {
      task = rush$pop_task(fields = c("xs", "seed"))
      if (!is.null(task)) {
        tryCatch({
          # evaluate task with seed
          fun = function(x1, x2) list(y = x1 + x2)
          ys = with_rng_state(fun, args = c(task$xs), seed = task$seed)
          rush$push_results(task$key, yss = list(ys))
        }, error = function(e) {
          condition = list(message = e$message)
          rush$push_failed(task$key, conditions = list(condition))
        })
      }
    }

    return(NULL)
  }

  worker_ids = rush$start_local_workers(
    worker_loop = worker_loop,
    n_workers = 2,
    lgr_thresholds = c("mlr3/rush" = "debug"))
    rush$wait_for_workers(2, timeout = 5)

  worker_id_1 = rush$running_worker_ids[1]
  worker_id_2 = rush$running_worker_ids[2]

  # worker 1
  rush$stop_workers(worker_ids = worker_id_1, type = "terminate")
  Sys.sleep(3)
  expect_false(rush$processes_processx[[worker_id_1]]$is_alive())
  expect_equal(rush$running_worker_ids, worker_id_2)
  expect_equal(worker_id_1, rush$terminated_worker_ids)

  # worker 2
  rush$stop_workers(worker_ids = worker_id_2, type = "terminate")
  Sys.sleep(3)
  expect_false(rush$processes_processx[[worker_id_2]]$is_alive())
  expect_set_equal(c(worker_id_1, worker_id_2), rush$terminated_worker_ids)
  expect_null(rush$running_worker_ids)

  expect_rush_reset(rush, type = "terminate")
})

# kill workers -----------------------------------------------------------------

test_that("a local worker is killed", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config)
  worker_ids = rush$start_local_workers(
      worker_loop = test_worker_loop,
      n_workers = 2,
      lgr_thresholds = c("mlr3/rush" = "debug"))
  rush$wait_for_workers(2, timeout = 5)

  worker_id_1 = rush$running_worker_ids[1]
  worker_id_2 = rush$running_worker_ids[2]

  rush$worker_info

  # worker 1
  rush$stop_workers(worker_ids = worker_id_1, type = "kill")
  Sys.sleep(1)
  expect_equal(worker_id_1, rush$killed_worker_ids)
  expect_false(rush$processes_processx[[worker_id_1]]$is_alive())
  expect_true(rush$processes_processx[[worker_id_2]]$is_alive())

  # worker 2
  rush$stop_workers(worker_ids = worker_id_2, type = "kill")
  Sys.sleep(1)
  expect_set_equal(c(worker_id_1, worker_id_2), rush$killed_worker_ids)
  expect_false(rush$processes_processx[[worker_id_1]]$is_alive())
  expect_false(rush$processes_processx[[worker_id_2]]$is_alive())

  expect_rush_reset(rush)
})

test_that("a mirai worker is killed", {
  skip_on_cran()

  config = start_flush_redis()

  on.exit({
    mirai::daemons(0)
  }, add = TRUE)

  mirai::daemons(2)
  rush = rsh(network_id = "test-rush", config = config)
  worker_ids = rush$start_remote_workers(
      worker_loop = test_worker_loop,
      n_workers = 2,
      lgr_thresholds = c("mlr3/rush" = "debug"))
  rush$wait_for_workers(2, timeout = 5)

  worker_id_1 = rush$running_worker_ids[1]
  worker_id_2 = rush$running_worker_ids[2]
  worker_info = rush$worker_info
  expect_true(all(tools::pskill(worker_info$pid, signal = 0L)))

  # worker 1
  rush$stop_workers(worker_ids = worker_id_1, type = "kill")
  Sys.sleep(1)
  expect_equal(worker_id_1, rush$killed_worker_ids)
  expect_equal(rush$running_worker_ids, worker_id_2)

  expect_true(mirai::is_error_value(rush$processes_mirai[[worker_id_1]]$data))
  expect_false(mirai::is_error_value(rush$processes_mirai[[worker_id_2]]$data))

  # worker 2
  rush$stop_workers(worker_ids = worker_id_2, type = "kill")
  Sys.sleep(1)
  expect_set_equal(c(worker_id_1, worker_id_2), rush$killed_worker_ids)

  expect_true(mirai::is_error_value(rush$processes_mirai[[worker_id_1]]$data))
  expect_true(mirai::is_error_value(rush$processes_mirai[[worker_id_2]]$data))

  expect_rush_reset(rush)
})

test_that("worker is killed with a heartbeat process", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config)
  expect_data_table(rush$worker_info, nrows = 0)

  rush$worker_script(
    worker_loop = test_worker_loop,
    heartbeat_period = 3,
    heartbeat_expire = 9,
    lgr_thresholds = c("mlr3/rush" = "debug"))

  px = processx::process$new("Rscript",
    args = c("-e", "rush::start_worker(network_id = 'test-rush', config = list(url = 'redis://127.0.0.1:6379', scheme = 'redis', host = '127.0.0.1', port = '6379'), remote = TRUE, lgr_thresholds = c('mlr3/rush' = 'debug'), lgr_buffer_size = 0, heartbeat_period = 3, heartbeat_expire = 9)"),
    supervise = TRUE,
    stderr = "|", stdout = "|")

  on.exit({
    px$kill()
  }, add = TRUE)

  Sys.sleep(5)

  worker_info = rush$worker_info
  expect_logical(worker_info$heartbeat)

  # signal 0L returns TRUE if the process is still alive
  expect_true(tools::pskill(worker_info$pid, signal = 0L))

  rush$stop_workers(type = "kill")

  Sys.sleep(1)

  expect_false(tools::pskill(worker_info$pid, signal = 0L))
  expect_true(rush$worker_states$state == "killed")
  expect_equal(rush$killed_worker_ids, worker_info$worker_id)

  expect_rush_reset(rush)
})

# low level read and write -----------------------------------------------------

test_that("reading and writing a hash works with flatten", {
  skip_on_cran()

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)

  # one field with list
  key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)))
  expect_equal(rush$read_hashes(key, "xs"), list(list(x1 = 1, x2 = 2)))

  # one field with atomic
  key = rush$write_hashes(timeout = 1)
  expect_equal(rush$read_hashes(key, "timeout"), list(list(timeout = 1)))

  # two fields with lists
  key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), ys = list(list(y = 3)))
  expect_equal(rush$read_hashes(key, c("xs", "ys")), list(list(x1 = 1, x2 = 2, y = 3)))

  # two fields with list and empty list
  key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), ys = list())
  expect_equal(rush$read_hashes(key, c("xs", "ys")), list(list(x1 = 1, x2 = 2)))

  # two fields with list and atomic
  key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), timeout = 1)
  expect_equal(rush$read_hashes(key, c("xs", "timeout")), list(list(x1 = 1, x2 = 2, timeout = 1)))
})

test_that("reading and writing a hash works without flatten", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config)

  # one field with list
  key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)))
  expect_equal(rush$read_hashes(key, "xs", flatten = FALSE), list(list(xs = list(x1 = 1, x2 = 2))))

  # one field with atomic
  key = rush$write_hashes(timeout = 1)
  expect_equal(rush$read_hashes(key, "timeout", flatten = FALSE), list(list(timeout = 1)))

  # two fields with lists
  key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), ys = list(list(y = 3)))
  expect_equal(rush$read_hashes(key, c("xs", "ys"), flatten = FALSE), list(list(xs = list(x1 = 1, x2 = 2), ys = list(y = 3))))

  # two fields with list and empty list
  key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), ys = list())
  expect_equal(rush$read_hashes(key, c("xs", "ys"), flatten = FALSE), list(list(xs = list(x1 = 1, x2 = 2), ys = NULL)))

  # two fields with list and atomic
  key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), timeout = 1)
  expect_equal(rush$read_hashes(key, c("xs", "timeout"), flatten = FALSE), list(list(xs = list(x1 = 1, x2 = 2), timeout = 1)))
})

test_that("reading and writing hashes works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)

  # one field with list
  keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)))
  expect_equal(rush$read_hashes(keys, "xs"), list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)))

  # one field atomic
  keys = rush$write_hashes(timeout = c(1, 1))
  expect_equal(rush$read_hashes(keys, "timeout"), list(list(timeout = 1), list(timeout = 1)))

  # two fields with list and recycled atomic
  keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), timeout = 1)
  expect_equal(rush$read_hashes(keys, c("xs", "timeout")), list(list(x1 = 1, x2 = 2, timeout = 1), list(x1 = 1, x2 = 3, timeout = 1)))

  # two fields
  keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), ys = list(list(y = 3), list(y = 4)))
  expect_equal(rush$read_hashes(keys, c("xs", "ys")), list(list(x1 = 1, x2 = 2, y = 3), list(x1 = 1, x2 = 3, y = 4)))

  # two fields with list and atomic
  keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), timeout = c(1, 1))
  expect_equal(rush$read_hashes(keys, c("xs", "timeout")), list(list(x1 = 1, x2 = 2, timeout = 1), list(x1 = 1, x2 = 3, timeout = 1)))

  # two fields with list and recycled atomic
  keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), timeout = 1)
  expect_equal(rush$read_hashes(keys, c("xs", "timeout")), list(list(x1 = 1, x2 = 2, timeout = 1), list(x1 = 1, x2 = 3, timeout = 1)))

  # two fields, one empty
  keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), ys = list())
  expect_equal(rush$read_hashes(keys, c("xs", "ys")), list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)))

  # recycle
  keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), ys = list(list(y = 3)))
  expect_equal(rush$read_hashes(keys, c("xs", "ys")), list(list(x1 = 1, x2 = 2, y = 3), list(x1 = 1, x2 = 3, y = 3)))
})

test_that("writing hashes to specific keys works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)

  # one element
  keys = uuid::UUIDgenerate()
  rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), keys = keys)
  expect_equal(rush$read_hashes(keys, "xs"), list(list(x1 = 1, x2 = 2)))

  # two elements
  keys = uuid::UUIDgenerate(n = 2)
  rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), keys = keys)
  expect_equal(rush$read_hashes(keys, "xs"), list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)))

  # wrong number of keys
  keys = uuid::UUIDgenerate()
  expect_error(rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), keys = keys), "Assertion on 'keys' failed")
})


test_that("writing list columns works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)

  keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), xs_extra = list(list(extra = list("A"))))
  rush$connector$command(c("LPUSH", "test-rush:finished_tasks", keys))

  expect_list(rush$fetch_finished_tasks()$extra, len = 1)

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)

  keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), xs_extra = list(list(extra = list(letters[1:3]))))
  rush$connector$command(c("LPUSH", "test-rush:finished_tasks", keys))

  expect_list(rush$fetch_finished_tasks()$extra, len = 1)

  config = start_flush_redis()
  rush = RushWorker$new(network_id = "test-rush", config = config, remote = FALSE)

  keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 2, x2 = 2)), xs_extra = list(list(extra = list("A")), list(extra = list("B"))))
  rush$connector$command(c("LPUSH", "test-rush:finished_tasks", keys))
  rush$read_hashes(keys, c("xs", "xs_extra"))

  expect_list(rush$fetch_finished_tasks()$extra, len = 2)
})

# task evaluation --------------------------------------------------------------

test_that("evaluating a task works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config)
  worker_ids = rush$start_local_workers(
    worker_loop = test_worker_loop,
    n_workers = 4,
    lgr_thresholds = c("mlr3/rush" = "debug"))
  rush$wait_for_workers(2, timeout = 5)

  xss = list(list(x1 = 1, x2 = 2))
  keys = rush$push_tasks(xss)
  rush$wait_for_tasks(keys)

  # check task count
  expect_equal(rush$n_tasks, 1)
  expect_equal(rush$n_queued_tasks, 0)
  expect_equal(rush$n_running_tasks, 0)
  expect_equal(rush$n_finished_tasks, 1)
  expect_equal(rush$n_failed_tasks, 0)

  # check keys in sets
  expect_string(rush$tasks)
  expect_null(rush$queued_tasks)
  expect_null(rush$running_tasks)
  expect_string(rush$finished_tasks)
  expect_null(rush$failed_tasks)

  # check fetching
  expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
  expect_data_table(rush$fetch_running_tasks(), nrows = 0)
  expect_data_table(rush$fetch_failed_tasks(), nrows = 0)
  data = rush$fetch_finished_tasks()
  expect_names(names(data), must.include = c("x1", "x2", "worker_id", "y", "keys"))
  expect_data_table(data, nrows = 1)
  expect_data_table(rush$fetch_tasks(), nrows = 1)

  expect_rush_reset(rush)
})

test_that("evaluating tasks works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config)
  worker_ids = rush$start_local_workers(
    worker_loop = test_worker_loop,
    n_workers = 4,
    lgr_thresholds = c("mlr3/rush" = "debug"))
  rush$wait_for_workers(2, timeout = 5)

  xss = replicate(10, list(list(x1 = 1, x2 = 2)))
  keys = rush$push_tasks(xss)
  rush$wait_for_tasks(keys)

  # check task count
  expect_equal(rush$n_tasks, 10)
  expect_equal(rush$n_queued_tasks, 0)
  expect_equal(rush$n_running_tasks, 0)
  expect_equal(rush$n_finished_tasks, 10)
  expect_equal(rush$n_failed_tasks, 0)

  # check keys in sets
  expect_character(rush$tasks, len = 10)
  expect_null(rush$queued_tasks)
  expect_null(rush$running_tasks)
  expect_character(rush$finished_tasks, len = 10)
  expect_null(rush$failed_tasks)

  # check fetching
  expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
  expect_data_table(rush$fetch_running_tasks(), nrows = 0)
  expect_data_table(rush$fetch_failed_tasks(), nrows = 0)
  data = rush$fetch_finished_tasks()
  expect_names(names(data), must.include = c("x1", "x2", "worker_id", "y", "keys"))
  expect_data_table(data, nrows = 10)
  expect_data_table(rush$fetch_tasks(), nrows = 10)

  expect_rush_reset(rush)
})

test_that("caching results works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config)
  worker_ids = rush$start_local_workers(
    worker_loop = test_worker_loop,
    n_workers = 4,
    lgr_thresholds = c("mlr3/rush" = "debug"))
  rush$wait_for_workers(2, timeout = 5)

  xss = replicate(10, list(list(x1 = 1, x2 = 2)))
  keys = rush$push_tasks(xss)
  rush$wait_for_tasks(keys)

  expect_data_table(rush$fetch_finished_tasks(), nrows = 10)
  expect_list(get_private(rush)$.cached_tasks, len = 10)

  expect_list(rush$fetch_finished_tasks(data_format = "list"), len = 10)
  expect_list(get_private(rush)$.cached_tasks, len = 10)

  expect_data_table(rush$fetch_finished_tasks(), nrows = 10)
  expect_list(get_private(rush)$.cached_tasks, len = 10)

  expect_list(rush$fetch_finished_tasks(data_format = "list"), len = 10)
  expect_list(get_private(rush)$.cached_tasks, len = 10)

  xss = replicate(10, list(list(x1 = 1, x2 = 2)))
  keys = rush$push_tasks(xss)
  rush$wait_for_tasks(keys)

  expect_data_table(rush$fetch_finished_tasks(), nrows = 20)
  expect_list(get_private(rush)$.cached_tasks, len = 20)

  expect_list(rush$fetch_finished_tasks(data_format = "list"), len = 20)
  expect_list(get_private(rush)$.cached_tasks, len = 20)
})

# segfault detection -----------------------------------------------------------

test_that("a segfault on a local worker is detected", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config)
  worker_loop = function(rush) {
    while(TRUE) {
      Sys.sleep(1)
      xs = list(x1 = 1, x2 = 2)
      key = rush$push_running_tasks(list(xs))
      get("attach")(structure(list(), class = "UserDefinedDatabase"))
    }
  }

  worker_ids = rush$start_local_workers(
    worker_loop = worker_loop,
    n_workers = 1,
    lgr_thresholds = c("mlr3/rush" = "debug"))
  rush$wait_for_workers(1, timeout = 5)

  Sys.sleep(3)

  expect_null(rush$lost_worker_ids)
  rush$detect_lost_workers()
  expect_equal(rush$lost_worker_ids, worker_ids)
  rush$fetch_failed_tasks()

  expect_rush_reset(rush)
})

test_that("a segfault on a mirai worker", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config)
  worker_loop = function(rush) {
    while(TRUE) {
      Sys.sleep(1)
      xs = list(x1 = 1, x2 = 2)
      key = rush$push_running_tasks(list(xs))
      get("attach")(structure(list(), class = "UserDefinedDatabase"))
    }
  }

  mirai::daemons(1)
  worker_ids = rush$start_remote_workers(
    worker_loop = worker_loop,
    n_workers = 1,
    lgr_thresholds = c("mlr3/rush" = "debug"))
  rush$wait_for_workers(1, timeout = 5)

  Sys.sleep(3)

  expect_null(rush$lost_worker_ids)
  rush$detect_lost_workers()
  expect_equal(rush$lost_worker_ids, worker_ids)
  rush$fetch_failed_tasks()

  expect_rush_reset(rush)
})

test_that("a segfault on a worker is detected via the heartbeat", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config)
  expect_data_table(rush$worker_info, nrows = 0)

  worker_loop = function(rush) {
    while(TRUE) {
      Sys.sleep(1)
      xs = list(x1 = 1, x2 = 2)
      key = rush$push_running_tasks(list(xs))
      get("attach")(structure(list(), class = "UserDefinedDatabase"))
    }
  }

  rush$worker_script(
    worker_loop = worker_loop,
    heartbeat_period = 1,
    heartbeat_expire = 2,
    lgr_thresholds = c("mlr3/rush" = "debug"))

  px = processx::process$new("Rscript",
    args = c("-e", "rush::start_worker(network_id = 'test-rush', config = list(url = 'redis://127.0.0.1:6379', scheme = 'redis', host = '127.0.0.1', port = '6379'), remote = TRUE, lgr_thresholds = c('mlr3/rush' = 'debug'), lgr_buffer_size = 0, heartbeat_period = 1, heartbeat_expire = 2)"),
    supervise = TRUE,
    stderr = "|", stdout = "|")

  on.exit({
    px$kill()
  }, add = TRUE)

  Sys.sleep(10)

  expect_null(rush$lost_worker_ids)
  rush$detect_lost_workers()
  expect_string(rush$lost_worker_ids)

  expect_rush_reset(rush)
})

test_that("segfaults on workers are detected via the heartbeat", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config)
  expect_data_table(rush$worker_info, nrows = 0)

  worker_loop = function(rush) {
    while(TRUE) {
      Sys.sleep(1)
      xs = list(x1 = 1, x2 = 2)
      key = rush$push_running_tasks(list(xs))
      get("attach")(structure(list(), class = "UserDefinedDatabase"))
    }
  }

  rush$worker_script(
    worker_loop = worker_loop,
    heartbeat_period = 1,
    heartbeat_expire = 2,
    lgr_thresholds = c("mlr3/rush" = "debug"))

  px_1 = processx::process$new("Rscript",
    args = c("-e", "rush::start_worker(network_id = 'test-rush', config = list(url = 'redis://127.0.0.1:6379', scheme = 'redis', host = '127.0.0.1', port = '6379'), remote = TRUE, lgr_thresholds = c('mlr3/rush' = 'debug'), lgr_buffer_size = 0, heartbeat_period = 1, heartbeat_expire = 2)"),
    supervise = TRUE,
    stderr = "|", stdout = "|")

  px_2 = processx::process$new("Rscript",
    args = c("-e", "rush::start_worker(network_id = 'test-rush', config = list(url = 'redis://127.0.0.1:6379', scheme = 'redis', host = '127.0.0.1', port = '6379'), remote = TRUE, lgr_thresholds = c('mlr3/rush' = 'debug'), lgr_buffer_size = 0, heartbeat_period = 1, heartbeat_expire = 2)"),
    supervise = TRUE,
    stderr = "|", stdout = "|")

  on.exit({
    px_1$kill()
    px_2$kill()
  }, add = TRUE)

  Sys.sleep(10)

  expect_null(rush$lost_worker_ids)
  rush$detect_lost_workers()
  expect_character(rush$lost_worker_ids, len = 2)

  expect_rush_reset(rush)
})


# fault detection --------------------------------------------------------------

test_that("a lost task is detected", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config)
  worker_loop = function(rush) {
    while(TRUE) {
      Sys.sleep(1)
      xs = list(x1 = 1, x2 = 2)
      key = rush$push_running_tasks(list(xs))
      get("attach")(structure(list(), class = "UserDefinedDatabase"))
    }
  }

  worker_ids = rush$start_local_workers(
    worker_loop = worker_loop,
    n_workers = 1,
    lgr_thresholds = c("mlr3/rush" = "debug"))
  rush$wait_for_workers(1, timeout = 5)

  Sys.sleep(5)

  rush$detect_lost_workers()

  # check task count
  expect_equal(rush$n_tasks, 1)
  expect_equal(rush$n_queued_tasks, 0)
  expect_equal(rush$n_running_tasks, 0)
  expect_equal(rush$n_finished_tasks, 0)
  expect_equal(rush$n_failed_tasks, 1)

  # check keys in sets
  expect_character(rush$tasks, len = 1)
  expect_null(rush$queued_tasks)
  expect_null(rush$running_tasks)
  expect_null(rush$finished_tasks)
  expect_string(rush$failed_tasks)

  # check fetching
  expect_data_table(rush$fetch_queued_tasks(), nrows = 0)
  expect_data_table(rush$fetch_running_tasks(), nrows = 0)
  expect_data_table(rush$fetch_tasks(), nrows = 1)

  data = rush$fetch_failed_tasks()
  expect_names(names(data), must.include = c("x1", "x2", "worker_id", "message", "keys"))
  expect_data_table(data, nrows = 1)
  expect_equal(data$message, "Worker has crashed or was killed")

  expect_class(rush$detect_lost_workers(), "Rush")

  expect_rush_reset(rush)
})

# restart workers --------------------------------------------------------------

test_that("restarting a local worker works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config)
  worker_ids = rush$start_local_workers(
    worker_loop = test_worker_loop,
    n_workers = 4,
    lgr_thresholds = c("mlr3/rush" = "debug"))
  rush$wait_for_workers(2, timeout = 5)

  worker_id_1 = rush$running_worker_ids[1]
  worker_id_2 = rush$running_worker_ids[2]

  tools::pskill(rush$worker_info[worker_id == worker_id_1, pid])
  Sys.sleep(1)
  expect_false(rush$processes_processx[[worker_id_1]]$is_alive())

  rush$detect_lost_workers(restart_local_workers = TRUE)
  expect_true(rush$processes_processx[[worker_id_1]]$is_alive())

  expect_rush_reset(rush)
})

test_that("restarting a worker kills the local worker", {
  skip_on_cran()
  skip_on_os("windows")

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config)
  worker_ids = rush$start_local_workers(
    worker_loop = test_worker_loop,
    n_workers = 1,
    lgr_thresholds = c("mlr3/rush" = "debug"))
  rush$wait_for_workers(1, timeout = 5)

  pid = rush$worker_info$pid
  worker_id = rush$running_worker_ids
  expect_true(tools::pskill(pid, signal = 0))

  rush$restart_workers(worker_ids = worker_id)

  Sys.sleep(1)

  expect_false(pid == rush$worker_info$pid)
  expect_false(tools::pskill(pid, signal = 0))

  expect_rush_reset(rush)
})


test_that("restarting a remote worker works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config)
  worker_loop = function(rush) {
    while(TRUE) {
      Sys.sleep(1)
      xs = list(x1 = 1, x2 = 2)
      key = rush$push_running_tasks(list(xs))
      get("attach")(structure(list(), class = "UserDefinedDatabase"))
    }
  }

  on.exit({
    mirai::daemons(0)
  }, add = TRUE)

  mirai::daemons(1)
  worker_ids = rush$start_remote_workers(
    worker_loop = worker_loop,
    n_workers = 1,
    lgr_thresholds = c("mlr3/rush" = "debug"))
  rush$wait_for_workers(1, timeout = 5)

  Sys.sleep(3)

  expect_null(rush$lost_worker_ids)
  rush$detect_lost_workers()
  expect_equal(rush$lost_worker_ids, worker_ids)
  expect_data_table(rush$fetch_failed_tasks(), nrows = 1)

  mirai::daemons(0)
  mirai::daemons(1)

  rush$restart_workers(worker_ids = worker_ids)

  Sys.sleep(3)

  rush$detect_lost_workers()
  expect_equal(rush$lost_worker_ids, worker_ids)
  expect_data_table(rush$fetch_failed_tasks(), nrows = 2)

  expect_rush_reset(rush)
})

# receiving results ------------------------------------------------------------

# test_that("blocking on new results works", {
#   skip_on_cran()

#   config = start_flush_redis()
#   rush = rsh(network_id = "test-rush", config = config)
#   worker_loop = function(rush) {
#     while(!rush$terminated && !rush$terminated_on_idle) {
#       task = rush$pop_task(fields = c("xs", "seed"))
#       if (!is.null(task)) {
#         tryCatch({
#           # evaluate task with seed
#           fun = function(x1, x2, ...) {
#             Sys.sleep(5)
#             list(y = x1 + x2)
#           }
#           ys = with_rng_state(fun, args = c(task$xs), seed = task$seed)
#           rush$push_results(task$key, yss = list(ys))
#         }, error = function(e) {
#           condition = list(message = e$message)
#           rush$push_failed(task$key, conditions = list(condition))
#         })
#       }
#     }

#     return(NULL)
#   }

#   worker_ids = rush$start_local_workers(
#     worker_loop = worker_loop,
#     n_workers = 1,
#     lgr_thresholds = c("mlr3/rush" = "debug"))
#   rush$wait_for_workers(1, timeout = 5)

#   xss = list(list(x1 = 1, x2 = 2))
#   keys = rush$push_tasks(xss)

#   expect_data_table(rush$wait_for_new_tasks(timeout = 1), nrows = 0)
#   expect_data_table(rush$wait_for_new_tasks(timeout = 10), nrows = 1)
#   expect_data_table(rush$wait_for_new_tasks(timeout = 1), nrows = 0)

#   expect_rush_reset(rush)
# })

test_that("wait for tasks works when a task gets lost", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config)
  worker_ids = rush$start_local_workers(
    worker_loop = segfault_worker_loop,
    n_workers = 2,
    lgr_thresholds = c("mlr3/rush" = "debug"))
  rush$wait_for_workers(2, timeout = 5)

  xss = list(list(x1 = 1, x2 = 2), list(x1 = 0, x2 = 2))
  keys = rush$push_tasks(xss)

  expect_class(rush$wait_for_tasks(keys, detect_lost_workers = TRUE), "Rush")

  expect_rush_reset(rush)
})

# misc--------------------------------------------------------------------------

test_that("saving lgr logs works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config)
  worker_ids = rush$start_local_workers(
    worker_loop = test_worker_loop,
    n_workers = 1,
    lgr_thresholds = c("mlr3/rush" = "debug"))
  rush$wait_for_workers(1, timeout = 5)

  Sys.sleep(5)

  xss = list(list(x1 = 2, x2 = 2))
  keys = rush$push_tasks(xss)
  rush$wait_for_tasks(keys)
  Sys.sleep(5)

  log = rush$read_log()
  expect_data_table(log, min.rows = 1L)
  expect_names(names(log), must.include = c("worker_id", "timestamp", "logger", "caller", "msg"))

  log = rush$read_log(time_difference = TRUE)
  expect_data_table(log, min.rows = 1L)
  expect_names(names(log), must.include = c("time_difference"))
  expect_class(log$time_difference, "difftime")

  xss = list(list(x1 = 1, x2 = 2), list(x1 = 0, x2 = 2), list(x1 = 1, x2 = 2))
  keys = rush$push_tasks(xss)
  rush$wait_for_tasks(keys)
  Sys.sleep(5)

  log = rush$read_log()
  expect_data_table(log, min.rows = 2L)
  expect_names(names(log), must.include = c("worker_id", "timestamp", "logger", "caller", "msg"))

  expect_rush_reset(rush)
})

test_that("snapshot option works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config)
  worker_ids = rush$start_local_workers(
    worker_loop = test_worker_loop,
    n_workers = 1,
    lgr_thresholds = c("mlr3/rush" = "debug"))
  rush$wait_for_workers(1, timeout = 5)

  rush$snapshot_schedule = c(1, 1)
  expect_equal(rush$connector$CONFIG_GET("save")[[2]], "1 1")
  expect_equal(rush$snapshot_schedule, c(1, 1))

  rush$snapshot_schedule = NULL
  expect_equal(rush$connector$CONFIG_GET("save")[[2]], "")
  expect_equal(rush$snapshot_schedule, "")

  expect_rush_reset(rush)
})

test_that("terminating workers on idle works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config)
  worker_ids = rush$start_local_workers(
    worker_loop = test_worker_loop,
    n_workers = 2,
    lgr_thresholds = c("mlr3/rush" = "debug"))
  rush$wait_for_workers(2, timeout = 5)

  xss = list(list(x1 = 1, x2 = 2))
  keys = rush$push_tasks(xss, terminate_workers = TRUE)
  rush$wait_for_tasks(keys)
  Sys.sleep(5)

  expect_set_equal(rush$worker_states$state, "terminated")

  expect_rush_reset(rush)
})

test_that("reconnecting rush instance works", {
  skip_on_cran()

  on.exit({
    file.remove("rush.rds")
  })

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config)

  saveRDS(rush, file = "rush.rds")
  rush = readRDS("rush.rds")

  expect_error(rush$print(), "Context is not connected")

  rush$reconnect()
  expect_r6(rush, "Rush")

  expect_rush_reset(rush)
})

# seed -------------------------------------------------------------------------

test_that("seeds are generated from regular rng seed", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config, seed = 123)
  rush$push_tasks(list(list(x1 = 1, x2 = 2)))
  tab = rush$fetch_tasks(fields = c("xs", "seed"))
  expect_true(is_lecyer_cmrg_seed(tab$seed[[1]]))

  rush$push_tasks(list(list(x1 = 2, x2 = 2), list(x1 = 3, x2 = 2)))
  tab = rush$fetch_tasks(fields = c("xs", "seed"))
  expect_true(tab$seed[[1]][2] != tab$seed[[2]][2])
  expect_true(tab$seed[[2]][2] != tab$seed[[3]][2])
})

test_that("seed are generated from L'Ecuyer-CMRG seed", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config, seed = c(10407L, 1801422725L, -2057975723L, 1156894209L, 1595475487L, 210384600L, -1655729657L))
  rush$push_tasks(list(list(x1 = 1, x2 = 2)))
  tab = rush$fetch_tasks(fields = c("xs", "seed"))
  expect_true(is_lecyer_cmrg_seed(tab$seed[[1]]))

  rush$push_tasks(list(list(x1 = 2, x2 = 2), list(x1 = 3, x2 = 2)))
  tab = rush$fetch_tasks(fields = c("xs", "seed"))
  expect_true(tab$seed[[1]][2] != tab$seed[[2]][2])
  expect_true(tab$seed[[2]][2] != tab$seed[[3]][2])
})

test_that("seed is set correctly on two workers", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config, seed = 123)
  worker_loop = function(rush) {
    while(!rush$terminated && !rush$terminated_on_idle) {
      task = rush$pop_task(fields = c("xs", "seed"))
      if (!is.null(task)) {
        tryCatch({
          # evaluate task with seed
          fun = function(x1, x2, ...) list(y = sample(10000, 1))
          ys = with_rng_state(fun, args = c(task$xs), seed = task$seed)
          rush$push_results(task$key, yss = list(ys))
        }, error = function(e) {
          condition = list(message = e$message)
          rush$push_failed(task$key, conditions = list(condition))
        })
      }
    }

    return(NULL)
  }

  worker_ids = rush$start_local_workers(
    worker_loop = worker_loop,
    n_workers = 2,
    lgr_thresholds = c("mlr3/rush" = "debug"))
  rush$wait_for_workers(2, timeout = 5)

  .keys = rush$push_tasks(list(list(x1 = 1, x2 = 2), list(x1 = 2, x2 = 2), list(x1 = 2, x2 = 3), list(x1 = 2, x2 = 4)))
  rush$wait_for_tasks(.keys)

  finished_tasks = rush$fetch_finished_tasks()
  expect_set_equal(finished_tasks$y, c(5971L, 4090L, 1754L, 9794L))

  .keys = rush$push_tasks(list(list(x1 = 5, x2 = 3), list(x1 = 5, x2 = 4)))
  rush$wait_for_tasks(.keys)

  finished_tasks = rush$fetch_finished_tasks()
  expect_set_equal(finished_tasks$y, c(1754L, 9794L, 4090L, 5971L, 8213L, 3865L))

  expect_rush_reset(rush, type = "terminate")
})

test_that("redis info works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(network_id = "test-rush", config = config)
  expect_list(rush$redis_info)
})

test_that("saving logs with redis appender works", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(
    network_id = "test-rush",
    config = config,
    remote = FALSE)

  worker_ids = rush$start_local_workers(
      worker_loop = test_worker_loop,
      n_workers = 1,
      lgr_thresholds = c("mlr3/rush" = "debug"),
      lgr_buffer_size = 1)
  rush$wait_for_workers(1, timeout = 5)

  log = rush$read_log()
  expect_data_table(log, min.rows = 1)
  expect_names(colnames(log), identical.to =  c("worker_id", "level", "timestamp", "logger", "caller", "msg"))

  expect_rush_reset(rush)
})

test_that("error and output logs work", {
  skip_on_cran()

  config = start_flush_redis()
  rush = rsh(
    network_id = "test-rush",
    config = config,
    remote = FALSE)

  message_log = tempdir()
  output_log = tempdir()

  worker_ids = rush$start_local_workers(
    worker_loop = test_worker_loop,
    n_workers = 1,
    lgr_thresholds = c("mlr3/rush" = "debug"),
    lgr_buffer_size = 1,
    message_log = message_log,
    output_log = output_log)
  rush$wait_for_workers(1)

  expect_match(readLines(file.path(message_log, sprintf("message_%s.log", worker_ids[1])))[1], "Debug message logging on worker")
  expect_match(readLines(file.path(output_log, sprintf("output_%s.log", worker_ids[1])))[1], "Debug output logging on worker")

  expect_rush_reset(rush)
})

Try the rush package in your browser

Any scripts or data that you put into this service are public.

rush documentation built on Aug. 8, 2025, 7:41 p.m.