tests/testthat/test-class_clustermq.R

tar_test("clustermq$workers", {
  skip_cran()
  out <- clustermq_init(list(), workers = 3L)
  expect_equal(out$workers, 3L)
})

tar_test("workerless deployment works", {
  skip_cran()
  skip_on_os("windows")
  skip_on_os("solaris")
  require_clustermq()
  skip_on_covr()
  tar_runtime$fun <- "tar_make_clustermq"
  on.exit(tar_runtime$fun <- NULL)
  x <- tar_target_raw("x", quote(1L), deployment = "main")
  y <- tar_target_raw("y", quote(x), deployment = "main")
  z <- tar_target_raw("z", quote(x + 1L), deployment = "main")
  pipeline <- pipeline_init(list(x, y, z))
  clustermq_init(pipeline)$run()
  expect_equal(target_read_value(x)$object, 1L)
  expect_equal(target_read_value(y)$object, 1L)
  expect_equal(target_read_value(z)$object, 2L)
  x <- tar_target_raw("x", quote(1L), deployment = "main")
  y <- tar_target_raw("y", quote(x), deployment = "main")
  z <- tar_target_raw("z", quote(x + 1L), deployment = "main")
  pipeline <- pipeline_init(list(x, y, z))
  out <- clustermq_init(pipeline)
  out$run()
  built <- names(out$scheduler$progress$built$envir)
  expect_equal(built, character(0))
})

tar_test("semi-workerless deployment works", {
  skip_cran()
  skip_on_os("windows")
  skip_on_os("solaris")
  require_clustermq()
  skip_on_covr()
  tar_runtime$fun <- "tar_make_clustermq"
  on.exit(tar_runtime$fun <- NULL)
  old <- getOption("clustermq.scheduler")
  options(clustermq.scheduler = "multicore")
  on.exit(options(clustermq.scheduler = old), add = TRUE)
  x <- tar_target_raw("x", quote(1L), deployment = "main")
  y <- tar_target_raw("y", quote(x), deployment = "worker")
  z <- tar_target_raw("z", quote(x + 1L), deployment = "main")
  pipeline <- pipeline_init(list(x, y, z))
  # https://github.com/mschubert/clustermq/issues/269
  suppressWarnings(clustermq_init(pipeline)$run())
  expect_equal(target_read_value(x)$object, 1L)
  expect_equal(tar_read(y), 1L)
  expect_equal(target_read_value(z)$object, 2L)
  x <- tar_target_raw("x", quote(1L), deployment = "main")
  y <- tar_target_raw("y", quote(x), deployment = "worker")
  z <- tar_target_raw("z", quote(x + 1L), deployment = "main")
  pipeline <- pipeline_init(list(x, y, z))
  out <- clustermq_init(pipeline)
  out$run()
  built <- names(out$scheduler$progress$built$envir)
  expect_equal(built, character(0))
})

tar_test("some targets up to date, some not", {
  skip_cran()
  skip_on_os("windows")
  skip_on_os("solaris")
  require_clustermq()
  skip_on_covr()
  old <- getOption("clustermq.scheduler")
  options(clustermq.scheduler = "multicore")
  tar_runtime$fun <- "tar_make_clustermq"
  on.exit(tar_runtime$fun <- NULL)
  on.exit(options(clustermq.scheduler = old), add = TRUE)
  x <- tar_target_raw("x", quote(1L))
  y <- tar_target_raw("y", quote(x))
  pipeline <- pipeline_init(list(x, y))
  local <- local_init(pipeline)
  local$run()
  x <- tar_target_raw("x", quote(1L))
  y <- tar_target_raw("y", quote(x + 1L))
  pipeline <- pipeline_init(list(x, y))
  # https://github.com/mschubert/clustermq/issues/269
  cmq <- clustermq_init(pipeline)
  suppressWarnings(cmq$run())
  out <- names(cmq$scheduler$progress$built$envir)
  expect_equal(out, "y")
  value <- target_read_value(pipeline_get_target(pipeline, "y"))
  expect_equal(value$object, 2L)
})

tar_test("clustermq algo can skip targets", {
  skip_cran()
  skip_on_os("windows")
  skip_on_os("solaris")
  require_clustermq()
  skip_on_covr()
  old <- getOption("clustermq.scheduler")
  options(clustermq.scheduler = "multicore")
  tar_runtime$fun <- "tar_make_clustermq"
  on.exit(tar_runtime$fun <- NULL)
  on.exit(options(clustermq.scheduler = old), add = TRUE)
  x <- tar_target_raw("x", quote(1L))
  y <- tar_target_raw("y", quote(x))
  pipeline <- pipeline_init(list(x, y))
  local <- local_init(pipeline)
  local$run()
  unlink(file.path("_targets", "objects", "x"))
  x <- tar_target_raw("x", quote(1L))
  y <- tar_target_raw("y", quote(x))
  pipeline <- pipeline_init(list(x, y))
  cmq <- clustermq_init(pipeline)
  # https://github.com/mschubert/clustermq/issues/269
  suppressWarnings(cmq$run())
  out <- names(cmq$scheduler$progress$built$envir)
  expect_equal(out, "x")
  expect_equal(tar_read(x), 1L)
})

tar_test("nontrivial common data", {
  skip_cran()
  skip_on_os("windows")
  skip_on_os("solaris")
  skip_on_covr()
  require_clustermq()
  old <- getOption("clustermq.scheduler")
  options(clustermq.scheduler = "multicore")
  tar_runtime$fun <- "tar_make_clustermq"
  on.exit(tar_runtime$fun <- NULL)
  old_envir <- tar_option_get("envir")
  envir <- new.env(parent = globalenv())
  tar_option_set(envir = envir)
  on.exit({
    options(clustermq.scheduler = old)
    tar_option_set(envir = old_envir)
  }, add = TRUE)
  evalq({
    f <- function(x) {
      g(x) + 1L
    }
    g <- function(x) {
      x + 1L
    }
  }, envir = envir)
  x <- tar_target_raw("x", quote(f(1L)))
  pipeline <- pipeline_init(list(x))
  cmq <- clustermq_init(pipeline)
  # https://github.com/mschubert/clustermq/issues/269
  suppressWarnings(cmq$run())
  value <- target_read_value(pipeline_get_target(pipeline, "x"))
  expect_equal(value$object, 3L)
})

tar_test("clustermq with a dynamic file", {
  skip_cran()
  skip_on_os("windows")
  skip_on_os("solaris")
  skip_on_covr()
  require_clustermq()
  old <- getOption("clustermq.scheduler")
  options(clustermq.scheduler = "multicore")
  tar_runtime$fun <- "tar_make_clustermq"
  on.exit(tar_runtime$fun <- NULL)
  old_envir <- tar_option_get("envir")
  envir <- new.env(parent = globalenv())
  on.exit({
    options(clustermq.scheduler = old)
    tar_option_set(envir = old_envir)
  }, add = TRUE)
  tar_option_set(envir = envir)
  evalq({
    save1 <- function() {
      file <- "saved.out"
      saveRDS(1L, file)
      file
    }
  }, envir = envir)
  x <- tar_target_raw("x", quote(save1()), format = "file")
  pipeline <- pipeline_init(list(x))
  cmq <- clustermq_init(pipeline)
  # https://github.com/mschubert/clustermq/issues/269
  suppressWarnings(cmq$run())
  out <- names(cmq$scheduler$progress$built$envir)
  expect_equal(out, "x")
  saveRDS(2L, pipeline_get_target(pipeline, "x")$store$file$path)
  x <- tar_target_raw("x", quote(save1()), format = "file")
  pipeline <- pipeline_init(list(x))
  cmq <- clustermq_init(pipeline)
  # https://github.com/mschubert/clustermq/issues/269
  suppressWarnings(cmq$run())
  out <- names(cmq$scheduler$progress$built$envir)
  expect_equal(out, "x")
})

tar_test("branching plan", {
  skip_cran()
  skip_on_os("windows")
  skip_on_os("solaris")
  require_clustermq()
  skip_on_covr()
  old <- getOption("clustermq.scheduler")
  options(clustermq.scheduler = "multicore")
  tar_runtime$fun <- "tar_make_clustermq"
  on.exit(tar_runtime$fun <- NULL)
  on.exit(options(clustermq.scheduler = old), add = TRUE)
  pipeline <- pipeline_map()
  out <- clustermq_init(pipeline, workers = 2L)
  # https://github.com/mschubert/clustermq/issues/269
  suppressWarnings(out$run())
  skipped <- names(out$scheduler$progress$skipped$envir)
  expect_equal(skipped, character(0))
  out2 <- clustermq_init(pipeline_map(), workers = 2L)
  # https://github.com/mschubert/clustermq/issues/269
  suppressWarnings(out2$run())
  built <- names(out2$scheduler$progress$built$envir)
  expect_equal(built, character(0))
  value <- function(name) {
    target_read_value(pipeline_get_target(pipeline, name))$object
  }
  expect_equal(value("data0"), 2L)
  expect_equal(value("data1"), seq_len(3L))
  expect_equal(value("data2"), seq_len(3L) + 3L)
  branches <- target_get_children(pipeline_get_target(pipeline, "map1"))
  for (index in seq_along(branches)) {
    expect_equal(value(branches[index]), index + 2L)
  }
  branches <- target_get_children(pipeline_get_target(pipeline, "map2"))
  for (index in seq_along(branches)) {
    expect_equal(value(branches[index]), 2L * index + 3L)
  }
  branches <- target_get_children(pipeline_get_target(pipeline, "map3"))
  for (index in seq_along(branches)) {
    expect_equal(value(branches[index]), index + 3L)
  }
  branches <- target_get_children(pipeline_get_target(pipeline, "map4"))
  for (index in seq_along(branches)) {
    expect_equal(value(branches[index]), 3L * index + 5L)
  }
  branches <- target_get_children(pipeline_get_target(pipeline, "map5"))
  for (index in seq_along(branches)) {
    expect_equal(value(branches[index]), 2L * index + 5L)
  }
  branches <- target_get_children(pipeline_get_target(pipeline, "map6"))
  for (index in seq_along(branches)) {
    expect_equal(value(branches[index]), index + 15L)
  }
})

tar_test("cover the worker shutdown step in clustermq$iterate() event loop", {
  skip_cran()
  skip_on_os("windows")
  skip_on_os("solaris")
  require_clustermq()
  skip_on_covr()
  old <- getOption("clustermq.scheduler")
  options(clustermq.scheduler = "multicore")
  tar_runtime$fun <- "tar_make_clustermq"
  on.exit(tar_runtime$fun <- NULL)
  on.exit(options(clustermq.scheduler = old), add = TRUE)
  targets <- list(
    target_init("x1", quote(1)),
    target_init("x2", quote(x1)),
    target_init("x3", quote(x2)),
    target_init("x4", quote(x3))
  )
  pipeline <- pipeline_init(targets)
  out <- clustermq_init(pipeline, workers = 2L)
  # https://github.com/mschubert/clustermq/issues/269
  suppressWarnings(out$run())
  expect_equal(tar_read(x4), 1L)
})

tar_test("clustermq$validate()", {
  skip_cran()
  out <- clustermq_init(pipeline_init())
  expect_silent(out$validate())
})

Try the targets package in your browser

Any scripts or data that you put into this service are public.

targets documentation built on Oct. 12, 2023, 5:07 p.m.