tests/testthat/test-req-perform-parallel.R

test_that("request and paths must match", {
  req <- request("http://example.com")
  expect_snapshot(req_perform_parallel(req, letters), error = TRUE)
})

test_that("can perform zero requests", {
  expect_equal(req_perform_parallel(list()), list())
})

test_that("can perform a single request", {
  reqs <- list(request_test("/get"))
  resps <- req_perform_parallel(reqs)
  expect_type(resps, "list")
  expect_length(resps, 1)
})

test_that("requests happen in parallel", {
  # test works best if webfakes has ample threads and keepalive
  reqs <- list2(
    request_test("/delay/:secs", secs = 0),
    request_test("/delay/:secs", secs = 0.25),
    request_test("/delay/:secs", secs = 0.25),
    request_test("/delay/:secs", secs = 0.25),
    request_test("/delay/:secs", secs = 0.25),
    request_test("/delay/:secs", secs = 0.25),
  )
  time <- system.time(req_perform_parallel(reqs))
  expect_lt(time[[3]], 1)
})

test_that("can perform >128 file uploads in parallel", {
  temp <- withr::local_tempfile(lines = letters)
  req <- request(example_url()) %>% req_body_file(temp)
  reqs <- rep(list(req), 130)

  expect_no_error(req_perform_parallel(reqs, on_error = "continue"))
})

test_that("can download files", {
  reqs <- list(request_test("/json"), request_test("/html"))
  paths <- c(withr::local_tempfile(), withr::local_tempfile())
  resps <- req_perform_parallel(reqs, paths)

  expect_equal(resps[[1]]$body, new_path(paths[[1]]))
  expect_equal(resps[[2]]$body, new_path(paths[[2]]))

  # And check that something was downloaded
  expect_gt(file.size(paths[[1]]), 0)
  expect_gt(file.size(paths[[2]]), 0)
})

test_that("can download 0 byte file", {
  reqs <- list(request_test("/bytes/0"))
  paths <- withr::local_tempfile()
  resps <- req_perform_parallel(reqs, paths = paths)

  expect_equal(file.size(paths[[1]]), 0)
})

test_that("objects are cached", {
  temp <- withr::local_tempdir()
  req <- request_test("etag/:etag", etag = "abcd") %>% req_cache(temp)

  expect_condition(
    resps1 <- req_perform_parallel(list(req)),
    class = "httr2_cache_save"
  )

  expect_condition(
    resps2 <- req_perform_parallel(list(req)),
    class = "httr2_cache_not_modified"
  )
})

test_that("immutable objects retrieved from cache", {
  req <- request("http://example.com") %>% req_cache(tempfile())
  resp <- response(200,
    headers = "Expires: Wed, 01 Jan 3000 00:00:00 GMT",
    body = charToRaw("abc")
  )
  cache_set(req, resp)

  expect_condition(
    resps <- req_perform_parallel(list(req)),
    class = "httr2_cache_cached"
  )
  expect_equal(resps[[1]], resp)
})

test_that("errors by default", {
  req <- request_test("/status/:status", status = 404)
  err <- expect_error(req_perform_parallel(list(req)))
  expect_s3_class(err, "httr2_http_404")

  # Wraps and forwards curl errors
  req <- request("INVALID")
  err <- expect_error(req_perform_parallel(list(req)))
  expect_s3_class(err, "httr2_failure")
  expect_s3_class(err$parent, "curl_error_couldnt_resolve_host")
})

test_that("both curl and HTTP errors become errors on continue", {
  reqs <- list2(
    request_test("/status/:status", status = 404),
    request("INVALID"),
  )
  out <- req_perform_parallel(reqs, on_error = "continue")
  expect_s3_class(out[[1]], "httr2_http_404")
  expect_s3_class(out[[2]], "httr2_failure")

  # and contain the responses
  expect_equal(out[[1]]$request, reqs[[1]])
  expect_equal(out[[2]]$request, reqs[[2]])
})

test_that("errors can cancel outstanding requests", {
  reqs <- list2(
    request_test("/status/:status", status = 404),
    request_test("/delay/:secs", secs = 1),
    request_test("/delay/:secs", secs = 1),
  )
  out <- req_perform_parallel(reqs, on_error = "return", max_active = 1)
  expect_s3_class(out[[1]], "httr2_http_404")
  # second request might succeed or fail depend on the timing, but the
  # third request should definitely fail
  expect_null(out[[3]])
})

test_that("req_perform_parallel resspects http_error() error override", {
  reqs <- list2(
    req_error(request_test("/status/:status", status = 404), is_error = ~FALSE),
    req_error(request_test("/status/:status", status = 500), is_error = ~FALSE)
  )
  resps <- req_perform_parallel(reqs)

  expect_equal(resp_status(resps[[1]]), 404)
  expect_equal(resp_status(resps[[2]]), 500)
})

test_that("req_perform_parallel respects http_error() body message", {
  reqs <- list2(
    req_error(request_test("/status/:status", status = 404), body = ~"hello")
  )
  expect_snapshot(req_perform_parallel(reqs), error = TRUE)
})

test_that("requests are throttled", {
  withr::defer(throttle_reset())

  mock_time <- 0
  local_mocked_bindings(
    unix_time = function() mock_time,
    Sys.sleep = function(seconds) mock_time <<- mock_time + seconds
  )

  req <- request_test("/status/:status", status = 200)
  req <- req %>% req_throttle(capacity = 1, fill_time_s = 1)
  reqs <- rep(list(req), 5)

  queue <- RequestQueue$new(reqs, progress = FALSE)
  queue$process()
  expect_equal(mock_time, 4)
})


# Tests of lower-level operation -----------------------------------------------

test_that("can retry an OAuth failure", {
  req <- local_app_request(function(req, res) {
    i <- res$app$locals$i %||% 1
    if (i == 1) {
      res$app$locals$i <- 2
      res$
        set_status(401)$
        set_header("WWW-Authenticate", 'Bearer realm="example", error="invalid_token"')$
        send_json(list(status = "failed"), auto_unbox = TRUE)
    } else {
      res$send_json(list(status = "done"), auto_unbox = TRUE)
    }
  })
  req <- req_policies(req, auth_oauth = TRUE)

  reset <- 0
  local_mocked_bindings(req_auth_clear_cache = function(...) reset <<- reset + 1)

  queue <- RequestQueue$new(list(req), progress = FALSE)
  queue$process()

  expect_equal(reset, 1)
  expect_equal(resp_body_json(queue$resps[[1]]), list(status = "done"))
})

test_that("but multiple failures causes an error", {
  req <- local_app_request(function(req, res) {
    res$
      set_status(401)$
      set_header("WWW-Authenticate", 'Bearer realm="example", error="invalid_token"')$
      send_json(list(status = "failed"), auto_unbox = TRUE)
  })
  req <- req_policies(req, auth_oauth = TRUE)

  queue <- RequestQueue$new(list(req), progress = FALSE)
  queue$process()
  expect_s3_class(queue$resps[[1]], "httr2_http_401")
})

test_that("can retry a transient error", {
  req <- local_app_request(function(req, res) {
    i <- res$app$locals$i %||% 1
    if (i == 1) {
      res$app$locals$i <- 2
      res$
        set_status(429)$
        set_header("retry-after", 2)$
        send_json(list(status = "waiting"), auto_unbox = TRUE)
    } else {
      res$send_json(list(status = "done"), auto_unbox = TRUE)
    }
  })
  req <- req_retry(req, max_tries = 2)

  mock_time <- 1
  local_mocked_bindings(
    unix_time = function() mock_time,
    Sys.sleep = function(seconds) mock_time <<- mock_time + seconds
  )

  queue <- RequestQueue$new(list(req), progress = FALSE)

  # submit the request
  expect_null(queue$process1())
  expect_equal(queue$queue_status, "working")
  expect_equal(queue$n_active, 1)
  expect_equal(queue$n_pending, 0)
  expect_equal(queue$status[[1]], "active")

  # process the response and capture the retry
  expect_null(queue$process1())
  expect_equal(queue$queue_status, "waiting")
  expect_equal(queue$rate_limit_deadline, mock_time + 2)
  expect_equal(queue$n_pending, 1)
  expect_s3_class(queue$resps[[1]], "httr2_http_429")
  expect_equal(resp_body_json(queue$resps[[1]]$resp), list(status = "waiting"))

  # Starting waiting
  expect_null(queue$process1())
  expect_equal(queue$queue_status, "waiting")
  expect_equal(mock_time, 3)

  # Finishing waiting
  expect_null(queue$process1())
  expect_equal(queue$queue_status, "working")
  expect_equal(queue$n_active, 0)
  expect_equal(queue$n_pending, 1)

  # Resubmit
  expect_null(queue$process1())
  expect_equal(queue$queue_status, "working")
  expect_equal(queue$n_active, 1)
  expect_equal(queue$n_pending, 0)

  # Process the response
  expect_null(queue$process1())
  expect_equal(queue$queue_status, "working")
  expect_equal(queue$n_active, 0)
  expect_equal(queue$n_pending, 0)
  expect_s3_class(queue$resps[[1]], "httr2_response")
  expect_equal(resp_body_json(queue$resps[[1]]), list(status = "done"))

  # So we're finally done
  expect_null(queue$process1())
  expect_equal(queue$queue_status, "done")
  expect_false(queue$process1())
})

test_that("throttling is limited by deadline", {
  withr::defer(throttle_reset("test"))

  mock_time <- 0
  local_mocked_bindings(
    unix_time = function() mock_time,
    Sys.sleep = function(seconds) mock_time <<- mock_time + seconds
  )

  req <- request_test("/status/:status", status = 200)
  req <- req_throttle(req, capacity = 1, fill_time_s = 1, realm = "test")
  queue <- RequestQueue$new(list(req), progress = FALSE)

  # Check time only advances by one second, and token is returned to bucket
  local_mocked_bindings(throttle_deadline = function(...) mock_time + 2)
  queue$process1(1)
  expect_equal(queue$queue_status, "waiting")
  queue$process1(1)
  expect_equal(mock_time, 1)
  expect_equal(the$throttle[["test"]]$tokens, 1)

  local_mocked_bindings(throttle_deadline = function(...) mock_time)
  queue$rate_limit_deadline <- mock_time + 2
  expect_equal(mock_time, 1)
  expect_equal(the$throttle[["test"]]$tokens, 1)
})

# Pool helpers ----------------------------------------------------------------

test_that("wait for deadline waits after pool complete", {
  pool <- curl::new_pool()
  deadline <- unix_time() + 1

  slept <- 0
  local_mocked_bindings(
    unix_time = function() 0,
    Sys.sleep = function(seconds) mock_time <<- slept <<- seconds
  )

  expect_true(pool_wait_for_deadline(pool, deadline = 1))
  expect_equal(slept, 1)
})

# Deprecations ----------------------------------------------------------------

test_that("multi_req_perform is deprecated", {
  expect_snapshot(multi_req_perform(list()))
})

test_that("pool argument is deprecated", {
  expect_snapshot(. <- req_perform_parallel(list(), pool = curl::new_pool()))
})
r-lib/httr2 documentation built on Feb. 28, 2025, 7:51 a.m.