#' Estimate quantiles of a climate time series
#'
#' This is a wrapper for \code{\link{get.quantiles}} that allows for file
#' management, batch processing, bootstrapping, and saving of output, if all the
#' directory and filename/structure defaults are followed, for the estimation of
#' quantiles across model time series, with potentially multiple runs of data
#' over the same time frame.
#'
#' \code{estimate.quantiles} runs \code{\link{get.quantiles}} on a
#' pixel-by-pixel basis. These pixels are loaded and processed on a
#' latitude-by-region/subset basis - in other words, a 'block' of pixels is
#' every pixel in one subset/region (determined by a separate .nc file in the
#' \code{mod.data.dir} set by the \code{\link{set.defaults}} function) with the
#' same latitude. This 'block' of pixels is loaded and sent through
#' \code{\link{get.quantiles}} one at a time; the resultant coefficients are
#' saved in files 'block' by 'block'. The 'blocks' are identified through the
#' saved output of \code{\link{get.process.chunks}}.
#'
#' @section Loading basis functions:
#' This function assumes that needed basis functions have already
#' been created using \code{\link{get.predictors}} and attempts
#' to load them (this is the most computationally efficient way
#' of dealing with them). It otherwise regenerates them.
#'
#' @section On \code{\link{get.quantiles}} parameters:
#' Inputs to \code{\link{get.quantiles}} are governed through the
#' \code{defaults} object, generated by the \code{\link{set.defaults}}
#' function. These include which quantiles to estimate (\code{q_norm},
#' \code{q_bulk}, \code{q_tail}), how many degrees of freedom to use in the
#' predictor variables (\code{df.x}, \code{df.t}, \code{df.xt}), what years to
#' process (\code{year.range}), and whether to add the volcanic forcing data
#' (\code{get.volc}). See \code{\link{get.quantiles}} and
#' \code{\link{set.defaults}} for details. Additionally, bootstrap/uncertainty
#' quantification parameters(\code{bootstrapping}, \code{nboots}, and
#' \code{block.size}) are set; these are used in this wrapper to govern
#' bootstrapping.
#'
#' @section On processing efficiency:
#' \code{estimate.quantiles} does not explicitly support parallel processing
#' \strong{within} the function (various \code{mclapply} instances were tried
#' without promising results). \strong{However}, the creation of temporary
#' output files at the start of processing and built-in testing for the
#' existance of the output files means that this function can be called from
#' multiple instances of R (for example in running in batch) without overwriting
#' work done by other instances.
#'
#' This is the most computationally-intensive step in the
#' package.
#'
#' @section WARNING WHEN BOOTSTRAPPING:
#' The loaded model data is copied \code{nboots} times within this code if
#' \code{defaults$bootstrapping=T}. Make sure to reduce the size of processing
#' chunks when running with bootstrapping turned on to reduce the risk of
#' memory issues... (some future version of this code may have a more
#' sophisticated treatment of this issue)
#'
#' @param defaults the output from \code{\link{set.defaults}}, used to set input
#' parameters (see Section "On \code{get.quantiles} parameters," below)
#' @param log whether to log the output using \code{sink()} in the directory
#' \code{[defaults$aux.data.dir]/run_logs/}. By default, true. Log filenames
#' are (using run start time):
#' \code{estimate_quantiles_run_YYYY-MM-DD_HH-MM-SS.txt}.
#' @param process.inputs add custom process.inputs list. By default, the code
#' attempts to load [defaults$mod.data.dir]/process_inputs.RData or tries to
#' generate its own using \code{get.process.chunks}. If this is not desired
#' (i.e. you want to just process a subset of process chunks), put a
#' subsetted output of \code{get.process.chunks} here , or make your own -
#' just make sure that it's a list of process chunks, with every element
#' containing at least the fields [global_loc] (used to load the correct
#' [params] file), [lat] (just one lat, by lat band), [lon] (all the lon
#' values desired), and [fn] (the raw data filename from which the params
#' were calculated).
#' @param max.runtime if batch processing on a server with a maximum allocated
#' runtime, you can set it here, and the run will stop when the next block of
#' time might take longer to run than the remaining allocated time. This
#' prevents empty, un-processed temporary output files from preventing the
#' complete processing of all data 'blocks'. Explicitly, the next block isn't
#' processed and the function run is interrupted if the current system time if
#' \code{start.time + max.runtime - Sys.time() < (2 * 160)*[num pixels in
#' block] seconds}, assuming that it takes roughly 160 seconds per pixel to
#' run this code (for 40 runs, 121 years of data, on the server this code was
#' written on etc.); this time assumption can be changed with the
#' \code{assumed.avg.processing.time} option, detailed below. Set to 0 if
#' you don't want this feature interefering.
#' @param assumed.avg.processing.time by default 160 (seconds), the estimated
#' processing time per pixel. Used in interrupting batch run if time is
#' running out.
#'
#' @return Nothing. Output is saved instead.
#'
#' @importFrom tictoc tic toc
# SO FAR THIS ASSUMES THAT LINEAR INDEXING IS ACROSS LONS
# (i.e. if using [lon,lat], it goes 1=(1,1),2=(2,1), etc.)
estimate.quantiles <- function(defaults,log=T,
max.runtime=4*60*60,assumed.avg.processing.time=160,
process.inputs=list()) {
# ----- SETUP ----------------------------------------------------------------------
# Set server-allocated wall time, to cancel jobs when it gets
# too close to the end (to make sure that no empty output files
# are created because we ran out of server time halfway through
# calculating one of these guys) (in seconds)
start.time <- Sys.time()
# Write error output to file
if (log) {
sink(paste0(defaults$aux.dir,"run_logs/estimate_quantiles_run_",gsub('\\:','-',gsub('\\s','_',as.character(start.time))),".txt"))
}
# Load process inputs (lists of inputs / filepaths / etc.
# necessary to run qmapping procedure)
if (length(process.inputs)==0){
if (file.exists(paste0(defaults$mod.data.dir,"process_inputs.RData"))) {
load(paste0(defaults$mod.data.dir,"process_inputs.RData"))
} else {
process.inputs <- get.process.chunks(defaults,save.output=TRUE,search.dir=defaults$mod.data.dir)
}
}
# Shuffle process inputs for processing below
process.inputs <- process.inputs[sample(seq(1,length(process.inputs)),length(process.inputs))]
# ----- LOAD BASIS FUNCTIONS ------------------------------------------------------
# Load bulk basis function (just one run)
basis.fn <- paste0(defaults$aux.dir,"bases/spline_basis_functions_",diff(defaults$mod.year.range)+1,"years_",
process.inputs[[1]]$nruns,"runs_",paste0(defaults$bulk.x.df,collapse="-"),"df.RData")
if (file.exists(basis.fn)) {
load(basis.fn); bulk.x <- X; rm(list=c("X","basis.fn"))
} else {
bulk.x <- get.predictors(n_files=process.inputs[[1]]$nruns,dfs=defaults$bulk.x.df,year.range=defaults$mod.year.range,save.predictors=T)
}
# Load tail basis function (just one run)
basis.fn <- paste0(defaults$aux.dir,"bases/spline_basis_functions_",diff(defaults$mod.year.range)+1,"years_",
process.inputs[[1]]$nruns,"runs_",paste0(defaults$tail.x.df,collapse="-"),"df.RData")
if (file.exists(basis.fn)) {
load(basis.fn); tail.x <- X; rm(list=c("X","basis.fn"))
} else {
tail.x <- get.predictors(n_files=process.inputs[[1]]$nruns,dfs=defaults$tail.x.df,year.range=defaults$mod.year.range,save.predictors=T)
}
# Load normalization basis function if not using volcanic data
# (and therefore it doesn't need to be latitude-dependent)
if (!defaults$get.volc) {
basis.fn <- paste0(defaults$aux.dir,"bases/spline_basis_functions_",diff(defaults$mod.year.range)+1,"years_",
process.inputs[[1]]$nruns,"runs_",paste0(defaults$norm.x.df,collapse="-"),"df.RData")
if (file.exists(basis.fn)) {
load(basis.fn); norm.x <- X; rm(list=c("X","basis.fn"))
} else {
norm.x <- get.predictors(n_files=process.inputs[[1]]$nruns,dfs=defaults$norm.x.df,year.range=defaults$mod.year.range,save.predictors=T)
}
} else {
norm.x <- numeric()
}
# ----- SET A BY-PROCESS-CHUNK WRAPPER FUNCTION ------------------------------------------------------
run.qmap <- function(process.inputs.tmp,
defaults,
bulk.x,tail.x,norm.x=numeric(),
max.runtime,assumed.avg.processing.time) {
output.fn <- paste0(defaults$mod.data.dir,"params/",defaults$filevar,
"_day_",defaults$mod.name,"_quantfit_params_",
defaults$mod.year.range[1],"-",defaults$mod.year.range[2],"_locs",
process.inputs.tmp$global_loc[1],"-",
process.inputs.tmp$global_loc[length(process.inputs.tmp$global_loc)])
if (defaults$bootstrapping) {
output.fn <- paste0(output.fn,"_",defaults$block.size,"block",defaults$nboots,"runs")
}
output.fn <- paste0(output.fn,defaults$fn.suffix,".RData")
# Only process if the file doesn't already exist
if (!file.exists(output.fn)) {
# Only process if there's enough time left - this is determined by seeing
# if the time elapsed since the start of the run is more than 2 x the
# average processing time expected for the number of pixels in this run
# (assuming ~160 seconds / pixel)
if (max.runtime==0 ||
difftime(Sys.time(),start.time,units="secs") <
(max.runtime)-
1.2*mean(assumed.avg.processing.time)*length(process.inputs.tmp$global_loc)*
max(1,defaults$bootstrapping*defaults$nboots)) {
# Save a placeholder file in the directory; to make sure no
# double-processing happens
status <- 'processing'
save(file=output.fn,
status)
rm(status)
# Input into a tryCatch statement to allow deletion of the temporary file
# in the case of an error (which hopefully will never happen, but
# yaknow...)
tryCatch({
# ----- SETUP --------------------------------------------------------------------
cat("\n\n")
cat(paste0("Beginning processing for pixels with lat=",process.inputs.tmp$lat,
" in region ",process.inputs.tmp$reg," (global locations ",process.inputs.tmp$global_loc[1],"-",
process.inputs.tmp$global_loc[length(process.inputs.tmp$global_loc)],")"),fill=TRUE)
# Get file year range (from position in filename, FILENAME MUST BE IN CMIP5 STANDARD BY YEAR, but can be either YYYYMMDD or YYYY)
fn.year.range <- strtoi(substr(strsplit(strsplit(process.inputs.tmp$fn,'\\_|\\.')[[1]][6],'\\-')[[1]],1,4))
# ----- LOAD LAT-DEPENDENT BASES -------------------------------------------------
# Get spline basis functions for normalization fit
if (length(norm.x)==0) {
if (defaults$get.volc) {
lat.volc <- volc.data$lat
# Load basis file
basis.fn <- paste0(defaults$aux.dir,"bases/spline_basis_functions_",diff(defaults$base.year.range)+1,"years_",
process.inputs.tmp$nruns,"runs_",paste0(defaults$base.norm.x.df,collapse="-"),
"df_volc",gsub("\\.","-",round(lat.volc[which.min(abs(as.vector(lat.volc)-as.vector(process.inputs.tmp$lat)))],1)),".RData")
if (file.exists(basis.fn)) {load(basis.fn);norm.x<-X;rm(x)} else {norm.x <- get.predictors(n_files=process.inputs.tmp$nruns,dfs=defaults$norm.x.df,year.range=defaults$mod.year.range,get.volc=TRUE,lat=process.inputs.tmp$lat,save.predictors=T)}
rm(list=c("basis.fn","lat.volc"))
} else {
basis.fn <- paste0(defaults$aux.dir,"bases/spline_basis_functions_",diff(defaults$mod.year.range)+1,"years_",process.inputs.tmp$nruns,"runs_",paste0(defaults$norm.x.df,collapse="-"),"df.RData")
if (file.exists(basis.fn)) {load(basis.fn);norm.x<-X;rm(x)} else {norm.x <- get.predictors(n_files=process.inputs.tmp$nruns,dfs=defaults$norm.x.df,year.range=defaults$mod.year.range,save.predictors=T)}
}
}
# ----- LOAD AND SET UP RAW MODEL DATA --------------------------------------------
# Load raw netcdf data, by latitude band within a region. [get.ncdf] gets the
# desired pixels, and outputs them as a named list, with each element containing for
# that pixel the raw data ("Raw"), and the lat / lon coordinate.
tic(paste0("loading ",defaults$mod.name," data for ",length(process.inputs.tmp$lon)," pixel(s)"))
Raw <- get.ncdf(defaults,process.inputs.tmp,year.range=defaults$mod.year.range)
toc()
# If resampling (for uncertainty analysis), get the run indices, and
# repeat the base data so that every bootstrap run is its own "pixel"
# in the process list
if (defaults$bootstrapping) {
set.seed(42) #For replicability
run.idxs <- matrix(sample(process.inputs.tmp$nruns,
length(process.inputs.tmp$local_idxs)*defaults$nboots*defaults$block.size,
replace=TRUE),
nrow=defaults$nboots*length(process.inputs.tmp$local_idxs))
# Repeat time series out for each element
Raw <- rep(Raw,each=defaults$nboots)
# Now, resample it by list
Raw <- lapply(1:length(Raw),function(x) list(Raw=as.vector(matrix(Raw[[x]]$Raw,nrow=(diff(defaults$mod.year.range)+1)*365)[,run.idxs[x,]]),
lat=Raw[[x]]$lat,lon=Raw[[x]]$lon))
# ADD SOME RUN ID SO AS TO MAKE IT READABLE BELOW IN THE PRINT CALL
# ALSO - DOES THE CANCELLING RUN THING DUE TO TIME TAKE INTO ACCOUNT BOOTSTRAPPING?
# Correspondingly change in the process.inputs.tmp list to make the
# parameter information assigning below easier
process.inputs.tmp$global_loc <- rep(process.inputs.tmp$global_loc,each=defaults$nboots)
process.inputs.tmp$run.idx <- rep(1:defaults$nboots,length(process.inputs.tmp$lon))
# Set basis function subsetting functions (only the length of as many
# runs as are used in this bootstrapping run, set by
# defaults$block.size)
base.idxs <- 1:(365*(diff(defaults$mod.year.range)+1)*defaults$block.size)
} else {
base.idxs <- 1:length(Raw[[1]]$Raw)
}
# ----- ESTIMATE DATA USING GET.QUANTILES ---------------------------------
params <- lapply(Raw,function(input.list) {
cat("\n") #Insert newlines to add space between messages of different processing chunks
if (defaults$bootstrapping) {
cat(paste0("Processing lon = ",input.list$lon,", run ",input.list$params$run.idx),fill=TRUE)
} else {
cat(paste0("Processing lon = ",input.list$lon),fill=TRUE)
}
get.quantiles(model.y=input.list$Raw,
norm.x.df=defaults$norm.x.df,bulk.x.df=defaults$bulk.x.df,tail.x.df=defaults$tail.x.df,
q_norm=defaults$q_norm,q_bulk=defaults$q_bulk,q_tail=defaults$q_tail,
year.range=defaults$mod.year.range,
norm.x=norm.x[base.idxs,],
bulk.x=bulk.x[base.idxs,],
tail.x=tail.x[base.idxs,],
lat=input.list$lat,lon=input.list$lon,
bases.dir=paste0(defaults$aux.dir,"bases/"),get.volc=defaults$get.volc)
})
# ----- CLEAN UP AND SAVE -------------------------------------------------
# Add a few extra identifiers to outputted list
for (loc.idx in 1:length(params)) {
params[[loc.idx]]$global_loc <- process.inputs.tmp$global_loc[loc.idx]
params[[loc.idx]]$reg <- process.inputs.tmp$reg
if (defaults$get.volc) {
params[[loc.idx]]$inc.volc <- TRUE
} else {
params[[loc.idx]]$inc.volc <- FALSE
}
if (defaults$bootstrapping) {
params[[loc.idx]]$run.idx <- process.inputs.tmp$run.idx[loc.idx]
}
}
# Save fit parameters
save(file=output.fn,
params)
rm(list=c("Raw","params"))
cat(paste0(defaults$mod.name," quantile fits complete and saved for pixels with lat=",process.inputs.tmp$lat,
" in region ",process.inputs.tmp$reg," (global locations ",process.inputs.tmp$global_loc[1],"-",
process.inputs.tmp$global_loc[length(process.inputs.tmp$global_loc)],")!"),fill=TRUE)
cat(paste0("Parameters saved in ",output.fn),fill=TRUE)
cat("\n")
},error=function(e) {
print(e)
# Remove the temporary file if the run has an error
file.remove(output.fn)
cat(paste0("Error occured on the ",defaults$mod.name," quantile fits run for pixels with lat=",process.inputs.tmp$lat,
" in region ",process.inputs.tmp$reg," (global locations ",process.inputs.tmp$global_loc[1],"-",
process.inputs.tmp$global_loc[length(process.inputs.tmp$global_loc)],")!"),fill=TRUE)
cat("\n")
})
} else {
stop(paste0("Run stopped at ",Sys.time()," after ",format(difftime(Sys.time(),start.time)),
" due to set server time limits."))
}
} else {
cat(paste0(output.fn," already exists, was skipped."),fill=TRUE)
cat("\n")
}
# Return nothing
invisible()
}
#----- RUN ------------------------------------------------------
lapply(process.inputs,run.qmap,
defaults=defaults,
bulk.x=bulk.x,tail.x=tail.x,norm.x=norm.x,
max.runtime=max.runtime,assumed.avg.processing.time=assumed.avg.processing.time)
#----- SOME HOUSEKEEPING ----------------------------------------
# CLose file output connection for logging
if (log) {sink()}
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.