tests/testthat/test-rrq-progress.R

test_that("basic use", {
  obj <- test_rrq()
  w <- test_worker_blocking(obj)
  t <- rrq_task_create_expr(sin(1), controller = obj)
  w$poll(TRUE)
  worker_run_task_start(w, r6_private(w), t)

  ## Status is NULL by default (actually missing in Redis)
  expect_null(rrq_task_progress(t, controller = obj))

  ## Status can be set
  expect_null(w$progress("status1", TRUE))
  expect_equal(rrq_task_progress(t, controller = obj), "status1")

  ## Status can be updated
  expect_null(w$progress("status2", TRUE))
  expect_equal(rrq_task_progress(t, controller = obj), "status2")

  ## Completing a task retains its status
  worker_run_task(w, r6_private(w), t)
  expect_equal(rrq_task_progress(t, controller = obj), "status2")

  ## Deleting a task removes its status
  rrq_task_delete(t, controller = obj)
  expect_null(rrq_task_progress(t, controller = obj))
})


test_that("update progress from interface function", {
  obj <- test_rrq("myfuns.R")
  w <- test_worker_spawn(obj, 1)

  t <- rrq_task_create_expr(run_with_progress(5, 0), controller = obj)
  rrq_task_wait(t, controller = obj)
  expect_equal(rrq_task_result(t, controller = obj), 5)
  expect_equal(rrq_task_progress(t, controller = obj), "iteration 5")
})


test_that("update progress multiple times in task", {
  obj <- test_rrq("myfuns.R")
  w <- test_worker_spawn(obj, 1)
  p <- tempfile()

  t <- rrq_task_create_expr(run_with_progress_interactive(p), controller = obj)
  wait_status(t, obj)

  testthat::try_again(5, {
    Sys.sleep(0.5)
    expect_equal(rrq_task_progress(t, controller = obj),
                 "Waiting for file")
  })

  writeLines("something", p)
  Sys.sleep(0.2)
  expect_equal(rrq_task_progress(t, controller = obj),
               "Got contents 'something'")

  writeLines("another thing", p)
  Sys.sleep(0.2)
  expect_equal(rrq_task_progress(t, controller = obj),
               "Got contents 'another thing'")

  writeLines("STOP", p)
  wait_status(t, obj, status = TASK_RUNNING)
  expect_equal(rrq_task_result(t, controller = obj), "OK")
  expect_equal(rrq_task_status(t, controller = obj), TASK_COMPLETE)
  expect_equal(rrq_task_progress(t, controller = obj),
               "Finishing")
})


test_that("can't register progress with no active worker", {
  cache$active_worker <- NULL
  expect_error(
    rrq_task_progress_update("value", TRUE),
    "rrq_task_progress_update called with no active worker")
  expect_silent(
    rrq_task_progress_update("value", FALSE))
})


test_that("can't register progress with no active task", {
  obj <- test_rrq()
  w <- test_worker_blocking(obj)
  expect_error(
    w$progress("value", TRUE),
    "rrq_task_progress_update called with no active task")
  expect_silent(
    w$progress("value", FALSE))
})


test_that("collect progress from signal", {
  obj <- test_rrq("myfuns.R")
  w <- test_worker_blocking(obj)
  t <- rrq_task_create_expr(run_with_progress_signal(5, 0), controller = obj)
  w$step(TRUE)
  expect_equal(rrq_task_result(t, controller = obj), 5)
  expect_equal(rrq_task_progress(t, controller = obj),
               list(message = "iteration 5"))
})


## TODO: this test is slighly flakey, and I don't see why.
test_that("collect progress from separate process", {
  obj <- test_rrq("myfuns.R")
  w <- test_worker_spawn(obj, 1)
  p <- tempfile()

  t <- rrq_task_create_expr(run_with_progress_interactive(p),
                            separate_process = TRUE,
                            controller = obj)
  wait_status(t, obj)

  testthat::try_again(5, {
    Sys.sleep(0.5)
    expect_equal(rrq_task_progress(t, controller = obj),
                 "Waiting for file")
  })

  writeLines("something", p)
  Sys.sleep(0.2)
  expect_equal(rrq_task_progress(t, controller = obj),
               "Got contents 'something'")

  writeLines("another thing", p)
  Sys.sleep(0.2)
  expect_equal(rrq_task_progress(t, controller = obj),
               "Got contents 'another thing'")

  writeLines("STOP", p)
  wait_status(t, obj, status = TASK_RUNNING)
  expect_equal(rrq_task_status(t, controller = obj), TASK_COMPLETE)
  expect_equal(rrq_task_progress(t, controller = obj),
               "Finishing")
})


test_that("collect progress from signal", {
  obj <- test_rrq("myfuns.R")
  w <- test_worker_blocking(obj)
  t <- rrq_task_create_expr(run_with_progress_signal(5, 0),
                            separate_process = TRUE,
                            controller = obj)
  w$step(TRUE)
  expect_equal(rrq_task_result(t, controller = obj), 5)
  expect_equal(rrq_task_progress(t, controller = obj),
               list(message = "iteration 5"))
})


## This one actually belongs elsewhere, really.
test_that("Separate process leaves global env clean", {
  obj <- test_rrq("myfuns.R")
  w <- test_worker_blocking(obj)

  t1 <- rrq_task_create_expr(dirty_double(1), controller = obj)
  t2 <- rrq_task_create_expr(dirty_double(2), controller = obj)
  w$step(TRUE)
  w$step(TRUE)

  ## Running locally pollutes the global environment and is not
  ## isolated:
  expect_equal(rrq_task_result(t1, controller = obj), list(NULL, 2))
  expect_equal(rrq_task_result(t2, controller = obj), list(1, 4))
  expect_equal(.GlobalEnv$.rrq_dirty_double, 2)

  t3 <- rrq_task_create_expr(dirty_double(3),
                             separate_process = TRUE,
                             controller = obj)
  t4 <- rrq_task_create_expr(dirty_double(4),
                             separate_process = TRUE,
                             controller = obj)
  w$step(TRUE)
  w$step(TRUE)

  ## Running in separate process is unaffected by global environment
  ## and does not affect it:
  expect_equal(rrq_task_status(t3, controller = obj), TASK_COMPLETE)
  expect_equal(rrq_task_result(t3, controller = obj), list(NULL, 6))
  expect_equal(rrq_task_result(t4, controller = obj), list(NULL, 8))
  expect_equal(.GlobalEnv$.rrq_dirty_double, 2)

  rm(list = ".rrq_dirty_double", envir = .GlobalEnv)
})
mrc-ide/rrq documentation built on April 25, 2024, 11:59 p.m.