#' @title Create a launcher with local process workers.
#' @export
#' @family plugin_local
#' @description Create an `R6` object to launch and maintain
#' local process workers.
#' @inheritParams crew_launcher
#' @param options_local An object generated by [crew_options_local()]
#' with options specific to the local controller.
#' @param local_log_directory Deprecated on 2024-10-08. Use
#' `options_local` instead.
#' @param local_log_join Deprecated on 2024-10-08. Use
#' `options_local` instead.
#' @examples
#' if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
#' client <- crew_client()
#' client$start()
#' launcher <- crew_launcher_local(name = client$name)
#' launcher$start(url = client$url, profile = client$profile)
#' launcher$launch()
#' task <- mirai::mirai("result", .compute = client$profile)
#' mirai::call_mirai_(task)
#' task$data
#' client$terminate()
#' }
crew_launcher_local <- function(
name = NULL,
workers = 1L,
seconds_interval = 1,
seconds_timeout = 60,
seconds_launch = 30,
seconds_idle = Inf,
seconds_wall = Inf,
seconds_exit = NULL,
tasks_max = Inf,
tasks_timers = 0L,
reset_globals = TRUE,
reset_packages = FALSE,
reset_options = FALSE,
garbage_collection = FALSE,
crashes_error = 5L,
launch_max = NULL,
tls = crew::crew_tls(),
r_arguments = c("--no-save", "--no-restore"),
options_metrics = crew::crew_options_metrics(),
options_local = crew::crew_options_local(),
local_log_directory = NULL,
local_log_join = NULL
) {
crew_deprecate(
name = "seconds_exit",
date = "2023-09-21",
version = "0.5.0.9002",
alternative = "none (no longer necessary)",
condition = "warning",
value = seconds_exit
)
crew_deprecate(
name = "local_log_directory",
date = "2024-10-8",
version = "0.9.5.9012",
alternative = "options_local argument",
condition = "warning",
value = local_log_directory
)
crew_deprecate(
name = "local_log_join",
date = "2024-10-8",
version = "0.9.5.9012",
alternative = "options_local argument",
condition = "warning",
value = local_log_join
)
crew_deprecate(
name = "launch_max",
date = "2024-11-04",
version = "0.10.1.9000",
alternative = "crashes_error",
condition = "warning",
value = launch_max
)
options_local$log_directory <- local_log_directory %|||%
options_local$log_directory
options_local$log_join <- local_log_join %|||%
options_local$log_join
launcher <- crew_class_launcher_local$new(
name = name,
workers = workers,
seconds_interval = seconds_interval,
seconds_timeout = seconds_timeout,
seconds_launch = seconds_launch,
seconds_idle = seconds_idle,
seconds_wall = seconds_wall,
tasks_max = tasks_max,
tasks_timers = tasks_timers,
reset_globals = reset_globals,
reset_packages = reset_packages,
reset_options = reset_options,
garbage_collection = garbage_collection,
crashes_error = crashes_error,
tls = tls,
r_arguments = r_arguments,
options_metrics = options_metrics,
options_local = options_local
)
launcher$validate()
launcher
}
#' @title Local process launcher class
#' @export
#' @family plugin_local
#' @description `R6` class to launch and manage local process workers.
#' @details See [crew_launcher_local()].
#' @examples
#' if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
#' client <- crew_client()
#' client$start()
#' launcher <- crew_launcher_local(name = client$name)
#' launcher$start(url = client$url, profile = client$profile)
#' launcher$launch()
#' task <- mirai::mirai("result", .compute = client$profile)
#' mirai::call_mirai_(task)
#' task$data
#' client$terminate()
#' }
crew_class_launcher_local <- R6::R6Class(
classname = "crew_class_launcher_local",
inherit = crew_class_launcher,
cloneable = FALSE,
private = list(
.options_local = NULL,
.log_prepare = function() {
if (!is.null(private$.options_local$log_directory)) {
dir_create(private$.options_local$log_directory)
}
},
.log_stdout = function(name) {
directory <- private$.options_local$log_directory
if (is.null(directory)) {
return(NULL)
}
if (!private$.options_local$log_join) {
name <- paste0(name, "-stdout")
}
path.expand(file.path(directory, paste0(name, ".log")))
},
.log_stderr = function(name) {
directory <- private$.options_local$log_directory
if (is.null(directory)) {
return(NULL)
}
if_any(
private$.options_local$log_join,
"2>&1",
path.expand(file.path(directory, paste0(name, "-stderr.log")))
)
}
),
active = list(
#' @field options_local See [crew_launcher_local()].
options_local = function() {
.subset2(private, ".options_local")
}
),
public = list(
#' @description Local launcher constructor.
#' @return An `R6` object with the local launcher.
#' @param name See [crew_launcher()].
#' @param workers See [crew_launcher()].
#' @param seconds_interval See [crew_launcher()].
#' @param seconds_timeout See [crew_launcher()].
#' @param seconds_launch See [crew_launcher()].
#' @param seconds_idle See [crew_launcher()].
#' @param seconds_wall See [crew_launcher()].
#' @param seconds_exit See [crew_launcher()].
#' @param tasks_max See [crew_launcher()].
#' @param tasks_timers See [crew_launcher()].
#' @param reset_globals See [crew_launcher()].
#' @param reset_packages See [crew_launcher()].
#' @param reset_options See [crew_launcher()].
#' @param garbage_collection See [crew_launcher()].
#' @param crashes_error See [crew_launcher()].
#' @param tls See [crew_launcher()].
#' @param processes See [crew_launcher()].
#' @param r_arguments See [crew_launcher()].
#' @param options_metrics See [crew_launcher_local()].
#' @param options_local See [crew_launcher_local()].
#' @examples
#' if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
#' client <- crew_client()
#' client$start()
#' launcher <- crew_launcher_local(name = client$name)
#' launcher$start(url = client$url, profile = client$profile)
#' launcher$launch()
#' task <- mirai::mirai("result", .compute = client$profile)
#' mirai::call_mirai_(task)
#' task$data
#' client$terminate()
#' }
initialize = function(
name = NULL,
workers = NULL,
seconds_interval = NULL,
seconds_timeout = NULL,
seconds_launch = NULL,
seconds_idle = NULL,
seconds_wall = NULL,
seconds_exit = NULL,
tasks_max = NULL,
tasks_timers = NULL,
reset_globals = NULL,
reset_packages = NULL,
reset_options = NULL,
garbage_collection = NULL,
crashes_error = NULL,
tls = NULL,
processes = NULL,
r_arguments = NULL,
options_metrics = NULL,
options_local = NULL
) {
super$initialize(
name = name,
workers = workers,
seconds_interval = seconds_interval,
seconds_timeout = seconds_timeout,
seconds_launch = seconds_launch,
seconds_idle = seconds_idle,
seconds_wall = seconds_wall,
seconds_exit = seconds_exit,
tasks_max = tasks_max,
tasks_timers = tasks_timers,
reset_globals = reset_globals,
reset_packages = reset_packages,
reset_options = reset_options,
garbage_collection = garbage_collection,
crashes_error = crashes_error,
tls = tls,
processes = processes,
r_arguments = r_arguments,
options_metrics = options_metrics
)
private$.options_local <- options_local
},
#' @description Validate the local launcher.
#' @return `NULL` (invisibly).
validate = function() {
super$validate()
crew_options_local_validate(private$.options_local)
},
#' @description Launch a local process worker which will
#' dial into a socket.
#' @details The `call` argument is R code that will run to
#' initiate the worker. Together, the `launcher`, `worker`,
#' and `instance` arguments are useful for
#' constructing informative job names.
#' @return A handle object to allow the termination of the worker
#' later on.
#' @param call Character of length 1 with a namespaced call to
#' [crew_worker()] which will run in the worker and accept tasks.
#' @param name Character of length 1 with a long informative worker name
#' which contains the `launcher` and `worker` arguments
#' described below.
#' @param launcher Character of length 1, name of the launcher.
#' @param worker Character string, name of the worker within the launcher.
launch_worker = function(call, name, launcher, worker) {
bin <- if_any(
tolower(Sys.info()[["sysname"]]) == "windows",
"Rscript.exe",
"Rscript"
)
path <- file.path(R.home("bin"), bin)
private$.log_prepare()
processx::process$new(
command = path,
args = c(private$.r_arguments, "-e", call),
cleanup = TRUE,
stdout = private$.log_stdout(name = name),
stderr = private$.log_stderr(name = name)
)
},
#' @description Terminate a local process worker.
#' @return A list with the process ID of the worker.
#' @param handle A process handle object previously
#' returned by `launch_worker()`.
terminate_worker = function(handle) {
handle$signal(signal = crew_terminate_signal())
list(pid = handle$get_pid())
}
)
)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.