R/req-perform-parallel.R

Defines functions multi_req_perform pool_wait pool_wait_for_deadline pool_wait_for_one req_perform_parallel

Documented in multi_req_perform req_perform_parallel

#' Perform a list of requests in parallel
#'
#' @description
#' This variation on [req_perform_sequential()] performs multiple requests in
#' parallel. Never use it without [req_throttle()]; otherwise it's too easy to
#' pummel a server with a very large number of simultaneous requests.
#'
#' ## Limitations
#'
#' The main limitation of `req_perform_parallel()` is that it assumes applies
#' [req_throttle()] and [req_retry()] are across all requests. This means,
#' for example, that if request 1 is throttled, but request 2 is not,
#' `req_perform_parallel()` will wait for request 1 before performing request 2.
#' This makes it most suitable for performing many parallel requests to the same
#' host, rather than a mix of different hosts. It's probably possible to remove
#' these limitation, but it's enough work that I'm unlikely to do it unless
#' I know that people would fine it useful: so please let me know!
#'
#' @inherit req_perform_sequential params return
#' @param pool `r lifecycle::badge("deprecated")`. No longer supported;
#'   to control the maximum number of concurrent requests, set `max_active`.
#' @param max_active Maximum number of concurrent requests.
#' @export
#' @examples
#' # Requesting these 4 pages one at a time would take 2 seconds:
#' request_base <- request(example_url()) |>
#'   req_throttle(capacity = 100, fill_time_s = 60)
#' reqs <- list(
#'   request_base |> req_url_path("/delay/0.5"),
#'   request_base |> req_url_path("/delay/0.5"),
#'   request_base |> req_url_path("/delay/0.5"),
#'   request_base |> req_url_path("/delay/0.5")
#' )
#' # But it's much faster if you request in parallel
#' system.time(resps <- req_perform_parallel(reqs))
#'
#' # req_perform_parallel() will fail on error
#' reqs <- list(
#'   request_base |> req_url_path("/status/200"),
#'   request_base |> req_url_path("/status/400"),
#'   request("FAILURE")
#' )
#' try(resps <- req_perform_parallel(reqs))
#'
#' # but can use on_error to capture all successful results
#' resps <- req_perform_parallel(reqs, on_error = "continue")
#'
#' # Inspect the successful responses
#' resps |> resps_successes()
#'
#' # And the failed responses
#' resps |> resps_failures() |> resps_requests()
req_perform_parallel <- function(
  reqs,
  paths = NULL,
  pool = deprecated(),
  on_error = c("stop", "return", "continue"),
  progress = TRUE,
  max_active = 10
) {
  check_paths(paths, reqs)
  if (lifecycle::is_present(pool)) {
    lifecycle::deprecate_warn(
      when = "1.1.0",
      what = "req_perform_parallel(pool)"
    )
  }
  on_error <- arg_match(on_error)
  check_number_whole(max_active, min = 1)

  queue <- RequestQueue$new(
    reqs = reqs,
    paths = paths,
    max_active = max_active,
    on_error = on_error,
    progress = progress,
    error_call = environment()
  )

  tryCatch(
    queue$process(),
    interrupt = function(cnd) {
      queue$queue_status <- "errored"
      queue$process()

      n <- sum(!map_lgl(queue$resps, is.null))
      cli::cli_alert_warning("Terminating iteration; returning {n} response{?s}.")
    }
  )

  if (on_error == "stop") {
    errors <- keep(queue$resps, is_error)
    if (length(errors) > 0) {
      cnd_signal(errors[[1]])
    }
  }

  queue$resps
}

RequestQueue <- R6::R6Class(
  "RequestQueue",
  public = list(
    pool = NULL,
    rate_limit_deadline = 0,
    max_active = NULL,

    # Overall status for the queue
    queue_status = NULL,
    deadline = Inf,
    n_pending = 0,
    n_active = 0,
    n_complete = 0,
    on_error = "stop",
    progress = NULL,

    # Vectorised along reqs
    reqs = list(),
    pooled_reqs = list(),
    resps = list(),
    status = character(),
    tries = integer(),

    # Requests that have failed due to OAuth expiration; used to ensure that we
    # don't retry repeatedly, but still allow all active requests to retry one
    oauth_failed = integer(),

    initialize = function(
      reqs,
      paths = NULL,
      max_active = 10,
      on_error = "stop",
      progress = FALSE,
      error_call = caller_env()
    ) {
      n <- length(reqs)

      if (isTRUE(progress)) {
        self$progress <- cli::cli_progress_bar(
          total = n,
          format = paste0(
            "{self$n_pending} -> {self$n_active} -> {self$n_complete} | ",
            "{cli::pb_bar} {cli::pb_percent} | ETA: {cli::pb_eta}"
          ),
          .envir = error_call
        )
      }

      # goal is for pool to not do any queueing; i.e. the curl pool will
      # only ever contain requests that we actually want to process. Any
      # throttling is done by `req_throttle()`
      self$max_active <- max_active
      self$pool <- curl::new_pool(
        total_con = 100,
        host_con = 100,
        max_streams = 100
      )
      self$on_error <- on_error

      self$queue_status <- "working"
      self$n_pending <- n
      self$n_active <- 0
      self$n_complete <- 0

      self$reqs <- reqs
      self$pooled_reqs <- map(seq_along(reqs), function(i) {
        pooled_request(
          req = reqs[[i]],
          path = paths[[i]],
          on_success = function(resp) self$done_success(i, resp),
          on_failure = function(error) self$done_failure(i, error),
          on_error = function(error) self$done_error(i, error),
          error_call = error_call
        )
      })
      self$resps <- vector("list", n)
      self$status <- rep("pending", n)
      self$tries <- rep(0L, n)
    },

    process = function(timeout = Inf) {
      deadline <- unix_time() + timeout

      while (unix_time() <= deadline) {
        out <- self$process1(deadline)
        if (!is.null(out)) {
          return(out)
        }
      }

      TRUE
    },

    # Exposed for testing, so we can manaully work through one step at a time
    process1 = function(deadline = Inf) {
      if (self$queue_status == "done") {
        FALSE
      } else if (self$queue_status == "working") {
        if (self$n_pending == 0) {
          self$queue_status <- "finishing"
        } else if (self$n_active < self$max_active) {
          self$submit_next(deadline)
        } else {
          pool_wait_for_one(self$pool, deadline)
        }
        NULL
      } else if (self$queue_status == "finishing") {
        pool_wait_for_one(self$pool, deadline)

        if (self$n_pending > 0) {
          # we had to retry
          self$queue_status <- "working"
        } else if (self$n_active > 0) {
          # keep going
          self$queue_status <- "finishing"
        } else {
          self$queue_status <- "done"
        }
        NULL
      } else if (self$queue_status == "errored") {
        # Finish out any active request but don't add any more
        pool_wait_for_one(self$pool, deadline)
        self$queue_status <- if (self$n_active > 0) "errored" else "done"
        NULL
      }
    },

    submit_next = function(deadline) {
      next_i <- which(self$status == "pending")[[1]]

      # Need to wait for a token from the bucket AND for any rate limits.
      # The ordering is important here because requests will complete
      # while we wait and that might change the rate_limit_deadline
      token_deadline <- throttle_deadline(self$reqs[[next_i]])
      pool_wait_for_deadline(self$pool, min(token_deadline, deadline))
      if (token_deadline >= deadline) {
        throttle_return_token(self$reqs[[next_i]])
        return()
      }

      while (unix_time() < self$rate_limit_deadline) {
        pool_wait_for_deadline(self$pool, min(self$rate_limit_deadline, deadline))
        if (self$rate_limit_deadline >= deadline) {
          throttle_return_token(self$reqs[[next_i]])
          return()
        }
      }
      self$submit(next_i)
    },

    submit = function(i) {
      retry_check_breaker(self$reqs[[i]], self$tries[[i]], error_call = error_call)

      self$set_status(i, "active")
      self$resps[i] <- list(NULL)
      self$tries[[i]] <- self$tries[[i]] + 1

      self$pooled_reqs[[i]]$submit(self$pool)
    },

    done_success = function(i, resp) {
      self$set_status(i, "complete")
      self$resps[[i]] <- resp

      self$oauth_failed <- NULL
    },

    done_error = function(i, error) {
      self$resps[[i]] <- error
      self$set_status(i, "complete")
      if (self$on_error != "continue") {
        self$queue_status <- "errored"
      }
    },

    done_failure = function(i, error) {
      req <- self$reqs[[i]]
      resp <- error$resp
      self$resps[[i]] <- error
      tries <- self$tries[[i]]

      if (retry_is_transient(req, resp) && self$can_retry(i)) {
        # Do we need to somehow expose this to the user? Because if they're
        # hitting it a bunch, it's a sign that the throttling is too low
        delay <- retry_after(req, resp, tries)
        self$rate_limit_deadline <- unix_time() + delay
        self$set_status(i, "pending")
      } else if (resp_is_invalid_oauth_token(req, resp) && self$can_reauth(i)) {
        # This isn't quite right, because if there are (e.g.) four requests in
        # the queue and the first one fails, we'll clear the cache for all four,
        # causing a token refresh more often than necessary. This shouldn't
        # affect correctness, but it does make it slower than necessary.
        self$oauth_failed <- c(self$oauth_failed, i)
        req_auth_clear_cache(self$reqs[[i]])
        self$set_status(i, "pending")
      } else {
        self$set_status(i, "complete")
        if (self$on_error != "continue") {
          self$queue_status <- "errored"
        }
      }
    },

    set_status = function(i, status) {
      switch( # old status
        self$status[[i]],
        pending = self$n_pending <- self$n_pending - 1,
        active = self$n_active <- self$n_active - 1
      )
      switch( # new status
        status,
        pending = self$n_pending <- self$n_pending + 1,
        active = self$n_active <- self$n_active + 1,
        complete = self$n_complete <- self$n_complete + 1
      )

      self$status[[i]] <- status

      if (!is.null(self$progress)) {
        cli::cli_progress_update(id = self$progress, set = self$n_complete)
      }
    },

    can_retry = function(i) {
      self$tries[[i]] < retry_max_tries(self$reqs[[i]])
    },
    can_reauth = function(i) {
      !i %in% self$oauth_failed
    }
  )
)

pool_wait_for_one <- function(pool, deadline) {
  timeout <- deadline - unix_time()
  pool_wait(pool, poll = TRUE, timeout = timeout)
}

pool_wait_for_deadline <- function(pool, deadline) {
  now <- unix_time()
  timeout <- deadline - now
  if (timeout <= 0) {
    return(TRUE)
  }

  complete <- pool_wait(pool, poll = FALSE, timeout = timeout)

  # pool might finish early; we still want to wait out the full time
  remaining <- timeout - (unix_time() - now)
  if (remaining > 0) {
    Sys.sleep(remaining)
  }

  complete
}

pool_wait <- function(pool, poll, timeout) {
  signal("", class = "httr2_pool_wait", timeout = timeout)
  done <- curl::multi_run(pool = pool, poll = poll, timeout = timeout)
  (done$success + done$error) > 0 || done$pending == 0
}


#' Perform a list of requests in parallel
#'
#' @description
#' `r lifecycle::badge("deprecated")`
#'
#' Please use [req_perform_parallel()] instead, and note:
#'
#' * `cancel_on_error = FALSE` is now `on_error = "continue"`
#' * `cancel_on_error = TRUE` is now `on_error = "return"`
#'
#' @export
#' @param cancel_on_error Should all pending requests be cancelled when you
#'   hit an error? Set this to `TRUE` to stop all requests as soon as you
#'   hit an error. Responses that were never performed be `NULL` in the result.
#' @inheritParams req_perform_parallel
#' @keywords internal
multi_req_perform <- function(
  reqs,
  paths = NULL,
  pool = deprecated(),
  cancel_on_error = FALSE
) {
  lifecycle::deprecate_warn(
    "1.0.0",
    "multi_req_perform()",
    "req_perform_parallel()"
  )
  check_bool(cancel_on_error)

  req_perform_parallel(
    reqs = reqs,
    paths = paths,
    pool = pool,
    on_error = if (cancel_on_error) "continue" else "return"
  )
}
r-lib/httr2 documentation built on Feb. 18, 2025, 8:45 p.m.