tests/testthat/test-call-function.R

test_that("nested event loops", {
  ## Create a function that finishes while its event loop is inactive
  sleeper <- function(x) { Sys.sleep(x); Sys.getpid() }
  afun1 <- async(function(x) { x; call_function(sleeper, args = list(x)) })
  afun2 <- async(function(x1, x2) {
    x1; x2
    p1 <- afun1(x1)
    p2 <- delay(0)$then(function() {
      synchronise(afun1(x2))
    })
    when_all(p1, p2)
  })

  res <- synchronise(afun2(1, 2))
  expect_equal(length(res), 2)
  expect_true(res[[1]]$result %in% async_env$worker_pool$list_workers()$pid)
  expect_true(res[[2]]$result %in% async_env$worker_pool$list_workers()$pid)
})

test_that("successful call", {
  afun <- async(function(x) {
    call_function(function() 100)$
      then(function(x) x$result)
  })
  res <- synchronise(afun())
  expect_identical(res, 100)
})

test_that("successful calls", {
  afun <- async(function(x) {
    when_all(
      call_function(function() Sys.getpid()),
      call_function(function() Sys.getpid()),
      call_function(function() Sys.getpid()),
      call_function(function() Sys.getpid())
    )
  })

  res <- synchronise(afun())
  expect_true(is.integer(viapply(res, "[[", "result")))
})

test_that("calls that error", {
  skip_without_package("processx", "3.4.1.9001")
  afun <- async(function(x) {
    when_all(
      call_function(function() Sys.getpid()),
      call_function(function() Sys.getpid()),
      call_function(function() Sys.getpid()),
      call_function(function() stop("nope"))
    )
  })

  expect_error(synchronise(afun()), "nope", class = "error")
})

test_that("calls that crash", {
  skip_without_package("processx", "3.4.1.9001")
  afun <- async(function(x) {
    when_all(
      call_function(function() Sys.getpid()),
      call_function(function() Sys.getpid()),
      call_function(function() Sys.getpid()),
      call_function(function() asNamespace("callr")$crash())
    )
  })

  ## Either might happen here
  err <- tryCatch(synchronise(afun()), error = function(x) x)
  expect_true(
    grepl("R session crashed with exit code", err$message) ||
    grepl("R session closed the process connection", err$message))

  afun <- async(function(x) {
    when_all(
      call_function(function() asNamespace("callr")$crash()),
      call_function(function() asNamespace("callr")$crash()),
      call_function(function() asNamespace("callr")$crash()),
      call_function(function() asNamespace("callr")$crash())
    )
  })

  ## Either might happen here
  err <- tryCatch(synchronise(afun()), error = function(x) x)
  expect_true(
    grepl("R session crashed with exit code", err$message) ||
    grepl("R session closed the process connection", err$message))
})

test_that("handling call errors", {
  skip_without_package("processx", "3.4.1.9001")
  skip_without_package("callr", "3.7.1")

  worker_pid <- async(function() {
    call_function(function() Sys.getpid())$then(function(x) x$result)
  })

  afun <- async(function(x) {
    when_all(
      worker_pid(),
      worker_pid(),
      worker_pid(),
      call_function(function() stop("nope"))$
        catch(error = function(e) e)
    )
  })

  res <- synchronise(afun())
  expect_true(is_count(res[[1]]))
  expect_true(is_count(res[[2]]))
  expect_true(is_count(res[[3]]))
  expect_s3_class(res[[4]], "async_rejected")
  expect_match(res[[4]]$message, "in callr subprocess")
})

test_that("mix calls with others", {

  skip_on_cran()

  px <- asNamespace("processx")$get_tool("px")

  afun <- async(function() {
    when_all(
      delay = delay(1/1000)$
        then(function() 1),
      http = http_get(http$url("/status/418"))$
        then(function(x) x$status_code),
      process = run_process(px, c("outln", "foobar"))$
        then(function(x) str_trim(x$stdout)),
      r_process = run_r_process(function() 2)$
        then(function(x) x$result),
      call = call_function(function() 3)$
        then(function(x) x$result)
    )
  })

  res <- synchronise(afun())

  expect_equal(
    res,
    list(delay = 1,
         http = 418,
         process = "foobar",
         r_process = 2,
         call = 3)
  )
})
gaborcsardi/async documentation built on Nov. 20, 2024, 4:51 p.m.