R/tar_make.R

Defines functions tar_make_as_job tar_make_inner tar_make

Documented in tar_make

#' @title Run a pipeline of targets.
#' @export
#' @family pipeline
#' @description Run the pipeline you defined in the targets
#'   script file (default: `_targets.R`). `tar_make()`
#'   runs the correct targets in the correct order and stores the return
#'   values in `_targets/objects/`. Use [tar_read()] to read a target
#'   back into R, and see
#'   <https://docs.ropensci.org/targets/reference/index.html#clean>
#'   to manage output files.
#' @inheritSection tar_meta Storage access
#' @return `NULL` except if `callr_function = callr::r_bg()`, in which case
#'   a handle to the `callr` background process is returned. Either way,
#'   the value is invisibly returned.
#' @inheritParams tar_validate
#' @param names Names of the targets to run or check. Set to `NULL` to
#'   check/run all the targets (default).
#'   The object supplied to `names` should be a
#'   `tidyselect` expression like [any_of()] or [starts_with()]
#'   from `tidyselect` itself, or [tar_described_as()] to select target names
#'   based on their descriptions.
#' @param shortcut Logical of length 1, how to interpret the `names` argument.
#'   If `shortcut` is `FALSE` (default) then the function checks
#'   all targets upstream of `names` as far back as the dependency graph goes.
#'   `shortcut = TRUE` increases speed if there are a lot of
#'   up-to-date targets, but it assumes all the dependencies
#'   are up to date, so please use with caution.
#'   It relies on stored metadata for information about upstream dependencies.
#'   `shortcut = TRUE` only works if you set `names`.
#' @param reporter Character of length 1, name of the reporter to user.
#'   Controls how messages are printed as targets run in the pipeline.
#'   Defaults to `tar_config_get("reporter_make")`. Choices:
#'   * `"silent"`: print nothing.
#'   * `"summary"`: print a running total of the number of each targets in
#'     each status category (queued, dispatched, skipped, completed, canceled,
#'     or errored). Also show a timestamp (`"%H:%M %OS2"` `strptime()` format)
#'     of the last time the progress changed and printed to the screen.
#'   * `"timestamp"`: same as the `"verbose"` reporter except that each
#'     .message begins with a time stamp.
#'   * `"timestamp_positives"`: same as the `"timestamp"` reporter
#'     except without messages for skipped targets.
#'   * `"verbose"`: print messages for individual targets
#'     as they start, finish, or are skipped. Each individual
#'     target-specific time (e.g. "3.487 seconds") is strictly the
#'     elapsed runtime of the target and does not include
#'     steps like data retrieval and output storage.
#'   * `"verbose_positives"`: same as the `"verbose"` reporter
#'     except without messages for skipped targets.
#' @param seconds_interval Deprecated on 2023-08-24 (version 1.2.2.9001).
#'   Use `seconds_meta_append`, `seconds_meta_upload`,
#'   and `seconds_reporter` instead.
#' @param seconds_meta_append Positive numeric of length 1 with the minimum
#'   number of seconds between saves to the local metadata and progress files
#'   in the data store.
#'   Higher values generally make the pipeline run faster, but unsaved
#'   work (in the event of a crash) is not up to date.
#'   When the pipeline ends,
#'   all the metadata and progress data is saved immediately,
#'   regardless of `seconds_meta_append`.
#' @param seconds_meta_upload Positive numeric of length 1 with the minimum
#'   number of seconds between uploads of the metadata and progress data
#'   to the cloud
#'   (see <https://books.ropensci.org/targets/cloud-storage.html>).
#'   Higher values generally make the pipeline run faster, but unsaved
#'   work (in the event of a crash) may not be backed up to the cloud.
#'   When the pipeline ends,
#'   all the metadata and progress data is uploaded immediately,
#'   regardless of `seconds_meta_upload`.
#' @param seconds_reporter Positive numeric of length 1 with the minimum
#'   number of seconds between times when the reporter prints progress
#'   messages to the R console.
#' @param garbage_collection Logical of length 1. For a `crew`-integrated
#'   pipeline, whether to run garbage collection on the main process
#'   before sending a target
#'   to a worker. Ignored if `tar_option_get("controller")` is `NULL`.
#'   Independent from the `garbage_collection` argument of [tar_target()],
#'   which controls garbage collection on the worker.
#' @param use_crew Logical of length 1, whether to use `crew` if the
#'   `controller` option is set in `tar_option_set()` in the target script
#'   (`_targets.R`). See <https://books.ropensci.org/targets/crew.html>
#'   for details.
#' @param terminate_controller Logical of length 1. For a `crew`-integrated
#'   pipeline, whether to terminate the controller after stopping
#'   or finishing the pipeline. This should almost always be set to `TRUE`,
#'   but `FALSE` combined with `callr_function = NULL`
#'   will allow you to get the running controller using
#'   `tar_option_get("controller")` for debugging purposes.
#'   For example, `tar_option_get("controller")$summary()` produces a
#'   worker-by-worker summary of the work assigned and completed,
#'   `tar_option_get("controller")$queue` is the list of unresolved tasks,
#'   and `tar_option_get("controller")$results` is the list of
#'   tasks that completed but were not collected with `pop()`.
#'   You can manually terminate the controller with
#'   `tar_option_get("controller")$summary()` to close down the dispatcher
#'   and worker processes.
#' @param as_job `TRUE` to run as an RStudio IDE / Posit Workbench job,
#'   if running on RStudio IDE / Posit Workbench.
#'   `FALSE` to run as a `callr` process in the main R session
#'   (depending on the `callr_function` argument).
#'   If `as_job` is `TRUE`, then the `rstudioapi` package must be installed.
#' @examples
#' if (identical(Sys.getenv("TAR_EXAMPLES"), "true")) { # for CRAN
#' tar_dir({ # tar_dir() runs code from a temp dir for CRAN.
#' tar_script({
#'   library(targets)
#'   library(tarchetypes)
#'   list(
#'     tar_target(y1, 1 + 1),
#'     tar_target(y2, 1 + 1),
#'     tar_target(z, y1 + y2)
#'   )
#' }, ask = FALSE)
#' tar_make(starts_with("y")) # Only processes y1 and y2.
#' # Distributed computing with crew:
#' if (requireNamespace("crew", quietly = TRUE)) {
#' tar_script({
#'   library(targets)
#'   library(tarchetypes)
#'   tar_option_set(controller = crew::controller_local())
#'   list(
#'     tar_target(y1, 1 + 1),
#'     tar_target(y2, 1 + 1),
#'     tar_target(z, y1 + y2)
#'   )
#' }, ask = FALSE)
#' tar_make()
#' }
#' })
#' }
tar_make <- function(
  names = NULL,
  shortcut = targets::tar_config_get("shortcut"),
  reporter = targets::tar_config_get("reporter_make"),
  seconds_meta_append = targets::tar_config_get("seconds_meta_append"),
  seconds_meta_upload = targets::tar_config_get("seconds_meta_upload"),
  seconds_reporter = targets::tar_config_get("seconds_reporter"),
  seconds_interval = targets::tar_config_get("seconds_interval"),
  callr_function = callr::r,
  callr_arguments = targets::tar_callr_args_default(callr_function, reporter),
  envir = parent.frame(),
  script = targets::tar_config_get("script"),
  store = targets::tar_config_get("store"),
  garbage_collection = targets::tar_config_get("garbage_collection"),
  use_crew = targets::tar_config_get("use_crew"),
  terminate_controller = TRUE,
  as_job = targets::tar_config_get("as_job")
) {
  tar_assert_allow_meta("tar_make", store)
  force(envir)
  tar_assert_scalar(shortcut)
  tar_assert_lgl(shortcut)
  tar_assert_flag(reporter, tar_reporters_make())
  tar_assert_callr_function(callr_function)
  tar_assert_list(callr_arguments)
  tar_assert_dbl(seconds_meta_append)
  tar_assert_scalar(seconds_meta_append)
  tar_assert_none_na(seconds_meta_append)
  tar_assert_ge(seconds_meta_append, 0)
  tar_assert_dbl(seconds_meta_upload)
  tar_assert_scalar(seconds_meta_upload)
  tar_assert_none_na(seconds_meta_upload)
  tar_assert_ge(seconds_meta_upload, 0)
  tar_assert_dbl(seconds_reporter)
  tar_assert_scalar(seconds_reporter)
  tar_assert_none_na(seconds_reporter)
  tar_assert_ge(seconds_reporter, 0)
  tar_deprecate_seconds_interval(seconds_interval)
  tar_assert_lgl(garbage_collection)
  tar_assert_scalar(garbage_collection)
  tar_assert_none_na(garbage_collection)
  tar_assert_lgl(terminate_controller)
  tar_assert_scalar(terminate_controller)
  tar_assert_none_na(terminate_controller)
  tar_assert_lgl(as_job)
  tar_assert_scalar(as_job)
  tar_assert_none_na(as_job)
  # Tested in tests/interactive/test-job.R.
  # nocov start
  if (as_job && !rstudio_available(verbose = reporter != "silent")) {
    as_job <- FALSE
  }
  if (as_job) {
    call <- match.call()
    tar_make_as_job(call = call)
    return(invisible())
  }
  # nocov end
  targets_arguments <- list(
    path_store = store,
    names_quosure = rlang::enquo(names),
    shortcut = shortcut,
    reporter = reporter,
    seconds_meta_append = seconds_meta_append,
    seconds_meta_upload = seconds_meta_upload,
    seconds_reporter = seconds_reporter,
    garbage_collection = garbage_collection,
    use_crew = use_crew,
    terminate_controller = terminate_controller
  )
  out <- callr_outer(
    targets_function = tar_make_inner,
    targets_arguments = targets_arguments,
    callr_function = callr_function,
    callr_arguments = callr_arguments,
    envir = envir,
    script = script,
    store = store,
    fun = "tar_make"
  )
  invisible(out)
}

tar_make_inner <- function(
  pipeline,
  path_store,
  names_quosure,
  shortcut,
  reporter,
  seconds_meta_append,
  seconds_meta_upload,
  seconds_reporter,
  garbage_collection,
  use_crew,
  terminate_controller
) {
  names <- tar_tidyselect_eval(names_quosure, pipeline_get_names(pipeline))
  controller <- tar_option_get("controller")
  if (is.null(controller) || (!use_crew)) {
    pipeline_reset_deployments(pipeline)
    queue <- if_any(
      pipeline_uses_priorities(pipeline),
      "parallel",
      "sequential"
    )
    local_init(
      pipeline = pipeline,
      meta = meta_init(path_store = path_store),
      names = names,
      shortcut = shortcut,
      queue = queue,
      reporter = reporter,
      seconds_meta_append = seconds_meta_append,
      seconds_meta_upload = seconds_meta_upload,
      seconds_reporter = seconds_reporter,
      envir = tar_option_get("envir")
    )$run()
  } else {
    tar_assert_package("crew (>= 0.9.0)")
    crew_init(
      pipeline = pipeline,
      meta = meta_init(path_store = path_store),
      names = names,
      shortcut = shortcut,
      queue = "parallel",
      reporter = reporter,
      seconds_meta_append = seconds_meta_append,
      seconds_meta_upload = seconds_meta_upload,
      seconds_reporter = seconds_reporter,
      garbage_collection = garbage_collection,
      envir = tar_option_get("envir"),
      controller = controller,
      terminate_controller = terminate_controller
    )$run()
  }
  invisible()
}

# Tested in tests/interactive/test-job.R.
# nocov start
tar_make_as_job <- function(call) {
  args <- as.list(call)[-1L]
  args$as_job <- FALSE
  args$callr_function <- NULL
  args <- paste(names(args), "=", map_chr(args, tar_deparse_safe))
  args <- c(args, "callr_function = NULL")
  args <- paste0(args, collapse = ", ")
  text <- sprintf("targets::tar_make(%s)", args)
  script <- tempfile()
  writeLines(text, script)
  rstudioapi::jobRunScript(
    path = script,
    name = paste("targets", Sys.time()),
    workingDir = getwd(),
    importEnv = FALSE,
    exportEnv = ""
  )
}
# nocov end

Try the targets package in your browser

Any scripts or data that you put into this service are public.

targets documentation built on Oct. 3, 2024, 1:11 a.m.