test_that("nested event loops", {
## Create a function that finishes while its event loop is inactive
sleeper <- function(x) { Sys.sleep(x); Sys.getpid() }
afun1 <- async(function(x) { x; call_function(sleeper, args = list(x)) })
afun2 <- async(function(x1, x2) {
x1; x2
p1 <- afun1(x1)
p2 <- delay(0)$then(function() {
synchronise(afun1(x2))
})
when_all(p1, p2)
})
res <- synchronise(afun2(1, 2))
expect_equal(length(res), 2)
expect_true(res[[1]]$result %in% async_env$worker_pool$list_workers()$pid)
expect_true(res[[2]]$result %in% async_env$worker_pool$list_workers()$pid)
})
test_that("successful call", {
afun <- async(function(x) {
call_function(function() 100)$
then(function(x) x$result)
})
res <- synchronise(afun())
expect_identical(res, 100)
})
test_that("successful calls", {
afun <- async(function(x) {
when_all(
call_function(function() Sys.getpid()),
call_function(function() Sys.getpid()),
call_function(function() Sys.getpid()),
call_function(function() Sys.getpid())
)
})
res <- synchronise(afun())
expect_true(is.integer(viapply(res, "[[", "result")))
})
test_that("calls that error", {
skip_without_package("processx", "3.4.1.9001")
afun <- async(function(x) {
when_all(
call_function(function() Sys.getpid()),
call_function(function() Sys.getpid()),
call_function(function() Sys.getpid()),
call_function(function() stop("nope"))
)
})
expect_error(synchronise(afun()), "nope", class = "error")
})
test_that("calls that crash", {
skip_without_package("processx", "3.4.1.9001")
afun <- async(function(x) {
when_all(
call_function(function() Sys.getpid()),
call_function(function() Sys.getpid()),
call_function(function() Sys.getpid()),
call_function(function() asNamespace("callr")$crash())
)
})
## Either might happen here
err <- tryCatch(synchronise(afun()), error = function(x) x)
expect_true(
grepl("R session crashed with exit code", err$message) ||
grepl("R session closed the process connection", err$message))
afun <- async(function(x) {
when_all(
call_function(function() asNamespace("callr")$crash()),
call_function(function() asNamespace("callr")$crash()),
call_function(function() asNamespace("callr")$crash()),
call_function(function() asNamespace("callr")$crash())
)
})
## Either might happen here
err <- tryCatch(synchronise(afun()), error = function(x) x)
expect_true(
grepl("R session crashed with exit code", err$message) ||
grepl("R session closed the process connection", err$message))
})
test_that("handling call errors", {
skip_without_package("processx", "3.4.1.9001")
skip_without_package("callr", "3.7.1")
worker_pid <- async(function() {
call_function(function() Sys.getpid())$then(function(x) x$result)
})
afun <- async(function(x) {
when_all(
worker_pid(),
worker_pid(),
worker_pid(),
call_function(function() stop("nope"))$
catch(error = function(e) e)
)
})
res <- synchronise(afun())
expect_true(is_count(res[[1]]))
expect_true(is_count(res[[2]]))
expect_true(is_count(res[[3]]))
expect_s3_class(res[[4]], "async_rejected")
expect_match(res[[4]]$message, "in callr subprocess")
})
test_that("mix calls with others", {
skip_on_cran()
px <- asNamespace("processx")$get_tool("px")
afun <- async(function() {
when_all(
delay = delay(1/1000)$
then(function() 1),
http = http_get(http$url("/status/418"))$
then(function(x) x$status_code),
process = run_process(px, c("outln", "foobar"))$
then(function(x) str_trim(x$stdout)),
r_process = run_r_process(function() 2)$
then(function(x) x$result),
call = call_function(function() 3)$
then(function(x) x$result)
)
})
res <- synchronise(afun())
expect_equal(
res,
list(delay = 1,
http = 418,
process = "foobar",
r_process = 2,
call = 3)
)
})
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.