#' @title Superseded. Run a pipeline with persistent `clustermq` workers.
#' @export
#' @family pipeline
#' @description Superseded. Use [tar_make()] with `crew`:
#' <https://books.ropensci.org/targets/crew.html>.
#' @details `tar_make_clustermq()` is like [tar_make()] except that targets
#' run in parallel on persistent workers. A persistent worker is an
#' R process that runs for a long time and runs multiple
#' targets during its lifecycle. Persistent
#' workers launch as soon as the pipeline reaches an outdated
#' target with `deployment = "worker"`, and they keep running
#' until the pipeline starts to wind down.
#'
#' To configure `tar_make_clustermq()`, you must configure
#' the `clustermq` package. To do this, set global options
#' `clustermq.scheduler` and `clustermq.template`
#' inside the target script file (default: `_targets.R`).
#' To read more about configuring `clustermq` for your scheduler, visit
#' <https://mschubert.github.io/clustermq/articles/userguide.html#configuration> # nolint
#' or <https://books.ropensci.org/targets/hpc.html>.
#' `clustermq` is not a strict dependency of `targets`,
#' so you must install `clustermq` yourself.
#' @inheritSection tar_meta Storage access
#' @return `NULL` except if `callr_function = callr::r_bg()`, in which case
#' a handle to the `callr` background process is returned. Either way,
#' the value is invisibly returned.
#' @inheritParams tar_make_future
#' @param workers Positive integer, number of persistent `clustermq` workers
#' to create.
#' @param log_worker Logical, whether to write a log file for each worker.
#' Same as the `log_worker` argument of `clustermq::Q()`
#' and `clustermq::workers()`.
#' @examples
#' if (!identical(tolower(Sys.info()[["sysname"]]), "windows")) {
#' if (identical(Sys.getenv("TAR_EXAMPLES"), "true")) { # for CRAN
#' tar_dir({ # tar_dir() runs code from a temp dir for CRAN.
#' tar_script({
#' library(targets)
#' library(tarchetypes)
#' options(clustermq.scheduler = "multiprocess") # Does not work on Windows.
#' tar_option_set()
#' list(tar_target(x, 1 + 1))
#' }, ask = FALSE)
#' tar_make_clustermq()
#' })
#' }
#' }
tar_make_clustermq <- function(
names = NULL,
shortcut = targets::tar_config_get("shortcut"),
reporter = targets::tar_config_get("reporter_make"),
seconds_meta_append = targets::tar_config_get("seconds_meta_append"),
seconds_meta_upload = targets::tar_config_get("seconds_meta_upload"),
seconds_reporter = targets::tar_config_get("seconds_reporter"),
seconds_interval = targets::tar_config_get("seconds_interval"),
workers = targets::tar_config_get("workers"),
log_worker = FALSE,
callr_function = callr::r,
callr_arguments = targets::tar_callr_args_default(callr_function, reporter),
envir = parent.frame(),
script = targets::tar_config_get("script"),
store = targets::tar_config_get("store"),
garbage_collection = NULL
) {
# Need to suppress tests on covr only, due to
# https://github.com/r-lib/covr/issues/315.
# Cannot use multicore clustermq backend
# due to https://github.com/ropensci/targets/discussions/780
# nocov start
tar_assert_allow_meta("tar_make_clustermq", store)
force(envir)
tar_assert_package("clustermq (>= 0.9.2)")
tar_assert_scalar(shortcut)
tar_assert_lgl(shortcut)
tar_assert_scalar(workers)
tar_assert_dbl(workers)
tar_assert_ge(workers, 1)
tar_assert_callr_function(callr_function)
tar_assert_list(callr_arguments)
tar_assert_dbl(seconds_meta_append)
tar_assert_scalar(seconds_meta_append)
tar_assert_none_na(seconds_meta_append)
tar_assert_ge(seconds_meta_append, 0)
tar_assert_dbl(seconds_meta_upload)
tar_assert_scalar(seconds_meta_upload)
tar_assert_none_na(seconds_meta_upload)
tar_assert_ge(seconds_meta_upload, 0)
tar_deprecate_seconds_interval(seconds_interval)
if_any(
is.null(garbage_collection),
NULL,
tar_warn_deprecate(
"The garbage_collection argument of tar_make() was deprecated ",
"in targets version 1.8.0.9004 (2024-10-22). The garbage_collection ",
"argument of tar_option_set() is more unified and featureful now. ",
"Please have a look at its documentation."
)
)
reporter <- tar_make_reporter(reporter)
targets_arguments <- list(
path_store = store,
names_quosure = rlang::enquo(names),
shortcut = shortcut,
reporter = reporter,
seconds_meta_append = seconds_meta_append,
seconds_meta_upload = seconds_meta_upload,
workers = workers,
log_worker = log_worker
)
out <- callr_outer(
targets_function = tar_make_clustermq_inner,
targets_arguments = targets_arguments,
callr_function = callr_function,
callr_arguments = callr_arguments,
envir = envir,
script = script,
store = store,
fun = "tar_make_clustermq"
)
invisible(out)
}
tar_make_clustermq_inner <- function(
pipeline,
path_store,
names_quosure,
shortcut,
reporter,
seconds_meta_append,
seconds_meta_upload,
workers,
log_worker
) {
names <- tar_tidyselect_eval(names_quosure, pipeline_get_names(pipeline))
clustermq_init(
pipeline = pipeline,
meta = meta_init(path_store = path_store),
names = names,
shortcut = shortcut,
queue = "parallel",
reporter = reporter,
seconds_meta_append = seconds_meta_append,
seconds_meta_upload = seconds_meta_upload,
envir = tar_option_get("envir"),
workers = workers,
log_worker = log_worker
)$run()
invisible()
}
# nocov end
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.