tests/testthat/test-cancel-early.R

test_that("auto-cancellation", {

  httpx <- NULL
  idx <- 0

  do <- async(function() {
    req_done <- 0L

    response_time <- async(function(url) {
      idx <<- idx + 1
      httpx[[idx]] <<- http_head(url)
      httpx[[idx]]$
        then(function(x) { req_done <<- req_done + 1L ; x })$
        then(http_stop_for_status)$
        then(function() setNames(.[["times"]][["total"]], url))$
        catch(error = function(.) setNames(Inf, url))
    })

    urls <- http$url(c("/delay/5", "/get"))
    reqs <- lapply(urls, response_time)
    when_any(.list = reqs)
  })

  tic <- Sys.time()
  err <- tryCatch(synchronise(do()), error = identity)
  toc <- Sys.time()
  expect_true(toc - tic < as.difftime(2, units = "secs"))
  expect_true(get_private(httpx[[1]])$cancelled)
})

test_that("detect, if one is done", {
  pinged <- 0L
  dx <- list()
  do <- function() {
    f <- function(n) {
      force(n)
      nx <- delay(n)$
        then(function() pinged <<- pinged + 1)$
        then(function() TRUE)
      dx <<- c(dx, list(nx))
      nx
    }
    async_detect(c(1/1000, 5), f)
  }

  tic <- Sys.time()
  synchronise(do())
  toc <- Sys.time()

  expect_equal(pinged, 1L)
  expect_true(get_private(dx[[2]])$cancelled)
  expect_true(toc - tic < as.difftime(4.5, units = "secs"))
})

test_that("detect, if one errors",  {
  pinged <- 0L
  dx <- list()
  do <- function() {
    f <- function(n) {
      force(n)
      nx <- delay(n)$
        then(function() pinged <<- pinged + 1)$
        then(function() stop("foobar"))
      dx <<- c(dx, list(nx))
      nx
    }
    async_detect(c(1/1000, 5), f)
  }

  tic <- Sys.time()
  err <- tryCatch(synchronise(do()), error = identity)
  toc <- Sys.time()

  expect_s3_class(err, "async_rejected")
  expect_true(get_private(dx[[2]])$cancelled)
  expect_equal(conditionMessage(err), "foobar")
  expect_equal(pinged, 1L)
  expect_true(toc - tic < as.difftime(4.5, units = "secs"))
})

test_that("every, if one is FALSE", {
  pinged <- 0L
  dx <-  list()
  do <- function() {
    f <- function(n) {
      force(n)
      nx <- delay(n)$
        then(function() pinged <<- pinged + 1)$
        then(function() FALSE)
      dx <<- c(dx, list(nx))
      nx
    }
    async_every(c(1/1000, 5), f)
  }

  tic <- Sys.time()
  synchronise(do())
  toc <- Sys.time()
  expect_true(get_private(dx[[2]])$cancelled)
  expect_equal(pinged, 1L)
  expect_true(toc - tic < as.difftime(4.5, units = "secs"))
})

test_that("every, if one errors", {
  pinged <- 0L
  dx <- list()
  do <- function() {
    f <- function(n) {
      force(n)
      nx <- delay(n)$
        then(function() pinged <<- pinged + 1)$
        then(function() stop("foobar"))
      dx <<- c(dx, list(nx))
      nx
    }
    async_every(c(1/1000, 5), f)
  }

  tic <- Sys.time()
  err <- tryCatch(synchronise(do()), error = identity)
  toc <- Sys.time()

  expect_s3_class(err, "async_rejected")
  expect_true(get_private(dx[[2]])$cancelled)
  expect_equal(conditionMessage(err), "foobar")
  expect_equal(pinged, 1L)
  expect_true(toc - tic < as.difftime(4.5, units = "secs"))
})

test_that("filter, if one errors", {
  pinged <- 0L
  dx <- list()
  do <- function() {
    f <- function(n) {
      force(n)
      nx <- delay(n)$
        then(function() pinged <<- pinged + 1)$
        then(function() stop("foobar"))
      dx <<- c(dx, list(nx))
      nx
    }
    async_filter(c(1/1000, 5), f)
  }

  tic <- Sys.time()
  err <- tryCatch(synchronise(do()), error = identity)
  toc <- Sys.time()

  expect_s3_class(err, "async_rejected")
  expect_true(get_private(dx[[2]])$cancelled)
  expect_equal(conditionMessage(err), "foobar")
  expect_equal(pinged, 1L)
  expect_true(toc - tic < as.difftime(4.5, units = "secs"))
})

test_that("map, if one errors", {
  pinged <- 0L
  dx <- list()
  do <- function() {
    f <- function(n) {
      force(n)
      nx <- delay(n)$
        then(function() pinged <<- pinged + 1)$
        then(function() stop("foobar"))
      dx <<- c(dx, list(nx))
      nx
    }
    async_map(c(1/1000, 5), f)
  }

  tic <- Sys.time()
  err <- tryCatch(synchronise(do()), error = identity)
  toc <- Sys.time()

  expect_s3_class(err, "async_rejected")
  expect_true(get_private(dx[[2]])$cancelled)
  expect_equal(conditionMessage(err), "foobar")
  expect_equal(pinged, 1L)
  expect_true(toc - tic < as.difftime(4.5, units = "secs"))
})

test_that("some, if one is TRUE", {
  pinged <- 0L
  dx <- list()
  do <- function() {
    f <- function(n) {
      force(n)
      nx <- delay(n)$
        then(function() pinged <<- pinged + 1)$
        then(function() TRUE)
      dx <<- c(dx, list(nx))
      nx
    }
    async_some(c(1/1000, 5), f)
  }

  tic <- Sys.time()
  synchronise(do())
  toc <- Sys.time()
  expect_equal(pinged, 1L)
  expect_true(get_private(dx[[2]])$cancelled)
  expect_true(toc - tic < as.difftime(4.5, units = "secs"))
})

test_that("every, if one errors", {
  pinged <- 0L
  dx <- list()
  do <- function() {
    f <- function(n) {
      force(n)
      nx <- delay(n)$
        then(function() pinged <<- pinged + 1)$
        then(function() stop("foobar"))
      dx <<- c(dx, list(nx))
      nx
    }
    async_some(c(1/1000, 5), f)
  }

  tic <- Sys.time()
  err <- tryCatch(synchronise(do()), error = identity)
  toc <- Sys.time()

  expect_s3_class(err, "async_rejected")
  expect_true(get_private(dx[[2]])$cancelled)
  expect_equal(conditionMessage(err), "foobar")
  expect_equal(pinged, 1L)
  expect_true(toc - tic < as.difftime(4.5, units = "secs"))
})

test_that("when_all, if one errors", {
  pinged <- 0L
  dx <- list()
  do <- function() {
    f <- function(n) {
      force(n)
      nx <- delay(n)$
        then(function() pinged <<- pinged + 1)$
        then(function() stop("foobar"))
      dx <<- c(dx, list(nx))
      nx
    }
    defs <- lapply(c(1/1000, 5, 5), f)
    when_all(.list = defs)
  }

  tic <- Sys.time()
  err <- tryCatch(synchronise(do()), error = identity)
  toc <- Sys.time()

  expect_s3_class(err, "async_rejected")
  expect_true(get_private(dx[[2]])$cancelled)
  expect_true(get_private(dx[[3]])$cancelled)
  expect_equal(conditionMessage(err), "foobar")
  expect_equal(pinged, 1L)
  expect_true(toc - tic < as.difftime(4.5, units = "secs"))
})

test_that("when_some, if enough are done", {
  pinged <- 0L
  dx <- list()
  do <- function() {
    f <- function(n) {
      force(n)
      nx <- delay(n)$
        then(function() pinged <<- pinged + 1)$
        then(function() "yep")
      dx <<- c(dx, list(nx))
      nx
    }
    defs <- lapply(c(1/1000, 5, 1/1000, 5), f)
    when_some(2, .list = defs)
  }

  tic <- Sys.time()
  res <- synchronise(do())
  toc <- Sys.time()

  expect_equal(res, list("yep", "yep"))
  expect_true(get_private(dx[[2]])$cancelled)
  expect_true(get_private(dx[[4]])$cancelled)
  expect_equal(pinged, 2L)
  expect_true(toc - tic < as.difftime(4.5, units = "secs"))
})

test_that("when_some, if some error", {
  pinged <- 0L
  dx <- list()
  do <- function() {
    f <- function(n) {
      force(n)
      nx <- delay(n)$
        then(function() pinged <<- pinged + 1)$
        then(function() stop("foobar"))
      dx <<- c(dx, list(nx))
      nx
    }
    defs <- lapply(c(5, 1/1000, 1/1000, 5), f)
    when_some(3, .list = defs)
  }

  tic <- Sys.time()
  err <- tryCatch(synchronise(do()), error = identity)
  toc <- Sys.time()

  expect_s3_class(err, "async_rejected")
  expect_true(get_private(dx[[1]])$cancelled)
  expect_true(get_private(dx[[4]])$cancelled)
  expect_equal(conditionMessage(err), "when_some / when_any failed")
  expect_equal(length(err$errors), 2)
  expect_equal(conditionMessage(err$errors[[1]]), "foobar")
  expect_equal(conditionMessage(err$errors[[2]]), "foobar")
  expect_equal(pinged, 2L)
  expect_true(toc - tic < as.difftime(4.5, units = "secs"))
})

test_that("when_any, if one is done", {
  pinged <- 0L
  dx <- list()
  do <- function() {
    f <- function(n) {
      force(n)
      nx <- delay(n)$
        then(function() pinged <<- pinged + 1)$
        then(function() "yep")
      dx <<- c(dx, list(nx))
      nx
    }
    defs <- lapply(c(5, 5, 1/1000, 5), f)
    when_any(.list = defs)
  }

  tic <- Sys.time()
  res <- synchronise(do())
  toc <- Sys.time()

  expect_equal(res, "yep")
  expect_true(get_private(dx[[1]])$cancelled)
  expect_true(get_private(dx[[2]])$cancelled)
  expect_true(get_private(dx[[4]])$cancelled)
  expect_equal(pinged, 1L)
  expect_true(toc - tic < as.difftime(4.5, units = "secs"))
})
gaborcsardi/async documentation built on Nov. 20, 2024, 4:51 p.m.