ds_service <- R6::R6Class("ecmwfr_ds",
inherit = service,
public = list(
submit = function() {
if (private$status != "unsubmitted") {
return(self)
}
# get key
key <- wf_get_key(
user = private$user
)
# sanizite request before submission
# some product requests can't handle the
# presence of extra fields (e.g. target
# dataset_short_name)
request <- private$request
request$dataset_short_name <- NULL
request$target <- NULL
# get the response for the query provided
response <- httr::VERB(
private$http_verb,
private$request_url(),
httr::add_headers(
"PRIVATE-TOKEN" = key
),
body = list(inputs = request),
encode = "json"
)
# trap general http error
if (httr::http_error(response)) {
stop(httr::content(response),
call. = FALSE
)
}
# grab content, to look at the status
# and code
ct <- httr::content(response)
ct$code <- httr::status_code(response)
# some verbose feedback
if (private$verbose) {
message("- staging data transfer at url endpoint or request id:")
message(" ", ct$jobID, "\n")
message(" on server: ", wf_server(service = private$service), "\n")
}
private$status <- ct$status
private$code <- ct$code
private$name <- ct$jobID
private$next_retry <- Sys.time() + private$retry
# update url from collection to scheduled job
private$url <- wf_server(id = ct$jobID, service = private$service)
return(self)
},
update_status = function(
fail_is_error = TRUE,
verbose = NULL) {
if (private$status == "unsubmitted") {
self$submit()
return(self)
}
if (private$status == "deleted") {
warn_or_error(
"Request was previously deleted from queue",
call. = FALSE,
error = fail_is_error
)
return(self)
}
if (private$status == "failed") {
warn_or_error(
"Request has failed, please check the online request queue for more details!",
call. = FALSE,
error = fail_is_error
)
return(self)
}
key <- wf_get_key(
user = private$user
)
# set retry time
retry_in <- as.numeric(private$next_retry) - as.numeric(Sys.time())
if (retry_in > 0) {
if (private$verbose) {
# let a spinner spin for "retry" seconds
spinner(retry_in)
} else {
# sleep
Sys.sleep(retry_in)
}
}
response <- httr::GET(
private$url,
httr::add_headers(
"PRIVATE-TOKEN" = key
),
encode = "json"
)
ct <- httr::content(response)
private$status <- ct$status
# trap general http error most likely
# will fail on spamming the service too fast
# with a high retry rate
if (httr::http_error(response)) {
stop(paste0(
httr::content(response),
"--- check your retry rate!"),
call. = FALSE
)
}
# checks the status of the true download, not the http status
# of the call itself
if (private$status != "successful" || is.null(private$status)) {
private$code <- 202
private$file_url <- NA # just to be on the safe side
} else if (private$status == "successful") {
if (private$verbose) message(" file ready to download... ")
private$code <- 302
private$file_url <- private$get_location(ct)
} else if (private$status == "failed") {
private$code <- 404
permanent <- if (ct$error$permanent) "permanent "
error_msg <- paste0(
"Data transfer failed with ", permanent, ct$error$who, " error: ",
ct$error$message, ".\nReason given: ", ct$error$reason, ".\n",
"More information at ", ct$error$url
)
warn_or_error(error_msg, error = fail_is_error)
}
private$next_retry <- Sys.time()
return(self)
},
download = function(
force_redownload = FALSE,
fail_is_error = TRUE,
verbose = NULL
) {
# Check if download is actually needed
if (private$downloaded == TRUE & file.exists(private$file) & !force_redownload) {
if (private$verbose) message("File already downloaded")
return(self)
}
# Check status
self$update_status()
if (private$status != "successful") {
# if (private$verbose) message("\nRequest not completed")
return(self)
}
# If it's completed, begin download
if (private$verbose) message("\nDownloading file")
temp_file <- tempfile(pattern = "ecmwfr_", tmpdir = private$path)
key <- wf_get_key(user = private$user)
# formally download the file
response <- httr::GET(
private$file_url,
httr::write_disk(temp_file, overwrite = TRUE),
if(private$verbose) {httr::progress()}
)
# trap (http) errors on download, return a general error statement
if (httr::http_error(response)) {
warn_or_error(
paste0("Download failed with error ", response$status_code),
call. = FALSE,
error = fail_is_error
)
}
private$downloaded <- TRUE
# Copy data from temporary file to final location
move <- suppressWarnings(file.rename(temp_file, private$file))
# check if the move was successful
# fails for separate disks/partitions
# then copy and remove
if (!move) {
file.copy(temp_file, private$file, overwrite = TRUE)
file.remove(temp_file)
}
if (private$verbose) {
message(sprintf("- moved temporary file to -> %s", private$file))
}
return(self)
},
delete = function() {
# get key
key <- wf_get_key(user = private$user)
# get the response for the query provided
response <- httr::DELETE(
private$url,
httr::add_headers(
"PRIVATE-TOKEN" = key
)
)
# trap general http error
if (httr::http_error(response)) {
stop(httr::content(response),
call. = FALSE
)
}
# some verbose feedback
if (private$verbose) {
message("- Delete data from queue for url endpoint or request id:")
message(" ", private$url, "\n")
}
private$status <- "deleted"
private$code <- 204
return(self)
},
browse_request = function() {
url <- paste0(dirname(private$url),"/requests?tab=all")
utils::browseURL(url)
return(invisible(self))
}
),
private = list(
http_verb = "POST",
request_url = function() {
sprintf(
"%s/retrieve/v1/processes/%s/execute",
private$url,
private$request$dataset_short_name
)
},
get_location = function(content) {
# get key
key <- wf_get_key(
user = private$user
)
# fetch download location from results URL
# this is now a two step process
response <- httr::GET(
file.path(private$url, "results"),
httr::add_headers(
"PRIVATE-TOKEN" = key
)
)
# trap general http error
if (httr::http_error(response)) {
stop(httr::content(response),
call. = FALSE
)
}
# grab content
ct <- httr::content(response)
# return the asset location
return(ct$asset$value$href)
}
)
)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.