R/crew_launcher_local.R

Defines functions crew_launcher_local

Documented in crew_launcher_local

#' @title Create a launcher with local process workers.
#' @export
#' @family plugin_local
#' @description Create an `R6` object to launch and maintain
#'   local process workers.
#' @inheritParams crew_launcher
#' @param options_local An object generated by [crew_options_local()]
#'   with options specific to the local controller.
#' @param local_log_directory Deprecated on 2024-10-08. Use
#'   `options_local` instead.
#' @param local_log_join Deprecated on 2024-10-08. Use
#'   `options_local` instead.
#' @examples
#' if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
#' client <- crew_client()
#' client$start()
#' launcher <- crew_launcher_local(name = client$name)
#' launcher$start(url = client$url, profile = client$profile)
#' launcher$launch()
#' task <- mirai::mirai("result", .compute = client$profile)
#' mirai::call_mirai_(task)
#' task$data
#' client$terminate()
#' }
crew_launcher_local <- function(
  name = NULL,
  workers = 1L,
  seconds_interval = 1,
  seconds_timeout = 60,
  seconds_launch = 30,
  seconds_idle = Inf,
  seconds_wall = Inf,
  seconds_exit = NULL,
  tasks_max = Inf,
  tasks_timers = 0L,
  reset_globals = TRUE,
  reset_packages = FALSE,
  reset_options = FALSE,
  garbage_collection = FALSE,
  crashes_error = 5L,
  launch_max = NULL,
  tls = crew::crew_tls(),
  r_arguments = c("--no-save", "--no-restore"),
  options_metrics = crew::crew_options_metrics(),
  options_local = crew::crew_options_local(),
  local_log_directory = NULL,
  local_log_join = NULL
) {
  crew_deprecate(
    name = "seconds_exit",
    date = "2023-09-21",
    version = "0.5.0.9002",
    alternative = "none (no longer necessary)",
    condition = "warning",
    value = seconds_exit
  )
  crew_deprecate(
    name = "local_log_directory",
    date = "2024-10-8",
    version = "0.9.5.9012",
    alternative = "options_local argument",
    condition = "warning",
    value = local_log_directory
  )
  crew_deprecate(
    name = "local_log_join",
    date = "2024-10-8",
    version = "0.9.5.9012",
    alternative = "options_local argument",
    condition = "warning",
    value = local_log_join
  )
  crew_deprecate(
    name = "launch_max",
    date = "2024-11-04",
    version = "0.10.1.9000",
    alternative = "crashes_error",
    condition = "warning",
    value = launch_max
  )
  options_local$log_directory <- local_log_directory %|||%
    options_local$log_directory
  options_local$log_join <- local_log_join %|||%
    options_local$log_join
  launcher <- crew_class_launcher_local$new(
    name = name,
    workers = workers,
    seconds_interval = seconds_interval,
    seconds_timeout = seconds_timeout,
    seconds_launch = seconds_launch,
    seconds_idle = seconds_idle,
    seconds_wall = seconds_wall,
    tasks_max = tasks_max,
    tasks_timers = tasks_timers,
    reset_globals = reset_globals,
    reset_packages = reset_packages,
    reset_options = reset_options,
    garbage_collection = garbage_collection,
    crashes_error = crashes_error,
    tls = tls,
    r_arguments = r_arguments,
    options_metrics = options_metrics,
    options_local = options_local
  )
  launcher$validate()
  launcher
}

#' @title Local process launcher class
#' @export
#' @family plugin_local
#' @description `R6` class to launch and manage local process workers.
#' @details See [crew_launcher_local()].
#' @examples
#' if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
#' client <- crew_client()
#' client$start()
#' launcher <- crew_launcher_local(name = client$name)
#' launcher$start(url = client$url, profile = client$profile)
#' launcher$launch()
#' task <- mirai::mirai("result", .compute = client$profile)
#' mirai::call_mirai_(task)
#' task$data
#' client$terminate()
#' }
crew_class_launcher_local <- R6::R6Class(
  classname = "crew_class_launcher_local",
  inherit = crew_class_launcher,
  cloneable = FALSE,
  private = list(
    .options_local = NULL,
    .log_prepare = function() {
      if (!is.null(private$.options_local$log_directory)) {
        dir_create(private$.options_local$log_directory)
      }
    },
    .log_stdout = function(name) {
      directory <- private$.options_local$log_directory
      if (is.null(directory)) {
        return(NULL)
      }
      if (!private$.options_local$log_join) {
        name <- paste0(name, "-stdout")
      }
      path.expand(file.path(directory, paste0(name, ".log")))
    },
    .log_stderr = function(name) {
      directory <- private$.options_local$log_directory
      if (is.null(directory)) {
        return(NULL)
      }
      if_any(
        private$.options_local$log_join,
        "2>&1",
        path.expand(file.path(directory, paste0(name, "-stderr.log")))
      )
    }
  ),
  active = list(
    #' @field options_local See [crew_launcher_local()].
    options_local = function() {
      .subset2(private, ".options_local")
    }
  ),
  public = list(
    #' @description Local launcher constructor.
    #' @return An `R6` object with the local launcher.
    #' @param name See [crew_launcher()].
    #' @param workers See [crew_launcher()].
    #' @param seconds_interval See [crew_launcher()].
    #' @param seconds_timeout See [crew_launcher()].
    #' @param seconds_launch See [crew_launcher()].
    #' @param seconds_idle See [crew_launcher()].
    #' @param seconds_wall See [crew_launcher()].
    #' @param seconds_exit See [crew_launcher()].
    #' @param tasks_max See [crew_launcher()].
    #' @param tasks_timers See [crew_launcher()].
    #' @param reset_globals See [crew_launcher()].
    #' @param reset_packages See [crew_launcher()].
    #' @param reset_options See [crew_launcher()].
    #' @param garbage_collection See [crew_launcher()].
    #' @param crashes_error See [crew_launcher()].
    #' @param tls See [crew_launcher()].
    #' @param processes See [crew_launcher()].
    #' @param r_arguments See [crew_launcher()].
    #' @param options_metrics See [crew_launcher_local()].
    #' @param options_local See [crew_launcher_local()].
    #' @examples
    #' if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
    #' client <- crew_client()
    #' client$start()
    #' launcher <- crew_launcher_local(name = client$name)
    #' launcher$start(url = client$url, profile = client$profile)
    #' launcher$launch()
    #' task <- mirai::mirai("result", .compute = client$profile)
    #' mirai::call_mirai_(task)
    #' task$data
    #' client$terminate()
    #' }
    initialize = function(
      name = NULL,
      workers = NULL,
      seconds_interval = NULL,
      seconds_timeout = NULL,
      seconds_launch = NULL,
      seconds_idle = NULL,
      seconds_wall = NULL,
      seconds_exit = NULL,
      tasks_max = NULL,
      tasks_timers = NULL,
      reset_globals = NULL,
      reset_packages = NULL,
      reset_options = NULL,
      garbage_collection = NULL,
      crashes_error = NULL,
      tls = NULL,
      processes = NULL,
      r_arguments = NULL,
      options_metrics = NULL,
      options_local = NULL
    ) {
      super$initialize(
        name = name,
        workers = workers,
        seconds_interval = seconds_interval,
        seconds_timeout = seconds_timeout,
        seconds_launch = seconds_launch,
        seconds_idle = seconds_idle,
        seconds_wall = seconds_wall,
        seconds_exit = seconds_exit,
        tasks_max = tasks_max,
        tasks_timers = tasks_timers,
        reset_globals = reset_globals,
        reset_packages = reset_packages,
        reset_options = reset_options,
        garbage_collection = garbage_collection,
        crashes_error = crashes_error,
        tls = tls,
        processes = processes,
        r_arguments = r_arguments,
        options_metrics = options_metrics
      )
      private$.options_local <- options_local
    },
    #' @description Validate the local launcher.
    #' @return `NULL` (invisibly).
    validate = function() {
      super$validate()
      crew_options_local_validate(private$.options_local)
    },
    #' @description Launch a local process worker which will
    #'   dial into a socket.
    #' @details The `call` argument is R code that will run to
    #'   initiate the worker. Together, the `launcher`, `worker`,
    #'   and `instance` arguments are useful for
    #'   constructing informative job names.
    #' @return A handle object to allow the termination of the worker
    #'   later on.
    #' @param call Character of length 1 with a namespaced call to
    #'   [crew_worker()] which will run in the worker and accept tasks.
    #' @param name Character of length 1 with a long informative worker name
    #'   which contains the `launcher` and `worker` arguments
    #'   described below.
    #' @param launcher Character of length 1, name of the launcher.
    #' @param worker Character string, name of the worker within the launcher.
    launch_worker = function(call, name, launcher, worker) {
      bin <- if_any(
        tolower(Sys.info()[["sysname"]]) == "windows",
        "Rscript.exe",
        "Rscript"
      )
      path <- file.path(R.home("bin"), bin)
      private$.log_prepare()
      processx::process$new(
        command = path,
        args = c(private$.r_arguments, "-e", call),
        cleanup = TRUE,
        stdout = private$.log_stdout(name = name),
        stderr = private$.log_stderr(name = name)
      )
    },
    #' @description Terminate a local process worker.
    #' @return A list with the process ID of the worker.
    #' @param handle A process handle object previously
    #'   returned by `launch_worker()`.
    terminate_worker = function(handle) {
      handle$signal(signal = crew_terminate_signal())
      list(pid = handle$get_pid())
    }
  )
)
wlandau/crew documentation built on Feb. 8, 2025, 10:12 a.m.