tests/testthat/test-tasks.R

context("tasks")

## This cycles through all the core task query functions in a totally
## empty context database and checks that everything behaves sensibly.
## These are mostly edge cases.
test_that("tasks in empty context", {
  path <- tempfile("cluster_")
  on.exit(cleanup(path))
  ctx <- context_save(path)

  expect_equal(task_list(ctx), character(0))
  expect_equal(task_list(path), character(0))
  expect_equal(task_list(context_root(path)), character(0))

  ids <- ids::random_id(2)
  id <- ids[[1]]

  expect_false(task_delete(id, ctx))
  expect_equal(task_delete(ids, ctx), c(FALSE, FALSE))
  expect_equal(task_delete(character(0), ctx), logical(0))

  expect_is(task_result(id, ctx, TRUE), "UnfetchableTask")
  expect_error(task_result(id, ctx), "unfetchable: MISSING")

  expect_equal(task_status(id, ctx), TASK_MISSING)
  expect_equal(task_status(ids, ctx), rep(TASK_MISSING, length(ids)))
  expect_equal(task_status(character(0), ctx, named = FALSE), character(0))
  expect_equal(task_status(character(0), ctx, named = TRUE),
               setNames(character(0), character(0)))

  expect_equal(task_context_id(character(0), ctx), character(0))
  expect_equal(task_context_id(id, ctx), NA_character_)
  expect_equal(task_context_id(ids, ctx), rep(NA_character_, length(ids)))

  expect_error(task_read(id, ctx), "not found")
  expect_error(task_log(id, ctx), "Logging not enabled")

  expect_equal(task_context(id, ctx), NA_character_)
  expect_equal(task_context(ids, ctx), rep(NA_character_, 2L))
  expect_equal(task_context(character(0), ctx), character(0))

  res <- task_times(ids, ctx)
  expect_is(res, "data.frame")
  expect_equal(res$task_id, ids)
  expect_equal(res$submitted, missing_time(length(ids)))
  expect_equal(res$started, missing_time(length(ids)))
  expect_equal(res$finished, missing_time(length(ids)))
  expect_equal(res$waiting, rep(NA_real_, 2))
  expect_equal(res$running, rep(NA_real_, 2))
  expect_equal(res$idle, rep(NA_real_, 2))

  res0 <- task_times(character(0), ctx)
  expect_equal(res0, res[integer(0), ])
})

test_that("dependencies must exist", {
  path <- tempfile("cluster_")
  on.exit(cleanup(path))
  ctx <- context_save(path)

  expr <- quote(sin(1))
  expect_error(task_save(expr, ctx, depends_on = "123"),
               "Failed to save as dependency 123 does not exist")
  expect_error(task_save(expr, ctx, depends_on = c("123", "456")),
               "Failed to save as dependencies 123, 456 do not exist")

  t <- task_save(expr, ctx)
  t2 <- task_save(expr, ctx, depends_on = t)
  t3 <- task_save(expr, ctx, depends_on = c(t, t2))

  expect_equal(task_deps(t2, ctx), list(t))
  expect_length(task_deps(t, ctx), 0)
  expect_equal(task_deps(c(t, t2), ctx), list(t))
  expect_equal(task_deps(t3, ctx), list(c(t, t2)))

  expected_named <- list(t, c(t, t2))
  names(expected_named) <- c(t2, t3)
  expect_equal(task_deps(c(t, t2, t3), ctx, named = TRUE), expected_named)
})

test_that("single task", {
  path <- tempfile("cluster_")
  on.exit(cleanup(path))
  ctx <- context_save(path)

  expr <- quote(sin(1))
  t <- task_save(expr, ctx)

  expect_true(is_id(t))
  expect_equal(task_list(ctx), t)
  expect_equal(task_status(t, ctx, TRUE), setNames(TASK_PENDING, t))
  expect_equal(task_status(t, ctx, FALSE), TASK_PENDING)
  expect_equal(task_context(t, ctx), ctx$id)

  expect_is(task_result(t, ctx, TRUE), "UnfetchableTask")
  expect_error(task_result(t, ctx), "unfetchable: PENDING")

  res <- task_times(t, ctx)
  expect_is(res, "data.frame")
  expect_equal(res$task_id, t)
  expect_is(res$submitted, "POSIXt")
  expect_false(is.na(res$submitted))
  expect_equal(res$started, missing_time())
  expect_equal(res$finished, missing_time())
  expect_false(is.na(res$waiting))
  expect_equal(res$running, NA_real_)
  expect_equal(res$idle, NA_real_)

  dat <- task_read(t, path)
  expect_is(dat$db, "storr")
  expect_equal(dat$context_id, ctx$id)
  expect_equal(dat$expr, expr)
  expect_null(dat$objects)
  expect_equal(task_context_id(t, path), ctx$id)

  expect_equal(task_expr(t, path), expr)
  expect_equal(task_function_name(t, path), "sin")

  e <- new.env()
  ctx_run <- context_load(ctx, e)
  dat <- task_load(t, ctx_run)

  expect_is(dat$db, "storr")
  expect_equal(dat$expr, expr)
  expect_equal(eval(dat$expr, dat$envir), eval(expr))

  ## OK, this is nasty.  If we have a local environment, like in this
  ## situation, then unserialising that environment is going to create
  ## a situation where our *globals* aren't in the right place.  Such
  ## is life; I don't see what else we can do about that.
  expect_equal(ls(dat$envir), character(0))
  expect_identical(parent.env(dat$envir), ctx_run$envir)
  if (!identical(environment(), .GlobalEnv)) {
    expect_false(identical(dat$parent, e))
    expect_equal(ls(e), ls(.GlobalEnv))
  }

  expect_equal(task_run(t, ctx_run), eval(expr))
})

test_that("task_delete (single)", {
  path <- tempfile("cluster_")
  on.exit(cleanup(path))

  ctx <- context_save(path, storage_type = "environment")

  expr <- quote(sin(1))
  t <- task_save(expr, ctx)

  expect_equal(task_list(ctx), t)
  expect_true(ctx$db$exists(t, "tasks"))
  expect_true(task_delete(t, ctx))
  expect_equal(task_list(ctx), character(0))

  expect_false(ctx$db$exists(t, "tasks"))
  expect_false(task_delete(t, ctx))
})

test_that("task_delete (multiple)", {
  path <- tempfile("cluster_")
  on.exit(cleanup(path))

  ctx <- context_save(path, storage_type = "environment")

  expr <- quote(sin(1))
  t1 <- task_save(expr, ctx)
  t2 <- task_save(expr, ctx)
  t3 <- task_save(expr, ctx)
  tt <- c(t1, t2, t3)

  expect_true(all(tt %in% task_list(ctx)))
  expect_equal(ctx$db$exists(tt, "tasks"), rep(TRUE, length(tt)))
  i <- 1:2
  expect_equal(task_delete(tt[i], ctx), rep(TRUE, length(tt[i])))
  expect_equal(sort(tt[-i]), sort(task_list(ctx)))

  expect_equal(task_delete(tt, ctx), !(seq_along(tt) %in% i))
  expect_equal(task_delete(tt, ctx), rep(FALSE, length(tt)))

  expect_equal(task_list(ctx), character(0))
})

test_that("local variables", {
  path <- tempfile("cluster_")
  on.exit(cleanup(path))

  x <- 1
  expr <- quote(sin(x))
  ctx <- context_save(path)
  t <- task_save(expr, ctx)

  dat <- task_read(t, ctx)
  expect_equal(names(dat$objects), "x")
  expect_equal(unname(dat$objects), ctx$db$hash_object(x))

  e <- new.env(parent = .GlobalEnv)
  ctx_run <- context_load(ctx, e)
  dat <- task_load(t, ctx_run)

  expect_identical(ls(dat$envir), "x")
  expect_identical(dat$envir$x, x)
  expect_identical(parent.env(dat$envir), ctx_run$envir)

  expect_equal(task_expr(t, ctx), expr)
  expect_equal(task_expr(t, ctx, TRUE),
               structure(expr, locals = dat$objects))

  res <- task_run(t, ctx_run)
  expect_equal(res, sin(1))
})

test_that("task_run & times", {
  path <- tempfile("cluster_")
  on.exit(cleanup(path))

  x <- 1
  y <- 2
  expr <- quote(list(x, y))
  ctx <- context_save(path)

  expect_equal(ctx$db$list("task_time_sub"), character(0))
  expect_equal(ctx$db$list("task_time_beg"), character(0))
  expect_equal(ctx$db$list("task_time_end"), character(0))

  t <- task_save(expr, ctx)

  expect_equal(ctx$db$list("task_time_sub"), t)
  expect_equal(ctx$db$list("task_time_beg"), character(0))
  expect_equal(ctx$db$list("task_time_end"), character(0))

  e <- new.env(parent = environment())
  ctx_run <- context_load(ctx, e)
  res <- task_run(t, ctx_run)
  expect_identical(res, list(x, y))

  expect_equal(ctx$db$list("task_time_sub"), t)
  expect_equal(ctx$db$list("task_time_beg"), t)
  expect_equal(ctx$db$list("task_time_end"), t)

  t_sub <- ctx$db$get(t, "task_time_sub")
  t_beg <- ctx$db$get(t, "task_time_beg")
  t_end <- ctx$db$get(t, "task_time_end")

  expect_is(t_sub, "POSIXt")
  expect_true(t_beg >= t_sub)
  expect_true(t_end >= t_beg)

  expect_equal(task_status(t, ctx), TASK_COMPLETE)
})

test_that("complex expressions", {
  path <- tempfile("cluster_")
  on.exit(cleanup(path))

  x <- 1
  y <- 10
  n <- 2
  expr <- quote(rep(x:y, n))
  ctx <- context_save(path)

  t <- task_save(expr, ctx)

  tmp <- task_read(t, ctx)
  expect_equal(tmp$expr, expr)
  expect_equal(sort(names(tmp$objects)), c("n", "x", "y"))

  res <- task_run(t, context_load(ctx, new.env(parent = .GlobalEnv)))
  expect_equal(res, eval(expr))
})

test_that("stack trace", {
  ctx <- context_save(tempfile(), storage_type = "environment")
  ctx_run <- context_load(ctx, new.env(parent = .GlobalEnv))
  t <- task_save(quote(readLines("asdfa.txt")), ctx)
  ## Warning is not suppressed:
  expect_warning(res <- task_run(t, ctx_run), "No such file")
  expect_is(res, "context_task_error")
  expect_is(res$trace, "character")
  expect_match(tail(res$trace, 2)[[1]], "^readLines")
})

test_that("stack trace, no warning", {
  ctx <- context_save(tempfile(), storage_type = "environment",
                      sources = "myfuns.R")
  t <- task_save(quote(f(-10)), ctx)

  ctx_run <- context_load(ctx, new.env(parent = .GlobalEnv))
  expect_message(res <- task_run(t, ctx_run),
                 "Need positive x")
  expect_null(res$warnings)
  expect_match(tail(res$trace, 1), "stop")
})

test_that("long expr", {
  ctx <- context_save(tempfile(), storage_type = "environment")
  task <- task_save(quote(list(a_label = "a value",
                               another_label = pi,
                               one_more = c(exp(1), pi, 123.12312),
                               last_one = "a very long string here to wrap")),
                    ctx)

  ctx_run <- context_load(ctx, new.env(parent = .GlobalEnv))
  msg <- capture_messages(res <- task_run(task, ctx_run))
  msg <- strsplit(paste(msg, collapse = ""), "\n")[[1]]
  dat <- parse_context_log(sub("\n$", "", msg))
  expect_true("..." %in% dat$title)
  expect_equal(dat$title[[which(dat$title == "expr") + 1L]], "...")
})

test_that("print", {
  ctx <- context_save(tempfile(), storage_type = "environment")
  t <- task_save(quote(sin(2)), ctx)
  dat <- task_read(t, ctx)
  expect_output(print(dat), "<task>", fixed = TRUE)
})

test_that("capture output", {
  ctx <- context_save(tempfile(),
                      storage_type = "environment")
  ctx_run <- context_load(ctx, new.env(parent = .GlobalEnv))
  t <- task_save(quote(sin(2)), ctx)
  logfile <- tempfile()

  ## Problem: This one is not working with testthat, unfortunately;
  ## getting it working probably will mean either working out how to
  ## win the sink fight with testthat, or testing in a subprocess.
  ##
  ## Solution via:
  ## https://github.com/hadley/testthat/issues/460
  res <- withCallingHandlers(
    task_run(t, ctx_run, filename = logfile),
    message = function(e) cat(conditionMessage(e), file = stderr(), sep = ""))
  expect_equal(res, sin(2))
  expect_true(file.exists(logfile))
  dat <- parse_context_log(readLines(logfile))
  expect_is(dat, "context_log")
  str <- capture.output(print(dat))
  expect_match(str[[1]], "[ root", fixed = TRUE)

  ctx$db$set(t, logfile, "log_path")
  expect_equal(task_log(t, ctx), dat)

  txt <- task_log(t, ctx, FALSE)
  expect_equal(parse_context_log(txt), task_log(t, ctx))

  logpath <- file.path(ctx$root$path, "logs")
  dir.create(logpath, FALSE, TRUE)
  file.copy(logfile, file.path(logpath, t))

  ctx$db$set(t, "logs", "log_path")
  expect_equal(task_log(t, ctx), dat)

  ctx$db$set(t, file.path("logs", t), "log_path")
  expect_equal(task_log(t, ctx), dat)

  ctx$db$set(t, tempfile(), "log_path")
  expect_error(task_log(t, ctx), "Logfile does not exist at")
})

test_that("can't run without loading", {
  ctx <- context_save(tempfile(),
                      storage_type = "environment")
  t <- task_save(quote(sin(1)), ctx)
  expect_error(task_run(t, ctx), "context is not loaded")
})

test_that("fetch task result", {
  ctx <- context_save(tempfile(),
                      storage_type = "environment")
  ctx_run <- context_load(ctx, new.env(parent = .GlobalEnv))
  on.exit(unlink(ctx$root$path, recursive = TRUE))
  t <- task_save(quote(sin(1)), ctx)
  expect_equal(task_run(t, ctx_run), sin(1))

  expect_equal(task_result(t, ctx$root), sin(1))
  expect_equal(task_status(t, ctx$root), TASK_COMPLETE)
})

test_that("task_function_name", {
  ctx <- context_save(tempfile(),
                      storage_type = "environment")
  on.exit(unlink(ctx$root$path, recursive = TRUE))
  t1 <- task_save(quote(sin(1)), ctx)
  t2 <- task_save(quote(cos(1)), ctx)

  expect_equal(task_function_name(character(0), ctx),
               setNames(character(0), character(0)))
  expect_equal(task_function_name(c(t1, t2), ctx),
               setNames(c("sin", "cos"), c(t1, t2)))
  expect_equal(task_function_name(c(t1, t1), ctx),
               setNames(c("sin", "sin"), c(t1, t1)))
  expect_error(task_function_name(ids::random_id(), ctx),
               "not found")

  ## Names pass through:
  expect_equal(task_function_name(c(a = t1, b = t2), ctx),
               setNames(c("sin", "cos"), c("a", "b")))
})

test_that("task_exists", {
  ctx <- context_save(tempfile(),
                      storage_type = "environment")
  on.exit(unlink(ctx$root$path, recursive = TRUE))
  t1 <- task_save(quote(sin(1)), ctx)
  t2 <- task_save(quote(cos(1)), ctx)
  t3 <- ids::random_id()

  expect_equal(task_exists(character(0), ctx), logical(0))
  expect_equal(task_exists(t1, ctx), TRUE)
  expect_equal(task_exists(c(t1, t2), ctx), rep(TRUE, 2))
  expect_equal(task_exists(c(t1, t3, t2), ctx), c(TRUE, FALSE, TRUE))
})

test_that("invalid task", {
  ctx <- context_save(tempfile(),
                      storage_type = "environment")
  on.exit(unlink(ctx$root$path, recursive = TRUE))
  ## Not a great message:
  expect_error(task_save(sin(1), ctx),
               "expr must inherit from call")
  expect_error(task_save(pi, ctx),
               "expr must inherit from call")
  expect_error(task_save(quote(sin), ctx),
               "expr must inherit from call")
})



test_that("task run in external process", {
  path <- tempfile("cluster_")
  on.exit(cleanup(path))
  expr <- quote(list(x, y))
  ctx <- context_save(path)
  t <- task_save(quote(sin(1)), ctx)
  logfile <- tempfile()
  expect_null(task_run_external(path, ctx$id, t, logfile))
  expect_true(file.exists(logfile))
  expect_match(readLines(logfile), "[ start", fixed = TRUE, all = FALSE)
  expect_equal(task_result(t, ctx), sin(1))
})

test_that("task_reset (single)", {
  path <- tempfile("cluster_")
  on.exit(cleanup(path))

  ctx <- context_save(path, storage_type = "environment")

  now <- Sys.time()
  expr <- quote(sin(1))
  t <- task_save(expr, ctx)
  t2 <- task_save(expr, ctx)

  e <- new.env(parent = environment())
  ctx_run <- context_load(ctx, e)
  task_run(t, ctx_run)
  task_run(t2, ctx_run)
  expect_equal(ctx$db$get(t, "task_status"), "COMPLETE")
  expect_gt(ctx$db$get(t, "task_time_sub"), now)
  expect_equal(ctx$db$get(t, "task_results"), sin(1))

  later <- Sys.time()
  task_reset(t, ctx)

  expect_equal(ctx$db$get(t, "task_status"), "PENDING")
  expect_gt(ctx$db$get(t, "task_time_sub"), later)
  expect_equal(ctx$db$get(t, "task_time_beg"), NA)
  expect_equal(ctx$db$get(t, "task_time_end"), NA)
  expect_false(ctx$db$exists(t, "task_results"))

  expect_equal(ctx$db$get(t2, "task_status"), "COMPLETE")
  expect_lt(ctx$db$get(t2, "task_time_sub"), later)
  expect_equal(ctx$db$get(t2, "task_results"), sin(1))
})

test_that("task_reset (multiple)", {
  path <- tempfile("cluster_")
  on.exit(cleanup(path))

  ctx <- context_save(path, storage_type = "environment")

  now <- Sys.time()
  expr <- quote(sin(1))
  t1 <- task_save(expr, ctx)
  t2 <- task_save(expr, ctx)

  e <- new.env(parent = environment())
  ctx_run <- context_load(ctx, e)
  task_run(t1, ctx_run)
  task_run(t2, ctx_run)

  expect_equal(ctx$db$get(t1, "task_status"), "COMPLETE")
  expect_gt(ctx$db$get(t1, "task_time_sub"), now)
  expect_equal(ctx$db$get(t2, "task_status"), "COMPLETE")
  expect_gt(ctx$db$get(t2, "task_time_sub"), now)
  expect_equal(ctx$db$get(t1, "task_results"), sin(1))
  expect_equal(ctx$db$get(t2, "task_results"), sin(1))

  later <- Sys.time()
  task_reset(c(t1, t2), ctx)

  expect_equal(ctx$db$get(t1, "task_status"), "PENDING")
  expect_gt(ctx$db$get(t1, "task_time_sub"), later)
  expect_equal(ctx$db$get(t2, "task_status"), "PENDING")
  expect_gt(ctx$db$get(t2, "task_time_sub"), later)
  expect_false(ctx$db$exists(t1, "task_results"))
  expect_false(ctx$db$exists(t2, "task_results"))
})
dide-tools/context documentation built on June 4, 2023, 4:46 a.m.