tests/testthat/test-crew_controller_local.R

crew_test("crew_controller_local()", {
  skip_on_os("windows")
  x <- crew_controller_local(
    workers = 1L,
    seconds_idle = 360
  )
  on.exit({
    x$terminate()
    rm(x)
    gc()
    crew_test_sleep()
  })
  expect_silent(x$validate())
  expect_false(x$client$started)
  expect_false(x$started())
  expect_null(x$summary())
  expect_false(x$autoscaling)
  expect_equal(length(x$pids()), 1L)
  x$start()
  expect_true(x$empty())
  expect_false(x$saturated())
  crew_retry(
    ~{
      x$wait(mode = "all", seconds_timeout = 30)
      TRUE
    },
    seconds_interval = 0.5,
    seconds_timeout = 5
  )
  s <- x$summary()
  expect_true(is.data.frame(s))
  expect_equal(nrow(s), 1L)
  expect_equal(
    sort(colnames(s)),
    sort(
      c(
        "controller",
        "tasks",
        "seconds",
        "success",
        "error",
        "crash",
        "cancel",
        "warning"
      )
    )
  )
  expect_equal(s$tasks, 0L)
  expect_true(x$client$started)
  expect_true(x$started())
  expect_equal(length(x$pids()), 2L)
  # first task
  expect_equal(x$pushed, 0L)
  expect_equal(x$popped, 0L)
  task <- x$push(
    command = Sys.getenv("CREW_WORKER"),
    name = "task"
  )
  expect_s3_class(task, "mirai")
  expect_equal(x$pushed, 1L)
  expect_equal(x$popped, 0L)
  expect_false(x$empty())
  expect_true(x$nonempty())
  x$wait(mode = "one", seconds_timeout = 5)
  expect_false(x$empty())
  expect_true(x$nonempty())
  envir <- new.env(parent = emptyenv())
  crew_retry(
    ~{
      envir$out <- x$pop(scale = TRUE)
      !is.null(envir$out)
    },
    seconds_interval = 0.5,
    seconds_timeout = 10
  )
  expect_equal(x$pushed, 1L)
  expect_equal(x$popped, 1L)
  out <- envir$out
  expect_true(x$empty())
  expect_false(x$nonempty())
  expect_equal(x$summary()$tasks, 1L)
  expect_equal(x$summary()$error, 0L)
  expect_equal(x$summary()$warning, 0L)
  expect_equal(out$name, "task")
  expect_equal(out$command, "Sys.getenv(\"CREW_WORKER\")")
  expect_equal(out$result[[1]], out$worker)
  expect_false(any(out$worker == Sys.getenv("CREW_WORKER")))
  expect_true(is.numeric(out$seconds))
  expect_false(anyNA(out$seconds))
  expect_true(out$seconds >= 0)
  expect_true(anyNA(out$error))
  expect_true(anyNA(out$trace))
  expect_true(anyNA(out$warnings))
  windows_or_cran <- identical(tolower(Sys.info()[["sysname"]]), "windows") ||
    !identical(Sys.getenv("NOT_CRAN"), "true")
  if (!windows_or_cran) {
    # data task
    expect_false(exists(x = ".crew_y", envir = globalenv()))
    x$push(
      command = {
        paste0("a", x, .crew_y, sample.int(n = 1e9L, size = 1L))
      },
      data = list(x = "b"),
      globals = list(.crew_y = "c"),
      seed = 0L,
      algorithm = "L'Ecuyer-CMRG",
      seconds_timeout = 100
    )
    x$wait(seconds_timeout = 5)
    out <- x$pop()
    set.seed(seed = 0L, kind = "L'Ecuyer-CMRG")
    exp <- paste0("abc", sample.int(n = 1e9L, size = 1L))
    expect_false(anyNA(out$command))
    expect_equal(out$result[[1]], exp)
    expect_equal(out$error, NA_character_)
    expect_false(exists(x = ".crew_y", envir = globalenv()))
    # package task
    x$push(
      command = paste0(arg, "y"),
      data = list(arg = "x"),
      packages = "nanonext"
    )
    x$wait(seconds_timeout = 5)
    out <- x$pop()
    expect_equal(out$result[[1]], "xy")
  }
  # terminate
  handle <- x$launcher$instances$handle[[1]]
  x$terminate()
  expect_false(x$client$started)
  expect_false(x$started())
  crew_retry(
    ~!handle$is_alive(),
    seconds_interval = 0.1,
    seconds_timeout = 5
  )
})

crew_test("crew_controller_local() substitute = FALSE and quick push", {
  skip_on_cran()
  skip_on_os("windows")
  x <- crew_controller_local(
    seconds_idle = 360,
    r_arguments = c("--no-save", "--no-restore")
  )
  on.exit({
    x$terminate()
    rm(x)
    gc()
    crew_test_sleep()
  })
  expect_silent(x$validate())
  expect_false(x$client$started)
  x$start()
  expect_equal(x$summary()$tasks, 0L)
  expect_equal(x$summary()$error, 0L)
  expect_equal(x$summary()$warning, 0L)
  command <- quote(sqrt(4L) + sqrt(9L))
  # regular push
  x$push(command = command, substitute = FALSE, name = "substitute")
  x$wait(seconds_timeout = 10)
  # just the mirai task data
  out <- as.list(x$tasks)[[1L]]$data
  expect_equal(out$result[[1L]], 5L)
  expect_equal(out$name, "substitute")
  expect_true(is.numeric(out$seconds))
  expect_false(anyNA(out$seconds))
  expect_true(out$seconds >= 0)
  expect_true(anyNA(out$error))
  expect_true(anyNA(out$warnings))
  expect_true(anyNA(out$trace))
  # full pop
  out <- x$pop(scale = FALSE)
  expect_equal(out$result[[1]], 5L)
  expect_equal(out$name, "substitute")
  expect_true(is.numeric(out$seconds))
  expect_false(anyNA(out$seconds))
  expect_true(out$seconds >= 0)
  expect_true(anyNA(out$error))
  expect_true(anyNA(out$warnings))
  expect_true(anyNA(out$trace))
  # cleanup
  handle <- x$launcher$instances$handle[[1]]
  x$terminate()
  expect_false(x$client$started)
  crew_retry(
    ~!handle$is_alive(),
    seconds_interval = 0.1,
    seconds_timeout = 5
  )
})

crew_test("crew_controller_local() launch method", {
  skip_on_cran()
  skip_on_os("windows")
  x <- crew_controller_local(
    seconds_idle = 360
  )
  on.exit({
    x$terminate()
    rm(x)
    gc()
    crew_test_sleep()
  })
  x$start()
  x$launch(n = 1L)
  handle <- x$launcher$instances$handle[[1]]
  crew_retry(
    ~handle$is_alive(),
    seconds_interval = 0.1,
    seconds_timeout = 5
  )
  expect_true(handle$is_alive())
  x$terminate()
  crew_retry(
    ~!handle$is_alive(),
    seconds_interval = 0.1,
    seconds_timeout = 5
  )
  expect_false(handle$is_alive())
})

crew_test("exit status and code", {
  skip_on_cran()
  skip_on_os("windows")
  x <- crew_controller_local()
  on.exit({
    x$terminate()
    rm(x)
    gc()
    crew_test_sleep()
  })
  x$start()
  x$push(Sys.sleep(0.01), name = "short")
  x$wait(mode = "one")
  task <- x$pop()
  expect_equal(task$status, "success")
  expect_equal(task$code, 0L)
  x$push(stop("message"), name = "broken")
  x$wait(mode = "one")
  task <- x$pop()
  expect_equal(task$status, "error")
  expect_equal(task$code, -1L)
  x$push(Sys.sleep(10000), name = "long")
  x$cancel(names = "long")
  x$wait()
  task <- x$pop()
  expect_equal(task$status, "cancel")
  expect_equal(task$code, 20L)
  expect_equal(x$client$resolved(), 3L)
})

crew_test("crew_controller_local() resource usage metric logging", {
  skip_on_cran()
  skip_on_os("windows")
  skip_if_not_installed("autometric", minimum_version = "0.1.0")
  log <- tempfile()
  x <- crew_controller_local(
    seconds_idle = 360,
    options_metrics = crew_options_metrics(
      path = log,
      seconds_interval = 0.25
    )
  )
  on.exit({
    x$terminate()
    rm(x)
    gc()
    crew_test_sleep()
  })
  x$start()
  x$push(name = "monitored_task", Sys.sleep(2))
  x$wait(mode = "all", seconds_timeout = 30)
  x$terminate()
  expect_true(dir.exists(log))
  data <- autometric::log_read(log)
  expect_true(any("monitored_task" %in% data$phase))
  expect_true(is.data.frame(data))
  expect_gt(nrow(data), 0L)
  expect_equal(unique(data$status), 0L)
})

crew_test("crew_controller_local() resource usage metrics with stdout", {
  skip_on_cran()
  skip_on_os("windows")
  skip_if_not_installed("autometric", minimum_version = "0.1.0")
  log <- tempfile()
  x <- crew_controller_local(
    seconds_idle = 360,
    options_metrics = crew_options_metrics(
      path = "/dev/stdout",
      seconds_interval = 0.25
    ),
    options_local = crew_options_local(
      log_directory = log
    )
  )
  on.exit({
    x$terminate()
    rm(x)
    gc()
    crew_test_sleep()
  })
  x$start()
  x$push(Sys.sleep(2))
  x$wait(mode = "all", seconds_timeout = 30)
  x$terminate()
  expect_true(dir.exists(log))
  data <- autometric::log_read(log)
  expect_true(is.data.frame(data))
  expect_gt(nrow(data), 0L)
  expect_equal(unique(data$status), 0L)
})

crew_test("joined logs", {
  skip_on_cran()
  skip_on_covr()
  skip_on_os("windows")
  dir <- tempfile()
  x <- crew_controller_local(
    workers = 1L,
    seconds_idle = 60,
    options_local = crew_options_local(
      log_directory = dir,
      log_join = TRUE
    )
  )
  on.exit({
    x$terminate()
    rm(x)
    gc()
    crew_test_sleep()
    unlink(dir, recursive = TRUE)
  })
  x$start()
  x$push(print("this-print"))
  x$push(message("this-message"))
  x$push(warning("this-warning"))
  x$push(stop("this-stop"))
  x$wait(mode = "all")
  Sys.sleep(0.25)
  dir <- x$launcher$options_local$log_directory
  logs <- list.files(dir, full.names = TRUE)
  expect_equal(length(logs), 1L)
  lines <- readLines(logs)
  expect_true(any(grepl("this-print", lines, fixed = TRUE)))
  expect_true(any(grepl("this-message", lines, fixed = TRUE)))
  expect_true(any(grepl("Warning: this-warning", lines, fixed = TRUE)))
  expect_true(any(grepl("Error: this-stop", lines, fixed = TRUE)))
})

crew_test("separate logs", {
  skip_on_cran()
  skip_on_covr()
  skip_on_os("windows")
  dir <- tempfile()
  x <- crew_controller_local(
    workers = 1L,
    seconds_idle = 60,
    options_local = crew_options_local(
      log_directory = dir,
      log_join = FALSE
    )
  )
  on.exit({
    x$terminate()
    rm(x)
    gc()
    crew_test_sleep()
    unlink(dir, recursive = TRUE)
  })
  x$start()
  x$push(print("this-print"))
  x$push(message("this-message"))
  x$push(warning("this-warning"))
  x$push(stop("this-stop"))
  x$wait(mode = "all")
  Sys.sleep(0.25)
  logs <- list.files(dir, full.names = TRUE)
  expect_equal(length(logs), 2L)
  stderr <- readLines(logs[1L])
  stdout <- readLines(logs[2L])
  expect_true(any(grepl("this-print", stdout, fixed = TRUE)))
  expect_true(any(grepl("this-message", stderr, fixed = TRUE)))
  expect_true(any(grepl("Warning: this-warning", stderr, fixed = TRUE)))
  expect_true(any(grepl("Error: this-stop", stderr, fixed = TRUE)))
})

crew_test("deprecate seconds_exit", {
  expect_warning(
    x <- crew_controller_local(
      workers = 1L,
      seconds_idle = 360,
      seconds_exit = 1
    ),
    class = "crew_deprecate"
  )
})
wlandau/crew documentation built on Feb. 8, 2025, 10:12 a.m.