R/quantile_estimation_functions_master.R

Defines functions get_metrics get_county_data write_zipped_aggregation_files pull_file_names calc_metrics get_prob_labels process_raw_data get_cumul_expr get_configuration

Documented in calc_metrics get_configuration get_county_data get_metrics get_prob_labels process_raw_data pull_file_names write_zipped_aggregation_files

#' Get the base configuration
#'
#' This function prepares the base configuration
#' @param geoids_data_table data table containing list of states for each geoid
#' @param cumul_cols vector of character column names that should cumulated (default is c("incidI", "incidD","incidH"))
#' @return Returns a configuration, indicating the target columns, cols to cumulate, target quantiles, and by columns for
#' each geographical level
#' @export
#' @examples
#' get_configuration(geoids)
#' get_configuration(geoids, cumul_cols=c("incidI"))
#' get_configuration(geoids, cumul_cols=c("incidI", "incidD", "incidH"))
get_configuration <- function(geoids_data_table, cumul_cols = c("incidI", "incidD","incidH")) {
  config <- list(
    us = list(
      target_columns = c("incidI","incidH","hosp_curr","icu_curr", "vent_curr", "incidICU", "incidVent", "incidD"),
      target_quantiles = c(0.01,0.025, seq(0.05,.95,.05),.975,.999),
      by_columns = c("date_inds")
    ),
    state = list(
      target_columns = c("incidI","incidH","hosp_curr","icu_curr", "vent_curr", "incidICU", "incidVent", "incidD"),
      target_quantiles = c(0.01,0.025, seq(0.05,.95,.05),.975,.999),
      by_columns = c("USPS", "date_inds")
    ),
    county = list(
      target_columns = c("incidI","incidH","hosp_curr","icu_curr", "vent_curr", "incidICU", "incidVent", "incidD"),
      target_quantiles = c(0.025,0.05, 0.25,0.50,0.75,0.95,.975),
      by_columns = c("geoid", "date_inds")
    ),
    cols_to_cumul = cumul_cols
  )


  for(level in c("us", "state", "county")) {
    current_cols <- config[[level]][["target_columns"]]
    for(cumulcol in config[["cols_to_cumul"]]) {
      config[[level]][["target_columns"]] <- c(config[[level]][["target_columns"]], paste0("cum",cumulcol))
    }
  }
  #add the cumulation expression
  config[["cc"]] <- get_cumul_expr(config[["cols_to_cumul"]])
  config[["geoids"]] <-geoids_data_table
  return(config)
}

get_cumul_expr <- function(cc) {
  cumul_colnames=paste0("cum",cc)
  cumul_expr <- paste0("list(",paste0("cumsum(",cc,")", collapse = ","),")")
  return(list(cumul_colnames, rlang::parse_expr(cumul_expr)))
}


############################################################
######################  Get Raw Data - General #######################
############################################################

#' Process the Raw Data
#'
#' This function processes the raw model output data
#' @param config aggregation configuration list
#' @param fnames vector of filenames (absolute paths, required)
#' @param parallelize defaults to TRUE, indicates whether or not processing should be done in parallel
#' @param write_geoid defaults to FALSE, indicates whether county level should also be processed
#' @param geoid_save_location this must be provided if write_geoid=TRUE; is path to location for temporary geoid csv
#' @return Returns a list of data tables, one at the us/national level (summed over all geoids),
#' and one at the state level (summed over all geoids within each state)
#' @import data.table
#' @importFrom foreach %dopar%
#' @export
#' @examples
#' process_raw_data(fnames, parallelize=T, write_geoid=F)
#' process_raw_data(fnames, parallelize=T, write_geoid=T, geoid_save_location="tempfolder/")
process_raw_data <- function(config, fnames, parallelize=T, write_geoid=F, geoid_save_location=NULL) {
  cat(paste0("Pulling Raw Data and Processing to State / National",
  dplyr::if_else(write_geoid," / County "," "),"Level ... may take some time...\n"))

  if(parallelize) doParallel::registerDoParallel()

  result = foreach::foreach(i=fnames) %dopar% {
      #get the parquet file as data table
      usdf <- data.table::data.table(arrow::read_parquet(i))

      #if need to write off separately by geoid, do so
      if(write_geoid) usdf[, data.table::fwrite(.SD, paste0(geoid_save_location,.BY,".csv"), append=T,col.names=F), by=geoid]

      #get any cumulative columns necessary
      usdf[,eval(config[["cc"]][[1]]):=eval(config[["cc"]][[2]]), by=geoid]

      #make a version that has the state information
      statedf <- merge(usdf,config[["geoids"]],by="geoid")
      #sum up geoids for state
      statedf <- statedf[,lapply(.SD, sum), by=eval(config[["state"]][["by_columns"]]), .SDcols = eval(config[["state"]][["target_columns"]])]
      #sum up geoids for nation
      usdf <- usdf[,lapply(.SD, sum), by=eval(config[["us"]][["by_columns"]]), .SDcols = eval(config[["us"]][["target_columns"]])]

      #return aggregated data
      list(statedf,usdf)
  }
  us <- data.table::rbindlist(lapply(result, function(x) x[[2]]))
  state <- data.table::rbindlist(lapply(result,function(x) x[[1]]))

  return(list("us" = us, "state" = state))

}

#' Get the labels for quantile
#'
#' This function returns a set of string labels given a vector of probabilities
#' @param probs a vector of probabilities (see get_configuration() for an example of a vector)
#' @return Returns a vector of string labels starting with a p (e.g. c("p250","p500", "p750")
#' @export
#' @examples
#' get_prob_labels(c(0.025,0.05, 0.25,0.5,0.75,0.95,.975))
get_prob_labels <- function(probs) {
  plabel <- paste0("p", stringr::str_remove(probs,"0\\.") %>% stringr::str_pad("3","right", "0"))
  return(plabel)
}


############################################################
######################  get metrics ######################
############################################################
#' Calculate the aggregations metrics (mean, quantiles)
#'
#' This function estimates metrics, by group, given a data table, desired quantiles, and by columns
#' @param dt a data table containing the raw data
#' @param cols_to_metric a vector of column names on which to calculate metrics
#' @param probs a vector of probabilities that are the target quantiles
#' @param bycols a vector of column names that serve as the grouping variables over which the metrics should be
#' calculated
#' @return Returns a single data frame, that contains the raw input data summarized over the bycols
#' @export
#' @examples
#' calc_metrics(mytable, c("incidI","incidD"), c(0.05,0.50, 0.95), bycols="groupvar")
calc_metrics <- function(dt,cols_to_metric, probs, bycols) {
  probs_label = get_prob_labels(probs)

  #grab means first (fast)
  means = dt[,lapply(.SD, mean), by=bycols, .SDcols=cols_to_metric]

  # ADD the mean label variable
  means[,q := rep("mean",length.out=.N)]

  # OKAY, now calculate the quantiles, over bycols
  dt = dt[,lapply(.SD,quantile, probs),by=bycols,.SDcols = cols_to_metric]
  # ADD the quantile label variable
  dt[,q := rep(probs_label,length.out=.N)]

  return(rbind(means,dt))

}

#' Get a vector of source file names
#'
#' This function returns a vector of string file names, given a source directory and a pattern
#' @param srcdir path to where the source parquet files are located
#' @param pattern_to_search character string that will serve as the pattern (i.e. "high") to get a specific subset of files
#' @param file_num integer value. If this value is less than the number of files, a random draw of size file_num
#' will be drawn from the pool of possible source files (small numbers aid in testing, prior to aggregating entire
#' set of 1000 files)
#' @return vector of source file names
#' @export
#' @examples
#' pull_file_names("modeloutput/scenario_X/", "med", 50)
#' pull_file_names("modeloutput/scenario_Y/", "high", 1000)

pull_file_names <- function(srcdir, pattern_to_search = NULL, file_num) {
  fnames = dir(path=srcdir, pattern = pattern_to_search, full.names = T)
  if(file_num<=length(fnames)) fnames = sample(fnames, file_num, replace=F)
  return(fnames)
}

#' Write aggregated files
#'
#' Convenience function to write a compressed version of results, given a list of results, and a destination directory
#' @param result a named list of data tables; each will be written to a separate compressed csv file
#' @param dest_dir a path to folder where these files will be written
#' @param scenario a description of scenario - will be included in the file name
#' @param ifr_level character string that indicates the ifr level - will be included in the file name
#' @return NULL
#' @export
#' @examples
#' write_zipped_aggregation_files(result, "my_dest_path/", "scenarioX", "med")
write_zipped_aggregation_files <- function(result, dest_dir, scenario, ifr_level) {

  dir.create(file.path(dest_dir), showWarnings = FALSE)

  lapply(names(result), function(res_name) {
    targ_fname <- paste0(dest_dir, scenario, "_", ifr_level, "_", res_name, ".csv.gz")
    data.table::fwrite(result[[res_name]], targ_fname, compress="gzip")
  })
  cat("Output files written.\n")
  invisible(return(NULL))
}

#' Estimate quantiles/metrics on county days
#'
#' This function will take individual geo-id level csv files, and will estimate metrics over the set of sims and days
#' @param geoid_dir location of csv files
#' @param tquant a vector of target quantiles (i.e. all between 0 and 1)
#' @param tcols a vector of columns names that are the targets of the estimation
#' @return Returns a datatable of county level aggregated information
#' @importFrom foreach %dopar%
#' @export
#' @examples
#' get_county_data("path_to_my_csvs/" c(0.25,0.5,0.75), c("col1", "col2")
get_county_data <- function(geoid_dir, tquant, tcols) {

  doParallel::registerDoParallel()
  fnames <- dir(path=geoid_dir, full.names = F)

  probs_label = get_prob_labels(tquant)

  county_list <- data.table::rbindlist(
    foreach::foreach(f=fnames, .inorder = F, .verbose = F) %dopar% {

      df <- data.table::fread(paste0(geoid_dir, f))
      data.table::setnames(df,old=c("time","uid","sim_num","comp","incidI","hosp_curr","icu_curr","vent_curr","incidH","incidICU","incidVent","incidD","date_inds","geo_ind"))
      data.table::setorder(df,sim_num,date_inds)

      #need to estimate the sim-specific cumdeaths and cumfin
      df[,eval(config[["cc"]][[1]]):=eval(config[["cc"]][[2]]), by=sim_num]

      #get metrics
      df <- rbind(
        df[,lapply(.SD, quantile,tquant),by=date_inds,.SDcols = tcols][,q := rep(probs_label,length.out=.N)],
        df[,lapply(.SD,mean), by=date_inds, .SDcols = tcols][,q := rep("mean",length.out=.N)]
      )

      #add a geoid label
      geoid = stringr::str_sub(f,1,5)
      df[,geoid := rep(geoid,length.out=.N)]

      return(df)
  })

  doParallel::stopImplicitCluster()
  return(county_list)
}


############################################################
######################  Wrapper Function ######################
############################################################
#' Wrapper function to simplify workflow
#'
#' Function will call other package functions to simplify the workflow of creating all aggregations for a scenario
#' @param config for this aggregation
#' @param srcdir a path to model output
#' @param scenario a scenario name (character string)
#' @param ifr_level a character string that indicates the ifr level (subset of parquet files)
#' @param file_num number of source file on which to perform the aggregation (1000 for full set)
#' @param dest_dir a path to save the aggregated files
#' @param anchor_data a string data in form "YYYY-MM-DD" that indicates the first day of the model
#' output (this defaults to "2020-01-01")
#' @param include_county a boolean (default is FALSE) indicating whether county level aggregation should be done
#' @param geoid_save_path a string path where temp csvs should be stored if include_county is TRUE
#' @return Returns a datatable of county level aggregated information
#' @export
#' @examples
#' config <- get_configuration(geoid_table)
#' get_metrics(config, "modeloutput/", "scenarioX", "med", 1000, "my_save_path/", "2020-01-01", include_county=F)
#' get_metrics(config, "modeloutput/", "scenarioX", "med", 1000, "my_save_path/", "2020-01-01", include_county=T, "tempfolder/")
get_metrics <- function(config,
                        srcdir,
                        scenario,
                        ifr_level,
                        pattern_to_search,
                        file_num,
                        dest_dir,
                        anchor_date="2020-01-01",
                        include_county=F,
                        geoid_save_path = NULL) {


  # First, get all the source file fnames
  fnames = pull_file_names(srcdir, pattern_to_search, file_num)
  if(length(fnames)==0) {
    stop("No files found, check pattern_to_search and srcdir.")
  }

  # Get the raw data for both us and state (and prepare county geoid files, if requested)
  system.time({result = process_raw_data(config, fnames,parallelize = T,write_geoid = include_county,geoid_save_location = geoid_save_path)})


  cat("Estimating State and National Quantiles and Means ... may take some time...\n")

  ######### METHOD 3, USING regular lapply #####
  # Third, get the us and state quantile estimation
  result = lapply(c("us","state"), function(geo) {
    calc_metrics(dt = result[[geo]],
                 cols_to_metric =  config[[geo]][["target_columns"]],
                 probs = config[[geo]][["target_quantiles"]],
                 bycols = config[[geo]][["by_columns"]])
  })
  names(result) <- c("us", "state")


  #Fourth, ADD County Information, if requested
  if(include_county) {
    cat("Estimating County Quantiles and Means ... may take some time...\n")
    result[["county"]] <- get_county_data(geoid_dir = geoid_save_path,
                                          tquant = config$county$target_quantiles,
                                          tcols = config$county$target_columns)
    #remove the csv files
    system2(command = "rm",args =paste0(geoid_save_path,"*"))
  }

  #Fifth, ADD Date Variable
  for(geoname in names(result)) {
    result[[geoname]] <- result[[geoname]][,date:=as.Date(anchor_date)-1+date_inds]
  }

  #Sixth write aggregation files
  write_zipped_aggregation_files(result, dest_dir,scenario,ifr_level)

  return(NULL)

}
lmullany/iddquantiles documentation built on June 21, 2020, 7:28 p.m.