Nothing
tar_test("packages are actually loaded", {
skip_on_cran()
skip_if_not_installed("clustermq")
tar_runtime$fun <- "tar_make_clustermq"
on.exit(tar_runtime$fun <- NULL)
options(clustermq.scheduler = "multiprocess")
tar_option_set(envir = environment())
x <- tar_target_raw(
"x",
quote(tibble(x = "x")),
packages = "tibble"
)
pipeline <- pipeline_init(list(x))
out <- clustermq_init(pipeline)
suppressWarnings(out$run())
exp <- tibble::tibble(x = "x")
expect_equal(tar_read(x), exp)
})
tar_test("clustermq iteration loop can wait for and shut down workers", {
skip_on_os("windows")
skip_if_not_installed("clustermq")
tar_runtime$fun <- "tar_make_clustermq"
on.exit(tar_runtime$fun <- NULL)
on.exit(options(clustermq.scheduler = old), add = TRUE)
old <- getOption("clustermq.scheduler")
options(clustermq.scheduler = "multiprocess")
x <- tar_target_raw("x", quote(Sys.sleep(2)), garbage_collection = TRUE)
y <- tar_target_raw("y", quote(list(x, a = "x")), garbage_collection = TRUE)
pipeline <- pipeline_init(list(x, y))
out <- clustermq_init(pipeline, reporter = "silent")
suppressWarnings(out$run())
target <- pipeline_get_target(pipeline, "y")
expect_equal(target_read_value(target)$object$a, "x")
})
tar_test("nontrivial common data with multiprocess", {
skip_on_cran()
skip_if_not_installed("clustermq")
tar_script({
options(clustermq.scheduler = "multiprocess")
envir <- new.env(parent = globalenv())
evalq({
f <- function(x) {
g(x) + 1L
}
g <- function(x) {
x + 1L
}
}, envir = envir)
tar_option_set(envir = envir)
list(
tar_target(x, 1),
tar_target(y, f(x))
)
})
tar_make_clustermq()
expect_equal(tar_read(y), 3L)
})
tar_test("nontrivial globals with global environment", {
skip_on_cran()
skip_if_not_installed("clustermq")
tar_script({
options(clustermq.scheduler = "multiprocess")
f <- function(x) {
g(x) + 1L
}
g <- function(x) {
x + 1L
}
list(
tar_target(x, 1),
tar_target(y, f(x))
)
})
tar_make_clustermq()
expect_equal(tar_read(y), 3L)
})
tar_test("prevent high-memory data via target objects", {
# Run this test once inside tar_test() (test environment)
# and once outside tar_test() global environment.
skip_on_cran()
skip_if_not_installed("clustermq")
tar_runtime$fun <- "tar_make_clustermq"
on.exit(tar_runtime$fun <- NULL)
options(clustermq.scheduler = "multiprocess")
t <- list(tar_target(x, runif(1e7), deployment = "main", format = "qs"))
pipeline <- pipeline_init(list(t[[1]], tar_target(y, x)))
algo <- clustermq_init(pipeline)
debug(algo$start_workers)
tar_option_set(envir = environment())
# should enter a debugger:
algo$run()
# In the debugger verify that the exported data is much smaller than
# the value of x because we cloned the target objects in pipeline_init().
o <- self$produce_exports(envir, path_store_default())
# Exported data should be small:
pryr::object_size(o)
# The target object should not be in the environment.
expect_true(inherits(tar_option_get("envir")$t[[1]], "tar_target"))
pryr::object_size(tar_option_get("envir")$t[[1]])
# The pipeline's copy of the target object should be much larger:
pryr::object_size(pipeline_get_target(self$pipeline, "x")$value$object)
# The algorithm object itself should be large too, and it is not exported.
pryr::object_size(self)
})
tar_test("heavily parallel workload should run fast", {
skip_on_cran()
skip_if_not_installed("clustermq")
tar_script({
library(targets)
options(clustermq.scheduler = "multicore")
list(
tar_target(
index_batch,
seq_len(100),
),
tar_target(
data_continuous,
index_batch,
pattern = map(index_batch)
),
tar_target(
data_discrete,
index_batch,
pattern = map(index_batch)
),
tar_target(
fit_continuous,
data_continuous,
pattern = map(data_continuous)
),
tar_target(
fit_discrete,
data_discrete,
pattern = map(data_discrete)
)
)
})
tar_make_clustermq(workers = 4)
expect_equal(tar_outdated(callr_function = NULL), character(0))
})
tar_test("profile heavily parallel workload", {
skip_on_cran()
skip_if_not_installed("clustermq")
tar_script({
library(targets)
options(clustermq.scheduler = "multicore")
list(
tar_target(
index_batch,
seq_len(100),
),
tar_target(
data_continuous,
index_batch,
pattern = map(index_batch)
),
tar_target(
data_discrete,
index_batch,
pattern = map(index_batch)
),
tar_target(
fit_continuous,
data_continuous,
pattern = map(data_continuous)
),
tar_target(
fit_discrete,
data_discrete,
pattern = map(data_discrete)
)
)
})
# Should deploy targets in a timely manner.
proffer::pprof(tar_make_clustermq(workers = 4, callr_function = NULL))
expect_equal(tar_outdated(callr_function = NULL), character(0))
expect_equal(unname(tar_read(fit_continuous)), seq_len(100))
expect_equal(unname(tar_read(fit_discrete)), seq_len(100))
})
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.