R/shiny_get_download_csv.R

Defines functions split_ids_into_equal_buckets get_api_downloadable get_download_csv get_x_values

Documented in split_ids_into_equal_buckets

get_x_values <- function(dates) {
  return(ifelse(
    format(dates) == "double",
    as.character(format(dates, "%d-%m-%y")),
    as.character(dates)
  ))
}

get_download_csv <- function(data_store, indicator_definitions, date_range = NULL) {
  downloadable_indicators <- names(indicator_definitions)[
    as.vector(sapply(indicator_definitions, function(x) (!is.null(x$download) && x$download)))
  ]

  output <- data.frame(
    class = c(),
    category = c(),
    indicator_name = c(),
    series_name = c(),
    sub_series_name = c(),
    parameter = c(),
    value = c(),
    units = c(),
    date_last_updated = c()
  )
  counter <- 0
  counter_width <- length(downloadable_indicators)
  for (item_name in downloadable_indicators) {
    shiny::setProgress(counter / counter_width)
    counter <- counter + 1
    item <- indicator_definitions[[item_name]]

    sub_series <- data.frame(
      series_name = c(),
      sub_series_name = c(),
      value = c(),
      parameter = c(),
      units = c(),
      date_last_updated = c()
    )
    for (i in 1:length(item$groups)) {
      group <- item$groups[[i]]
      key <- paste(item$class, item$type, item$indicator_name, group$name, sep = "_")
      if ("TimeSeries" %in% class(data_store[[key]])) {
        sub_sub_series <- data_store[[key]]$get_csv_content(date_range)
      } else {
        sub_sub_series <- data_store[[key]]$get_csv_content()
      }

      sub_sub_series$series_name <- gsub(" \U2012 ", " - ", group$name)
      sub_sub_series$date_last_updated <- format(data_store[[key]]$update_date, "%d-%m-%y")

      if (!is.null(group$units)) {
        sub_sub_series$units <- group$units
      } else if (!is.null(item$units)) {
        sub_sub_series$units <- item$units
      } else {
        sub_sub_series$units <- ""
      }

      if (nrow(sub_sub_series) < 1) {
        stop(paste("Download failed for the indicator specific sub series", key))
      }
      if (!is.numeric(sub_sub_series$value)) {
        stop(paste("Non numeric values in the dataset", key))
      }
      geterrmessage()

      sub_series <- rbind(sub_series, sub_sub_series)
    }

    sub_series$indicator_name <- gsub(" \U2012 ", " - ", item$indicator_name)
    sub_series$class <- item$class
    sub_series$category <- item$type
    sub_series <- mutate(sub_series, parameter = as.character(parameter))



    if (nrow(sub_series) < 1) {
      stop(paste("Download failed for the indicator", sub_series$indicator_name))
    }

    output <- rbind(sub_series, output)
  }

  output <- output %>%
    select(c(
      "class",
      "category",
      "indicator_name",
      "series_name",
      "sub_series_name",
      "parameter",
      "value",
      "units",
      "date_last_updated"
    ))

  return(output)
}

get_api_downloadable <- function(indicator, min_date) {

  # This snippet grabs odata api ids from indicator definitions
  # An O(n^2) loop is required for ids found in group defintions
  api_ids <- c()
  id_counter <- 0
  for (i in 1:length(indicator)) {
    if (!is.null(indicator[[i]]$api_resource_id)) {
      id_counter <- id_counter + 1
      api_ids[id_counter] <- indicator[[i]]$api_resource_id
    } else {
      for (j in 1:length(indicator[[i]]$groups)) {
        if (!is.null(indicator[[i]]$groups[[j]]$api_resource_id)) {
          id_counter <- id_counter + 1
          api_ids[id_counter] <- indicator[[i]]$groups[[j]]$api_resource_id
        }
      }
    }
  }
  api_ids <- unique(na.omit(api_ids))

  NUM_IDS_PER_BUNCH <- 10
  MAX_CORES <- 4
  NUM_ROWS_PER_BUCKET <- 20000

  if (!is.null(api_ids)) {
    resources <- get_api_df("Resources", api_ids)

    # Sends bunches of ResourceIDs to get_api_df function concurrently.
    # Leaves 1 core spare for other processes, but uses MAX_CORES to not be greedy
    n_cores <- max(future::availableCores() - 1, 1)
    future::plan("multisession", workers = min(n_cores, MAX_CORES))
    bunched_ids <- split_ids_into_equal_buckets(api_ids, NUM_ROWS_PER_BUCKET)

    bunched_filter <- paste0(
      " and (Period gt ", min_date, ")",
      "&$select=Period,ResourceID,Label1,Label2,Geo,Value,Unit,Measure,Multiplier"
    )
    bunched_observations <- furrr::future_map(
      .x = bunched_ids,
      .f = function(id_bunch) {
        get_api_df("Observations", id_bunch, extra_filter = bunched_filter)
      }
    )
    merged_observations <- bind_rows(bunched_observations)
  }
  # Check that no rows have been dropped along the way
  # TODO: unit test for this?
  expected_num_obs <- get_api_df(
    "Observations",
    api_ids,
    extra_filter = paste0(
      " and (Period gt ", min_date, ")&$apply=aggregate($count as count)"
    )
  )[1, 1]
  if (nrow(merged_observations) < expected_num_obs) {
    warning(paste(
      "API-download process returned", nrow(merged_observations), "rows,",
      "but expected", expected_num_obs, "rows."
    ))
  }

  output_list <- list(metadata = resources, data = merged_observations)
  return(output_list)
}

split_ids_into_equal_buckets <- function(api_ids, num_rows_per_bucket) {
  #' Return ResourceIDs grouped with equal(ish) number of rows in Observations.
  #'
  #' example of return value:
  #' list(
  #'   1 = c("id1", "id2", "id3", "id4", "id5"),
  #'   2 = c("id6", "id7")
  #'  )
  #' (this would indicate id6 & id7 would *each* have more rows than *each* of
  #' the first 5 ids)
  count_filter <- "&$apply=groupby((ResourceID),aggregate($count as count))"
  info <- get_api_df("Observations", api_ids, extra_filter = count_filter) %>%
    arrange(count) %>%
    mutate(cumul_weight = cumsum(count))

  n_buckets <- max(ceiling(sum(info$count) / num_rows_per_bucket),
                   2)  # cut requires breaks >=2
  info <- info %>%
    mutate(
      bucket_num = cut(cumul_weight, n_buckets) %>% as.numeric()
    )

  bucket_num_to_ids <- split(info$ResourceID, info$bucket)

  return(bucket_num_to_ids)
}
xaviermiles/portalLite documentation built on Jan. 28, 2022, 9:10 a.m.