Nothing
#' Download requests in a queue
#'
#' @export
#' @param ... any number of [occ_download()] requests
#' @param .list any number of [occ_download_prep()] requests
#' @param status_ping (integer) seconds between pings checking status of
#' the download request. generally larger numbers for larger requests.
#' default: 10 (i.e., 10 seconds). must be 10 or greater
#' @return a list of `occ_download` class objects, see [occ_download_get()]
#' to fetch data
#' @details This function is a convenience wrapper around [occ_download()],
#' allowing the user to kick off any number of requests, while abiding by
#' GBIF rules of 3 concurrent requests per user.
#' @note see [downloads] for an overview of GBIF downloads methods
#' @family downloads
#' @section How it works:
#' It works by using lazy evaluation to collect your requests into a queue
#' (but does not use lazy evaluation if use the `.list` parameter).
#' Then it kicks of the first 3 requests. Then in a while loop, we check
#' status of those requests, and when any request finishes (see
#' `When is a job done?` below), we kick off the
#' next, and so on. So in theory, there may not always strictly be 3 running
#' concurrently, but the function will usually provide for 3 running
#' concurrently.
#'
#' @section When is a job done?:
#' We mark a job as done by checking the `/occurrence/download/` API route
#' with our [occ_download_meta()] function. If the status of the job is
#' any of "succeeded", "killed", or "cancelled", then we mark the job as done
#' and move on to other jobs in the queue.
#'
#' @section Beware:
#' This function is still in development. There's a lot of complexity
#' to this problem. We'll be rolling out fixes and improvements in future
#' versions of the package, so expect to have to adjust your code
#' with new versions.
#'
#' @examples \dontrun{
#' if (interactive()) { # dont run in automated example runs, too costly
#' # passing occ_download() requests via ...
#' out <- occ_download_queue(
#' occ_download(pred('taxonKey', 3119195), pred("year", 1976)),
#' occ_download(pred('taxonKey', 3119195), pred("year", 2001)),
#' occ_download(pred('taxonKey', 3119195), pred("year", 2001),
#' pred_lte("month", 8)),
#' occ_download(pred('taxonKey', 5229208), pred("year", 2011)),
#' occ_download(pred('taxonKey', 2480946), pred("year", 2015)),
#' occ_download(pred("country", "NZ"), pred("year", 1999),
#' pred("month", 3)),
#' occ_download(pred("catalogNumber", "Bird.27847588"),
#' pred("year", 1998), pred("month", 2))
#' )
#'
#' # supports <= 3 requests too
#' out <- occ_download_queue(
#' occ_download(pred("country", "NZ"), pred("year", 1999), pred("month", 3)),
#' occ_download(pred("catalogNumber", "Bird.27847588"), pred("year", 1998),
#' pred("month", 2))
#' )
#'
#' # using pre-prepared requests via .list
#' keys <- c(7905507, 5384395, 8911082)
#' queries <- list()
#' for (i in seq_along(keys)) {
#' queries[[i]] <- occ_download_prep(
#' pred("taxonKey", keys[i]),
#' pred_in("basisOfRecord", c("HUMAN_OBSERVATION","OBSERVATION")),
#' pred("hasCoordinate", TRUE),
#' pred("hasGeospatialIssue", FALSE),
#' pred("year", 1993)
#' )
#' }
#' out <- occ_download_queue(.list = queries)
#' out
#'
#' # another pre-prepared example
#' yrs <- 1930:1934
#' queries <- list()
#' for (i in seq_along(yrs)) {
#' queries[[i]] <- occ_download_prep(
#' pred("taxonKey", 2877951),
#' pred_in("basisOfRecord", c("HUMAN_OBSERVATION","OBSERVATION")),
#' pred("hasCoordinate", TRUE),
#' pred("hasGeospatialIssue", FALSE),
#' pred("year", yrs[i])
#' )
#' }
#' out <- occ_download_queue(.list = queries)
#' out
#' }}
occ_download_queue <- function(..., .list = list(), status_ping = 10) {
# number of max concurrent requests
# hard-coded due to GBIF limits
max_concurrent <- 3
# status must be 10 sec or greater
assert(status_ping, c('integer', 'numeric'))
stopifnot(status_ping >= 10)
# collect requests
que <- GbifQueue$new(..., .list = .list)
# stop if no requests submitted
if (length(que$reqs) == 0) stop("no requests submitted")
# initialize bucket to collect data
results <- list()
# add all jobs to the queue
que$add_all()
# start the 1st `max_concurrent` jobs
kickoff <- if (length(que$reqs) > 3) 3 else length(que$reqs)
reqend <- if (kickoff > 1) "s" else ""
message(sprintf("kicking off first %s request%s", kickoff, reqend))
res <- invisible(lapply(
rgbif_compact(que$queue[seq_len(max_concurrent)]), function(x) {
# remove from queue
que$remove(x)
# run job
x$run()
}))
# credentials check
if (any(
vapply(res, function(x) is.character(x) && !inherits(x, "occ_download"),
logical(1))
)
) {
if (is.character(res[[1]])) stop(res[[1]], call. = FALSE)
}
# get only occ_download objects
res <- Filter(function(x) inherits(x, "occ_download"), res)
# handle if 3 requests or less
if (que$jobs() == 0) {
message(sprintf("<= %s request%s, waiting for completion", kickoff, reqend))
still_running <- TRUE
while (still_running) {
metas <- lapply(res, occ_download_meta)
status <- vapply(metas, "[[", "", "status", USE.NAMES = FALSE)
still_running <- !all(tolower(status) %in% c('succeeded', 'killed', 'cancelled'))
Sys.sleep(status_ping)
}
results <- res
} else {
message("> 3 requests, waiting for completion")
# cycle through until all jobs run and complete
while (que$jobs() > 0 || length(res) > 0) {
metas <- lapply(rgbif_compact(res), occ_download_meta)
status <- vapply(metas, "[[", "", "status", USE.NAMES = FALSE)
statusbool <- tolower(status) %in% c('succeeded', 'killed', 'cancelled')
if (any(statusbool)) {
message(paste0(
lapply(metas[statusbool], function(w) {
sprintf(" %s: %s", w$key, tolower(w$status))
}),
collapse = "\n"
))
# stash result
results <- c(results, res[statusbool])
# drop those done from `res` pool
res <- res[!statusbool]
# kick offf new job(s)
if (que$jobs() > 0) {
# take minimum of max concurrent - number still running OR number
# jobs remaining - we don't want to start 2 jobs if there's
# only 1 left to start
jobs_to_start <- min(max_concurrent - length(res), que$jobs())
# kick off N=jobs_to_start jobs
for (i in seq_len(jobs_to_start)) {
## get job
x <- que$next_()
## remove from queue
que$remove(x[[1]])
## run job
message(sprintf(" running %s of %s",
length(que$reqs) - length(que$queue), length(que$reqs)))
res_single <- x[[1]]$run()
## stash into `res` pool
res <- c(res, stats::setNames(list(res_single), names(x)))
}
}
}
Sys.sleep(status_ping)
}
}
return(results)
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.