get_x_values <- function(dates) {
return(ifelse(
format(dates) == "double",
as.character(format(dates, "%d-%m-%y")),
as.character(dates)
))
}
get_download_csv <- function(data_store, indicator_definitions, date_range = NULL) {
downloadable_indicators <- names(indicator_definitions)[
as.vector(sapply(indicator_definitions, function(x) (!is.null(x$download) && x$download)))
]
output <- data.frame(
class = c(),
category = c(),
indicator_name = c(),
series_name = c(),
sub_series_name = c(),
parameter = c(),
value = c(),
units = c(),
date_last_updated = c()
)
counter <- 0
counter_width <- length(downloadable_indicators)
for (item_name in downloadable_indicators) {
shiny::setProgress(counter / counter_width)
counter <- counter + 1
item <- indicator_definitions[[item_name]]
sub_series <- data.frame(
series_name = c(),
sub_series_name = c(),
value = c(),
parameter = c(),
units = c(),
date_last_updated = c()
)
for (i in 1:length(item$groups)) {
group <- item$groups[[i]]
key <- paste(item$class, item$type, item$indicator_name, group$name, sep = "_")
if ("TimeSeries" %in% class(data_store[[key]])) {
sub_sub_series <- data_store[[key]]$get_csv_content(date_range)
} else {
sub_sub_series <- data_store[[key]]$get_csv_content()
}
sub_sub_series$series_name <- gsub(" \U2012 ", " - ", group$name)
sub_sub_series$date_last_updated <- format(data_store[[key]]$update_date, "%d-%m-%y")
if (!is.null(group$units)) {
sub_sub_series$units <- group$units
} else if (!is.null(item$units)) {
sub_sub_series$units <- item$units
} else {
sub_sub_series$units <- ""
}
if (nrow(sub_sub_series) < 1) {
stop(paste("Download failed for the indicator specific sub series", key))
}
if (!is.numeric(sub_sub_series$value)) {
stop(paste("Non numeric values in the dataset", key))
}
geterrmessage()
sub_series <- rbind(sub_series, sub_sub_series)
}
sub_series$indicator_name <- gsub(" \U2012 ", " - ", item$indicator_name)
sub_series$class <- item$class
sub_series$category <- item$type
sub_series <- mutate(sub_series, parameter = as.character(parameter))
if (nrow(sub_series) < 1) {
stop(paste("Download failed for the indicator", sub_series$indicator_name))
}
output <- rbind(sub_series, output)
}
output <- output %>%
select(c(
"class",
"category",
"indicator_name",
"series_name",
"sub_series_name",
"parameter",
"value",
"units",
"date_last_updated"
))
return(output)
}
get_api_downloadable <- function(indicator, min_date) {
# This snippet grabs odata api ids from indicator definitions
# An O(n^2) loop is required for ids found in group defintions
api_ids <- c()
id_counter <- 0
for (i in 1:length(indicator)) {
if (!is.null(indicator[[i]]$api_resource_id)) {
id_counter <- id_counter + 1
api_ids[id_counter] <- indicator[[i]]$api_resource_id
} else {
for (j in 1:length(indicator[[i]]$groups)) {
if (!is.null(indicator[[i]]$groups[[j]]$api_resource_id)) {
id_counter <- id_counter + 1
api_ids[id_counter] <- indicator[[i]]$groups[[j]]$api_resource_id
}
}
}
}
api_ids <- unique(na.omit(api_ids))
NUM_IDS_PER_BUNCH <- 10
MAX_CORES <- 4
NUM_ROWS_PER_BUCKET <- 20000
if (!is.null(api_ids)) {
resources <- get_api_df("Resources", api_ids)
# Sends bunches of ResourceIDs to get_api_df function concurrently.
# Leaves 1 core spare for other processes, but uses MAX_CORES to not be greedy
n_cores <- max(future::availableCores() - 1, 1)
future::plan("multisession", workers = min(n_cores, MAX_CORES))
bunched_ids <- split_ids_into_equal_buckets(api_ids, NUM_ROWS_PER_BUCKET)
bunched_filter <- paste0(
" and (Period gt ", min_date, ")",
"&$select=Period,ResourceID,Label1,Label2,Geo,Value,Unit,Measure,Multiplier"
)
bunched_observations <- furrr::future_map(
.x = bunched_ids,
.f = function(id_bunch) {
get_api_df("Observations", id_bunch, extra_filter = bunched_filter)
}
)
merged_observations <- bind_rows(bunched_observations)
}
# Check that no rows have been dropped along the way
# TODO: unit test for this?
expected_num_obs <- get_api_df(
"Observations",
api_ids,
extra_filter = paste0(
" and (Period gt ", min_date, ")&$apply=aggregate($count as count)"
)
)[1, 1]
if (nrow(merged_observations) < expected_num_obs) {
warning(paste(
"API-download process returned", nrow(merged_observations), "rows,",
"but expected", expected_num_obs, "rows."
))
}
output_list <- list(metadata = resources, data = merged_observations)
return(output_list)
}
split_ids_into_equal_buckets <- function(api_ids, num_rows_per_bucket) {
#' Return ResourceIDs grouped with equal(ish) number of rows in Observations.
#'
#' example of return value:
#' list(
#' 1 = c("id1", "id2", "id3", "id4", "id5"),
#' 2 = c("id6", "id7")
#' )
#' (this would indicate id6 & id7 would *each* have more rows than *each* of
#' the first 5 ids)
count_filter <- "&$apply=groupby((ResourceID),aggregate($count as count))"
info <- get_api_df("Observations", api_ids, extra_filter = count_filter) %>%
arrange(count) %>%
mutate(cumul_weight = cumsum(count))
n_buckets <- max(ceiling(sum(info$count) / num_rows_per_bucket),
2) # cut requires breaks >=2
info <- info %>%
mutate(
bucket_num = cut(cumul_weight, n_buckets) %>% as.numeric()
)
bucket_num_to_ids <- split(info$ResourceID, info$bucket)
return(bucket_num_to_ids)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.