#' Spatially interpolate precipitation gauge data to whole watersheds (i.e. basins).
#'
#' Using precipitation gauge recordings (depth and/or chemistry) and locations,
#' this function interpolates an estimated value at each cell of an automatically retrieved digital
#' elevation model (DEM), then averages cell values to generate a watershed average
#' at every time step. The local relationship between
#' elevation and precipitation depth may also be leveraged for better prediction.
#' If precipitation chemistry is supplied in addition to depth, then watershed-average
#' precipitation depth, chemistry, and chemical flux estimates will all be generated.
#'
#' @keywords internal
#' @author Spencer Rhea
#' @author Mike Vlah, \email{vlahm13@@gmail.com}
#' @author Wes Slaughter
#' @param precip optional \code{data.frame} in MacroSheds format (see details),
#' containing precipitation depth time series in mm. A \code{tibble} fitting these specifications
#' can be generated by [ms_load_product()]. If \code{precip} is
#' supplied and \code{pchem} is not, only watershed-average precip is returned.
#' If both are supplied, both are returned, as well as watershed-average chemical flux,
#' computed from the cell-wise product of precip and pchem after imputation.
#' @param pchem optional \code{data.frame} in MacroSheds format (see details),
#' containing precipitation chemistry time series in mg/L. A \code{tibble} fitting these specifications
#' can be generated by by [ms_load_product()]. If \code{pchem} is
#' supplied and \code{precip} is not, only watershed-average pchem is returned.
#' If both are supplied, both are returned, as well as watershed-average precipitation chemical flux,
#' computed from the cell-wise product of precip and pchem after imputation.
#' @param ws_boundary \code{sf} object or path to a directory \emph{containing} directories, each in turn containing individual shapefiles
#' of watershed boundaries. Object/files must have site_code and geometry features (columns).
#' See [ms_delineate_watershed()].
#' @param precip_gauge \code{sf} object or path to a directory \emph{containing} directories, each in turn containing shapefiles
#' of precipitation gauge locations. Object/files must have site_code and geometry features (columns).
#' All supplied precip gauges will be used for imputation, even if they are far from
#' the watershed of interest.
#' @param out_path character. Directory to which output data will be saved.
#' @param parallel logical. If TRUE (default), interpolation attempts to use \code{maxcores} CPU cores/threads.
#' @param maxcores numeric. Default \code{Inf}; the maximum number of cores (or hyperthreads)
#' used in parallel processing.
#' Ignored if \code{parallel} is FALSE. If greater than the available number of cores/hyperthreads,
#' all available cores/hyperthreads will be used.
#' @param elevation_agnostic Logical. Should elevation data be ignored when interpolating
#' precipitation data? TRUE means "ignore elevation." See details for more information.
#' @param verbose Logical. If TRUE (default), prints more information during run.
#' @return Saves watershed-average precipitation depth and/or chemistry, and chemical flux (if both precip
#' and pchem are supplied) to \code{out_path} as feather files in MacroSheds format (see details).
#' Output units for flux are kg/d. See [ms_scale_flux_by_area()] to convert to kg/ha/d.
#' @details This function uses inverse distance weighted interpolation to extend gauge measurements
#' to every cell of a watershed DEM. If \code{elevation_agnostic} is set to TRUE, the resulting
#' watershed average will be a simple arithmetic mean of all watershed cell values, ignoring
#' the local relationship between precipitation and elevation.
#' If FALSE (the default), the resulting watershed average will be a weighted mean of two separate estimates: the
#' mean of cell values (weight: 1) \emph{and} the prediction of a linear regression with elevation (weight: absolute value of adjusted R^2 of modeled data).
#' At least 3 gauges must record at a given time step for the regression to be leveraged in generating the final prediction.
#' Note that these computations are performed at every time step, using all available
#' (non-missing, non-NA) gauges.
#'
#' This function will interpolate only precipitation depth if no
#' pchem is supplied. It will only interpolate precipitation chemistry
#' if no precip data is supplied. If both precip and pchem are supplied, then
#' precipitation depth, chemical concentrations, and precipitation chemical fluxes
#' will all be calculated.
#'
#' In general, elevation is a relevant predictor for precipitation depth, but not necessarily
#' concentration or flux of precip chemical components. As such, \code{elevation_agnostic}
#' is always set to TRUE (ignore the elev-precip relationship) when determining watershed-average precipitation chemistry \emph{in the case of both}
#' \code{precip} and \code{pchem} being supplied. If only \code{pchem} or \code{precip}is
#' supplied and \code{elevation_agnostic} is FALSE, then the elevation-pchem relationship
#' \emph{will} be incorporated into the final estimate of the watershed average
#' for each time step.
#'
#' Note that this function does not return the imputed watershed grid for each
#' timestep--only the watershed-average estimate(s) as tibbles.
#'
#' MacroSheds format (only date, site_code, var, and val are required as inputs. all are written to output files):
#' + "date" in YYYY-mm-dd format
#' + "site_code": Short name for MacroSheds site. See [ms_load_sites()].
#' + "grab_sample": Boolean integer indicating whether the observation was obtained via grab sample or installed sensor. 1 = TRUE (grab sample), 0 = FALSE (installed sensor).
#' + "var": Variable code. See [ms_load_variables()].
#' + "val": Data value. See [ms_load_variables()] for units.
#' + "ms_status": Boolean integer. 0 = clean value. 1 = questionable value. See [the MacroSheds data paper](https://aslopubs.onlinelibrary.wiley.com/doi/full/10.1002/lol2.10325) for details.
#' + "ms_interp": Boolean integer. 0 = measured or imputed by primary source. 1 = interpolated by MacroSheds. See [the MacroSheds data paper](https://aslopubs.onlinelibrary.wiley.com/doi/full/10.1002/lol2.10325) for details.
#' + "val_err": The combined standard uncertainty associated with the corresponding data point, or NA if unknown. See "Detection limits and propagation of uncertainty" section of See [the MacroSheds data paper](https://aslopubs.onlinelibrary.wiley.com/doi/full/10.1002/lol2.10325) for details.
#' @seealso [ms_load_product()], [ms_delineate_watershed()], [ms_scale_flux_by_area()]
#' @importFrom data.table ':='
#' @examples
#' See vignette: https://github.com/MacroSHEDS/macrosheds/blob/master/vignettes/ms_interpolate_precip.md
ms_calc_watershed_precip <- function(precip,
ws_boundary,
precip_gauge,
pchem,
parallel = TRUE,
maxcores = Inf,
out_path,
elevation_agnostic = TRUE,
verbose = TRUE){
library("dplyr", quietly = TRUE)
check_suggested_pkgs(c('terra', 'elevatr', 'parallel', 'data.table', 'raster'))
precip_only <- missing(pchem)
pchem_only <- missing(precip)
requireNamespace('macrosheds', quietly = TRUE)
ms_vars <- macrosheds::ms_vars_ts %>%
filter(unit != 'kg/ha/d') %>%
dplyr::select(variable_code, flux_convertible) %>%
distinct()
#### Load in data if file path supplied ####
# load watershed boundaries
if(! inherits(ws_boundary, 'sf') && inherits(ws_boundary, 'character')){
if(grepl('\\.sh[a-z]$', ws_boundary)){
stop('ws_boundary must be a directory if it is not an sf object')
}
wb_path <- list.files(ws_boundary, full.names = TRUE)
wb <- try(purrr::map_dfr(wb_path, sf::st_read))
if(inherits(wb, 'try-error')){
stop('ws_boundary file failed to load, check file path is correct')
}
} else{
wb <- ws_boundary
}
# Load in precipitation gauge locations
if(! inherits(precip_gauge, 'sf') && inherits(precip_gauge, 'character')){
if(grepl('\\.sh[a-z]$', precip_gauge)){
stop('precip_gauge must be a directory if it is not an sf object')
}
rg_path <- list.files(precip_gauge, full.names = TRUE)
rg <- try(purrr::map_dfr(rg_path, sf::st_read))
if(inherits(rg, 'try-error')){
stop('precip_gauge file failed to load, check file path is correct')
}
} else{
rg <- precip_gauge
}
# Load in precipitation data
if(pchem_only){
if(verbose){
print('Only interpolating precipitation chemistry')
}
} else {
if(! inherits(precip, 'data.frame')){
precip_path <- list.files(precip, full.names = TRUE)
precip <- try(purrr::map_dfr(precip_path, feather::read_feather))
if(inherits(precip, 'try-error')){
stop('precip files failed to load; check file path is correct')
}
}
}
# Load in precip chemistry
if(precip_only){
if(verbose){
print('Only interpolating precipitation depth')
}
} else {
if(! inherits(pchem, 'data.frame')){
pchem_path <- list.files(pchem, full.names = TRUE)
pchem <- try(purrr::map_dfr(pchem_path, feather::read_feather))
if(inherits(pchem, 'try-error')){
stop('pchem files failed to load; check file path is correct')
}
}
}
#### Checks ####
if(!inherits(rg, 'sf')){
stop('precip_gauge file must be an sf object.')
}
if(!inherits(wb, 'sf')){
stop('ws_boundary file must be an sf object.')
}
if(!missing(precip) && !all(c('datetime', 'val', 'var', 'site_code', 'val_err') %in% names(precip))){
stop('precip must be in MacroSheds format (required columns: datetime, site_code, var, val, val_err)')
}
if(!missing(pchem) && !all(c('datetime', 'val', 'var', 'site_code', 'val_err') %in% names(pchem))){
stop('precip_chem must be in MacroSheds format (required columns: datetime, site_code, var, val, val_err)')
}
if(! length(unique(rg$site_code)) == length(rg$site_code)){
stop('precip_gauge contains duplicate entries for the same gauge')
}
if(! pchem_only){
if(! all(rg$site_code %in% unique(precip$site_code))){
missing_gauge <- rg$site_code[! rg$site_code %in% unique(precip$site_code)]
stop(paste0('a precip gauge location exists in precip_gauge for the gauge(s): ',
paste0(missing_gauge, collapse = ', '),
', but no corresponding data exist in precip.',
' Either add data to precip for these gauges or',
' remove them from precip_gauge.'))
}
if(! all(unique(precip$site_code) %in% rg$site_code)){
missing_gauge <- unique(precip$site_code)[!unique(precip$site_code) %in% rg$site_code]
stop(paste0('data exist in precip for the gauge(s): ', paste0(missing_gauge, collapse = ', '),
', but have no corresponding location in precip_gauge.',
' Either add these gauge(s) to precip_gauge or remove corresponding data from precip.'))
}
}
if(! precip_only){
if(! all(rg$site_code %in% unique(pchem$site_code))){
missing_gauge <- rg$site_code[! rg$site_code %in% unique(pchem$site_code)]
stop(paste0('a gauge location exists in precip_gauge for the gauge(s): ',
paste0(missing_gauge, collapse = ', '),
', but no corresponding data exist in pchem',
' Either add data to pchem for these gauges or',
' remove them from precip_gauge.'))
}
if(! all(unique(pchem$site_code) %in% rg$site_code)){
missing_gauge <- unique(pchem$site_code)[!unique(pchem$site_code) %in% rg$site_code]
stop(paste0('data exist in pchem for the gauge(s): ', paste0(missing_gauge, collapse = ', '),
', but have no corresponding location in precip_gauge.',
' Either add these gauge(s) to precip_gauge or remove corresponding data from pchem.'))
}
}
# get precip var name
if(! pchem_only){
precip_var_name <- unique(precip$var)
if(! length(precip_var_name) == 1){
stop('Only one unique variable code is allowed in the var column of precip.')
}
}
# Apply val_err as errors
if(! pchem_only){
errors::errors(precip$val) <- precip$val_err
precip <- precip %>%
dplyr::select(-val_err)
}
if(! precip_only){
errors::errors(pchem$val) <- pchem$val_err
pchem <- pchem %>%
dplyr::select(-val_err)
}
if(pchem_only) precip <- NULL
#### Set up spatial ####
#project based on average latlong of watershed boundaries
bbox <- as.list(sf::st_bbox(wb))
projstring <- choose_projection(lat = mean(bbox$ymin, bbox$ymax),
long = mean(bbox$xmin, bbox$xmax))
wb <- sf::st_transform(wb, projstring)
rg <- sf::st_transform(rg, projstring)
# get a DEM that encompasses all watersheds and gauges and buffer because
# gauges on the edge of bbox can error in extracting elevation
wb_rg_bbox <- sf::st_as_sf(sf::st_as_sfc(sf::st_bbox(bind_rows(wb, rg)))) %>%
sf::st_buffer(50)
if('area' %in% names(wb)){
wb <- wb %>%
mutate(area = as.numeric(sf::st_area(geometry)/10000))
}
# Maybe change this
dem_res <- ifelse(any(wb$area < 5), 9, 8)
dem <- expo_backoff(
expr = {
suppressWarnings(elevatr::get_elev_raster(locations = wb_rg_bbox,
z = dem_res,
clip = 'bbox',
expand = 0.005,
verbose = verbose,
override_size_check = TRUE))
},
max_attempts = 5
)
#add elev column to rain gauges
rg$elevation <- terra::extract(dem, rg)
#this avoids a lot of slow summarizing
if(! pchem_only){
status_cols <- precip %>%
dplyr::select(datetime, ms_status, ms_interp) %>%
group_by(datetime) %>%
summarize(
ms_status = numeric_any(ms_status),
ms_interp = numeric_any(ms_interp))
day_durations_byproduct <- datetimes_to_durations(
datetime_vec = precip$datetime,
variable_prefix_vec = ms_extract_var_prefix_(precip$var),
unit = 'days',
installed_maxgap = 2,
grab_maxgap = 30)
precip$val[is.na(day_durations_byproduct)] <- NA
precip <- precip %>%
dplyr::select(-ms_status, -ms_interp, -var) %>%
tidyr::pivot_wider(names_from = site_code,
values_from = val) %>%
left_join(status_cols, #they get lumped anyway
by = 'datetime') %>%
arrange(datetime)
day_durations <- datetimes_to_durations(
datetime_vec = precip$datetime,
unit = 'days')
}
if(! precip_only){
#determine which variables can be flux converted (prefix handling clunky here)
flux_vars <- ms_vars %>%
filter(as.logical(flux_convertible)) %>%
pull(variable_code)
pchem_vars <- unique(pchem$var)
pchem_vars_fluxable0 <- base::intersect(ms_drop_var_prefix_(pchem_vars),
flux_vars)
pchem_vars_fluxable <- pchem_vars[ms_drop_var_prefix_(pchem_vars) %in%
pchem_vars_fluxable0]
#this avoids a lot of slow summarizing
status_cols <- pchem %>%
dplyr::select(datetime, ms_status, ms_interp) %>%
group_by(datetime) %>%
summarize(
ms_status = numeric_any(ms_status),
ms_interp = numeric_any(ms_interp))
#clean pchem one variable at a time, matrixify it, insert it into list
nvars <- length(pchem_vars)
pchem_setlist <- as.list(rep(NA, nvars))
for(i in 1:nvars){
v <- pchem_vars[i]
#clean data and arrange for matrixification
pchem_setlist[[i]] <- pchem %>%
filter(var == v) %>%
dplyr::select(-var, -ms_status, -ms_interp) %>%
tidyr::pivot_wider(names_from = site_code,
values_from = val) %>%
left_join(status_cols,
by = 'datetime') %>%
arrange(datetime)
}
} #end conditional pchem+pflux block (1)
#make sure old quickref data aren't still sitting around
unlink(glue::glue('{ms}/precip_idw_quickref',
ms = out_path),
recursive = TRUE)
#send vars into interpolator with precip, one at a time. if var is flux-
#convertible, interpolate precip, pchem, and pflux. otherwise, just precip
#and pchem. combine and write outputs by site
for(i in 1:nrow(wb)){
wbi <- slice(wb, i)
site_code <- wbi$site_code
wbi_area_ha <- as.numeric(sf::st_area(wbi)) / 10000
idw_log_wb(verbose = verbose,
site_code = site_code,
i = i,
nw = nrow(wb))
nthreads <- parallel::detectCores()
if(maxcores > nthreads){
if(! is.infinite(maxcores)){
message(paste('Requested more cores than are available. Using',
nthreads, 'instead.'))
}
maxcores <- nthreads
}
## IDW INTERPOLATE PRECIP FOR ALL TIMESTEPS. STORE CELL VALUES
## SO THEY CAN BE USED FOR PFLUX INTERP
# Conditional switch between running in parallel and series
`%parcond%` <- ifelse(parallel, `%dopar%`, `%do%`)
if(! pchem_only){
ntimesteps_precip <- nrow(precip)
nsuperchunks <- ceiling(ntimesteps_precip / 25000 * 2)
nchunks_precip <- nthreads * nsuperchunks
precip_superchunklist <- chunk_df(d = precip,
nchunks = nsuperchunks,
create_index_column = TRUE)
ws_mean_precip <- tibble()
for(s in 1:length(precip_superchunklist)){
# ws_mean_precip <- foreach::foreach(
# s = 1:length(precip_superchunklist),
# .combine = idw_parallel_combine,
# .init = 'first iter') %:% {
precip_superchunk <- precip_superchunklist[[s]]
precip_chunklist <- chunk_df(d = precip_superchunk,
nchunks = nthreads,
create_index_column = FALSE)
idw_log_var(verbose = verbose,
site_code = site_code,
v = 'precipitation',
j = paste('chunk', s),
ntimesteps = nrow(precip_superchunk),
nvars = nsuperchunks)
clst <- ms_parallelize(maxcores = nthreads)
# doFuture::registerDoFuture()
# ncores <- min(parallel::detectCores(), maxcores)
# clst <- parallel::makeCluster(nthreads, type='FORK')
# future::plan(future::multicore, workers = 48)
# future::plan(future::multisession, workers = 48)
# parallel::stopCluster(clst)
# fe_junk <- foreach:::.foreachGlobals
# rm(list = ls(name = fe_junk),
# pos = fe_junk)
ws_mean_precip_chunk <- foreach::foreach(
j = 1:min(nthreads, nrow(precip_superchunk)),
.combine = idw_parallel_combine,
.init = 'first iter',
.packages = c('dplyr', 'raster')) %parcond% {
pchunk <- precip_chunklist[[j]]
# idw_log_var(verbose = verbose,
# site_code = site_code,
# v = 'precipitation',
# j = paste('chunk', j + (nthreads * (s - 1))),
# ntimesteps = nrow(pchunk),
# nvars = nchunks_precip)
foreach_return <- shortcut_idw(
encompassing_dem = dem,
wshd_bnd = wbi,
data_locations = rg,
data_values = pchunk,
durations_between_samples = day_durations[pchunk$ind],
stream_site_code = site_code,
output_varname = 'SPECIAL CASE PRECIP',
save_precip_quickref = ! precip_only,
elev_agnostic = elevation_agnostic,
p_var = precip_var_name,
verbose = verbose,
macrosheds_root = out_path)
foreach_return
}
ms_unparallelize(clst)
rm(precip_chunklist); gc()
ws_mean_precip <- bind_rows(ws_mean_precip, ws_mean_precip_chunk)
}
rm(precip_superchunklist); gc()
if(any(is.na(ws_mean_precip$datetime))){
stop('NA datetime found in ws_mean_precip')
}
# #restore original varnames by site and dt
# ws_mean_precip <- ws_mean_precip %>%
# arrange(datetime) %>%
# dplyr::select(-var) %>% #just a placeholder
# left_join(precip_varnames,
# by = c('datetime', 'site_code'))
ws_mean_precip <- ws_mean_precip %>%
dplyr::rename_all(dplyr::recode, concentration = 'val') %>%
# rename(val = concentration) %>%
arrange(datetime)
# ws_mean_precip <- apply_detection_limit_t(
# X = ws_mean_precip,
# network = network,
# domain = domain,
# prodname_ms = precursor_prodnames[grepl('^precipitation',
# precursor_prodnames)])
write_ms_file(ws_mean_precip,
macrosheds_root = out_path,
prodname_ms = 'precipitation__ms900',
site_code = site_code,
shapefile = FALSE)
rm(ws_mean_precip); gc()
}
## NOW IDW INTERPOLATE PCHEM (IF PRECIP CHEMISTRY DATA EXIST)
## AND PFLUX (FOR VARIABLES THAT ARE FLUXABLE).
if(! precip_only){
ws_mean_chemflux <- tibble()
for(j in 1:nvars){
v <- pchem_vars[j]
jd <- pchem_setlist[[j]]
ntimesteps_chemflux <- nrow(jd)
if(v %in% pchem_vars_fluxable && ! pchem_only){
is_fluxable <- TRUE
} else {
is_fluxable <- FALSE
}
idw_log_var(verbose = verbose,
site_code = site_code,
v = v,
j = paste('var', j),
nvars = nvars,
ntimesteps = ntimesteps_chemflux,
is_fluxable = is_fluxable)
nsuperchunks <- ceiling(ntimesteps_chemflux / 5000 * 2)
nchunks_chemflux <- nthreads * nsuperchunks
chemflux_superchunklist <- chunk_df(d = jd,
nchunks = nsuperchunks)
nsuperchunks <- length(chemflux_superchunklist)
ws_mean_chemflux_var <- tibble()
for(s in 1:nsuperchunks){
chemflux_superchunk <- chemflux_superchunklist[[s]]
chemflux_chunklist <- chunk_df(d = chemflux_superchunk,
nchunks = nthreads)
idw_log_var(verbose = verbose,
site_code = site_code,
v = v,
j = paste('chunk', s),
ntimesteps = nrow(chemflux_superchunk),
nvars = nsuperchunks)
clst <- ms_parallelize(maxcores = nthreads)
foreach_out <- foreach::foreach(
l = 1:length(chemflux_chunklist),
# l = 1:min(nthreads, nrow(chemflux_superchunk)),
.combine = idw_parallel_combine,
.init = 'first iter',
.packages = 'dplyr') %parcond% {
if(is_fluxable){
foreach_chunk <- shortcut_idw_concflux_v2(
encompassing_dem = dem,
wshd_bnd = wbi,
ws_area = wbi_area_ha,
data_locations = rg,
precip_values = precip,
chem_values = chemflux_chunklist[[l]],
stream_site_code = site_code,
output_varname = v,
out_path = out_path,
verbose = verbose)
} else {
foreach_chunk <- shortcut_idw(
encompassing_dem = dem,
wshd_bnd = wbi,
data_locations = rg,
data_values = chemflux_chunklist[[l]],
stream_site_code = site_code,
output_varname = v,
elev_agnostic = elevation_agnostic,
macrosheds_root = out_path,
verbose = verbose)
}
foreach_chunk
}
ms_unparallelize(clst)
rm(chemflux_chunklist); gc()
ws_mean_chemflux_var <- bind_rows(ws_mean_chemflux_var,
foreach_out)
}
rm(chemflux_superchunklist); gc()
ws_mean_chemflux <- bind_rows(ws_mean_chemflux,
ws_mean_chemflux_var)
}
if(any(is.na(ws_mean_chemflux$datetime))){
stop('NA datetime found in ws_mean_chemflux')
}
if(! pchem_only){
ws_mean_pflux <- ws_mean_chemflux %>%
dplyr::select(-concentration) %>%
rename(val = flux) %>%
arrange(var, datetime)
write_ms_file(ws_mean_pflux,
macrosheds_root = out_path,
prodname_ms = 'precip_flux_inst__ms902',
site_code = site_code,
shapefile = FALSE)
rm(ws_mean_pflux)
}
ws_mean_pchem <- ws_mean_chemflux %>%
dplyr::select(-any_of('flux')) %>%
rename(val = concentration) %>%
arrange(var, datetime)
write_ms_file(ws_mean_pchem,
macrosheds_root = out_path,
prodname_ms = 'precip_chemistry__ms901',
site_code = site_code,
shapefile = FALSE)
rm(ws_mean_pchem); gc()
} #end conditional pchem+pflux block (2)
}
unlink(glue::glue('{ms}/precip_idw_quickref',
ms = out_path),
recursive = TRUE)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.