R/retrieve.R

Defines functions cds_download_jobs .cds_estimate_costs cds_estimate_costs .cds_constraints cds_build_request cds_submit_job

Documented in cds_build_request cds_download_jobs cds_estimate_costs cds_submit_job

#' Submit a download job for a dataset
#' 
#' Submit a request to the Copernicus Climate Data Service to download
#' (part of) a dataset. If the request is successful, a job identifier is
#' returned which can be used to actually download the data (`cds_download_jobs()`).
#' 
#' @param dataset The dataset name to be downloaded, or a `list` returned by
#' `cds_build_request()`. When this argument is missing the function will
#' attempt to build a request with text on the clipboard using `cds_python_to_r()`.
#' @param ... Subsetting parameters passed onto `cds_build_request()`. Should
#' be empty when `dataset` is missing or a `list`.
#' @param wait A `logical` value indicating if the function should wait for the
#' submitted job to finish. Set it to `FALSE` to continue without waiting
#' @param check_quota Each account has a quota of data that can be downloaded.
#' If this argument is set to `TRUE` (default) it is checked if the request doesn't
#' exceed your quota. Set it to `FALSE` to skip this step and speed up the submission.
#' @param check_licence Datasets generally require you to accept certain terms of use.
#' If this argument is set to `TRUE` (default), it will be checked if you have accepted all
#' required licences for the submitted request. Set it to `FALSE` to skip this step and
#'  speed up the submission.
#' @inheritParams cds_check_authentication
#' @returns Returns a `data.frame` containing information about the submitted job.
#' @examples
#' if (interactive() && cds_token_works()) {
#'   job <- cds_submit_job(
#'       dataset        = "reanalysis-era5-pressure-levels",
#'       variable       = "geopotential",
#'       product_type   = "reanalysis",
#'       area           = c(n = 55, w = -1, s = 50, e = 10),
#'       year           = "2024",
#'       month          = "03",
#'       day            = "01",
#'       pressure_level = "1000",
#'       data_format    = "netcdf"
#'     )
#'
#'   ## Or split into two separate steps:
#'   
#'   req <- cds_build_request(
#'       dataset        = "reanalysis-era5-pressure-levels",
#'       variable       = "geopotential",
#'       product_type   = "reanalysis",
#'       area           = c(n = 55, w = -1, s = 50, e = 10),
#'       year           = "2024",
#'       month          = "03",
#'       day            = "01",
#'       pressure_level = "1000",
#'       data_format    = "netcdf"
#'     )
#'   job <- cds_submit_job(req)
#' }
#' @include helpers.R
#' @export
cds_submit_job <- function(
    dataset, ..., wait = TRUE, check_quota = TRUE, check_licence = TRUE,
    token = cds_get_token()) {
  #https://cds.climate.copernicus.eu/api/retrieve/v1/docs
  message("Building request")
  if (missing(dataset)) dataset <- NULL
  
  form <- cds_build_request(dataset, ...)
  if (is.null(dataset) || is.list(dataset)) dataset <- attr(form, "dataset")
  if (check_quota) {
    message("Checking quota")
    quota <- .cds_estimate_costs(form, token = token)
    if (quota$cost > quota$limit) {
      rlang::abort(c(x = sprintf("This request (%i) exceeds your quota (%i)",
                                 quota$cost, quota$limit),
                     i = "Try narrowing your request"))
    }
  }
  if (is.null(attributes(form)$licences)) {
    message("Required license unknown, skipping check")
  } else if (check_licence) {
    message("Checking license")
    missing_licence <- attributes(form)$licences$id
    missing_licence <- missing_licence[!missing_licence %in% cds_accepted_licences()$id]
    if (length(missing_licence) > 0) {
      rlang::abort(c(
        x = sprintf("Dataset requires you to accept these licences: %s",
                    paste(missing_licence, collapse = ", ")),
        i = "Use `cds_accept_licence()` to accept the required licences and try again"
      ))
    }
  }
  message("Submitting job")
  job <-
    .base_url |>
    paste("retrieve/v1/processes", dataset, "execution", sep = "/") |>
    .execute_request(token, "POST", list(inputs = form)) |>
    list() |>
    .simplify()
  
  if (wait) {
    wait_anim <- c("-", "\\", "|", "/")
    i <- 1
    repeat {
      job <- cds_list_jobs(job$jobID, token = token)
      if ("finished" %in% names(job)) {wait_anim <- " "; i <- 1}
      message(paste("\rChecking job status:", wait_anim[i], job$status, "    "),
              appendLF = FALSE)
      if ("finished" %in% names(job)) {
        message("")
        break
      }
      i <- i + 1
      if (i > length(wait_anim)) i <- 1
      Sys.sleep(1)
    }
  }
  return (job)
}

#' Prepare a request for downloading a dataset
#' 
#' This function is used by `cds_estimate_costs()` and `cds_submit_job()`
#' to subset a dataset before downloading. It will also help you to explore
#' which parameters are available for subsetting.
#' @param dataset The dataset name to be used for setting up a request.
#' @param ... Parameters for subsetting the dataset. Use `cds_dataset_form()` to inquiry
#' which parameters and parameter values are available for a specific dataset.
#' If left blank it will take default parameter values.
#' @returns Returns a named list, which can be used to submit a job (`cds_submit_job()`)
#' or inquiry its cost (`cds_estimate_costs()`).
#' @examples
#' if (interactive()) {
#'   cds_build_request(
#'     dataset        = "reanalysis-era5-pressure-levels",
#'     variable       = "geopotential",
#'     product_type   = "reanalysis",
#'     area           = c(n = 55, w = -1, s = 50, e = 10),
#'     year           = "2024",
#'     month          = "03",
#'     day            = "01",
#'     pressure_level = "1000",
#'     data_format    = "netcdf"
#'   )
#' }
#'@export
cds_build_request <- function(dataset, ...) {
  form_result <- list(...)
  if ((missing(dataset) || is.null(dataset)) && length(form_result) == 0) {
    form_result <- cds_python_to_r()
    dataset <- attr(form_result, "dataset")
  } else if (is.list(dataset) && length(form_result) == 0) {
    ## Assume that if dataset is a list and there are no
    ## further arguments, 'dataset' is already parsed by
    ## cds_build_request and can be returned as is.
    return (dataset)
  }

  constraints_all <- .cds_constraints(dataset,  structure(list(), names = character(0)))
  form <- cds_dataset_form(dataset) |>
    dplyr::mutate(
      required = ifelse(is.na(.data$required), FALSE, .data$required)
    )

  known <- names(form_result) %in% form$name
  unknown <- names(form_result)[!known]
  for (nm in names(form_result)) {
    if (nm %in% unknown) {
      message("Removing unknown field ", nm)
      form_result[[nm]] <- NULL
    } else {
      details <- form |> dplyr::filter(.data$name == nm)
      details <- details$details[[1]]$details
      if (!is.null(details$groups)) {
        my_groups <- lapply(details$groups, `[[`, "label") |> unlist() |>
          tolower() |> stringr::str_replace_all(" ", "_")
        sel <- my_groups %in% form_result[[nm]]
        if (any(sel)) {
          details$values <-
            lapply(details$groups[sel], `[[`, "values") |>
            unlist() |>
            unique() |>
            list()
          form_result[[nm]] <- details$values |> unlist()
        }
      }
      if (!is.null(details$values) &&
          any(!form_result[[nm]] %in% unlist(details$values))) {
        rlang::abort(c(x = sprintf("Unknown value '%s'", paste(unlist(form_result[[nm]]),
                                                               collapse = ", ")),
                       i = paste("Expected one of", paste(unlist(details$values),
                                                          collapse = ", "))))
      }
      if (!is.null(details$values)) {
        form_result[[nm]] <- methods::as(unlist(form_result[[nm]]),
                                         typeof(details$values[[1]]))
      }
    }
  }
  required_elements <- form$name[form$required]
  not_included <- required_elements[!required_elements %in% names(form_result)]
  
  # Check lengths
  for (nm in names(form_result)) {
    details <- form |> dplyr::filter(.data$name == nm)
    type <- details$type
    switch(
      type,
      StringListWidget = {
        form_result[[nm]] <- as.list(unlist(form_result[[nm]]))
      },
      StringListArrayWidget = {
        form_result[[nm]] <- as.list(unlist(form_result[[nm]]))
      },
      StringChoiceWidget = {
        if (length(form_result[[nm]]) != 1) {
          rlang::abort(c(x = sprintf("Found multiple values for field '%s'", nm),
                         i = "Select only one value and try again"))
        } else {
          form_result[[nm]] <- unlist(form_result[[nm]])
        }
      },
      GeographicExtentWidget = {
        geo_details <- details$details[[1]]$details
        current <- unlist(form_result[[nm]])
        if (inherits(current, "bbox")) {
          if (requireNamespace("sf")) {
            if (!is.na(sf::st_crs(form_result[[nm]]))) {
              current <- sf::st_transform(form_result[[nm]], 4326)
            }
            current <- c(n = current[["ymax"]],
                         w = current[["xmin"]],
                         s = current[["ymin"]],
                         e = current[["xmax"]])
            
          } else {
            rlang::abort(
              c(x = "Using a `bbox` as `area` arguments needs package `sf`",
                i = "Install package `sf` and try again")
            )
          }
        }
        if (length(current) != 4)
          rlang::abort(c(x = sprintf("Expected rectangular bounding box, with 4 values (%s)",
                                     paste(names(geo_details$default), collapse = ", ")),
                         i = "Provide a correct bounding box"))
        if (is.null(names(current))) names(current) <- c("n", "w", "s", "e")
        if (current["s"] > current["n"] || current["e"] < current["w"])
          rlang::abort(c(x = "North should be larger than South. East should be larger than West",
                         i = "Check your bounding box for correctness"))
        if (current[["n"]] > geo_details$range$n ||
            current[["s"]] < geo_details$range$s ||
            current[["e"]] > geo_details$range$e ||
            current[["w"]] < geo_details$range$w)
          rlang::abort(c(x = "Coordinates of `area` out of range",
                         i = sprintf("Check if your coordinates are in range (%s)",
                                     paste(unlist(geo_details$range), collapse = ", "))))
        form_result[[nm]] <- current
      },
      LicenceWidget = {
        form_result[[nm]] <- NULL
      },
      rlang::abort(c(x = paste("Unknown field type", type),
                     i = "Please report at <https://github.com/pepijn-devries/CopernicusClimate/issues> with a reprex"))
    )
  }

  if (length(form_result) == 0) names(form_result) <- character(0)
  constraints <- .cds_constraints(dataset, form_result)
  for (missing_element in not_included) {
    if (!is.null(constraints[[missing_element]])) {
      details <-
        form |>
        dplyr::filter(.data$name == missing_element)
      type <- details$type
      details <- details$details[[1]]$details
      if (!is.null(details$default)) {
        form_result[[missing_element]] <- details$default |> unlist() |> unique()
      } else {
        dat <- constraints[[missing_element]]
        if (type %in% "StringChoiceWidget") dat <- dat[[1]]

        if (length(dat) == 0) form_result[[missing_element]] <- NULL else
          form_result[[missing_element]] <- dat
      }
    }
  }
  
  licences <- form |>
    dplyr::filter(.data$name == "licences") |>
    dplyr::pull("details")
  licences <- licences[[1]]$details$licences |> .simplify()
  attributes(form_result)$licences <- licences
  attributes(form_result)$dataset  <- dataset
  return( form_result )
}

.cds_constraints <- function(dataset, form) {
  .base_url |>
    paste("retrieve/v1/processes", dataset, "constraints", sep = "/") |>
    .execute_request(token = "", "POST", list(inputs = form))
}

#' Check the cost of a request against your quota
#' 
#' Each account has a limit to the amount of data that can be downloaded.
#' Use this function to check if a request exceeds your quota.
#' @param dataset A dataset name to be inspected
#' @param ... Parameters passed on to `cds_build_request()`
#' @inheritParams cds_check_authentication
#' @returns Returns a named list indicating the available quota and
#' the estimated cost for a request specified with `...`-arguments.
#' @examples
#' if (interactive() && cds_token_works()) {
#'   cds_estimate_costs(
#'     dataset        = "reanalysis-era5-pressure-levels",
#'     variable       = "geopotential",
#'     product_type   = "reanalysis",
#'     area           = c(n = 55, w = -1, s = 50, e = 10),
#'     year           = "2024",
#'     month          = "03",
#'     day            = "01",
#'     pressure_level = "1000",
#'     data_format    = "netcdf"
#'   )
#'   
#'   cds_estimate_costs(dataset = "reanalysis-era5-pressure-levels")
#' }
#' @include helpers.R
#' @export
cds_estimate_costs <- function(dataset, ..., token = cds_get_token()) {
  if (missing(dataset)) dataset <- NULL
  .cds_estimate_costs(dataset, ..., token = token)
}

.cds_estimate_costs <- function(dataset, ..., token) {
  form <- cds_build_request(dataset, ...)
  if (is.null(dataset) || is.list(dataset)) dataset <- attr(form, "dataset")
  .base_url |>
    paste("retrieve/v1/processes", dataset, "costing", sep = "/") |>
    .execute_request(token, "POST", list(inputs = form))
}

#' Download specific jobs
#' 
#' After submitting one or more jobs with `cds_submit_job()`, you can download the resulting
#' files with `cds_download_jobs()`. See `vignette("download")` for more details.
#' @param job_id If a specific job identifier is listed here, only the files resulting
#' from those jobs are downloaded. If left blank, all successful jobs are downloaded.
#' @param destination Destination path to store downloaded files.
#' @param names File names for the downloaded files. If missing, the cryptic hexadecimal
#' file name is taken from the job.
#' @param ... Ignored
#' @inheritParams cds_check_authentication
#' @returns A `data.frame` of all downloaded files. Contains a column `local` with the path
#' to the locally stored files.
#' @examples
#' if (interactive() && cds_token_works()) {
#'   job <- cds_submit_job(
#'       dataset        = "reanalysis-era5-pressure-levels",
#'       variable       = "geopotential",
#'       product_type   = "reanalysis",
#'       area           = c(n = 55, w = -1, s = 50, e = 10),
#'       year           = "2024",
#'       month          = "03",
#'       day            = "01",
#'       pressure_level = "1000",
#'       data_format    = "netcdf"
#'     )
#'   cds_download_jobs(job$jobID, tempdir())
#' }
#' @include helpers.R
#' @export
cds_download_jobs <- function(job_id, destination, names, ..., token = cds_get_token()) {
  if (!missing(names) && length(job_id) != length(names))
    stop("Argument `names` should have the same length as `job_id`.")
  missing_names <- missing(names)
  repeat {
    
    jobs <- cds_list_jobs(job_id, limit = 1000)
    busy_jobs <- jobs$status %in% c("accepted", "running")
    if (any(busy_jobs)) {
      message("\rWaiting for ", sum(busy_jobs), " job(s) to complete   ", appendLF = FALSE)
    } else break
    Sys.sleep(1)
  }
  message("")
  jobs <-
    cds_job_results(job_id) |>
    dplyr::mutate(
      name = basename(.data$href),
      success = !is.na(.data$href))
  if (any(!jobs$success)) message("Skipping ", sum(!jobs$success), " unsuccessful jobs")
  jobs <- jobs |> dplyr::filter(.data$success)
  if (nrow(jobs) == 0) stop("No successful jobs found to download")
  dupes <- duplicated(jobs$href)
  if (any(dupes)) message("Skipped ", sum(dupes), " identical files")
  jobs <- jobs |> dplyr::filter(!.env$dupes)
  requests <-
    lapply(jobs$href, httr2::request)
  message("Start downloading files:")
  responses <- httr2::req_perform_parallel(
    requests,
    paths = file.path(destination, jobs$name)
  )
  jobs |>
    dplyr::mutate(
      responses = responses,
      local = lapply(responses, \(x) x$body |> unclass()) |> unlist()
    )
}

Try the CopernicusClimate package in your browser

Any scripts or data that you put into this service are public.

CopernicusClimate documentation built on Jan. 8, 2026, 1:08 a.m.