tests/testthat/test-plugins.R

crew_test("custom launcher", {
  skip_on_cran()
  skip_on_os("windows")
  skip_if_not_installed("processx")
  if (isTRUE(as.logical(Sys.getenv("CI", "false")))) {
    skip_on_os("mac")
  }
  custom_launcher_class <- R6::R6Class(
    classname = "custom_launcher_class",
    inherit = crew::crew_class_launcher,
    public = list(
      launch_worker = function(call, name, launcher, worker) {
        bin <- if_any(
          tolower(Sys.info()[["sysname"]]) == "windows",
          "R.exe",
          "R"
        )
        path <- file.path(R.home("bin"), bin)
        processx::process$new(command = path, args = c("-e", call))
      },
      terminate_worker = function(handle) {
        handle$signal(signal = crew::crew_terminate_signal())
      }
    )
  )
  crew_controller_custom <- function(
    name = "custom controller name",
    workers = 1L,
    host = "127.0.0.1",
    port = NULL,
    tls = crew::crew_tls(mode = "none"),
    seconds_interval = 0.5,
    seconds_timeout = 5,
    seconds_launch = 30,
    seconds_idle = Inf,
    seconds_wall = Inf,
    tasks_max = Inf,
    tasks_timers = 0L,
    reset_globals = TRUE,
    reset_packages = FALSE,
    reset_options = FALSE,
    garbage_collection = FALSE,
    crashes_error = 5L
  ) {
    client <- crew::crew_client(
      host = host,
      port = port,
      tls = tls,
      seconds_interval = seconds_interval,
      seconds_timeout = seconds_timeout
    )
    launcher <- custom_launcher_class$new(
      name = name,
      workers = workers,
      seconds_interval = seconds_interval,
      seconds_timeout = seconds_timeout,
      seconds_launch = seconds_launch,
      seconds_idle = seconds_idle,
      seconds_wall = seconds_wall,
      tasks_max = tasks_max,
      tasks_timers = tasks_timers,
      reset_globals = reset_globals,
      reset_packages = reset_packages,
      reset_options = reset_options,
      garbage_collection = garbage_collection,
      crashes_error = crashes_error,
      tls = tls
    )
    controller <- crew::crew_controller(client = client, launcher = launcher)
    controller$validate()
    controller
  }
  controller <- crew_controller_custom()
  controller$start()
  on.exit({
    controller$terminate()
    rm(controller)
    gc()
    crew_test_sleep()
  })
  controller$push(name = "pid", command = ps::ps_pid())
  controller$wait(seconds_timeout = 10)
  out <- controller$pop()$result[[1]]
  handle <- controller$launcher$instances$handle[[1]]
  exp <- handle$get_pid()
  expect_equal(out, exp)
  expect_true(handle$is_alive())
  controller$launcher$terminate()
  crew_retry(
    ~!handle$is_alive(),
    seconds_interval = 0.1,
    seconds_timeout = 5
  )
  expect_false(handle$is_alive())
  controller$terminate()
})

crew_test("custom launcher with local asyncs launch errors", {
  skip_on_cran()
  skip_on_covr() # Avoid clashes with NNG and covr child processes.
  skip_on_os("windows")
  skip_if_not_installed("processx")
  if (isTRUE(as.logical(Sys.getenv("CI", "false")))) {
    skip_on_os("mac")
  }
  custom_launcher_class <- R6::R6Class(
    classname = "custom_launcher_class",
    inherit = crew::crew_class_launcher,
    public = list(
      launch_worker = function(call, name, launcher, worker) {
        self$async$eval(
          command = "okay value",
          packages = "this package does not exist"
        )
      },
      terminate_worker = function(handle) {
      }
    )
  )
  crew_controller_custom <- function(
    name = "custom controller name",
    workers = 1L,
    host = "127.0.0.1",
    port = NULL,
    tls = crew::crew_tls(mode = "none"),
    seconds_interval = 0.5,
    seconds_timeout = 5,
    seconds_launch = 30,
    seconds_idle = Inf,
    seconds_wall = Inf,
    tasks_max = Inf,
    tasks_timers = 0L,
    reset_globals = TRUE,
    reset_packages = FALSE,
    reset_options = FALSE,
    garbage_collection = FALSE,
    crashes_error = 5L,
    processes = NULL
  ) {
    client <- crew::crew_client(
      host = host,
      port = port,
      tls = tls,
      seconds_interval = seconds_interval,
      seconds_timeout = seconds_timeout
    )
    launcher <- custom_launcher_class$new(
      name = name,
      workers = workers,
      seconds_interval = seconds_interval,
      seconds_timeout = seconds_timeout,
      seconds_launch = seconds_launch,
      seconds_idle = seconds_idle,
      seconds_wall = seconds_wall,
      tasks_max = tasks_max,
      tasks_timers = tasks_timers,
      reset_globals = reset_globals,
      reset_packages = reset_packages,
      reset_options = reset_options,
      garbage_collection = garbage_collection,
      crashes_error = crashes_error,
      tls = tls,
      processes = processes
    )
    controller <- crew::crew_controller(
      client = client,
      launcher = launcher
    )
    controller$validate()
    controller
  }
  controller <- crew_controller_custom(processes = 1L)
  controller$start()
  on.exit({
    try(suppressWarnings(controller$terminate()), silent = TRUE)
    rm(controller)
    gc()
    crew_test_sleep()
  })
  envir <- new.env(parent = emptyenv())
  crew_retry(
    ~ tryCatch(
      expr = {
        controller$push(command = TRUE)
        envir$result <- FALSE
        FALSE
      },
      crew_error = function(condition) {
        envir$result <- TRUE
        TRUE
      }
    ),
    seconds_interval = 1,
    seconds_timeout = 30
  )
  expect_true(envir$result)
})

crew_test("custom launcher with local asyncs termination errors", {
  skip_on_cran()
  skip_on_covr() # Avoid clashes with NNG and covr child processes.
  skip_on_os("windows")
  skip_if_not_installed("processx")
  if (isTRUE(as.logical(Sys.getenv("CI", "false")))) {
    skip_on_os("mac")
  }
  custom_launcher_class <- R6::R6Class(
    classname = "custom_launcher_class",
    inherit = crew::crew_class_launcher,
    public = list(
      launch_worker = function(call, name, launcher, worker) {
        list(abstract = TRUE)
      },
      terminate_worker = function(handle) {
        self$async$eval(
          command = stop("termination error"),
          packages = "processx"
        )
      }
    )
  )
  crew_controller_custom <- function(
    name = "custom controller name",
    workers = 1L,
    host = "127.0.0.1",
    port = NULL,
    tls = crew::crew_tls(mode = "none"),
    seconds_interval = 0.5,
    seconds_timeout = 5,
    seconds_launch = 30,
    seconds_idle = Inf,
    seconds_wall = Inf,
    tasks_max = Inf,
    tasks_timers = 0L,
    reset_globals = TRUE,
    reset_packages = FALSE,
    reset_options = FALSE,
    garbage_collection = FALSE,
    crashes_error = 5L,
    processes = NULL
  ) {
    client <- crew::crew_client(
      host = host,
      port = port,
      tls = tls,
      seconds_interval = seconds_interval,
      seconds_timeout = seconds_timeout
    )
    launcher <- custom_launcher_class$new(
      name = name,
      workers = workers,
      seconds_interval = seconds_interval,
      seconds_timeout = seconds_timeout,
      seconds_launch = seconds_launch,
      seconds_idle = seconds_idle,
      seconds_wall = seconds_wall,
      tasks_max = tasks_max,
      tasks_timers = tasks_timers,
      reset_globals = reset_globals,
      reset_packages = reset_packages,
      reset_options = reset_options,
      garbage_collection = garbage_collection,
      crashes_error = crashes_error,
      tls = tls,
      processes = processes
    )
    controller <- crew::crew_controller(
      client = client,
      launcher = launcher
    )
    controller$validate()
    controller
  }
  controller <- crew_controller_custom(processes = 1L)
  controller$start()
  on.exit({
    try(suppressWarnings(controller$terminate()), silent = TRUE)
    rm(controller)
    gc()
    crew_test_sleep()
  })
  controller$launch(n = 1L)
  expect_warning(controller$terminate(), class = "crew_warning")
})

crew_test("custom launcher with async internal launcher tasks", {
  skip_on_cran()
  skip_on_ci()
  skip_on_covr() # Avoid clashes with NNG and covr child processes.
  skip_on_os("windows")
  skip_if_not_installed("processx")
  custom_launcher_class <- R6::R6Class(
    classname = "custom_launcher_class",
    inherit = crew::crew_class_launcher,
    public = list(
      launch_worker = function(call, name, launcher, worker, instance) {
        bin <- if_any(
          tolower(Sys.info()[["sysname"]]) == "windows",
          "R.exe",
          "R"
        )
        path <- file.path(R.home("bin"), bin)
        self$async$eval(
          command = {
            handle <- process$new(
              command = path,
              args = c("-e", call),
              cleanup = FALSE
            )
            while (!handle$is_alive()) {
              Sys.sleep(0.025)
            }
            list(pid = handle$get_pid(), status = "started")
          },
          data = list(
            path = path,
            call = call
          ),
          packages = "processx"
        )
      },
      terminate_worker = function(handle) {
        pid <- handle$pid
        self$async$eval(
          command = {
            crew::crew_terminate_process(pid)
            list(pid = pid, status = "terminated")
          },
          data = list(pid = pid)
        )
      }
    )
  )
  crew_controller_custom <- function(
    name = "custom controller name",
    workers = 1L,
    host = "127.0.0.1",
    port = NULL,
    tls = crew::crew_tls(mode = "none"),
    seconds_interval = 0.5,
    seconds_timeout = 5,
    seconds_launch = 30,
    seconds_idle = Inf,
    seconds_wall = Inf,
    tasks_max = Inf,
    tasks_timers = 0L,
    reset_globals = TRUE,
    reset_packages = FALSE,
    reset_options = FALSE,
    garbage_collection = FALSE,
    crashes_error = 5L,
    processes = NULL
  ) {
    client <- crew::crew_client(
      host = host,
      port = port,
      tls = tls,
      seconds_interval = seconds_interval,
      seconds_timeout = seconds_timeout
    )
    launcher <- custom_launcher_class$new(
      name = name,
      workers = workers,
      seconds_interval = seconds_interval,
      seconds_timeout = seconds_timeout,
      seconds_launch = seconds_launch,
      seconds_idle = seconds_idle,
      seconds_wall = seconds_wall,
      tasks_max = tasks_max,
      tasks_timers = tasks_timers,
      reset_globals = reset_globals,
      reset_packages = reset_packages,
      reset_options = reset_options,
      garbage_collection = garbage_collection,
      crashes_error = crashes_error,
      tls = tls,
      processes = processes
    )
    controller <- crew::crew_controller(
      client = client,
      launcher = launcher
    )
    controller$validate()
    controller
  }
  controller <- crew_controller_custom(processes = 1L)
  controller$start()
  on.exit({
    controller$terminate()
    rm(controller)
    gc()
    crew_test_sleep()
  })
  controller$push(name = "pid", command = ps::ps_pid())
  controller$wait(seconds_timeout = 10)
  envir <- new.env(parent = emptyenv())
  crew_retry(
    ~ {
      envir$pid <- controller$pop()$result[[1L]]
      !is.null(envir$pid)
    },
    seconds_interval = 0.25,
    seconds_timeout = 15
  )
  handle <- controller$launcher$instances$handle[[1L]]
  pid <- handle$pid
  expect_equal(envir$pid, pid)
  expect_equal(handle$status, "started")
  controller$launcher$terminate()
})

crew_test("can terminate a lost worker with an async launch", {
  skip_on_cran()
  skip_on_os("windows")
  skip_on_covr() # Avoid clashes with NNG and covr child processes.
  if (isTRUE(as.logical(Sys.getenv("CI", "false")))) {
    skip_on_os("mac")
  }
  custom_launcher_class <- R6::R6Class(
    classname = "custom_launcher_class",
    inherit = crew::crew_class_launcher,
    public = list(
      launch_worker = function(call, name, launcher, worker) {
      },
      terminate_worker = function(handle) {
        ps::ps_kill(p = ps::ps_handle(handle$pid))
      }
    )
  )
  crew_controller_custom <- function(
    name = "custom controller name",
    workers = 1L,
    host = "127.0.0.1",
    port = NULL,
    tls = crew::crew_tls(mode = "none"),
    seconds_interval = 0.5,
    seconds_timeout = 5,
    seconds_launch = 30,
    seconds_idle = Inf,
    seconds_wall = Inf,
    tasks_max = Inf,
    tasks_timers = 0L,
    reset_globals = TRUE,
    reset_packages = FALSE,
    reset_options = FALSE,
    garbage_collection = FALSE,
    crashes_error = 5L
  ) {
    client <- crew::crew_client(
      host = host,
      port = port,
      tls = tls,
      seconds_interval = seconds_interval,
      seconds_timeout = seconds_timeout
    )
    launcher <- custom_launcher_class$new(
      name = name,
      workers = workers,
      seconds_interval = seconds_interval,
      seconds_timeout = seconds_timeout,
      seconds_launch = seconds_launch,
      seconds_idle = seconds_idle,
      seconds_wall = seconds_wall,
      tasks_max = tasks_max,
      tasks_timers = tasks_timers,
      reset_globals = reset_globals,
      reset_packages = reset_packages,
      reset_options = reset_options,
      garbage_collection = garbage_collection,
      crashes_error = crashes_error,
      tls = tls
    )
    controller <- crew::crew_controller(
      client = client,
      launcher = launcher
    )
    controller$validate()
    controller
  }
  x <- crew_controller_custom()
  x$start()
  on.exit({
    x$terminate()
    rm(x)
    gc()
    crew_test_sleep()
  })
  private <- crew_private(x$launcher)
  bin <- ifelse(
    tolower(Sys.info()[["sysname"]]) == "windows",
    "R.exe",
    "R"
  )
  path <- file.path(R.home("bin"), bin)
  handle <- mirai::mirai(
    .expr = {
      Sys.sleep(0.5)
      handle <- processx::process$new(
        command = path,
        args = c("-e", "Sys.sleep(90)"),
        cleanup = FALSE
      )
      list(pid = handle$get_pid())
    },
    .args = list(path = path)
  )
  private$.instances <- tibble::add_row(
    private$.instances,
    handle = list(handle),
    id = 99L,
    start = - Inf,
    online = FALSE,
    discovered = FALSE
  )
  crew_retry(
    ~ {
      x$scale()
      !nanonext::.unresolved(handle)
    },
    seconds_interval = 0.1,
    seconds_timeout = 15
  )
  expect_true(is.integer(handle$data$pid))
  crew_retry(
    ~ {
      x$scale()
      tryCatch(
        !ps::ps_is_running(p = ps::ps_handle(handle$data$pid)),
        error = function(condition) {
          TRUE
        }
      )
    },
    seconds_interval = 0.1,
    seconds_timeout = 15
  )
  expect_equal(nrow(x$launcher$instances), 0L)
  x$client$terminate()
})
wlandau/crew documentation built on Feb. 8, 2025, 10:12 a.m.