tests/testthat/test-rrq.R

context("rrq")

test_that("queue interface to worker submission sends correct args", {
  config <- example_config(use_workers = TRUE)
  ctx <- context::context_save(file.path(config$workdir, "context"))
  test_fake_rrq(ctx, config)
  obj <- queue_didehpc(ctx, config, initialise = FALSE, provision = "fake")
  mock_submit_workers <- mockery::mock()
  mockery::stub(obj$submit_workers, "rrq_submit_workers", mock_submit_workers)
  obj$submit_workers(5, 100, FALSE)
  mockery::expect_called(mock_submit_workers, 1)
  expect_equal(mockery::mock_args(mock_submit_workers)[[1]],
               list(obj, r6_private(obj)$data, 5, 100, FALSE))
})


test_that("queue interface to job submission sends correct args", {
  config <- example_config(use_workers = TRUE)
  ctx <- context::context_save(file.path(config$workdir, "context"))
  obj <- queue_didehpc(ctx, config, initialise = FALSE, provision = "fake")
  mock_submit_context_tasks <- mockery::mock()
  ids <- ids::random_id(5)
  mockery::stub(obj$submit, "rrq_submit_context_tasks",
                mock_submit_context_tasks)
  obj$submit(ids)
  mockery::expect_called(mock_submit_context_tasks, 1)
  expect_equal(mockery::mock_args(mock_submit_context_tasks)[[1]],
               list(obj$config, obj$context, ids, NULL))
})


test_that("queue interface to worker stop sends correct args", {
  config <- example_config(use_workers = TRUE)
  ctx <- context::context_save(file.path(config$workdir, "context"))
  obj <- queue_didehpc(ctx, config, initialise = FALSE, provision = "fake")
  mock_stop_workers <- mockery::mock()
  mockery::stub(obj$stop_workers, "rrq_stop_workers", mock_stop_workers)
  ids <- ids::random_id(2)
  obj$stop_workers(ids)
  mockery::expect_called(mock_stop_workers, 1)
  expect_equal(mockery::mock_args(mock_stop_workers)[[1]],
               list(obj$config, obj$context$id, ids))
})


test_that("queue interface to controller creation sends correct args", {
  config <- example_config(use_workers = TRUE)
  ctx <- context::context_save(file.path(config$workdir, "context"))
  obj <- queue_didehpc(ctx, config, initialise = FALSE, provision = "fake")
  mock_rrq_controller <- mockery::mock()
  mockery::stub(obj$rrq_controller, "didehpc_rrq_controller",
                mock_rrq_controller)
  obj$rrq_controller()
  mockery::expect_called(mock_rrq_controller, 1)
  expect_equal(mockery::mock_args(mock_rrq_controller)[[1]],
               list(obj$config, obj$context$id))
})


test_that("stopping workers interaction test", {
  config <- example_config(use_workers = TRUE)
  id <- ids::random_id()
  worker_ids <- ids::random_id(4)
  mock_rrq <- list(worker_stop = mockery::mock())
  mock_rrq_controller <- mockery::mock(mock_rrq)
  mockery::stub(rrq_stop_workers, "didehpc_rrq_controller", mock_rrq_controller)
  rrq_stop_workers(config, id, worker_ids)

  mockery::expect_called(mock_rrq_controller, 1)
  expect_equal(mockery::mock_args(mock_rrq_controller)[[1]],
               list(config, id))

  mockery::expect_called(mock_rrq$worker_stop, 1)
  expect_equal(mockery::mock_args(mock_rrq$worker_stop)[[1]],
               list(worker_ids))
})


## this tests makes the manipulation below more reasonable because it
## makes clear which parts of config are used.
test_that("rrq controller reads the cluster host", {
  mock_hiredis <- mockery::mock(redux::redis)
  mock_rrq <- mockery::mock()
  id <- ids::random_id()
  mockery::stub(didehpc_rrq_controller, "rrq::rrq_controller$new", mock_rrq)
  mockery::stub(didehpc_rrq_controller, "redux::hiredis", mock_hiredis)
  expect_error(
    didehpc_rrq_controller(list(use_rrq = FALSE, use_workers = FALSE), id),
    "workers not enabled")
  mockery::expect_called(mock_hiredis, 0)
  mockery::expect_called(mock_rrq, 0)

  config <- list(use_rrq = TRUE,
                 use_workers = FALSE,
                 redis_host = "redis.example.com")
  expect_silent(didehpc_rrq_controller(config, id))
  mockery::expect_called(mock_hiredis, 1)
  mockery::expect_called(mock_rrq, 1)
  expect_equal(mockery::mock_args(mock_hiredis)[[1]],
               list(host = "redis.example.com"))
  expect_equal(mockery::mock_args(mock_rrq)[[1]], list(id, redux::redis))
})


test_that("configure environment", {
  skip_if_no_redis()
  config <- list(use_rrq = TRUE, redis_host = NULL, worker_timeout = 100)
  id <- ids::random_id()
  rrq <- didehpc_rrq_controller(config, id)
  expect_null(rrq$con$GET(r6_private(rrq)$keys$envir))

  rrq_init(rrq, config)
  expect_setequal(rrq$worker_config_list(), c("localhost", "didehpc"))
  expect_equal(
    unclass(rrq$worker_config_read("didehpc")),
    list(queue = c("default", "context"), verbose = TRUE,
         poll_queue = 5, timeout_idle = 100, poll_process = 1,
         timeout_process_die = 2, heartbeat_period = NULL), tolerance = 1)

  create <- rrq$con$GET(r6_private(rrq)$keys$envir)
  expect_is(create, "raw")
  expect_is(unserialize(create), "function")
  expect_identical(environment(unserialize(create)), globalenv())
})


test_that("use context loader", {
  skip_if_no_redis()
  loader <- rrq_context_loader()
  expect_identical(environment(loader), globalenv())
  ctx <- context::context_save(tempfile())
  env <- new.env()

  withr::with_envvar(
    c(CONTEXT_ROOT = NA_character_, CONTEXT_ID = NA_character_),
    expect_error(
      loader(env),
      "Environment variables incorrect: CONTEXT_ROOT: '' CONTEXT_ID: ''"))
  withr::with_envvar(
    c(CONTEXT_ROOT = NA_character_, CONTEXT_ID = "x"),
    expect_error(
      loader(env),
      "Environment variables incorrect: CONTEXT_ROOT: '' CONTEXT_ID: 'x'"))

  mock_load <- mockery::mock()
  mockery::stub(loader, "context::context_load", mock_load)
  withr::with_envvar(
    c(CONTEXT_ROOT = ctx$root$path, CONTEXT_ID = ctx$id),
    loader(env))
  mockery::expect_called(mock_load, 1L)
  args <- mockery::mock_args(mock_load)[[1]]
  expect_equal(args[[1]]$root$id, ctx$root$id)
  expect_equal(args[[1]]$root$path, ctx$root$path)
  expect_equal(args[[1]]$id, ctx$id)
  expect_identical(args[[2]], env)
})


test_that("Can send context tasks to a worker", {
  skip_if_no_redis()
  config <- list(use_rrq = TRUE, redis_host = NULL, worker_timeout = 10)
  ctx <- context::context_save(tempfile())
  ids <- c(context::task_save(quote(sin(1)), ctx),
           context::task_save(quote(sin(2)), ctx))
  rrq_submit_context_tasks(config, ctx, ids, NULL)
  rrq <- didehpc_rrq_controller(config, ctx$id)
  rrq_init(rrq, config)
  rrq$envir(NULL, FALSE) # just simpler not to have this here
  id_rrq <- rrq$task_list()
  expect_length(id_rrq, 2L)

  rrq$task_data(id_rrq[[1]])

  w <- rrq::rrq_worker$new(ctx$id, "didehpc")
  w$step(TRUE)

  expect_equal(context::task_status(ids[[1]], ctx), "COMPLETE")
  expect_equal(context::task_status(ids[[2]], ctx), "PENDING")

  w$step(TRUE)

  expect_equal(context::task_status(ids[[1]], ctx), "COMPLETE")
  expect_equal(context::task_status(ids[[2]], ctx), "COMPLETE")
})


test_that("Can plausibly submit workers", {
  skip_if_no_redis()
  config <- example_config(use_workers = TRUE)
  config$redis_host <- NULL
  ctx <- context::context_save(file.path(config$workdir, "context"))
  test_fake_rrq(ctx, config)
  obj <- queue_didehpc(ctx, config, initialise = FALSE, provision = "fake")
  obj$client <- list(submit = mockery::mock())
  mock_wait <- mockery::mock()
  mockery::stub(rrq_submit_workers, "rrq::rrq_worker_wait", mock_wait)
  data <- r6_private(obj)$data

  expect_message(
    rrq_submit_workers(obj, data, 5, 100, FALSE),
    "Submitting 5 workers with base name '.+'")

  expect_true(file.exists(data$paths$local$batch))
  expect_true(file.exists(data$paths$local$worker_log))
  mockery::expect_called(obj$client$submit, 5)

  batch <- dir(data$paths$local$batch)
  expect_length(batch, 5)
  base <- sub("_[0-9]\\.bat", "", batch[[1]])

  args <- mockery::mock_args(obj$client$submit)
  expect_equal(
    args[[1]],
    list(paste0(data$paths$remote$batch, "\\", base, "_1.bat"),
         paste0(base, "_1"), config$resource$template, config$cluster,
         config$resource$type, config$resource$count))
  expect_equal(
    args[[5]],
    list(paste0(data$paths$remote$batch, "\\", base, "_5.bat"),
         paste0(base, "_5"), config$resource$template, config$cluster,
         config$resource$type, config$resource$count))

  mockery::expect_called(mock_wait, 1)
  args <- mockery::mock_args(mock_wait)[[1]]
  expect_s3_class(args[[1]], "rrq_controller")
  expect_match(args[[2]], sprintf("^%s:worker:alive:.+", ctx$id))
  expect_equal(args[3:4], list(timeout = 100, progress = FALSE))
})


test_that("Can plausibly submit workers with different configuration", {
  skip_if_no_redis()
  w <- worker_resource(template = "8Core", cores = 8, parallel = FALSE)
  config <- example_config(use_rrq = TRUE, worker_resource = w)
  config$redis_host <- NULL
  ctx <- context::context_save(file.path(config$workdir, "context"))
  test_fake_rrq(ctx, config)
  obj <- queue_didehpc(ctx, config, initialise = FALSE, provision = "fake")
  obj$client <- list(submit = mockery::mock())
  mock_wait <- mockery::mock()
  mockery::stub(rrq_submit_workers, "rrq::rrq_worker_wait", mock_wait)
  data <- r6_private(obj)$data

  expect_message(
    rrq_submit_workers(obj, data, 5, 100, FALSE),
    "Submitting 5 workers with base name '.+'")

  expect_true(file.exists(data$paths$local$batch))
  expect_true(file.exists(data$paths$local$worker_log))
  mockery::expect_called(obj$client$submit, 5)

  batch <- dir(data$paths$local$batch)
  expect_length(batch, 5)
  base <- sub("_[0-9]\\.bat", "", batch[[1]])

  args <- mockery::mock_args(obj$client$submit)
  expect_equal(
    args[[1]],
    list(paste0(data$paths$remote$batch, "\\", base, "_1.bat"),
         paste0(base, "_1"), config$worker_resource$template, config$cluster,
         config$worker_resource$type, config$worker_resource$count))
  expect_equal(
    args[[5]],
    list(paste0(data$paths$remote$batch, "\\", base, "_5.bat"),
         paste0(base, "_5"), config$worker_resource$template, config$cluster,
         config$worker_resource$type, config$worker_resource$count))

  mockery::expect_called(mock_wait, 1)
  args <- mockery::mock_args(mock_wait)[[1]]
  expect_s3_class(args[[1]], "rrq_controller")
  expect_match(args[[2]], sprintf("^%s:worker:alive:.+", ctx$id))
  expect_equal(args[3:4], list(timeout = 100, progress = FALSE))
})


test_that("error if remote rrq is too old", {
  curr <- packageVersion("rrq")
  expect_error(
    rrq_check_package_version(curr, numeric_version("0.4.4")),
    "Your remote version of rrq (0.4.4) is too old; must be at least 0.6.21",
    fixed = TRUE)
})


test_that("warn if rrq versions differ", {
  curr <- packageVersion("rrq")
  other <- numeric_version("99.99.99")
  expect_warning(
    rrq_check_package_version(curr, other),
    "rrq versions differ between local \\([0-9]+\\.[0-9]+\\.[0-9]+\\) and remote \\(99.99.99\\)")
})


test_that("silent of rrq versions agree", {
  curr <- packageVersion("rrq")
  expect_silent(
    rrq_check_package_version(curr, curr))
})
mrc-ide/didehpc documentation built on Aug. 20, 2023, 10:27 a.m.