tests/testthat/test-class_crew.R

tar_test("crew$validate()", {
  skip_if_not_installed("crew", minimum_version = "0.6.0")
  controller <- crew::crew_controller_local(
    host = "127.0.0.1",
    seconds_interval = 0.5
  )
  out <- crew_init(pipeline_init(), controller = controller)
  expect_silent(out$validate())
})

tar_test("crew database subkey", {
  out <- database_crew(path_store_default())
  expect_equal(out$key, file.path(path_store_default(), "meta", "crew"))
})

tar_test("workerless deployment works", {
  skip_on_os("solaris")
  skip_if_not_installed("crew", minimum_version = "0.6.0")
  skip_if_not_installed("R.utils")
  tar_runtime$fun <- "tar_make"
  tar_option_set(backoff = tar_backoff(min = 0.5, max = 0.5))
  x <- tar_target_raw(
    "x",
    quote(1L),
    deployment = "main",
    memory = "transient",
    garbage_collection = TRUE
  )
  y <- tar_target_raw(
    "y",
    quote(x),
    deployment = "main",
    memory = "transient",
    garbage_collection = TRUE
  )
  z <- tar_target_raw(
    "z",
    quote(x + 1L),
    deployment = "main",
    memory = "transient",
    garbage_collection = TRUE
  )
  pipeline <- pipeline_init(list(x, y, z))
  controller <- crew::crew_controller_local(
    host = "127.0.0.1",
    seconds_interval = 0.5
  )
  R.utils::withTimeout(
    crew_init(pipeline, controller = controller)$run(),
    timeout = 360
  )
  controller$terminate()
  gc()
  crew_test_sleep()
  expect_equal(target_read_value(x)$object, 1L)
  expect_equal(target_read_value(y)$object, 1L)
  expect_equal(target_read_value(z)$object, 2L)
  x <- tar_target_raw(
    "x",
    quote(1L),
    deployment = "main",
    memory = "transient",
    garbage_collection = TRUE
  )
  y <- tar_target_raw(
    "y",
    quote(x),
    deployment = "main",
    memory = "transient",
    garbage_collection = TRUE
  )
  z <- tar_target_raw(
    "z",
    quote(x + 1L),
    deployment = "main",
    memory = "transient",
    garbage_collection = TRUE
  )
  pipeline <- pipeline_init(list(x, y, z))
  controller <- crew::crew_controller_local(
    host = "127.0.0.1",
    seconds_interval = 0.5
  )
  out <- crew_init(pipeline, controller = controller)
  on.exit({
    tar_runtime$fun <- NULL
    controller$terminate()
    rm(controller)
    gc()
    crew_test_sleep()
  })
  R.utils::withTimeout(out$run(), timeout = 360)
  built <- names(out$scheduler$progress$built$envir)
  expect_equal(built, character(0))
})

tar_test("semi-workerless deployment works", {
  skip_cran()
  skip_on_os("windows")
  skip_on_os("solaris")
  skip_if_not_installed("crew", minimum_version = "0.6.0")
  skip_if_not_installed("R.utils")
  crew_test_sleep()
  tar_runtime$fun <- "tar_make"
  tar_option_set(backoff = tar_backoff(min = 0.5, max = 0.5))
  x <- tar_target_raw(
    "x",
    quote(1L),
    deployment = "main",
    memory = "transient",
    garbage_collection = TRUE
  )
  y <- tar_target_raw(
    "y",
    quote(x),
    deployment = "worker",
    memory = "transient",
    garbage_collection = TRUE
  )
  z <- tar_target_raw(
    "z",
    quote(x + 1L),
    deployment = "main",
    memory = "transient",
    garbage_collection = TRUE
  )
  pipeline <- pipeline_init(list(x, y, z))
  controller <- crew::crew_controller_local(
    host = "127.0.0.1",
    seconds_interval = 0.5
  )
  R.utils::withTimeout(
    crew_init(pipeline, controller = controller)$run(),
    timeout = 360
  )
  controller$terminate()
  rm(controller)
  gc()
  crew_test_sleep()
  expect_equal(target_read_value(x)$object, 1L)
  expect_equal(tar_read(y), 1L)
  expect_equal(target_read_value(z)$object, 2L)
  x <- tar_target_raw(
    "x",
    quote(1L),
    deployment = "main",
    memory = "transient",
    garbage_collection = TRUE
  )
  y <- tar_target_raw(
    "y",
    quote(x),
    deployment = "worker",
    memory = "transient",
    garbage_collection = TRUE
  )
  z <- tar_target_raw(
    "z",
    quote(x + 1L),
    deployment = "main",
    memory = "transient",
    garbage_collection = TRUE
  )
  pipeline <- pipeline_init(list(x, y, z))
  controller <- crew::crew_controller_local(
    host = "127.0.0.1",
    seconds_interval = 0.5
  )
  on.exit({
    tar_runtime$fun <- NULL
    controller$terminate()
    rm(controller)
    gc()
    crew_test_sleep()
  })
  out <- crew_init(pipeline, controller = controller)
  R.utils::withTimeout(out$run(), timeout = 360)
  built <- names(out$scheduler$progress$built$envir)
  expect_equal(built, character(0))
})

tar_test("some targets up to date, some not", {
  skip_cran()
  skip_on_os("windows")
  skip_on_os("solaris")
  skip_if_not_installed("crew", minimum_version = "0.6.0")
  skip_if_not_installed("R.utils")
  tar_runtime$fun <- "tar_make"
  tar_option_set(backoff = tar_backoff(min = 0.5, max = 0.5))
  x <- tar_target_raw(
    "x",
    quote(1L),
    memory = "transient",
    garbage_collection = TRUE
  )
  y <- tar_target_raw(
    "y",
    quote(x),
    memory = "transient",
    garbage_collection = TRUE
  )
  pipeline <- pipeline_init(list(x, y))
  local <- local_init(pipeline)
  R.utils::withTimeout(local$run(), timeout = 360)
  x <- tar_target_raw(
    "x",
    quote(1L),
    memory = "transient",
    garbage_collection = TRUE
  )
  y <- tar_target_raw(
    "y",
    quote(x + 1L),
    memory = "transient",
    garbage_collection = TRUE
  )
  pipeline <- pipeline_init(list(x, y))
  controller <- crew::crew_controller_local(
    host = "127.0.0.1",
    seconds_interval = 0.5
  )
  on.exit({
    tar_runtime$fun <- NULL
    controller$terminate()
    rm(controller)
    gc()
    crew_test_sleep()
  })
  algo <- crew_init(pipeline, controller = controller)
  R.utils::withTimeout(algo$run(), timeout = 360)
  out <- names(algo$scheduler$progress$built$envir)
  expect_equal(out, "y")
  value <- target_read_value(pipeline_get_target(pipeline, "y"))
  expect_equal(value$object, 2L)
})

tar_test("crew algo can skip targets", {
  skip_cran()
  skip_on_os("windows")
  skip_on_os("solaris")
  skip_if_not_installed("crew", minimum_version = "0.6.0")
  skip_if_not_installed("R.utils")
  tar_runtime$fun <- "tar_make"
  tar_option_set(backoff = tar_backoff(min = 0.5, max = 0.5))
  x <- tar_target_raw(
    "x",
    quote(1L),
    memory = "transient",
    garbage_collection = TRUE
  )
  y <- tar_target_raw(
    "y",
    quote(x),
    memory = "transient",
    garbage_collection = TRUE
  )
  pipeline <- pipeline_init(list(x, y))
  local <- local_init(pipeline)
  R.utils::withTimeout(local$run(), timeout = 360)
  unlink(file.path("_targets", "objects", "x"))
  x <- tar_target_raw(
    "x",
    quote(1L),
    memory = "transient",
    garbage_collection = TRUE
  )
  y <- tar_target_raw(
    "y",
    quote(x),
    memory = "transient",
    garbage_collection = TRUE
  )
  pipeline <- pipeline_init(list(x, y))
  controller <- crew::crew_controller_local(
    host = "127.0.0.1",
    seconds_interval = 0.5
  )
  on.exit({
    tar_runtime$fun <- NULL
    controller$terminate()
    rm(controller)
    gc()
    crew_test_sleep()
  })
  algo <- crew_init(pipeline, controller = controller)
  R.utils::withTimeout(algo$run(), timeout = 360)
  out <- names(algo$scheduler$progress$built$envir)
  expect_equal(out, "x")
  expect_equal(tar_read(x), 1L)
})

tar_test("nontrivial common data", {
  skip_cran()
  skip_on_os("windows")
  skip_on_os("solaris")
  skip_if_not_installed("crew", minimum_version = "0.6.0")
  skip_if_not_installed("R.utils")
  tar_runtime$fun <- "tar_make"
  tar_option_set(backoff = tar_backoff(min = 0.5, max = 0.5))
  old_envir <- tar_option_get("envir")
  envir <- new.env(parent = globalenv())
  tar_option_set(envir = envir)
  evalq({
    f <- function(x) {
      g(x) + 1L
    }
    g <- function(x) {
      x + 1L
    }
  }, envir = envir)
  x <- tar_target_raw(
    "x",
    quote(f(1L)),
    memory = "transient",
    garbage_collection = TRUE
  )
  pipeline <- pipeline_init(list(x))
  controller <- crew::crew_controller_local(
    host = "127.0.0.1",
    seconds_interval = 0.5
  )
  on.exit({
    tar_option_set(envir = old_envir)
    tar_runtime$fun <- NULL
    controller$terminate()
    rm(controller)
    gc()
    crew_test_sleep()
  })
  algo <- crew_init(pipeline, controller = controller)
  R.utils::withTimeout(algo$run(), timeout = 360)
  value <- target_read_value(pipeline_get_target(pipeline, "x"))
  expect_equal(value$object, 3L)
})

Try the targets package in your browser

Any scripts or data that you put into this service are public.

targets documentation built on Oct. 12, 2023, 5:07 p.m.