tests/async/test-call-function.R

test_that("nested event loops", {
  ## Create a function that finishes while its event loop is inactive
  sleeper <- function(x) {
    Sys.sleep(x)
    Sys.getpid()
  }
  afun1 <- async(function(x) {
    x
    call_function(sleeper, args = list(x))
  })
  afun2 <- async(function(x1, x2) {
    x1
    x2
    p1 <- afun1(x1)
    p2 <- delay(0)$then(function() {
      synchronise(afun1(x2))
    })
    when_all(p1, p2)
  })

  res <- synchronise(afun2(1, 2))
  expect_equal(length(res), 2)
  expect_true(res[[1]]$result %in% async_env$worker_pool$list_workers()$pid)
  expect_true(res[[2]]$result %in% async_env$worker_pool$list_workers()$pid)
})

test_that("successful call", {
  afun <- async(function(x) {
    call_function(function() 100)$then(function(x) x$result)
  })
  res <- synchronise(afun())
  expect_identical(res, 100)
})

test_that("successful calls", {
  afun <- async(function(x) {
    when_all(
      call_function(function() Sys.getpid()),
      call_function(function() Sys.getpid()),
      call_function(function() Sys.getpid()),
      call_function(function() Sys.getpid())
    )
  })

  res <- synchronise(afun())
  expect_true(is.integer(viapply(res, "[[", "result")))
})

test_that("calls that error", {
  skip_without_package("processx", "3.4.1.9001")
  testthat::local_edition(3)
  afun <- async(function(x) {
    when_all(
      call_function(function() Sys.getpid()),
      call_function(function() Sys.getpid()),
      call_function(function() Sys.getpid()),
      call_function(function() stop("nope"))
    )
  })

  expect_snapshot(error = TRUE, synchronise(afun()))
})

test_that("calls that crash", {
  skip_without_package("processx", "3.4.1.9001")
  skip_without_package("callr", "3.7.6.9000")
  afun <- async(function(x) {
    when_all(
      call_function(function() Sys.getpid()),
      call_function(function() Sys.getpid()),
      call_function(function() Sys.getpid()),
      call_function(function() asNamespace("callr")$crash())
    )
  })

  ## Either might happen here
  err <- tryCatch(synchronise(afun()), error = function(x) x)
  expect_true(
    grepl("R session crashed with exit code", err$message) ||
      grepl("R session closed the process connection", err$message)
  )

  afun <- async(function(x) {
    when_all(
      call_function(function() asNamespace("callr")$crash()),
      call_function(function() asNamespace("callr")$crash()),
      call_function(function() asNamespace("callr")$crash()),
      call_function(function() asNamespace("callr")$crash())
    )
  })

  ## Either might happen here
  err <- tryCatch(synchronise(afun()), error = function(x) x)
  expect_true(
    grepl("R session crashed with exit code", err$message) ||
      grepl("R session closed the process connection", err$message)
  )
})

test_that("handling call errors", {
  skip_without_package("processx", "3.4.1.9001")
  skip_without_package("callr", "3.7.1")

  worker_pid <- async(function() {
    call_function(function() Sys.getpid())$then(function(x) x$result)
  })

  afun <- async(function(x) {
    when_all(
      worker_pid(),
      worker_pid(),
      worker_pid(),
      call_function(function() stop("nope"))$catch(error = function(e) e)
    )
  })

  res <- synchronise(afun())
  expect_true(is_count(res[[1]]))
  expect_true(is_count(res[[2]]))
  expect_true(is_count(res[[3]]))
  expect_s3_class(res[[4]], "async_rejected")
  expect_match(res[[4]]$message, "in callr subprocess")
})

test_that("mix calls with others", {
  skip_on_cran()

  px <- asNamespace("processx")$get_tool("px")

  afun <- async(function() {
    when_all(
      delay = delay(1 / 1000)$then(function() 1),
      http = http_get(http$url("/status/418"))$then(function(x) x$status_code),
      process = run_process(px, c("outln", "foobar"))$then(
        function(x) str_trim(x$stdout)
      ),
      r_process = run_r_process(function() 2)$then(function(x) x$result),
      call = call_function(function() 3)$then(function(x) x$result)
    )
  })

  res <- synchronise(afun())

  expect_equal(
    res,
    list(delay = 1, http = 418, process = "foobar", r_process = 2, call = 3)
  )
})

Try the pkgcache package in your browser

Any scripts or data that you put into this service are public.

pkgcache documentation built on June 8, 2025, 10:49 a.m.