R/wrapper_get_quantiles.R

Defines functions estimate.quantiles

Documented in estimate.quantiles

#' Estimate quantiles of a climate time series
#'
#' This is a wrapper for \code{\link{get.quantiles}} that allows for file
#' management, batch processing, bootstrapping, and saving of output, if all the
#' directory and filename/structure defaults are followed, for the estimation of
#' quantiles across model time series, with potentially multiple runs of data
#' over the same time frame.
#'
#' \code{estimate.quantiles} runs \code{\link{get.quantiles}} on a
#' pixel-by-pixel basis. These pixels are loaded and processed on a
#' latitude-by-region/subset basis - in other words, a 'block' of pixels is
#' every pixel in one subset/region (determined by a separate .nc file in the
#' \code{mod.data.dir} set by the \code{\link{set.defaults}} function) with the
#' same latitude. This 'block' of pixels is loaded and sent through
#' \code{\link{get.quantiles}} one at a time; the resultant coefficients are
#' saved in files 'block' by 'block'. The 'blocks' are identified through the
#' saved output of \code{\link{get.process.chunks}}.
#'
#' @section Loading basis functions:
#'  This function assumes that needed basis functions have already
#'  been created using \code{\link{get.predictors}} and attempts
#'  to load them (this is the most computationally efficient way
#'  of dealing with them). It otherwise regenerates them.
#'
#' @section On \code{\link{get.quantiles}} parameters:
#'  Inputs to \code{\link{get.quantiles}} are governed through the
#'  \code{defaults} object, generated by the \code{\link{set.defaults}}
#'  function. These include which quantiles to estimate (\code{q_norm},
#'  \code{q_bulk}, \code{q_tail}), how many degrees of freedom to use in the
#'  predictor variables (\code{df.x}, \code{df.t}, \code{df.xt}), what years to
#'  process (\code{year.range}), and whether to add the volcanic forcing data
#'  (\code{get.volc}). See \code{\link{get.quantiles}} and
#'  \code{\link{set.defaults}} for details. Additionally, bootstrap/uncertainty
#'  quantification parameters(\code{bootstrapping}, \code{nboots}, and
#'  \code{block.size}) are set; these are used in this wrapper to govern
#'  bootstrapping.
#'
#' @section On processing efficiency:
#' 	\code{estimate.quantiles} does not explicitly support parallel processing
#' 	\strong{within} the function (various \code{mclapply} instances were tried
#' 	without promising results). \strong{However}, the creation of temporary
#' 	output files at the start of processing and built-in testing for the
#' 	existance of the output files means that this function can be called from
#' 	multiple instances of R (for example in running in batch) without overwriting
#' 	work done by other instances.
#'
#' 	This is the most computationally-intensive step in the
#' 	package.
#'
#' @section WARNING WHEN BOOTSTRAPPING: 
#'	The loaded model data is copied \code{nboots} times within this code if 
#'	\code{defaults$bootstrapping=T}. Make sure to reduce the size of processing
#' 	chunks when running with bootstrapping turned on to reduce the risk of 
#'	memory issues... (some future version of this code may have a more 
#'	sophisticated treatment of this issue)
#'
#' @param defaults the output from \code{\link{set.defaults}}, used to set input
#'   parameters (see Section "On \code{get.quantiles} parameters," below)
#' @param log whether to log the output using \code{sink()} in the directory
#'   \code{[defaults$aux.data.dir]/run_logs/}. By default, true. Log filenames
#'   are (using run start time):
#'   \code{estimate_quantiles_run_YYYY-MM-DD_HH-MM-SS.txt}.
#' @param process.inputs add custom process.inputs list. By default, the code
#' 	 attempts to load [defaults$mod.data.dir]/process_inputs.RData or tries to
#'	 generate its own using \code{get.process.chunks}. If this is not desired 
#'	 (i.e. you want to just process a subset of process chunks), put a 
#'	 subsetted output of \code{get.process.chunks} here , or make your own - 
#' 	 just make sure that it's a list of process chunks, with every element 
#' 	 containing at least the fields [global_loc] (used to load the correct
#'	 [params] file), [lat] (just one lat, by lat band), [lon] (all the lon
#'   values desired), and [fn] (the raw data filename from which the params
#'   were calculated).
#' @param max.runtime if batch processing on a server with a maximum allocated
#'   runtime, you can set it here, and the run will stop when the next block of
#'   time might take longer to run than the remaining allocated time. This
#'   prevents empty, un-processed temporary output files from preventing the
#'   complete processing of all data 'blocks'. Explicitly, the next block isn't
#'   processed and the function run is interrupted if the current system time if
#'   \code{start.time + max.runtime - Sys.time() < (2 * 160)*[num pixels in
#'   block] seconds}, assuming that it takes roughly 160 seconds per pixel to
#'   run this code (for 40 runs, 121 years of data, on the server this code was
#'   written on etc.); this time assumption can be changed with the
#'   \code{assumed.avg.processing.time} option, detailed below. Set to 0 if 
#'	 you don't want this feature interefering.
#' @param assumed.avg.processing.time by default 160 (seconds), the estimated
#'   processing time per pixel. Used in interrupting batch run if time is
#'   running out.
#'
#' @return Nothing. Output is saved instead.
#'
#' @importFrom tictoc tic toc 


# SO FAR THIS ASSUMES THAT LINEAR INDEXING IS ACROSS LONS
# (i.e. if using [lon,lat], it goes 1=(1,1),2=(2,1), etc.)


estimate.quantiles <- function(defaults,log=T,
							   max.runtime=4*60*60,assumed.avg.processing.time=160,
							   process.inputs=list()) {

	# ----- SETUP  ----------------------------------------------------------------------
	# Set server-allocated wall time, to cancel jobs when it gets
	# too close to the end (to make sure that no empty output files
	# are created because we ran out of server time halfway through
	# calculating one of these guys) (in seconds)
	start.time <- Sys.time()

	# Write error output to file
	if (log) {
		sink(paste0(defaults$aux.dir,"run_logs/estimate_quantiles_run_",gsub('\\:','-',gsub('\\s','_',as.character(start.time))),".txt"))
	}

	# Load process inputs (lists of inputs / filepaths / etc.
	# necessary to run qmapping procedure)
	if (length(process.inputs)==0){
		if (file.exists(paste0(defaults$mod.data.dir,"process_inputs.RData"))) {
			load(paste0(defaults$mod.data.dir,"process_inputs.RData"))
		} else {
			process.inputs <- get.process.chunks(defaults,save.output=TRUE,search.dir=defaults$mod.data.dir)
		}
	}
	

	# Shuffle process inputs for processing below
	process.inputs <- process.inputs[sample(seq(1,length(process.inputs)),length(process.inputs))]


	# ----- LOAD BASIS FUNCTIONS  ------------------------------------------------------
	# Load bulk basis function (just one run)
	basis.fn <- paste0(defaults$aux.dir,"bases/spline_basis_functions_",diff(defaults$mod.year.range)+1,"years_",
		process.inputs[[1]]$nruns,"runs_",paste0(defaults$bulk.x.df,collapse="-"),"df.RData")
	if (file.exists(basis.fn)) {
		load(basis.fn); bulk.x <- X; rm(list=c("X","basis.fn"))
	} else {
		bulk.x <- get.predictors(n_files=process.inputs[[1]]$nruns,dfs=defaults$bulk.x.df,year.range=defaults$mod.year.range,save.predictors=T)
	}

	# Load tail basis function (just one run)
	basis.fn <- paste0(defaults$aux.dir,"bases/spline_basis_functions_",diff(defaults$mod.year.range)+1,"years_",
		process.inputs[[1]]$nruns,"runs_",paste0(defaults$tail.x.df,collapse="-"),"df.RData")
	if (file.exists(basis.fn)) {
		load(basis.fn); tail.x <- X; rm(list=c("X","basis.fn"))
	} else {
		tail.x <- get.predictors(n_files=process.inputs[[1]]$nruns,dfs=defaults$tail.x.df,year.range=defaults$mod.year.range,save.predictors=T)
	}

	# Load normalization basis function if not using volcanic data
	# (and therefore it doesn't need to be latitude-dependent)
	if (!defaults$get.volc) {
		basis.fn <- paste0(defaults$aux.dir,"bases/spline_basis_functions_",diff(defaults$mod.year.range)+1,"years_",
		process.inputs[[1]]$nruns,"runs_",paste0(defaults$norm.x.df,collapse="-"),"df.RData")
		if (file.exists(basis.fn)) {
			load(basis.fn); norm.x <- X; rm(list=c("X","basis.fn"))
		} else {
			norm.x <- get.predictors(n_files=process.inputs[[1]]$nruns,dfs=defaults$norm.x.df,year.range=defaults$mod.year.range,save.predictors=T)
		}
	} else {
		norm.x <- numeric()
	}

	# ----- SET A BY-PROCESS-CHUNK WRAPPER FUNCTION ------------------------------------------------------
	run.qmap <- function(process.inputs.tmp,
	                     defaults,
	                     bulk.x,tail.x,norm.x=numeric(),
	                     max.runtime,assumed.avg.processing.time) {
		
	  output.fn <- paste0(defaults$mod.data.dir,"params/",defaults$filevar,
	                      "_day_",defaults$mod.name,"_quantfit_params_",
	                      defaults$mod.year.range[1],"-",defaults$mod.year.range[2],"_locs",
	                      process.inputs.tmp$global_loc[1],"-",
	                      process.inputs.tmp$global_loc[length(process.inputs.tmp$global_loc)])

	  if (defaults$bootstrapping) {
	    output.fn <- paste0(output.fn,"_",defaults$block.size,"block",defaults$nboots,"runs")
	  }

	  output.fn <- paste0(output.fn,defaults$fn.suffix,".RData")


	  # Only process if the file doesn't already exist
	  if (!file.exists(output.fn)) {

	    # Only process if there's enough time left - this is determined by seeing
	    # if the time elapsed since the start of the run is more than 2 x the
	    # average processing time expected for the number of pixels in this run
	    # (assuming ~160 seconds / pixel)
	    if (max.runtime==0 || 
	    	difftime(Sys.time(),start.time,units="secs") < 
	    		(max.runtime)-
	    		1.2*mean(assumed.avg.processing.time)*length(process.inputs.tmp$global_loc)*
	    			max(1,defaults$bootstrapping*defaults$nboots)) {

	      # Save a placeholder file in the directory; to make sure no
	      # double-processing happens
	      status <- 'processing'
	      save(file=output.fn,
	           status)
	      rm(status)

	      # Input into a tryCatch statement to allow deletion of the temporary file
	      # in the case of an error (which hopefully will never happen, but
	      # yaknow...)
	      tryCatch({

	        # ----- SETUP --------------------------------------------------------------------
	        cat("\n\n")
	        cat(paste0("Beginning processing for pixels with lat=",process.inputs.tmp$lat,
	                     " in region ",process.inputs.tmp$reg," (global locations ",process.inputs.tmp$global_loc[1],"-",
	                     process.inputs.tmp$global_loc[length(process.inputs.tmp$global_loc)],")"),fill=TRUE)

	        # Get file year range (from position in filename, FILENAME MUST BE IN CMIP5 STANDARD BY YEAR, but can be either YYYYMMDD or YYYY)
	        fn.year.range <- strtoi(substr(strsplit(strsplit(process.inputs.tmp$fn,'\\_|\\.')[[1]][6],'\\-')[[1]],1,4))

	        # ----- LOAD LAT-DEPENDENT BASES -------------------------------------------------
	        # Get spline basis functions for normalization fit
	        if (length(norm.x)==0) {
	          if (defaults$get.volc) {
	            lat.volc <- volc.data$lat
	            # Load basis file
	            basis.fn <- paste0(defaults$aux.dir,"bases/spline_basis_functions_",diff(defaults$base.year.range)+1,"years_",
	                               process.inputs.tmp$nruns,"runs_",paste0(defaults$base.norm.x.df,collapse="-"),
	                               "df_volc",gsub("\\.","-",round(lat.volc[which.min(abs(as.vector(lat.volc)-as.vector(process.inputs.tmp$lat)))],1)),".RData")
	            if (file.exists(basis.fn)) {load(basis.fn);norm.x<-X;rm(x)} else {norm.x <- get.predictors(n_files=process.inputs.tmp$nruns,dfs=defaults$norm.x.df,year.range=defaults$mod.year.range,get.volc=TRUE,lat=process.inputs.tmp$lat,save.predictors=T)}
	            rm(list=c("basis.fn","lat.volc"))
	          } else {
	            basis.fn <- paste0(defaults$aux.dir,"bases/spline_basis_functions_",diff(defaults$mod.year.range)+1,"years_",process.inputs.tmp$nruns,"runs_",paste0(defaults$norm.x.df,collapse="-"),"df.RData")
	            if (file.exists(basis.fn)) {load(basis.fn);norm.x<-X;rm(x)} else {norm.x <- get.predictors(n_files=process.inputs.tmp$nruns,dfs=defaults$norm.x.df,year.range=defaults$mod.year.range,save.predictors=T)}
	          }
	        }


	        # ----- LOAD AND SET UP RAW MODEL DATA --------------------------------------------
	        # Load raw netcdf data, by latitude band within a region. [get.ncdf] gets the 
	        # desired pixels, and outputs them as a named list, with each element containing for 
	        # that pixel the raw data ("Raw"), and the lat / lon coordinate. 
	        tic(paste0("loading ",defaults$mod.name," data for ",length(process.inputs.tmp$lon)," pixel(s)"))
	        Raw <- get.ncdf(defaults,process.inputs.tmp,year.range=defaults$mod.year.range)
	        toc()

	        # If resampling (for uncertainty analysis), get the run indices, and
	        # repeat the base data so that every bootstrap run is its own "pixel"
	        # in the process list
	        if (defaults$bootstrapping) {
	          set.seed(42) #For replicability
	          run.idxs <- matrix(sample(process.inputs.tmp$nruns,
	                                    length(process.inputs.tmp$local_idxs)*defaults$nboots*defaults$block.size,
	                                    replace=TRUE),
	                             nrow=defaults$nboots*length(process.inputs.tmp$local_idxs))

	          # Repeat time series out for each element
	          Raw <- rep(Raw,each=defaults$nboots)

	          # Now, resample it by list
	          Raw <- lapply(1:length(Raw),function(x) list(Raw=as.vector(matrix(Raw[[x]]$Raw,nrow=(diff(defaults$mod.year.range)+1)*365)[,run.idxs[x,]]),
	                                                       lat=Raw[[x]]$lat,lon=Raw[[x]]$lon))

	          # ADD SOME RUN ID SO AS TO MAKE IT READABLE BELOW IN THE PRINT CALL
	          # ALSO - DOES THE CANCELLING RUN THING DUE TO TIME TAKE INTO ACCOUNT BOOTSTRAPPING?

	          # Correspondingly change in the process.inputs.tmp list to make the
	          # parameter information assigning below easier
	          process.inputs.tmp$global_loc <- rep(process.inputs.tmp$global_loc,each=defaults$nboots)
	          process.inputs.tmp$run.idx <- rep(1:defaults$nboots,length(process.inputs.tmp$lon))

	          # Set basis function subsetting functions (only the length of as many
	          # runs as are used in this bootstrapping run, set by
	          # defaults$block.size)
	          base.idxs <- 1:(365*(diff(defaults$mod.year.range)+1)*defaults$block.size)
	        } else {
	          base.idxs <- 1:length(Raw[[1]]$Raw)
	        }


	        # ----- ESTIMATE DATA USING GET.QUANTILES ---------------------------------
	        params <- lapply(Raw,function(input.list) {
	          cat("\n") #Insert newlines to add space between messages of different processing chunks
	          if (defaults$bootstrapping) {
					cat(paste0("Processing lon = ",input.list$lon,", run ",input.list$params$run.idx),fill=TRUE)
			  } else {
					cat(paste0("Processing lon = ",input.list$lon),fill=TRUE)
			  }
	          get.quantiles(model.y=input.list$Raw,
	          				norm.x.df=defaults$norm.x.df,bulk.x.df=defaults$bulk.x.df,tail.x.df=defaults$tail.x.df,
	                        q_norm=defaults$q_norm,q_bulk=defaults$q_bulk,q_tail=defaults$q_tail,
	                        year.range=defaults$mod.year.range,
	                        norm.x=norm.x[base.idxs,],
	                        bulk.x=bulk.x[base.idxs,],
	                        tail.x=tail.x[base.idxs,],
	                        lat=input.list$lat,lon=input.list$lon,
	                        bases.dir=paste0(defaults$aux.dir,"bases/"),get.volc=defaults$get.volc)
	        })

	        # ----- CLEAN UP AND SAVE -------------------------------------------------
	        # Add a few extra identifiers to outputted list
	        for (loc.idx in 1:length(params)) {
	          params[[loc.idx]]$global_loc <- process.inputs.tmp$global_loc[loc.idx]
	          params[[loc.idx]]$reg <- process.inputs.tmp$reg
	          if (defaults$get.volc) {
	            params[[loc.idx]]$inc.volc <- TRUE
	          } else {
	            params[[loc.idx]]$inc.volc <- FALSE
	          }
	          if (defaults$bootstrapping) {
	            params[[loc.idx]]$run.idx <- process.inputs.tmp$run.idx[loc.idx]
	          }
	        }

	        # Save fit parameters
	        save(file=output.fn,
	             params)
	        rm(list=c("Raw","params"))
	        cat(paste0(defaults$mod.name," quantile fits complete and saved for pixels with lat=",process.inputs.tmp$lat,
	                     " in region ",process.inputs.tmp$reg," (global locations ",process.inputs.tmp$global_loc[1],"-",
	                     process.inputs.tmp$global_loc[length(process.inputs.tmp$global_loc)],")!"),fill=TRUE)
	        cat(paste0("Parameters saved in ",output.fn),fill=TRUE)
	        cat("\n")

	      },error=function(e) {
	        print(e)
	        # Remove the temporary file if the run has an error
	        file.remove(output.fn)
	        cat(paste0("Error occured on the ",defaults$mod.name," quantile fits run for pixels with lat=",process.inputs.tmp$lat,
	                     " in region ",process.inputs.tmp$reg," (global locations ",process.inputs.tmp$global_loc[1],"-",
	                     process.inputs.tmp$global_loc[length(process.inputs.tmp$global_loc)],")!"),fill=TRUE)
	        cat("\n")
	      })
	    } else {
	      stop(paste0("Run stopped at ",Sys.time()," after ",format(difftime(Sys.time(),start.time)),
	                  " due to set server time limits."))
	    }
	  } else {
	    cat(paste0(output.fn," already exists, was skipped."),fill=TRUE)
	    cat("\n")
	  }

	  # Return nothing
	  invisible()
	}

	#----- RUN ------------------------------------------------------
	lapply(process.inputs,run.qmap,
		defaults=defaults,
		bulk.x=bulk.x,tail.x=tail.x,norm.x=norm.x,
		max.runtime=max.runtime,assumed.avg.processing.time=assumed.avg.processing.time)

	#----- SOME HOUSEKEEPING ----------------------------------------
	# CLose file output connection for logging
	if (log) {sink()}

}
ks905383/quantproj documentation built on Nov. 1, 2020, 9:12 p.m.