#' Get the base configuration
#'
#' This function prepares the base configuration
#' @param geoids_data_table data table containing list of states for each geoid
#' @param cumul_cols vector of character column names that should cumulated (default is c("incidI", "incidD","incidH"))
#' @return Returns a configuration, indicating the target columns, cols to cumulate, target quantiles, and by columns for
#' each geographical level
#' @export
#' @examples
#' get_configuration(geoids)
#' get_configuration(geoids, cumul_cols=c("incidI"))
#' get_configuration(geoids, cumul_cols=c("incidI", "incidD", "incidH"))
get_configuration <- function(geoids_data_table, cumul_cols = c("incidI", "incidD","incidH")) {
config <- list(
us = list(
target_columns = c("incidI","incidH","hosp_curr","icu_curr", "vent_curr", "incidICU", "incidVent", "incidD"),
target_quantiles = c(0.01,0.025, seq(0.05,.95,.05),.975,.999),
by_columns = c("date_inds")
),
state = list(
target_columns = c("incidI","incidH","hosp_curr","icu_curr", "vent_curr", "incidICU", "incidVent", "incidD"),
target_quantiles = c(0.01,0.025, seq(0.05,.95,.05),.975,.999),
by_columns = c("USPS", "date_inds")
),
county = list(
target_columns = c("incidI","incidH","hosp_curr","icu_curr", "vent_curr", "incidICU", "incidVent", "incidD"),
target_quantiles = c(0.025,0.05, 0.25,0.50,0.75,0.95,.975),
by_columns = c("geoid", "date_inds")
),
cols_to_cumul = cumul_cols
)
for(level in c("us", "state", "county")) {
current_cols <- config[[level]][["target_columns"]]
for(cumulcol in config[["cols_to_cumul"]]) {
config[[level]][["target_columns"]] <- c(config[[level]][["target_columns"]], paste0("cum",cumulcol))
}
}
#add the cumulation expression
config[["cc"]] <- get_cumul_expr(config[["cols_to_cumul"]])
config[["geoids"]] <-geoids_data_table
return(config)
}
get_cumul_expr <- function(cc) {
cumul_colnames=paste0("cum",cc)
cumul_expr <- paste0("list(",paste0("cumsum(",cc,")", collapse = ","),")")
return(list(cumul_colnames, rlang::parse_expr(cumul_expr)))
}
############################################################
###################### Get Raw Data - General #######################
############################################################
#' Process the Raw Data
#'
#' This function processes the raw model output data
#' @param config aggregation configuration list
#' @param fnames vector of filenames (absolute paths, required)
#' @param parallelize defaults to TRUE, indicates whether or not processing should be done in parallel
#' @param write_geoid defaults to FALSE, indicates whether county level should also be processed
#' @param geoid_save_location this must be provided if write_geoid=TRUE; is path to location for temporary geoid csv
#' @return Returns a list of data tables, one at the us/national level (summed over all geoids),
#' and one at the state level (summed over all geoids within each state)
#' @import data.table
#' @importFrom foreach %dopar%
#' @export
#' @examples
#' process_raw_data(fnames, parallelize=T, write_geoid=F)
#' process_raw_data(fnames, parallelize=T, write_geoid=T, geoid_save_location="tempfolder/")
process_raw_data <- function(config, fnames, parallelize=T, write_geoid=F, geoid_save_location=NULL) {
cat(paste0("Pulling Raw Data and Processing to State / National",
dplyr::if_else(write_geoid," / County "," "),"Level ... may take some time...\n"))
if(parallelize) doParallel::registerDoParallel()
result = foreach::foreach(i=fnames) %dopar% {
#get the parquet file as data table
usdf <- data.table::data.table(arrow::read_parquet(i))
#if need to write off separately by geoid, do so
if(write_geoid) usdf[, data.table::fwrite(.SD, paste0(geoid_save_location,.BY,".csv"), append=T,col.names=F), by=geoid]
#get any cumulative columns necessary
usdf[,eval(config[["cc"]][[1]]):=eval(config[["cc"]][[2]]), by=geoid]
#make a version that has the state information
statedf <- merge(usdf,config[["geoids"]],by="geoid")
#sum up geoids for state
statedf <- statedf[,lapply(.SD, sum), by=eval(config[["state"]][["by_columns"]]), .SDcols = eval(config[["state"]][["target_columns"]])]
#sum up geoids for nation
usdf <- usdf[,lapply(.SD, sum), by=eval(config[["us"]][["by_columns"]]), .SDcols = eval(config[["us"]][["target_columns"]])]
#return aggregated data
list(statedf,usdf)
}
us <- data.table::rbindlist(lapply(result, function(x) x[[2]]))
state <- data.table::rbindlist(lapply(result,function(x) x[[1]]))
return(list("us" = us, "state" = state))
}
#' Get the labels for quantile
#'
#' This function returns a set of string labels given a vector of probabilities
#' @param probs a vector of probabilities (see get_configuration() for an example of a vector)
#' @return Returns a vector of string labels starting with a p (e.g. c("p250","p500", "p750")
#' @export
#' @examples
#' get_prob_labels(c(0.025,0.05, 0.25,0.5,0.75,0.95,.975))
get_prob_labels <- function(probs) {
plabel <- paste0("p", stringr::str_remove(probs,"0\\.") %>% stringr::str_pad("3","right", "0"))
return(plabel)
}
############################################################
###################### get metrics ######################
############################################################
#' Calculate the aggregations metrics (mean, quantiles)
#'
#' This function estimates metrics, by group, given a data table, desired quantiles, and by columns
#' @param dt a data table containing the raw data
#' @param cols_to_metric a vector of column names on which to calculate metrics
#' @param probs a vector of probabilities that are the target quantiles
#' @param bycols a vector of column names that serve as the grouping variables over which the metrics should be
#' calculated
#' @return Returns a single data frame, that contains the raw input data summarized over the bycols
#' @export
#' @examples
#' calc_metrics(mytable, c("incidI","incidD"), c(0.05,0.50, 0.95), bycols="groupvar")
calc_metrics <- function(dt,cols_to_metric, probs, bycols) {
probs_label = get_prob_labels(probs)
#grab means first (fast)
means = dt[,lapply(.SD, mean), by=bycols, .SDcols=cols_to_metric]
# ADD the mean label variable
means[,q := rep("mean",length.out=.N)]
# OKAY, now calculate the quantiles, over bycols
dt = dt[,lapply(.SD,quantile, probs),by=bycols,.SDcols = cols_to_metric]
# ADD the quantile label variable
dt[,q := rep(probs_label,length.out=.N)]
return(rbind(means,dt))
}
#' Get a vector of source file names
#'
#' This function returns a vector of string file names, given a source directory and a pattern
#' @param srcdir path to where the source parquet files are located
#' @param pattern_to_search character string that will serve as the pattern (i.e. "high") to get a specific subset of files
#' @param file_num integer value. If this value is less than the number of files, a random draw of size file_num
#' will be drawn from the pool of possible source files (small numbers aid in testing, prior to aggregating entire
#' set of 1000 files)
#' @return vector of source file names
#' @export
#' @examples
#' pull_file_names("modeloutput/scenario_X/", "med", 50)
#' pull_file_names("modeloutput/scenario_Y/", "high", 1000)
pull_file_names <- function(srcdir, pattern_to_search = NULL, file_num) {
fnames = dir(path=srcdir, pattern = pattern_to_search, full.names = T)
if(file_num<=length(fnames)) fnames = sample(fnames, file_num, replace=F)
return(fnames)
}
#' Write aggregated files
#'
#' Convenience function to write a compressed version of results, given a list of results, and a destination directory
#' @param result a named list of data tables; each will be written to a separate compressed csv file
#' @param dest_dir a path to folder where these files will be written
#' @param scenario a description of scenario - will be included in the file name
#' @param ifr_level character string that indicates the ifr level - will be included in the file name
#' @return NULL
#' @export
#' @examples
#' write_zipped_aggregation_files(result, "my_dest_path/", "scenarioX", "med")
write_zipped_aggregation_files <- function(result, dest_dir, scenario, ifr_level) {
dir.create(file.path(dest_dir), showWarnings = FALSE)
lapply(names(result), function(res_name) {
targ_fname <- paste0(dest_dir, scenario, "_", ifr_level, "_", res_name, ".csv.gz")
data.table::fwrite(result[[res_name]], targ_fname, compress="gzip")
})
cat("Output files written.\n")
invisible(return(NULL))
}
#' Estimate quantiles/metrics on county days
#'
#' This function will take individual geo-id level csv files, and will estimate metrics over the set of sims and days
#' @param geoid_dir location of csv files
#' @param tquant a vector of target quantiles (i.e. all between 0 and 1)
#' @param tcols a vector of columns names that are the targets of the estimation
#' @return Returns a datatable of county level aggregated information
#' @importFrom foreach %dopar%
#' @export
#' @examples
#' get_county_data("path_to_my_csvs/" c(0.25,0.5,0.75), c("col1", "col2")
get_county_data <- function(geoid_dir, tquant, tcols) {
doParallel::registerDoParallel()
fnames <- dir(path=geoid_dir, full.names = F)
probs_label = get_prob_labels(tquant)
county_list <- data.table::rbindlist(
foreach::foreach(f=fnames, .inorder = F, .verbose = F) %dopar% {
df <- data.table::fread(paste0(geoid_dir, f))
data.table::setnames(df,old=c("time","uid","sim_num","comp","incidI","hosp_curr","icu_curr","vent_curr","incidH","incidICU","incidVent","incidD","date_inds","geo_ind"))
data.table::setorder(df,sim_num,date_inds)
#need to estimate the sim-specific cumdeaths and cumfin
df[,eval(config[["cc"]][[1]]):=eval(config[["cc"]][[2]]), by=sim_num]
#get metrics
df <- rbind(
df[,lapply(.SD, quantile,tquant),by=date_inds,.SDcols = tcols][,q := rep(probs_label,length.out=.N)],
df[,lapply(.SD,mean), by=date_inds, .SDcols = tcols][,q := rep("mean",length.out=.N)]
)
#add a geoid label
geoid = stringr::str_sub(f,1,5)
df[,geoid := rep(geoid,length.out=.N)]
return(df)
})
doParallel::stopImplicitCluster()
return(county_list)
}
############################################################
###################### Wrapper Function ######################
############################################################
#' Wrapper function to simplify workflow
#'
#' Function will call other package functions to simplify the workflow of creating all aggregations for a scenario
#' @param config for this aggregation
#' @param srcdir a path to model output
#' @param scenario a scenario name (character string)
#' @param ifr_level a character string that indicates the ifr level (subset of parquet files)
#' @param file_num number of source file on which to perform the aggregation (1000 for full set)
#' @param dest_dir a path to save the aggregated files
#' @param anchor_data a string data in form "YYYY-MM-DD" that indicates the first day of the model
#' output (this defaults to "2020-01-01")
#' @param include_county a boolean (default is FALSE) indicating whether county level aggregation should be done
#' @param geoid_save_path a string path where temp csvs should be stored if include_county is TRUE
#' @return Returns a datatable of county level aggregated information
#' @export
#' @examples
#' config <- get_configuration(geoid_table)
#' get_metrics(config, "modeloutput/", "scenarioX", "med", 1000, "my_save_path/", "2020-01-01", include_county=F)
#' get_metrics(config, "modeloutput/", "scenarioX", "med", 1000, "my_save_path/", "2020-01-01", include_county=T, "tempfolder/")
get_metrics <- function(config,
srcdir,
scenario,
ifr_level,
pattern_to_search,
file_num,
dest_dir,
anchor_date="2020-01-01",
include_county=F,
geoid_save_path = NULL) {
# First, get all the source file fnames
fnames = pull_file_names(srcdir, pattern_to_search, file_num)
if(length(fnames)==0) {
stop("No files found, check pattern_to_search and srcdir.")
}
# Get the raw data for both us and state (and prepare county geoid files, if requested)
system.time({result = process_raw_data(config, fnames,parallelize = T,write_geoid = include_county,geoid_save_location = geoid_save_path)})
cat("Estimating State and National Quantiles and Means ... may take some time...\n")
######### METHOD 3, USING regular lapply #####
# Third, get the us and state quantile estimation
result = lapply(c("us","state"), function(geo) {
calc_metrics(dt = result[[geo]],
cols_to_metric = config[[geo]][["target_columns"]],
probs = config[[geo]][["target_quantiles"]],
bycols = config[[geo]][["by_columns"]])
})
names(result) <- c("us", "state")
#Fourth, ADD County Information, if requested
if(include_county) {
cat("Estimating County Quantiles and Means ... may take some time...\n")
result[["county"]] <- get_county_data(geoid_dir = geoid_save_path,
tquant = config$county$target_quantiles,
tcols = config$county$target_columns)
#remove the csv files
system2(command = "rm",args =paste0(geoid_save_path,"*"))
}
#Fifth, ADD Date Variable
for(geoname in names(result)) {
result[[geoname]] <- result[[geoname]][,date:=as.Date(anchor_date)-1+date_inds]
}
#Sixth write aggregation files
write_zipped_aggregation_files(result, dest_dir,scenario,ifr_level)
return(NULL)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.