
Defines functions check_weatherDB find_sites_with_bad_weather

Documented in check_weatherDB find_sites_with_bad_weather

#' Determine which site_ids in the weather database do not have weather data for
#' every requested climate scenario
#' @return A logical vector of length \code{site_labels} respectively
#'   \code{siteID_by_dbW}. The \code{TRUE} elements indicate sites for which the
#'   weather database lacks some or all data. If all data are in the weather
#'   database, then \code{FALSE}.
#' @export
find_sites_with_bad_weather <- function(fdbWeather, site_labels = NULL,
  siteID_by_dbW = NULL, scen_labels = NULL, scenID_by_dbW = NULL,
  chunk_size = 500L, verbose = FALSE) {

  if (verbose) {
    t1 <- Sys.time()
    temp_call <- shQuote(match.call()[1])
    print(paste0("rSFSW2's ", temp_call, ": started at ", t1))

      print(paste0("rSFSW2's ", temp_call, ": ended after ",
      round(difftime(Sys.time(), t1, units = "secs"), 2), " s"))
      cat("\n")}, add = TRUE)

  # The pairs of arguments should be both NULL; if they both are not NULL,
  # then they must of identical length
  si_ltemp <- c(length(site_labels), length(siteID_by_dbW))
  si_ntemp <- c(is.null(site_labels), is.null(siteID_by_dbW))
  sc_ltemp <- c(length(scen_labels), length(scenID_by_dbW))
  sc_ntemp <- c(is.null(scen_labels), is.null(scenID_by_dbW))

  stopifnot(!(si_ntemp[1] && si_ntemp[2]),
    si_ntemp[1] || si_ntemp[2] || identical(si_ltemp[1], si_ltemp[2]))
  stopifnot(!(sc_ntemp[1] && sc_ntemp[2]),
    sc_ntemp[1] || sc_ntemp[2] || identical(sc_ltemp[1], sc_ltemp[2]))

  req_scenN <- max(sc_ltemp)
  n_todos <- max(si_ltemp)
  todos <- rep(TRUE, n_todos)

  con <- dbConnect(SQLite(), dbname = fdbWeather,
    flags = SQLITE_RO)
  on.exit(dbDisconnect(con), add = TRUE)
  rSOILWAT2::dbW_setConnection(dbFilePath = fdbWeather, FALSE)
  on.exit(rSOILWAT2::dbW_disconnectConnection(), add = TRUE)

  if (dbExistsTable(con, "WeatherData")) {
    if (is.null(siteID_by_dbW)) {
      if (verbose) {
        print(paste0("rSFSW2's ", temp_call, ": is calling the potentially ",
          "time-consuming function 'rSOILWAT2::dbW_getSiteId'"))
      siteID_by_dbW <- rSOILWAT2::dbW_getSiteId(Labels = site_labels)
    if (anyNA(siteID_by_dbW)) {
      stop("Not all sites (labels) available in weather database.")

    if (is.null(scenID_by_dbW)) {
      scenID_by_dbW <- rSOILWAT2::dbW_getScenarioId(Scenario = scen_labels)
    if (anyNA(scenID_by_dbW)) {
      stop("Not all sites (labels) available in weather database.")

    do_chunks <- parallel::splitIndices(n_todos, ceiling(n_todos / chunk_size))

    sql <- paste0("SELECT Site_id, Scenario FROM WeatherData ",
      "WHERE Site_id IN (:x1) ",
      "AND Scenario IN (:x2) ORDER BY Site_id, Scenario")
    rs <- dbSendStatement(con, sql)
    on.exit(dbClearResult(rs), add = TRUE)

    for (k in seq_along(do_chunks)) {
      if (verbose) {
        print(paste0("'find_sites_with_bad_weather': ", Sys.time(),
          " is checking availability of climate scenarios in 'dbWeather': ",
          "chunk ", k, " out of ", length(do_chunks), " chunks of 'sites'"))

      # Get site_id, scenario_id from dbWeather for chunked requested site_ids
      dbBind(rs, params = as.list(expand.grid(
        x1 = siteID_by_dbW[do_chunks[[k]]],
        x2 = scenID_by_dbW, KEEP.OUT.ATTRS = FALSE, stringsAsFactors = FALSE)))
      res <- dbFetch(rs)

      if (dim(res)[1] > 0) {
        temp <- tapply(res[, "Scenario"], res[, "Site_id"], length)
        temp <- cbind(Site_id = as.integer(names(temp)), scenN = temp)

        # Compare expected with what is available in dbWeather
        # Good: all requested scenarios are available: set their todos to FALSE
        i_good <- temp[, "scenN"] == req_scenN

        if (any(i_good)) {
          todos[siteID_by_dbW %in% temp[i_good, "Site_id"]] <- FALSE

  } else {
    stop("'find_sites_with_bad_weather': table 'WeatherData' is missing ",
      "from 'dbWeather'")


#' Checks data in a weather database
#' @param dir_prj A character string. The directory path the \pkg{rSFSW2}
#'   simulation project.
#' @param fdbWeather A character string. The file path of weather database.
#' @param repeats An integer value. The number of times each weather object
#'   is extracted (repeats > 1 enable comparison of the duplicates).
#' @param do_preprocess_tempfiles A logical value. Set to TRUE, for instance,
#'   if a previous run was prematurely aborted.
#' @param seed A seed set, \code{NULL}, or \code{NA}. \code{NA} will not affect
#'  the state of the \var{RNG}; \code{NULL} will re-initialize the \var{RNG};
#'  and all  other values are passed to \code{\link{set.seed}}.
#' @export
check_weatherDB <- function(dir_prj, fdbWeather, repeats = 2L,
  do_preprocess_tempfiles = TRUE, n_cores = 20L, startyear = 1979,
  endyear = 2010, seed = NA) {

  if (!is.na(seed)) set.seed(seed)
  name_wid <- ".wid"

  vars <- c("MAP_mm", "aPPT_mm_sd", "MAT_C", "MATmax_C", "MATmin_C")
  vars_mult <- "MAP_mm"

  dir.create(dir_out <- file.path(dir_prj, "6_Results", "Weather_summary"),
    recursive = TRUE, showWarnings = FALSE)
  dir.create(dir_temp <- file.path(dir_out, "temp"), recursive = TRUE,
    showWarnings = FALSE)

  # Basename of temporary files, one for each worker: storing extracted results
  pattern_temp <- "Summary_climate_dbWeatherData_temp_worker"
  # Temporary file, aggregated from worker output temp files
  ftemp <- file.path(dir_out, "Summary_climate_dbWeatherData_temp.csv")
  # Duplicated extractions with non-identical output
  fdups <- file.path(dir_out, "Summary_climate_dbWeatherData_duplicated.csv")
  # Successful extractions
  fout <- file.path(dir_out, "Summary_climate_dbWeatherData.csv")
  fclimate <- file.path(dir_out, paste0("Summary_climate_dbWeatherData_",
    format(Sys.Date(), "%Y%m%d"), ".rds"))  # 'climate'

  #---Calculate in parallel
  cl <- parallel::makeCluster(n_cores, type = "PSOCK",
    outfile = "workers_log.txt")
  temp <- parallel::clusterExport(cl, "name_wid")

  # TODO (drs): it is ok to load into globalenv() because this happens on
  # workers and not on master -> R CMD CHECK reports this nevertheless as issue
  temp <- parallel::clusterApply(cl, seq_len(n_cores), function(x)
    assign(name_wid, x, pos = 1L)) # worker identification number

  merge_workers_tempfiles <- function(dir_temp, pattern, file) {
    ftemps <- list.files(dir_temp, pattern = pattern, full.names = TRUE)
    if (length(ftemps) > 0) {
      temps <- lapply(ftemps, function(f) utils::read.csv(f, header = TRUE))
      res <- do.call("rbind", temps)

      stopifnot(nrow(res) == sum(vapply(temps, nrow, NA_integer_)))

      utils::write.table(res, file = file, append = TRUE, sep = ",", dec = ".",
        qmethod = "double", row.names = FALSE, col.names = FALSE)

  #---Connect to weather database
  rSOILWAT2::dbW_setConnection(dbFilePath = fdbWeather, FALSE)
  on.exit(rSOILWAT2::dbW_disconnectConnection(), add = TRUE)

  fsite <- file.path(file.path(dir_out, "Sites.cvs"))
  if (!file.exists(fsite)) {
    dbW_iSiteTable <- rSOILWAT2::dbW_getSiteTable()
    utils::write.csv(dbW_iSiteTable, file = fsite, row.names = FALSE)
  } else {
    dbW_iSiteTable <- utils::read.csv(fsite, header = TRUE)
  fscen <- file.path(file.path(dir_out, "Scenarios.cvs"))
  if (!file.exists(fscen)) {
    dbW_iScenarioTable <- rSOILWAT2::dbW_getScenariosTable()
    utils::write.csv(dbW_iScenarioTable, file = fscen, row.names = FALSE)
  } else {
    dbW_iScenarioTable <- utils::read.csv(fscen, header = TRUE)

  #---Define output
  #  Non-empty rows in 'climate' will be extracted
  used_sites <- dbW_iSiteTable$Latitude > -90 & dbW_iSiteTable$Latitude > -180
  sitesN <- sum(used_sites)

  climate <- as.data.frame(matrix(NA, nrow = nrow(dbW_iScenarioTable) * sitesN,
    ncol = 3 + length(vars),
    dimnames = list(NULL, c("Site_id_by_dbW", "Scenario_id", "Status", vars))))
  climate[, "Site_id_by_dbW"] <- dbW_iSiteTable[used_sites, "Site_id"]
  climate[, "Scenario_id"] <- rep(dbW_iScenarioTable[, "id"], each = sitesN)

  ids_todo <- seq_len(nrow(climate))

  #---Check progress
  make_ids <- compiler::cmpfun(function(data,
    id_vars = c("Site_id_by_dbW", "Scenario_id")) {
    as.vector(apply(data[, id_vars, drop = FALSE], 1, paste0, collapse = "_"))

  revert_ids <- compiler::cmpfun(function(ids,
    id_vars = c("Site_id_by_dbW", "Scenario_id"), id_class = "integer") {
    temp <- t(simplify2array(strsplit(ids, split = "_", fixed = TRUE)))
    temp <- as.data.frame(temp, stringsAsFactors = FALSE)
    stopifnot(length(id_vars) == ncol(temp))
    colnames(temp) <- id_vars

    id_class <- rep_len(id_class, length.out = ncol(temp))
    for (i in seq_len(ncol(temp))) {
      temp[, i] <- as(temp[, i], id_class[i])


  copy_matches <- compiler::cmpfun(function(out, data, match_vars, copy_vars,
    ids_out = NULL) {

    out_ids <- make_ids(out, id_vars = match_vars)
    data_ids <- make_ids(data, id_vars = match_vars)

    im_data <- match(out_ids, data_ids, nomatch = 0)
    im_out <- out_ids %in% data_ids

    out[im_out, copy_vars] <- data[im_data, copy_vars]

    list(out = out, ids_out = if (!is.null(ids_out)) ids_out[!im_out] else NA)

  process_good_extractions <- function(climate, ids_todo, var_out = vars,
    file = fout) {

    if (file.exists(fout)) {
      climate_good <- utils::read.csv(fout, header = TRUE)

      # Transfer to output
      if (nrow(climate_good) > 0) {
        temp <- copy_matches(out = climate, data = climate_good,
                  match_vars = c("Site_id_by_dbW", "Scenario_id"),
                  copy_vars = c("Status", vars),
                  ids_out = ids_todo)
        climate <- temp[["out"]]
        ids_todo <- temp[["ids_out"]]
    } else {
      climate_good <- climate[0, ]
      utils::write.table(climate_good, file = file, append = FALSE,
        sep = ",", dec = ".", qmethod = "double", row.names = FALSE,
        col.names = TRUE)

    list(climate = climate, ids_todo = ids_todo)

  paste0(Sys.time(), ": process previous progress")

  progress <- process_good_extractions(climate, ids_todo)
  climate <- progress[["climate"]]
  ids_todo <- progress[["ids_todo"]]

  check_entry <- compiler::cmpfun(function(i, idss, data, vars, repeats) {
    # 3 types of entries in 'data'
    #  - # of duplicates < repeats ==> (-1) repeat: do not copy to 'climate';
    #     leave lines in 'ftemp'
    #  - # of duplicates >= repeats and
    #    - identical output ==> (1) good extraction: copy to 'fout' and
    #      'climate'; remove lines from 'ftemp'
    #    - varied output ==> (0) repeat: do not copy to 'climate';
    #      move duplicated entries to 'fdups'

    if (i %% 1000 == 1) print(paste0(Sys.time(), ": checking ", i, "-th entry"))

    idss <- unlist(idss)
    temp <- data[, "Site_id_by_dbW"] == idss["Site_id_by_dbW"] &
      data[, "Scenario_id"] == idss["Scenario_id"]
    tr <- data[temp, ]

    res <- unlist(c(seq_id = i, Dups_N = nrow(tr),
          tr[1, c("Site_id_by_dbW", "Scenario_id")],
          Status = -1L))

    if (NROW(tr) >= repeats) {
      variation <- apply(tr[, c("Status", vars)], 2, function(x)
        # duplicates with and without NAs OR with different values among non-NAs
        anyNA(x) && !all(is.na(x)) ||
          any(abs(diff(stats::na.omit(x))) > sqrt(.Machine$double.eps)))
      res["Status"] <- if (any(variation)) 0L else 1L


  process_tempfiles <- function(climate, ids_todo, var_out = vars, ftemp,
    fdups, fout, repeats, dir_temp, pattern_temp) {

    merge_workers_tempfiles(dir_temp, pattern = pattern_temp, file = ftemp)

    if (file.exists(ftemp)) {
      climate_progress <- utils::read.csv(ftemp, header = TRUE)

      if (nrow(climate_progress) > 0) {
        ids_progress <- as.vector(apply(
          climate_progress[, c("Site_id_by_dbW", "Scenario_id")], 1,
          paste0, collapse = "_"))
        ids_unique <- sort(unique(ids_progress))
        idus_ss <- revert_ids(ids_unique)

        fstat1 <- sub(".csv", "_uniqueIDs.rds", ftemp)
        fstat2 <- sub(".csv", "_status.rds", ftemp)
        do_calc_status <- TRUE

        if (file.exists(fstat1) && file.exists(fstat2)) {
          ids_unique_prev <- readRDS(fstat1)
          do_calc_status <- !identical(ids_unique_prev, ids_unique)

        if (do_calc_status) {
          saveRDS(ids_unique, file = fstat1)
          parallel::clusterExport(cl, c("climate_progress", "idus_ss",
            "repeats", "vars", "check_entry"), envir = environment())

          print(paste0(Sys.time(), ": 'process_tempfiles' check status of n = ",
            length(ids_unique), " extractions; this may take a while."))
          status <- data.frame(t(parallel::parSapply(cl,
            X = seq_along(ids_unique), FUN = function(i)
            check_entry(i, idss = idus_ss[i, ], data = climate_progress,
              vars = vars, repeats = repeats))))
          saveRDS(status, file = fstat2)
          temp <- parallel::clusterEvalQ(cl, rm(list = ls()))
        } else {
          status <- readRDS(fstat2)

        print(paste0(Sys.time(), ": 'process_tempfiles' process ",
          "successful database extractions"))
        #  - # of duplicates < repeats ==> (-1) repeat: do not copy to
        #     'climate'; leave lines in 'ftemp'
        ids_keep <- rep(TRUE, nrow(climate_progress))
        #  - # of duplicates >= repeats and
        #    - identical output ==> (1) good extraction: write one copy to
        #      'fout' and add to 'climate'; remove all lines from 'ftemp'
        igood <- status[, "Status"] == 1L
        ids_good <- ids_progress %in% ids_unique[status[igood, "seq_id"]]
        climate_good <- unique(climate_progress[ids_good, ])
        stopifnot(nrow(climate_good) == sum(igood),
          sum(ids_good) == sum(status[igood, "Dups_N"]))

        if (nrow(climate_good) > 0) {
          utils::write.table(climate_good, file = fout, append = TRUE,
            sep = ",", dec = ".", qmethod = "double", row.names = FALSE,
            col.names = FALSE)

          temp <- copy_matches(out = climate, data = climate_good,
                    match_vars = c("Site_id_by_dbW", "Scenario_id"),
                    copy_vars = c("Status", vars),
                    ids_out = ids_todo)
          climate <- temp[["out"]]
          ids_todo <- temp[["ids_out"]]

        ids_keep <- ids_keep & !ids_good

        print(paste0(Sys.time(), ": 'process_tempfiles' process database ",
          "extractions with variation among repeats"))
        #    - varied output ==> (0) repeat: do not copy to 'climate';
        #      move duplicated entries to 'fdups'
        ids_vdups <- ids_unique[status[status[, "Status"] == 0L, "seq_id"]]
        idups <- ids_progress %in% ids_vdups

        climate_dups <- climate_progress[idups, ]
        appD <- file.exists(fdups)
        utils::write.table(climate_dups, file = fdups, append = appD,
          sep = ",", dec = ".", qmethod = "double", row.names = FALSE,
          col.names = !appD)

        ids_keep <- ids_keep & !idups
        utils::write.table(climate_progress[ids_keep, ], file = ftemp,
          append = FALSE, sep = ",", dec = ".", qmethod = "double",
          row.names = FALSE, col.names = TRUE)

    } else {
      climate_progress <- climate[0, ]
      utils::write.table(climate_progress, file = ftemp, append = FALSE,
        sep = ",", dec = ".", qmethod = "double", row.names = FALSE,
        col.names = TRUE)

    list(climate = climate, ids_todo = ids_todo)

  paste0(Sys.time(), ": process previous output")

  if (do_preprocess_tempfiles) {
    progress2 <- process_tempfiles(climate, ids_todo, vars, ftemp, fdups,
      fout, repeats, dir_temp, pattern_temp)
    climate <- progress2[["climate"]]
    ids_todo <- progress2[["ids_todo"]]

  #---Go through the weather database
  summarize_weather <- compiler::cmpfun(function(i, iclimate, scen, startyear,
    endyear, db_name) {

    stopifnot(exists("outfile"), file.exists(outfile))

    if (i %% 1000 == 1)
      print(paste0(Sys.time(), ": run = ", i, ": site_id/scenario = ",
        iclimate["Site_id_by_dbW"], "/", scen))

    # Access data from database
    wtemp <- try(rSOILWAT2::dbW_getWeatherData(
      Site_id = iclimate["Site_id_by_dbW"], startYear = startyear,
      endYear = endyear, Scenario = scen), silent = TRUE)

    if (inherits(wtemp, "try-error")) {
      # Maybe the connection to the database failed? Re-set connection and
      # attempt extraction once more
      rSOILWAT2::dbW_setConnection(dbFilePath = db_name, FALSE)

      wtemp <- try(rSOILWAT2::dbW_getWeatherData(
        Site_id = iclimate["Site_id_by_dbW"], startYear = startyear,
        endYear = endyear, Scenario = scen), silent = TRUE)

    if (inherits(wtemp, "try-error")) {
      iclimate["Status"] <- 0
      print(paste0(Sys.time(), ": run = ", i, ": site_id/scenario = ",
        iclimate["Site_id_by_dbW"], "/", scen, " failed:", wtemp))

    } else {
      iclimate["Status"] <- 1
      wd <- rSOILWAT2::dbW_weatherData_to_dataframe(wtemp)

      # Calculate climate variables (this is the slow part of the function)
      wy_ppt <- tapply(wd[, "PPT_cm"], wd[, "Year"], sum)
      iclimate["MAP_mm"] <- round(mean(wy_ppt))
      iclimate["aPPT_mm_sd"] <- round(stats::sd(wy_ppt), 2)

      wy_tmax <- tapply(wd[, "Tmax_C"], wd[, "Year"], mean)
      iclimate["MATmax_C"] <- round(mean(wy_tmax), 2)

      wy_tmin <- tapply(wd[, "Tmin_C"], wd[, "Year"], mean)
      iclimate["MATmin_C"] <- round(mean(wy_tmin), 2)

      wy_tmean <- apply(cbind(wy_tmax, wy_tmin), 1, mean)
      iclimate["MAT_C"] <- round(mean(wy_tmean), 2)

    # Temporary output
    utils::write.table(iclimate, file = outfile, append = TRUE, sep = ",",
      dec = ".", qmethod = "double", row.names = FALSE, col.names = FALSE)


  # Calculate in parallel
  if (length(ids_todo) > 0) {
    print(paste0(Sys.time(), ": # run =", length(ids_todo), " out of",

    parallel::clusterExport(cl, c("climate", "dir_temp", "pattern_temp",
      "name_wid", "summarize_weather", "fdbWeather", "dbW_iScenarioTable",
      "startyear", "endyear"))
    temp <- parallel::clusterEvalQ(cl, {
      rSOILWAT2::dbW_setConnection(dbFilePath = fdbWeather, FALSE)
    temp <- parallel::clusterEvalQ(cl, {
      outfile <- file.path(dir_temp, paste0(pattern_temp, "-",
        get(name_wid), ".csv"))
      utils::write.table(climate[0, ], file = outfile, append = FALSE,
        sep = ",", dec = ".", qmethod = "double", row.names = FALSE,
        col.names = TRUE)

    itests <- unlist(lapply(seq_len(repeats), function(i) sample(x = ids_todo,
      size = length(ids_todo))))

    idone <- parallel::parSapply(cl, X = itests, FUN = function(i)
      summarize_weather(i, iclimate = climate[i, ],
      scen = dbW_iScenarioTable[as.integer(climate[i, "Scenario_id"]),
      startyear = startyear, endyear = endyear, db_name = fdbWeather))

    temp <- parallel::clusterEvalQ(cl, rSOILWAT2::dbW_disconnectConnection())

  #---Final save
  if (length(ids_todo) > 0) {
    print(paste0(Sys.time(), ": process new output"))

    progress3 <- process_tempfiles(climate, ids_todo, vars, ftemp, fdups,
      fout, repeats, dir_temp, pattern_temp)
    climate <- progress3[["climate"]]
    ids_todo <- progress3[["ids_todo"]]

  print(paste0(Sys.time(), ": script completed"))
  if (length(ids_todo) > 0) {
    print(paste("# run =", length(ids_todo), "out of", nrow(climate),
      "still to do"))

  saveRDS(climate, file = fclimate)


  #---Report on extracted 'climate'
  # Failures
  failed <- climate$Status == 0
  iNAs <- apply(climate, 1, anyNA)
  identical(failed, iNAs)
  print(paste0("Unsuccessful extractions: n = ", sum(failed), "; f = ",
    signif(sum(failed) / length(failed), 2)))
  tmp <- climate[failed, ]
  graphics::plot(tmp[, "Site_id_by_dbW"], tmp[, "Scenario_id"])

  failed_siteID <- climate[failed, "Site_id_by_dbW"]
  print(paste0("Sites with at least one unsuccessful extractions: n = ",
  failed_siteID_freq <- tapply(rep(1, sum(failed)), failed_siteID, sum)
  probs <- c(0, 0.01, 0.5, 0.99, 1)
  print(paste0("Unsuccessful extractions per scenario: quantiles: ",
    paste(probs, stats::quantile(failed_siteID_freq, probs = probs),
      sep = "% = ", collapse = ", ")))

  failed_scenID <- climate[failed, "Scenario_id"]
  print(paste0("Scenarios with at least one unsuccessful extractions: n = ",
  print(paste0("Unsuccessful extractions per scenario: n = "))

  # Variation among downscaled scenarios as difference to current

  dat <- climate[!failed & climate$Scenario_id > 1, ]
  dat_cur <- climate[climate$Site_id_by_dbW %in% dat$Site_id_by_dbW &
      climate$Scenario_id == 1, ]
  dat <- dat[do.call(order, dat), ]

  dat_cur <- copy_matches(out = dat, data = dat_cur[do.call(order, dat_cur), ],
    match_vars = "Site_id_by_dbW", copy_vars = vars)[["out"]]

  dat_diff <- dat
  for (iv in vars) {
    dat_diff[, iv] <- if (iv %in% vars_mult) {
        dat[, iv] / dat_cur[, iv]
      } else {
        dat[, iv] - dat_cur[, iv]

  for (iv in vars) {
    print(paste0("Mean variation within sites among downscaled scenarios ",
      "for variable ", iv))
    temp <- stats::aggregate(dat_diff[, iv], by = list(dat_diff$Site_id_by_dbW),
      FUN = function(x) {
        rx <- range(x)
        c(mean = mean(x), min = min(rx), max = max(rx), range = diff(rx))
    print(round(apply(temp[[2]], 2, mean), 2))

