crew_test("custom launcher", {
skip_on_cran()
skip_on_os("windows")
skip_if_not_installed("processx")
if (isTRUE(as.logical(Sys.getenv("CI", "false")))) {
skip_on_os("mac")
}
custom_launcher_class <- R6::R6Class(
classname = "custom_launcher_class",
inherit = crew::crew_class_launcher,
public = list(
launch_worker = function(call, name, launcher, worker) {
bin <- if_any(
tolower(Sys.info()[["sysname"]]) == "windows",
"R.exe",
"R"
)
path <- file.path(R.home("bin"), bin)
processx::process$new(command = path, args = c("-e", call))
},
terminate_worker = function(handle) {
handle$signal(signal = crew::crew_terminate_signal())
}
)
)
crew_controller_custom <- function(
name = "custom controller name",
workers = 1L,
host = "127.0.0.1",
port = NULL,
tls = crew::crew_tls(mode = "none"),
seconds_interval = 0.5,
seconds_timeout = 5,
seconds_launch = 30,
seconds_idle = Inf,
seconds_wall = Inf,
tasks_max = Inf,
tasks_timers = 0L,
reset_globals = TRUE,
reset_packages = FALSE,
reset_options = FALSE,
garbage_collection = FALSE,
crashes_error = 5L
) {
client <- crew::crew_client(
host = host,
port = port,
tls = tls,
seconds_interval = seconds_interval,
seconds_timeout = seconds_timeout
)
launcher <- custom_launcher_class$new(
name = name,
workers = workers,
seconds_interval = seconds_interval,
seconds_timeout = seconds_timeout,
seconds_launch = seconds_launch,
seconds_idle = seconds_idle,
seconds_wall = seconds_wall,
tasks_max = tasks_max,
tasks_timers = tasks_timers,
reset_globals = reset_globals,
reset_packages = reset_packages,
reset_options = reset_options,
garbage_collection = garbage_collection,
crashes_error = crashes_error,
tls = tls
)
controller <- crew::crew_controller(client = client, launcher = launcher)
controller$validate()
controller
}
controller <- crew_controller_custom()
controller$start()
on.exit({
controller$terminate()
rm(controller)
gc()
crew_test_sleep()
})
controller$push(name = "pid", command = ps::ps_pid())
controller$wait(seconds_timeout = 10)
out <- controller$pop()$result[[1]]
handle <- controller$launcher$instances$handle[[1]]
exp <- handle$get_pid()
expect_equal(out, exp)
expect_true(handle$is_alive())
controller$launcher$terminate()
crew_retry(
~!handle$is_alive(),
seconds_interval = 0.1,
seconds_timeout = 5
)
expect_false(handle$is_alive())
controller$terminate()
})
crew_test("custom launcher with local asyncs launch errors", {
skip_on_cran()
skip_on_covr() # Avoid clashes with NNG and covr child processes.
skip_on_os("windows")
skip_if_not_installed("processx")
if (isTRUE(as.logical(Sys.getenv("CI", "false")))) {
skip_on_os("mac")
}
custom_launcher_class <- R6::R6Class(
classname = "custom_launcher_class",
inherit = crew::crew_class_launcher,
public = list(
launch_worker = function(call, name, launcher, worker) {
self$async$eval(
command = "okay value",
packages = "this package does not exist"
)
},
terminate_worker = function(handle) {
}
)
)
crew_controller_custom <- function(
name = "custom controller name",
workers = 1L,
host = "127.0.0.1",
port = NULL,
tls = crew::crew_tls(mode = "none"),
seconds_interval = 0.5,
seconds_timeout = 5,
seconds_launch = 30,
seconds_idle = Inf,
seconds_wall = Inf,
tasks_max = Inf,
tasks_timers = 0L,
reset_globals = TRUE,
reset_packages = FALSE,
reset_options = FALSE,
garbage_collection = FALSE,
crashes_error = 5L,
processes = NULL
) {
client <- crew::crew_client(
host = host,
port = port,
tls = tls,
seconds_interval = seconds_interval,
seconds_timeout = seconds_timeout
)
launcher <- custom_launcher_class$new(
name = name,
workers = workers,
seconds_interval = seconds_interval,
seconds_timeout = seconds_timeout,
seconds_launch = seconds_launch,
seconds_idle = seconds_idle,
seconds_wall = seconds_wall,
tasks_max = tasks_max,
tasks_timers = tasks_timers,
reset_globals = reset_globals,
reset_packages = reset_packages,
reset_options = reset_options,
garbage_collection = garbage_collection,
crashes_error = crashes_error,
tls = tls,
processes = processes
)
controller <- crew::crew_controller(
client = client,
launcher = launcher
)
controller$validate()
controller
}
controller <- crew_controller_custom(processes = 1L)
controller$start()
on.exit({
try(suppressWarnings(controller$terminate()), silent = TRUE)
rm(controller)
gc()
crew_test_sleep()
})
envir <- new.env(parent = emptyenv())
crew_retry(
~ tryCatch(
expr = {
controller$push(command = TRUE)
envir$result <- FALSE
FALSE
},
crew_error = function(condition) {
envir$result <- TRUE
TRUE
}
),
seconds_interval = 1,
seconds_timeout = 30
)
expect_true(envir$result)
})
crew_test("custom launcher with local asyncs termination errors", {
skip_on_cran()
skip_on_covr() # Avoid clashes with NNG and covr child processes.
skip_on_os("windows")
skip_if_not_installed("processx")
if (isTRUE(as.logical(Sys.getenv("CI", "false")))) {
skip_on_os("mac")
}
custom_launcher_class <- R6::R6Class(
classname = "custom_launcher_class",
inherit = crew::crew_class_launcher,
public = list(
launch_worker = function(call, name, launcher, worker) {
list(abstract = TRUE)
},
terminate_worker = function(handle) {
self$async$eval(
command = stop("termination error"),
packages = "processx"
)
}
)
)
crew_controller_custom <- function(
name = "custom controller name",
workers = 1L,
host = "127.0.0.1",
port = NULL,
tls = crew::crew_tls(mode = "none"),
seconds_interval = 0.5,
seconds_timeout = 5,
seconds_launch = 30,
seconds_idle = Inf,
seconds_wall = Inf,
tasks_max = Inf,
tasks_timers = 0L,
reset_globals = TRUE,
reset_packages = FALSE,
reset_options = FALSE,
garbage_collection = FALSE,
crashes_error = 5L,
processes = NULL
) {
client <- crew::crew_client(
host = host,
port = port,
tls = tls,
seconds_interval = seconds_interval,
seconds_timeout = seconds_timeout
)
launcher <- custom_launcher_class$new(
name = name,
workers = workers,
seconds_interval = seconds_interval,
seconds_timeout = seconds_timeout,
seconds_launch = seconds_launch,
seconds_idle = seconds_idle,
seconds_wall = seconds_wall,
tasks_max = tasks_max,
tasks_timers = tasks_timers,
reset_globals = reset_globals,
reset_packages = reset_packages,
reset_options = reset_options,
garbage_collection = garbage_collection,
crashes_error = crashes_error,
tls = tls,
processes = processes
)
controller <- crew::crew_controller(
client = client,
launcher = launcher
)
controller$validate()
controller
}
controller <- crew_controller_custom(processes = 1L)
controller$start()
on.exit({
try(suppressWarnings(controller$terminate()), silent = TRUE)
rm(controller)
gc()
crew_test_sleep()
})
controller$launch(n = 1L)
expect_warning(controller$terminate(), class = "crew_warning")
})
crew_test("custom launcher with async internal launcher tasks", {
skip_on_cran()
skip_on_ci()
skip_on_covr() # Avoid clashes with NNG and covr child processes.
skip_on_os("windows")
skip_if_not_installed("processx")
custom_launcher_class <- R6::R6Class(
classname = "custom_launcher_class",
inherit = crew::crew_class_launcher,
public = list(
launch_worker = function(call, name, launcher, worker, instance) {
bin <- if_any(
tolower(Sys.info()[["sysname"]]) == "windows",
"R.exe",
"R"
)
path <- file.path(R.home("bin"), bin)
self$async$eval(
command = {
handle <- process$new(
command = path,
args = c("-e", call),
cleanup = FALSE
)
while (!handle$is_alive()) {
Sys.sleep(0.025)
}
list(pid = handle$get_pid(), status = "started")
},
data = list(
path = path,
call = call
),
packages = "processx"
)
},
terminate_worker = function(handle) {
pid <- handle$pid
self$async$eval(
command = {
crew::crew_terminate_process(pid)
list(pid = pid, status = "terminated")
},
data = list(pid = pid)
)
}
)
)
crew_controller_custom <- function(
name = "custom controller name",
workers = 1L,
host = "127.0.0.1",
port = NULL,
tls = crew::crew_tls(mode = "none"),
seconds_interval = 0.5,
seconds_timeout = 5,
seconds_launch = 30,
seconds_idle = Inf,
seconds_wall = Inf,
tasks_max = Inf,
tasks_timers = 0L,
reset_globals = TRUE,
reset_packages = FALSE,
reset_options = FALSE,
garbage_collection = FALSE,
crashes_error = 5L,
processes = NULL
) {
client <- crew::crew_client(
host = host,
port = port,
tls = tls,
seconds_interval = seconds_interval,
seconds_timeout = seconds_timeout
)
launcher <- custom_launcher_class$new(
name = name,
workers = workers,
seconds_interval = seconds_interval,
seconds_timeout = seconds_timeout,
seconds_launch = seconds_launch,
seconds_idle = seconds_idle,
seconds_wall = seconds_wall,
tasks_max = tasks_max,
tasks_timers = tasks_timers,
reset_globals = reset_globals,
reset_packages = reset_packages,
reset_options = reset_options,
garbage_collection = garbage_collection,
crashes_error = crashes_error,
tls = tls,
processes = processes
)
controller <- crew::crew_controller(
client = client,
launcher = launcher
)
controller$validate()
controller
}
controller <- crew_controller_custom(processes = 1L)
controller$start()
on.exit({
controller$terminate()
rm(controller)
gc()
crew_test_sleep()
})
controller$push(name = "pid", command = ps::ps_pid())
controller$wait(seconds_timeout = 10)
envir <- new.env(parent = emptyenv())
crew_retry(
~ {
envir$pid <- controller$pop()$result[[1L]]
!is.null(envir$pid)
},
seconds_interval = 0.25,
seconds_timeout = 15
)
handle <- controller$launcher$instances$handle[[1L]]
pid <- handle$pid
expect_equal(envir$pid, pid)
expect_equal(handle$status, "started")
controller$launcher$terminate()
})
crew_test("can terminate a lost worker with an async launch", {
skip_on_cran()
skip_on_os("windows")
skip_on_covr() # Avoid clashes with NNG and covr child processes.
if (isTRUE(as.logical(Sys.getenv("CI", "false")))) {
skip_on_os("mac")
}
custom_launcher_class <- R6::R6Class(
classname = "custom_launcher_class",
inherit = crew::crew_class_launcher,
public = list(
launch_worker = function(call, name, launcher, worker) {
},
terminate_worker = function(handle) {
ps::ps_kill(p = ps::ps_handle(handle$pid))
}
)
)
crew_controller_custom <- function(
name = "custom controller name",
workers = 1L,
host = "127.0.0.1",
port = NULL,
tls = crew::crew_tls(mode = "none"),
seconds_interval = 0.5,
seconds_timeout = 5,
seconds_launch = 30,
seconds_idle = Inf,
seconds_wall = Inf,
tasks_max = Inf,
tasks_timers = 0L,
reset_globals = TRUE,
reset_packages = FALSE,
reset_options = FALSE,
garbage_collection = FALSE,
crashes_error = 5L
) {
client <- crew::crew_client(
host = host,
port = port,
tls = tls,
seconds_interval = seconds_interval,
seconds_timeout = seconds_timeout
)
launcher <- custom_launcher_class$new(
name = name,
workers = workers,
seconds_interval = seconds_interval,
seconds_timeout = seconds_timeout,
seconds_launch = seconds_launch,
seconds_idle = seconds_idle,
seconds_wall = seconds_wall,
tasks_max = tasks_max,
tasks_timers = tasks_timers,
reset_globals = reset_globals,
reset_packages = reset_packages,
reset_options = reset_options,
garbage_collection = garbage_collection,
crashes_error = crashes_error,
tls = tls
)
controller <- crew::crew_controller(
client = client,
launcher = launcher
)
controller$validate()
controller
}
x <- crew_controller_custom()
x$start()
on.exit({
x$terminate()
rm(x)
gc()
crew_test_sleep()
})
private <- crew_private(x$launcher)
bin <- ifelse(
tolower(Sys.info()[["sysname"]]) == "windows",
"R.exe",
"R"
)
path <- file.path(R.home("bin"), bin)
handle <- mirai::mirai(
.expr = {
Sys.sleep(0.5)
handle <- processx::process$new(
command = path,
args = c("-e", "Sys.sleep(90)"),
cleanup = FALSE
)
list(pid = handle$get_pid())
},
.args = list(path = path)
)
private$.instances <- tibble::add_row(
private$.instances,
handle = list(handle),
id = 99L,
start = - Inf,
online = FALSE,
discovered = FALSE
)
crew_retry(
~ {
x$scale()
!nanonext::.unresolved(handle)
},
seconds_interval = 0.1,
seconds_timeout = 15
)
expect_true(is.integer(handle$data$pid))
crew_retry(
~ {
x$scale()
tryCatch(
!ps::ps_is_running(p = ps::ps_handle(handle$data$pid)),
error = function(condition) {
TRUE
}
)
},
seconds_interval = 0.1,
seconds_timeout = 15
)
expect_equal(nrow(x$launcher$instances), 0L)
x$client$terminate()
})
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.