#' Internal functions
#'
#' Not intended to be called directly by the user
#'
#' @noRd
# Function aliases
sw <- suppressWarnings
sm <- suppressMessages
`%dopar%` <- foreach::`%dopar%`
`%do%` <- foreach::`%do%`
# Unit conversion helpers ####
parse_molecular_formulae <- function(formulae){
#`formulae` is a vector
# formulae = c('C', 'C4', 'Cl', 'Cl2', 'CCl', 'C2Cl', 'C2Cl2', 'C2Cl2B2')
# formulae = 'BCH10He10PLi2'
# formulae='Mn'
conc_vars = stringr::str_match(formulae, '^(?:OM|TM|DO|TD|UT|UTK|TK|TI|TO|DI)?([A-Za-z0-9]+)_?')[,2]
two_let_symb_num = stringr::str_extract_all(conc_vars, '([A-Z][a-z][0-9]+)')
conc_vars = stringr::str_remove_all(conc_vars, '([A-Z][a-z][0-9]+)')
one_let_symb_num = stringr::str_extract_all(conc_vars, '([A-Z][0-9]+)')
conc_vars = stringr::str_remove_all(conc_vars, '([A-Z][0-9]+)')
two_let_symb = stringr::str_extract_all(conc_vars, '([A-Z][a-z])')
conc_vars = stringr::str_remove_all(conc_vars, '([A-Z][a-z])')
one_let_symb = stringr::str_extract_all(conc_vars, '([A-Z])')
constituents = mapply(c, SIMPLIFY=FALSE,
two_let_symb_num, one_let_symb_num, two_let_symb, one_let_symb)
return(constituents) # a list of vectors
}
combine_atomic_masses <- function(molecular_constituents){
#`molecular_constituents` is a vector
xmat = stringr::str_match(molecular_constituents,
'([A-Z][a-z]?)([0-9]+)?')[, -1, drop=FALSE]
elems = xmat[,1]
mults = as.numeric(xmat[,2])
mults[is.na(mults)] = 1
molecular_mass = sum(PeriodicTable::mass(elems) * mults)
return(molecular_mass) #a scalar
}
calculate_molar_mass <- function(molecular_formula){
if(length(molecular_formula) > 1){
stop('molecular_formula must be a string of length 1')
}
parsed_formula = parse_molecular_formulae(molecular_formula)[[1]]
molar_mass = combine_atomic_masses(parsed_formula)
return(molar_mass)
}
convert_molecule <- function(x, from, to){
#e.g. convert_molecule(1.54, 'NH4', 'N')
from_mass <- calculate_molar_mass(from)
to_mass <- calculate_molar_mass(to)
converted_mass <- x * to_mass / from_mass
return(converted_mass)
}
convert_to_gl <- function(x, input_unit, formula, ms_vars){
if(grepl('eq', input_unit)) {
valence = ms_vars$valence[ms_vars$variable_code %in% formula]
if(length(valence) == 0 | is.na(valence)) {stop(paste('valency of', formula, 'unknown'))}
x = (x * calculate_molar_mass(formula)) / valence
return(x)
}
if(grepl('mol', input_unit)) {
x = x * calculate_molar_mass(formula)
return(x)
}
return(x)
}
convert_from_gl <- function(x, input_unit, output_unit, molecule, g_conver, ms_vars){
molecule_real <- ms_vars %>%
filter(variable_code == !!molecule) %>%
pull(molecule)
if(!is.na(molecule_real)) {
formula <- molecule_real
} else {
formula <- molecule
}
if(grepl('eq', output_unit) && grepl('g', input_unit) ||
grepl('eq', output_unit) && g_conver) {
valence = ms_vars$valence[ms_vars$variable_code %in% molecule]
if(length(valence) == 0 | is.na(valence)) {stop(paste('valency of', molecule, 'unknown'))}
x = (x * valence) / calculate_molar_mass(formula)
return(x)
}
if(grepl('mol', output_unit) && grepl('g', input_unit) ||
grepl('mol', output_unit) && g_conver) {
x = x / calculate_molar_mass(formula)
return(x)
}
if(grepl('mol', output_unit) && grepl('eq', input_unit) && !g_conver) {
valence = ms_vars$valence[ms_vars$variable_code %in% molecule]
if(length(valence) == 0 | is.na(valence)) {stop(paste('valency of', molecule, 'unknown'))}
x = (x * calculate_molar_mass(formula)) / valence
x = x / calculate_molar_mass(formula)
return(x)
}
if(grepl('eq', output_unit) && grepl('mol', input_unit) && !g_conver) {
x = x * calculate_molar_mass(formula)
valence = ms_vars$valence[ms_vars$variable_code %in% molecule]
if(length(valence) == 0 | is.na(valence)) {stop(paste('valency of', molecule, 'unknown'))}
x = (x * valence)/calculate_molar_mass(formula)
return(x)
}
return(x)
}
convert_unit <- function(x, input_unit, output_unit){
units <- tibble(prefix = c('n', "u", "m", "c", "d", "h", "k", "M"),
convert_factor = c(0.000000001, 0.000001, 0.001, 0.01, 0.1, 100,
1000, 1000000))
old_fraction <- as.vector(stringr::str_split_fixed(input_unit, "/", n = Inf))
old_top <- as.vector(stringr::str_split_fixed(old_fraction[1], "", n = Inf))
if(length(old_fraction) == 2) {
old_bottom <- as.vector(stringr::str_split_fixed(old_fraction[2], "", n = Inf))
}
new_fraction <- as.vector(stringr::str_split_fixed(output_unit, "/", n = Inf))
new_top <- as.vector(stringr::str_split_fixed(new_fraction[1], "", n = Inf))
if(length(new_fraction == 2)) {
new_bottom <- as.vector(stringr::str_split_fixed(new_fraction[2], "", n = Inf))
}
old_top_unit <- tolower(stringr::str_split_fixed(old_top, "", 2)[1])
if(old_top_unit %in% c('g', 'e', 'q', 'l') || old_fraction[1] == 'mol') {
old_top_conver <- 1
} else {
old_top_conver <- as.numeric(filter(units, prefix == old_top_unit)[,2])
}
old_bottom_unit <- tolower(stringr::str_split_fixed(old_bottom, "", 2)[1])
if(old_bottom_unit %in% c('g', 'e', 'q', 'l') || old_fraction[2] == 'mol') {
old_bottom_conver <- 1
} else {
old_bottom_conver <- as.numeric(filter(units, prefix == old_bottom_unit)[,2])
}
new_top_unit <- tolower(stringr::str_split_fixed(new_top, "", 2)[1])
if(new_top_unit %in% c('g', 'e', 'q', 'l') || new_fraction[1] == 'mol') {
new_top_conver <- 1
} else {
new_top_conver <- as.numeric(filter(units, prefix == new_top_unit)[,2])
}
new_bottom_unit <- tolower(stringr::str_split_fixed(new_bottom, "", 2)[1])
if(new_bottom_unit %in% c('g', 'e', 'q', 'l') || new_fraction[2] == 'mol') {
new_bottom_conver <- 1
} else {
new_bottom_conver <- as.numeric(filter(units, prefix == new_bottom_unit)[,2])
}
new_val <- x*old_top_conver
new_val <- new_val/new_top_conver
new_val <- new_val/old_bottom_conver
new_val <- new_val*new_bottom_conver
return(new_val)
}
# EGRET helpers ####
get_days_since_1850 <- function(dates){
return_dates <- as.numeric(as_date(dates)-lubridate::ymd('1850-01-01'))
return(return_dates)
}
get_DecYear <- function(dates){
year <- lubridate::year(dates)
get_days <- function(year){
if((year %% 4) == 0) {
if((year %% 100) == 0) {
if((year %% 400) == 0) {
day_years <- 366
} else {
day_years <- 365
}
} else {
day_years <- 366
}
} else {
day_years <- 365
}
}
days_in_year <- purrr::map_dbl(year, get_days)
DecYear <- (lubridate::yday(dates)/days_in_year)+lubridate::year(dates)
return(DecYear)
}
get_MonthSeq <- function(dates){
years <- lubridate::year(dates)-1850
MonthSeq <- years*12
MonthSeq <- MonthSeq + lubridate::month(dates)
return(MonthSeq)
}
get_start_end <- function(d){
start_date <- min(d$datetime)
start_year <- lubridate::year(start_date)
start_wy <- ifelse(lubridate::month(start_date) %in% c(10, 11, 12), start_year+1, start_year)
filter_start_date <- lubridate::ymd(paste0(start_wy, '-10-01'))
end_date <- max(d$datetime)
end_year <- lubridate::year(end_date)
end_wy <- ifelse(lubridate::month(end_date) %in% c(10, 11, 12), end_year+1, end_year)
filter_end_date <- lubridate::ymd(paste0(end_wy, '-10-01'))
fin_dates <- c(filter_start_date, filter_end_date)
return(fin_dates)
}
drop_var_prefix <- function(x){
unprefixed <- substr(x, 4, nchar(x))
return(unprefixed)
}
decimalDate <- function(rawData){
dateTime <- as.POSIXlt(rawData)
year <- dateTime$year + 1900
startYear <- as.POSIXct(paste0(year,"-01-01 00:00"))
endYear <- as.POSIXct(paste0(year+1,"-01-01 00:00"))
DecYear <- year + as.numeric(difftime(dateTime, startYear, units = "secs"))/as.numeric(difftime(endYear, startYear, units = "secs"))
return(DecYear)
}
# precip interpolation helpers ####
read_combine_shapefiles <- function(network, domain, prodname_ms){
#TODO: sometimes multiple locations are listed for the same rain gauge.
# if there's a date associated with those locations, we should take that
# into account when performing IDW, and when plotting gauges on the map.
# (maybe previous locations could show up semitransparent). for now,
# we're just grabbing the last location listed (by order). We don't store
# date columns with our raingauge shapes yet, so that's the place to start.
#TODO: also, this may be a problem for other spatial stuff.
level <- ifelse(is_derived_product(prodname_ms),
'derived',
'munged')
prodpaths <- list.files(glue::glue('data/{n}/{d}/{l}/{p}',
n = network,
d = domain,
l = level,
p = prodname_ms),
recursive = TRUE,
full.names = TRUE,
pattern = '*.shp')
shapes <- lapply(prodpaths,
function(x){
sf::st_read(x,
stringsAsFactors = FALSE,
quiet = TRUE) %>%
slice_tail()
})
# wb <- sw(Reduce(sf::st_union, wbs)) %>%
combined <- sw(Reduce(bind_rows, shapes))
# sf::st_transform(projstring)
return(combined)
}
read_combine_feathers <- function(network,
domain,
prodname_ms){
#read all data feathers associated with a network-domain-product,
#row bind them, arrange by site_code, var, datetime. insert val_err column
#into the val column as errors attribute and then remove val_err column
#(error/uncertainty is handled by the errors package as an attribute,
#so it must be written/read as a separate column).
#the processing level is determined automatically from prodname_ms.
# If the product code is "msXXX" where X is a numeral, the processing
# level is assumed to be "derived". otherwise "munged"
#handled in ms_list_files
# level <- ifelse(is_derived_product(prodname_ms),
# 'derived',
# 'munged')
prodpaths <- ms_list_files(network = network,
domain = domain,
prodname_ms = prodname_ms)
combined <- tibble()
for(i in 1:length(prodpaths)){
part <- feather::read_feather(prodpaths[i])
combined <- bind_rows(combined, part)
}
combined <- combined %>%
mutate(val = errors::set_errors(val, val_err)) %>%
dplyr::select(-val_err) %>%
arrange(site_code, var, datetime)
return(combined)
}
datetimes_to_durations <- function(datetime_vec,
variable_prefix_vec = NULL,
unit,
sensor_maxgap = Inf,
nonsensor_maxgap = Inf,
grab_maxgap = Inf,
installed_maxgap = Inf){
#datetime_vec: POSIXct. a vector of datetimes
#variable_prefix_vec: POSIXct. a vector of macrosheds variable prefixes,
# e.g. 'IS'. This vector is generated by calling extract_var_prefix
# on the var column of a macrosheds data.frame. Only required if you
# supply one or more of the maxgap parameters.
#unit: string. The desired datetime unit. Must be expressed in a form
# that's readable by base::difftime
#sensor_maxgap: numeric. the largest data gap that should return
# a value. Durations longer than this gap will return NA. Expressed in
# the same units as unit (see above). This applies
# to sensor data only (IS and GS).
#nonsensor_maxgap: numeric. the largest data gap that should return
# a value. Durations longer than this gap will return NA. Expressed in
# the same units as unit (see above). This applies
# to nonsensor data only (IN and GN).
#grab_maxgap: numeric. the largest data gap that should return
# a value. Durations longer than this gap will return NA. Expressed in
# the same units as unit (see above). This applies
# to grab data only (GS and GN).
#installed_maxgap: numeric. the largest data gap that should return
# a value. Durations longer than this gap will return NA. Expressed in
# the same units as unit (see above). This applies
# only to data from installed units (IS and IN).
#an NA is prepended to the output, so that its length is the same as
# datetime_vec
if(! is.null(variable_prefix_vec) &&
! all(variable_prefix_vec %in% c('GN', 'GS', 'IN', 'IS'))){
stop(paste('all elements of variable_prefix_vec must be one of "GN",',
'"GS", "IN", "IS"'))
}
durs <- diff(datetime_vec)
units(durs) <- unit
durs <- c(NA_real_, as.numeric(durs))
if(! is.null(variable_prefix_vec)){
is_sensor_data <- grepl('^.S', variable_prefix_vec)
is_installed_data <- grepl('^I', variable_prefix_vec)
}
if(! is.infinite(sensor_maxgap)){
durs[is_sensor_data & durs > sensor_maxgap] <- NA_real_
}
if(! is.infinite(nonsensor_maxgap)){
durs[! is_sensor_data & durs > nonsensor_maxgap] <- NA_real_
}
if(! is.infinite(grab_maxgap)){
durs[! is_installed_data & durs > grab_maxgap] <- NA_real_
}
if(! is.infinite(installed_maxgap)){
durs[is_installed_data & durs > installed_maxgap] <- NA_real_
}
return(durs)
}
chunk_df <- function(d, nchunks, create_index_column = FALSE){
nr <- nrow(d)
chunksize <- nr/nchunks
if(nr > 0){
if(create_index_column) d <- mutate(d, ind = 1:n())
chunklist <- split(d,
0:(nr - 1) %/% chunksize)
return(chunklist)
} else {
logwarn(msg = 'Trying to chunk an empty tibble. Something is probably wrong',
logger = logger_module)
return(d)
}
}
idw_log_var <- function(verbose,
site_code,
v,
j,
nvars,
ntimesteps,
is_fluxable = NA,
note = ''){
if(! verbose) return()
flux_calc_msg <- case_when(is_fluxable == FALSE ~ '[NO FLUX]',
is_fluxable == TRUE ~ '[yes flux]',
is.na(is_fluxable) ~ '')
note <- ifelse(note == '',
note,
paste0('[', note, ']'))
msg <- glue::glue('site: {s}; var: {vv} ({jj}/{nv}) {f}; timesteps: {nt}; {n}',
s = site_code,
vv = v,
jj = j,
nv = nvars,
nt = ntimesteps,
f = flux_calc_msg,
n = note)
message(msg)
}
ms_parallelize <- function(maxcores = Inf){
#maxcores is the maximum number of processor cores to use for R tasks.
# you may want to leave a few aside for other processes.
check_suggested_pkgs(c('parallel', 'doParallel'))
clst <- NULL
ncores <- min(parallel::detectCores(), maxcores)
if(Sys.info()['sysname'] == 'Windows'){
clst <- parallel::makeCluster(ncores, type = 'PSOCK')
doParallel::registerDoParallel(clst)
} else{
clst <- parallel::makeCluster(ncores, type = 'FORK')
doParallel::registerDoParallel(clst)
}
return(clst)
}
idw_parallel_combine <- function(d1, d2){
#this is for use with foreach loops inside the 4 idw prep functions
# (precip_idw, pchem_idw, flux_idw, precip_pchem_pflux_idw)
if(is.character(d1) && d1 == 'first iter') return(d2)
d_comb <- bind_rows(d1, d2)
return(d_comb)
}
shortcut_idw <- function(encompassing_dem,
wshd_bnd,
data_locations,
data_values,
durations_between_samples = NULL,
stream_site_code,
output_varname,
save_precip_quickref = FALSE,
elev_agnostic = FALSE,
p_var = NULL,
verbose = FALSE,
macrosheds_root){
#encompassing_dem: RasterLayer must cover the area of wshd_bnd and precip_gauges
#wshd_bnd: sf polygon with columns site_code and geometry
# it represents a single watershed boundary
#data_locations:sf point(s) with columns site_code and geometry.
# it represents all sites (e.g. rain gauges) that will be used in
# the interpolation
#data_values: data.frame with one column each for datetime and ms_status,
# and an additional named column of data values for each data location.
#durations_between_samples: numeric vector representing the time differences
# between rows of data_values. Must be expressed in days. Only used if
# output_varname == 'SPECIAL CASE PRECIP' and save_precip_quickref == TRUE,
# so that quickref data can be expressed in mm/day, which is the unit
# required to calculate precip flux.
# NOTE: this could be calculated internally, but the duration of measurement
# preceding the first value can't be known. Because shortcut_idw is
# often run iteratively on chunks of a dataset, we require that
# durations_between_samples be passed as an input, so as to minimize
# the number of NAs generated.
#output_varname: character; a prodname_ms, unless you're interpolating
# precipitation, in which case it must be "SPECIAL CASE PRECIP", because
# prefix information for precip is lost during the widen-by-site step
#save_precip_quickref: logical. should interpolated precip for all DEM cells
# be saved for later use. Should only be true when precip chem will be
# interpolated too. only useable when output_varname = 'PRECIP SPECIAL CASE'
#elev_agnostic: logical that determines whether elevation should be
# included as a predictor of the variable being interpolated
check_suggested_pkgs(c('parallel'))
if(output_varname != 'SPECIAL CASE PRECIP' && save_precip_quickref){
stop(paste('save_precip_quickref can only be TRUE if output_varname',
'== "SPECIAL CASE PRECIP"'))
}
if(save_precip_quickref && is.null(durations_between_samples)){
stop(paste('save_precip_quickref can only be TRUE if',
'durations_between_samples is supplied.'))
}
if(output_varname != 'SPECIAL CASE PRECIP' && ! is.null(durations_between_samples)){
logwarn(msg = paste('In shortcut_idw: ignoring durations_between_samples because',
'output_varname != "SPECIAL CASE PRECIP".'),
logger = logger_module)
}
if('ind' %in% colnames(data_values)){
timestep_indices <- data_values$ind
data_values$ind <- NULL
}
#matrixify input data so we can use matrix operations
d_status <- data_values$ms_status
d_interp <- data_values$ms_interp
d_dt <- data_values$datetime
data_matrix <- dplyr::select(data_values,
-ms_status,
-datetime,
-ms_interp) %>%
err_df_to_matrix()
#clean dem and get elevation values
dem_wb <- terra::crop(encompassing_dem, wshd_bnd)
dem_wb <- terra::mask(dem_wb, wshd_bnd)
wb_is_linear <- terra::nrow(dem_wb) == 1 || terra::ncol(dem_wb) == 1
wb_all_na <- all(is.na(terra::values(dem_wb)))
if(wb_all_na){
if(wb_is_linear){
#masking will cause trouble here (only known for niwot-MARTINELLI)
dem_wb <- terra::crop(encompassing_dem, wshd_bnd)
} else {
stop('some kind of crop/mask issue with small watersheds?')
}
}
elevs <- terra::values(dem_wb)
elevs_masked <- elevs[! is.na(elevs)]
#compute distances from all dem cells to all data locations
inv_distmat <- matrix(NA,
nrow = length(elevs_masked),
ncol = ncol(data_matrix),
dimnames = list(NULL,
colnames(data_matrix)))
dem_wb_all_na <- dem_wb
terra::values(dem_wb_all_na) <- NA
dem_wb_all_na <- terra::rast(dem_wb_all_na)
for(k in 1:ncol(data_matrix)){
dk <- filter(data_locations,
site_code == colnames(data_matrix)[k])
inv_dists_site <- 1 / terra::distance(terra::rast(dem_wb_all_na), terra::vect(dk))^2 %>%
terra::values(.)
inv_dists_site <- inv_dists_site[! is.na(elevs)] #drop elevs not included in mask
inv_distmat[, k] <- inv_dists_site
}
#calculate watershed mean at every timestep
if(save_precip_quickref) precip_quickref <- list()
ptm <- proc.time()
ws_mean <- rep(NA, nrow(data_matrix))
ntimesteps <- nrow(data_matrix)
for(k in 1:ntimesteps){
#assign cell weights as normalized inverse squared distances
dk <- t(data_matrix[k, , drop = FALSE])
inv_distmat_sub <- inv_distmat[, ! is.na(dk), drop = FALSE]
dk <- dk[! is.na(dk), , drop = FALSE]
weightmat <- do.call(rbind, #avoids matrix transposition
unlist(apply(inv_distmat_sub, #normalize by row
1,
function(x) list(x / sum(x))),
recursive = FALSE))
#perform vectorized idw
dk[is.na(dk)] <- 0 #allows matrix multiplication
d_idw <- weightmat %*% dk
if(nrow(dk) == 0){
ws_mean[k] <- errors::set_errors(NA_real_, NA)
if(save_precip_quickref){
precip_quickref[[k]] <- matrix(NA,
nrow = nrow(d_idw),
ncol = ncol(d_idw))
}
next
}
#reapply uncertainty dropped by `%*%`
errors::errors(d_idw) <- weightmat %*% matrix(errors::errors(dk),
nrow = nrow(dk))
#determine data-elevation relationship for interp weighting
if(! elev_agnostic && nrow(dk) >= 3){
d_elev <- tibble(site_code = rownames(dk),
d = dk[,1]) %>%
left_join(data_locations,
by = 'site_code')
mod <- lm(d ~ elevation, data = d_elev)
ab <- as.list(mod$coefficients)
#estimate raster values from elevation alone
d_from_elev <- ab$elevation * elevs_masked + ab$`(Intercept)`
# Set all negative values to 0
d_from_elev[d_from_elev < 0] <- 0
#get weighted mean of both approaches:
#weight on idw is 1; weight on elev-predicted is |R^2|
abs_r2 <- abs(cor(d_elev$d, mod$fitted.values)^2)
d_idw <- (d_idw + d_from_elev * abs_r2) / (1 + abs_r2)
}
ws_mean[k] <- mean(d_idw, na.rm=TRUE)
errors::errors(ws_mean)[k] <- mean(errors::errors(d_idw), na.rm=TRUE)
if(save_precip_quickref) precip_quickref[[k]] <- d_idw
}
if(save_precip_quickref){
#convert to mm/d
precip_quickref <- base::Map(
f = function(millimeters, days){
return(millimeters/days)
},
millimeters = precip_quickref,
days = durations_between_samples)
names(precip_quickref) <- as.character(timestep_indices)
write_precip_quickref(precip_idw_list = precip_quickref,
site_code = stream_site_code,
chunkdtrange = range(d_dt),
macrosheds_root = macrosheds_root)
}
if(output_varname == 'SPECIAL CASE PRECIP'){
ws_mean <- tibble(datetime = d_dt,
site_code = stream_site_code,
var = p_var,
val = ws_mean,
ms_status = d_status,
ms_interp = d_interp)
} else {
ws_mean <- tibble(datetime = d_dt,
site_code = stream_site_code,
var = output_varname,
concentration = ws_mean,
ms_status = d_status,
ms_interp = d_interp)
}
return(ws_mean)
}
ms_unparallelize <- function(cluster_object){
#if cluster_object is NULL, nothing will happen
check_suggested_pkgs(c('parallel'))
# tryCatch({print(site_code)},
# error=function(e) print('nope'))
if(is.null(cluster_object)){
# future::plan(future::sequential)
return()
}
parallel::stopCluster(cluster_object)
#remove foreach clutter that might compromise the next parallel run
fe_junk <- foreach:::.foreachGlobals
rm(list = ls(name = fe_junk),
pos = fe_junk)
# #remove any unneeded globals that were created during parallelization
# unneeded_globals <- c('pchem_vars', 'pchem_vars_fluxable',
# 'dem', 'wbi', 'rg', 'precip',
# 'pchem_setlist', 'first_fluxvar_ind', 'i', 'j')
# sw(rm(list = unneeded_globals,
# envir = .GlobalEnv))
# #restore globals that were overwritten during parallelization
# protected_vars <- mget('protected_vars',
# envir = protected_environment)
#
# for(i in 1:length(protected_vars)){
#
# nm <- names(protected_vars)[i]
# val <- protected_vars[[i]]
#
# if(! is.null(val)){
# assign(nm,
# value = val,
# envir = .GlobalEnv)
# } else {
#
# #or remove them if they didn't exist before parallelization
# sw(rm(list = nm,
# envir = .GlobalEnv))
# }
# }
}
write_ms_file <- function(d,
prodname_ms,
site_code,
shapefile = FALSE,
sep_errors = TRUE,
macrosheds_root){
#write an ms tibble or shapefile to its appropriate destination based on
#network, domain, prodname_ms, site_code, and processing level. If a tibble,
#write as a feather file (site_code.feather). Uncertainty (error) associated
#with the val column will be extracted into a separate column called
#val_err. Write the file to the appropriate location within the data
#acquisition repository.
#deprecated:
#if link_to_portal == TRUE, create a hard link to the
#file from the portal repository, which is assumed to be a sibling of the
#data_acquision directory and to be named "portal".
if(shapefile){
site_dir = glue::glue('{ms}/{p}/{s}',
ms = macrosheds_root,
p = prodname_ms,
s = site_code)
dir.create(site_dir,
showWarnings = FALSE,
recursive = TRUE)
if(any(! sf::st_is_valid(d))) {
d <- sf::st_make_valid(d)
}
sw(sf::st_write(obj = d,
dsn = glue::glue(site_dir, '/', site_code, '.shp'),
delete_dsn = TRUE,
quiet = TRUE))
} else {
prod_dir = glue::glue('{ms}/{p}',
ms = macrosheds_root,
p = prodname_ms)
dir.create(prod_dir,
showWarnings = FALSE,
recursive = TRUE)
site_file = glue::glue('{pd}/{s}.feather',
pd = prod_dir,
s = site_code)
if(sep_errors) {
#separate uncertainty into a new column.
#remove errors attribute from val column if it exists (it always should)
d$val_err <- errors::errors(d$val)
if('errors' %in% class(d$val)){
d$val <- errors::drop_errors(d$val)
} else {
warning(glue::glue('Uncertainty missing from val column ({n}-{d}-{s}-{p}). ',
'That means this dataset has not passed through ',
'carry_uncertainty yet. it should have.',
n = network,
d = domain,
s = site_code,
p = prodname_ms))
}
}
#make sure feather::write_feather will omit attrib by def (with no artifacts)
feather::write_feather(d, site_file)
}
#return()
}
shortcut_idw_concflux_v2 <- function(encompassing_dem,
wshd_bnd,
ws_area,
data_locations,
precip_values,
chem_values,
stream_site_code,
output_varname,
out_path,
# dump_idw_precip,
verbose = FALSE){
#This replaces shortcut_idw_concflux! shortcut_idw is still used for
#variables that can't be flux-converted, and for precipitation
#this function is similar to shortcut_idw_concflux.
#if the variable represented by chem_values and output_varname is
#flux-convertible, it multiplies precip chem
#by precip volume to calculate flux for each cell and returns
#the means of IDW-interpolated precipitation, precip chem, and precip flux
#for each sample timepoint. If that variable is not flux-convertible,
#It only returns precipitation and precip chem. All interpolated products are
#returned as standard macrosheds timeseries tibbles in a single list.
#encompassing_dem: RasterLayer; must cover the area of wshd_bnd and
# recip_gauges
#wshd_bnd: sf polygon with columns site_code and geometry.
# it represents a single watershed boundary.
#ws_area: numeric scalar representing watershed area in hectares. This is
# passed so that it doesn't have to be calculated repeatedly (if
# shortcut_idw_concflux_v2 is running iteratively).
#data_locations: sf point(s) with columns site_code and geometry.
# represents all sites (e.g. rain gauges) that will be used in
# the interpolation.
#precip_values: a data.frame with datetime, ms_status, ms_interp,
# and a column of data values for each precip location.
#chem_values: a data.frame with datetime, ms_status, ms_interp,
# and a column of data values for each precip chemistry location.
#stream_site_code: character; the name of the watershed/stream, not the
# name of a precip gauge
#output_varname: character; the prodname_ms used to populate the var
# column in the returned tibble
#dump_idw_precip: logical; if TRUE, IDW-interpolated precipitation will
# be dumped to disk (data/<network>/<domain>/precip_idw_dumps/<site_code>.rds).
# this file will then be read by precip_pchem_pflux_idw, in order to
# properly build data/<network>/<domain>/derived/<precipitation_msXXX>.
# the precip_idw_dumps directory is automatically removed after it's used.
# This should only be set to TRUE for one iteration of the calling loop,
# or else time will be wasted rewriting the files.
# REMOVED; OBSOLETE
precip_quickref <- read_precip_quickref(out_path = out_path,
site_code = stream_site_code,
dtrange = range(chem_values$datetime))
if(length(precip_quickref) == 1){
just_checkin <- precip_quickref[[1]]
if(class(just_checkin) == 'character' &&
just_checkin == 'NO QUICKREF AVAILABLE'){
return(tibble())
}
}
precip_is_highres <- Mode(diff(as.numeric(precip_values$datetime))) <= 15 * 60
if(is.na(precip_is_highres)) precip_is_highres <- FALSE
chem_is_highres <- Mode(diff(as.numeric(chem_values$datetime))) <= 15 * 60
if(is.na(chem_is_highres)) chem_is_highres <- FALSE
#if both chem and precip data are low resolution (grab samples),
# let approxjoin_datetime match up samples with a 12-hour gap. otherwise the
# gap should be 7.5 mins so that there isn't enormous duplication of
# timestamps where multiple high-res values can be snapped to the
# same low-res value
if(! chem_is_highres && ! precip_is_highres){
join_distance <- c('12:00:00')
} else {
join_distance <- c('7:30')
}
dt_match_inds <- approxjoin_datetime(x = chem_values,
y = precip_values,
rollmax = join_distance,
indices_only = TRUE)
chem_values <- chem_values[dt_match_inds$x, ]
common_datetimes <- chem_values$datetime
if(length(common_datetimes) == 0){
pchem_range <- range(chem_values$datetime)
test <- filter(precip_values,
datetime > pchem_range[1],
datetime < pchem_range[2])
if(nrow(test) > 0){
logging::logerror('something is wrong with approxjoin_datetime')
}
return(tibble())
}
precip_values <- precip_values %>%
mutate(ind = 1:n()) %>%
slice(dt_match_inds$y) %>%
mutate(datetime = !!common_datetimes)
quickref_inds <- precip_values$ind
precip_values$ind <- NULL
#matrixify input data so we can use matrix operations
p_status <- precip_values$ms_status
p_interp <- precip_values$ms_interp
p_matrix <- dplyr::select(precip_values,
-ms_status,
-datetime,
-ms_interp) %>%
err_df_to_matrix()
c_status <- chem_values$ms_status
c_interp <- chem_values$ms_interp
c_matrix <- dplyr::select(chem_values,
-ms_status,
-datetime,
-ms_interp) %>%
err_df_to_matrix()
rm(precip_values, chem_values); gc()
d_status <- bitwOr(p_status, c_status)
d_interp <- bitwOr(p_interp, c_interp)
#clean dem and get elevation values
dem_wb <- terra::crop(encompassing_dem, wshd_bnd)
dem_wb <- terra::mask(dem_wb, wshd_bnd)
elevs <- terra::values(dem_wb)
elevs_masked <- elevs[! is.na(elevs)]
#compute distances from all dem cells to all chemistry locations
inv_distmat_c <- matrix(NA,
nrow = length(elevs_masked),
ncol = ncol(c_matrix), #ngauges
dimnames = list(NULL,
colnames(c_matrix)))
dem_wb_all_na <- dem_wb
terra::values(dem_wb_all_na) <- NA
dem_wb_all_na <- terra::rast(dem_wb_all_na)
for(k in 1:ncol(c_matrix)){
dk <- filter(data_locations,
site_code == colnames(c_matrix)[k])
inv_dists_site <- 1 / terra::distance(dem_wb_all_na, terra::vect(dk))^2 %>%
terra::values(.)
inv_dists_site <- inv_dists_site[! is.na(elevs)] #drop elevs not included in mask
inv_distmat_c[, k] <- inv_dists_site
}
#calculate watershed mean concentration and flux at every timestep
ptm <- proc.time()
ntimesteps <- nrow(c_matrix)
ws_mean_conc <- ws_mean_flux <- rep(NA, ntimesteps)
for(k in 1:ntimesteps){
## GET CHEMISTRY FOR ALL CELLS IN TIMESTEP k
#assign cell weights as normalized inverse squared distances (c)
ck <- t(c_matrix[k, , drop = FALSE])
inv_distmat_c_sub <- inv_distmat_c[, ! is.na(ck), drop=FALSE]
ck <- ck[! is.na(ck), , drop=FALSE]
weightmat_c <- do.call(rbind,
unlist(apply(inv_distmat_c_sub,
1,
function(x) list(x / sum(x))),
recursive = FALSE))
if(ncol(weightmat_c) == 0){
ws_mean_conc[k] <- NA_real_ %>% errors::set_errors(NA)
ws_mean_flux[k] <- NA_real_ %>% errors::set_errors(NA)
next
}
#perform vectorized idw (c)
ck[is.na(ck)] <- 0
c_idw <- weightmat_c %*% ck
#reapply uncertainty dropped by `%*%`
errors::errors(c_idw) <- weightmat_c %*% matrix(errors::errors(ck),
nrow = nrow(ck))
## GET FLUX FOR ALL CELLS; THEN AVERAGE CELLS FOR PCHEM, PFLUX, PRECIP
#calculate flux for every cell:
# This is how we'd calcualate flux as kg/(ha * d):
# mm/d * mg/L * m/1000mm * kg/1,000,000mg * 1000L/m^(2 + 1) * 10,000m^2/ha
# therefore, kg/(ha * d) = mm * mg/L / d / 100 = (mm * mg) / (d * L * 100)
# But stream_flux_inst is not scaled by area when it's derived (that
# happens later), so we're going to derive precip_flux_inst
# as unscaled here, and then both can be scaled the same way in
# scale_flux_by_area. So we calculate flux in kg/(ha * d) as above,
# then multiply by watershed area in hectares.
quickref_ind <- as.character(quickref_inds[k])
# mg/L mm/day ha
flux_interp <- c_idw * precip_quickref[[quickref_ind]] * ws_area / 100
#calculate watershed averages (work around error drop)
ws_mean_conc[k] <- mean(c_idw, na.rm=TRUE)
ws_mean_flux[k] <- mean(flux_interp, na.rm=TRUE)
errors::errors(ws_mean_conc)[k] <- mean(errors::errors(c_idw), na.rm=TRUE)
errors::errors(ws_mean_flux)[k] <- mean(errors::errors(flux_interp), na.rm=TRUE)
}
# compare_interp_methods()
ws_means <- tibble(datetime = common_datetimes,
site_code = stream_site_code,
var = output_varname,
concentration = ws_mean_conc,
flux = ws_mean_flux,
ms_status = d_status,
ms_interp = d_interp)
return(ws_means)
}
idw_log_wb <- function(verbose, site_code, i, nw){
if(! verbose) return()
msg <- glue::glue('site: {s} ({ii}/{w})',
s = site_code,
ii = i,
w = nw)
message(msg)
}
err_df_to_matrix <- function(df){
if(! all(sapply(df, class) %in% c('errors', 'numeric'))){
stop('all columns of df must be of class "errors" or "numeric"')
}
errmat <- as.matrix(as.data.frame(lapply(df, errors::errors)))
M <- as.matrix(df)
errors::errors(M) <- errmat
return(M)
}
write_precip_quickref <- function(precip_idw_list,
macrosheds_root,
site_code,
chunkdtrange){
# timestep){
#allows precip values computed by shortcut_idw for each watershed
# raster cell to be reused by shortcut_idw_concflux_v2
quickref_dir <- glue::glue('{mr}/precip_idw_quickref/',
mr = macrosheds_root)
dir.create(path = quickref_dir,
showWarnings = FALSE,
recursive = TRUE)
chunkfile <- paste(strftime(chunkdtrange[1],
format = '%Y-%m-%d %H:%M:%S',
tz = 'UTC'),
strftime(chunkdtrange[2],
format = '%Y-%m-%d %H:%M:%S',
tz = 'UTC'),
sep = '_')
chunkfile <- stringr::str_replace_all(chunkfile, ':', '-')
saveRDS(object = precip_idw_list,
file = glue::glue('{qd}/{cf}', #omitting extension for easier parsing
qd = quickref_dir,
cf = chunkfile))
}
choose_projection <- function(lat = NULL,
long = NULL,
unprojected = FALSE){
#TODO: CHOOSE PROJECTIONS MORE CAREFULLY
if(unprojected){
PROJ4 <- glue::glue('+proj=longlat +datum=WGS84 +no_defs ',
'+ellps=WGS84 +towgs84=0,0,0')
return(PROJ4)
}
if(is.null(lat) || is.null(long)){
stop('If projecting, lat and long are required.')
}
abslat <- abs(lat)
# if(abslat < 23){ #tropical
# PROJ4 = glue::glue('+proj=laea +lon_0=', long)
# # ' +datum=WGS84 +units=m +no_defs')
# } else { #temperate or polar
# PROJ4 = glue::glue('+proj=laea +lat_0=', lat, ' +lon_0=', long)
# }
#this is what the makers of https://projectionwizard.org/# use to choose
#a suitable projection: https://rdrr.io/cran/rCAT/man/simProjWiz.html
# THIS WORKS (PROJECTS STUFF), BUT CAN'T BE READ AUTOMATICALLY BY sf::st_read
if(abslat < 70){ #tropical or temperate
PROJ4 <- glue::glue('+proj=cea +lon_0={lng} +lat_ts=0 +x_0=0 +y_0=0 ',
'+ellps=WGS84 +datum=WGS84 +units=m +no_defs',
lng = long)
} else { #polar
PROJ4 <- glue::glue('+proj=laea +lat_0={lt} +lon_0={lng} +x_0=0 +y_0=0 ',
'+ellps=WGS84 +datum=WGS84 +units=m +no_defs',
lt = lat,
lng = long)
}
## UTM/UPS would be nice for watersheds that don't fall on more than two zones
## (incomplete)
# if(lat > 84 || lat < -80){ #polar; use Universal Polar Stereographic (UPS)
# PROJ4 <- glue::glue('+proj=ups +lon_0=', long)
# # ' +datum=WGS84 +units=m +no_defs')
# } else { #not polar; use UTM
# PROJ4 <- glue::glue('+proj=utm +lat_0=', lat, ' +lon_0=', long)
# }
## EXTRA CODE FOR CHOOSING PROJECTION BY LATITUDE ONLY
# if(abslat < 23){ #tropical
# PROJ4 <- 9835 #Lambert cylindrical equal area (ellipsoidal; should spherical 9834 be used instead?)
# } else if(abslat > 23 && abslat < 66){ # middle latitudes
# PROJ4 <- 5070 #albers equal area conic
# } else { #polar (abslat >= 66)
# PROJ4 <- 9820 #lambert equal area azimuthal
# # PROJ4 <- 1027 #lambert equal area azimuthal (spherical)
# }
# PROJ4 <- 3857 #WGS 84 / Pseudo-Mercator
# PROJ4 <- 2163
return(PROJ4)
}
read_precip_quickref <- function(out_path,
site_code,
dtrange){
# timestep){
#allows precip values computed by shortcut_idw for each watershed
# raster cell to be reused by shortcut_idw_concflux_v2. These values are
# in mm/day
quickref_dir <- glue::glue('{ms}/precip_idw_quickref',
ms = out_path)
quickref_chunks <- list.files(quickref_dir)
refranges <- lapply(quickref_chunks,
function(x){
as.POSIXct(strsplit(x, '_')[[1]],
tz = 'UTC')
}) %>%
plyr::ldply(function(y){
data.frame(startdt = y[1],
enddt = y[2])
}) %>%
mutate(ref_ind = 1:n())
refranges_sel <- refranges %>%
filter((startdt >= dtrange[1] & enddt <= dtrange[2]) |
(startdt < dtrange[1] & enddt >= dtrange[1]) |
(enddt > dtrange[2] & startdt <= dtrange[2]))
#redundant?
# (startdt > dtrange[1] & startdt <= dtrange[2] & enddt > dtrange[2]) |
# (startdt < dtrange[1] & enddt < dtrange[2] & enddt >= dtrange[1]))
if(nrow(refranges_sel) == 0){
return(list('0' = 'NO QUICKREF AVAILABLE'))
}
#handle the case where an end of dtrange falls right between the start and
#end dates of a quickref file. this is possible because precip dates can
#be shifted (replaced with pchem dates) inside precip_pchem_pflux_idw2
ref_ind_range <- range(refranges_sel$ref_ind)
if(dtrange[1] < refranges_sel$startdt[1] && ref_ind_range[1] > 1){
refranges_sel <- bind_rows(refranges[ref_ind_range[1] - 1, ],
refranges_sel)
}
if(dtrange[2] > refranges_sel$enddt[nrow(refranges_sel)] &&
ref_ind_range[2] < nrow(refranges)){
refranges_sel <- bind_rows(refranges_sel,
refranges[ref_ind_range[2] + 1, ])
}
quickref <- list()
for(i in 1:nrow(refranges_sel)){
fn <- paste(strftime(refranges_sel$startdt[i],
format = '%Y-%m-%d %H-%M-%S',
tz = 'UTC'),
strftime(refranges_sel$enddt[i],
format = '%Y-%m-%d %H-%M-%S',
tz = 'UTC'),
sep = '_')
qf <- readRDS(glue::glue('{qd}/{f}',
qd = quickref_dir,
f = fn))
quickref <- append(quickref, qf)
}
#for some reason the first ref sometimes gets duplicated?
quickref <- quickref[! duplicated(names(quickref))]
return(quickref)
}
approxjoin_datetime <- function(x,
y,
rollmax = '7:30',
keep_datetimes_from = 'x',
indices_only = FALSE){
#direction = 'forward'){
#x and y: macrosheds standard tibbles with only one site_code,
# which must be the same in x and y. Nonstandard tibbles may also work,
# so long as they have datetime columns, but the only case where we need
# this for other tibbles is inside precip_pchem_pflux_idw, in which case
# indices_only == TRUE, so it's not really set up for general-purpose joining
#rollmax: the maximum snap time for matching elements of x and y.
# either '7:30' for continuous data or '12:00:00' for grab data
#direction [REMOVED]: either 'forward', meaning elements of x will be rolled forward
# in time to match the next y, or 'backward', meaning elements of
# x will be rolled back in time to reach the previous y
#keep_datetimes_from: string. either 'x' or 'y'. the datetime column from
# the corresponding tibble will be kept, and the other will be dropped
#indices_only: logical. if TRUE, a join is not performed. rather,
# the matching indices from each tibble are returned as a named list of vectors..
#good datasets for testing this function:
# x <- tribble(
# ~datetime, ~site_code, ~var, ~val, ~ms_status, ~ms_interp,
# '1968-10-09 04:42:00', 'GSWS10', 'GN_alk', set_errors(27.75, 1), 0, 0,
# '1968-10-09 04:44:00', 'GSWS10', 'GN_alk', set_errors(21.29, 1), 0, 0,
# '1968-10-09 04:47:00', 'GSWS10', 'GN_alk', set_errors(21.29, 1), 0, 0,
# '1968-10-09 04:59:59', 'GSWS10', 'GN_alk', set_errors(16.04, 1), 0, 0,
# '1968-10-09 05:15:01', 'GSWS10', 'GN_alk', set_errors(17.21, 1), 1, 0,
# '1968-10-09 05:30:59', 'GSWS10', 'GN_alk', set_errors(16.50, 1), 0, 0) %>%
# mutate(datetime = as.POSIXct(datetime, tz = 'UTC'))
# y <- tribble(
# ~datetime, ~site_code, ~var, ~val, ~ms_status, ~ms_interp,
# '1968-10-09 04:00:00', 'GSWS10', 'GN_alk', set_errors(1.009, 1), 1, 0,
# '1968-10-09 04:15:00', 'GSWS10', 'GN_alk', set_errors(2.009, 1), 1, 1,
# '1968-10-09 04:30:00', 'GSWS10', 'GN_alk', set_errors(3.009, 1), 1, 1,
# '1968-10-09 04:45:00', 'GSWS10', 'GN_alk', set_errors(4.009, 1), 1, 1,
# '1968-10-09 05:00:00', 'GSWS10', 'GN_alk', set_errors(5.009, 1), 1, 1,
# '1968-10-09 05:15:00', 'GSWS10', 'GN_alk', set_errors(6.009, 1), 1, 1) %>%
# mutate(datetime = as.POSIXct(datetime, tz = 'UTC'))
check_suggested_pkgs(c('data.table'))
#tests
if('site_code' %in% colnames(x) && length(unique(x$site_code)) > 1){
stop('Only one site_code allowed in x at the moment')
}
if('var' %in% colnames(x) && length(unique(drop_var_prefix(x$var))) > 1){
stop('Only one var allowed in x at the moment (not including prefix)')
}
if('site_code' %in% colnames(y) && length(unique(y$site_code)) > 1){
stop('Only one site_code allowed in y at the moment')
}
if('var' %in% colnames(y) && length(unique(drop_var_prefix(y$var))) > 1){
stop('Only one var allowed in y at the moment (not including prefix)')
}
if('site_code' %in% colnames(x) &&
'site_code' %in% colnames(y) &&
x$site_code[1] != y$site_code[1]) stop('x and y site_code must be the same')
if(! rollmax %in% c('7:30', '12:00:00')) stop('rollmax must be "7:30" or "12:00:00"')
# if(! direction %in% c('forward', 'backward')) stop('direction must be "forward" or "backward"')
if(! keep_datetimes_from %in% c('x', 'y')) stop('keep_datetimes_from must be "x" or "y"')
if(! 'datetime' %in% colnames(x) || ! 'datetime' %in% colnames(y)){
stop('both x and y must have "datetime" columns containing POSIXct values')
}
if(! is.logical(indices_only)) stop('indices_only must be a logical')
#deal with the case of x or y being a specialized "flow" tibble
# x_is_flowtibble <- y_is_flowtibble <- FALSE
# if('flow' %in% colnames(x)) x_is_flowtibble <- TRUE
# if('flow' %in% colnames(y)) y_is_flowtibble <- TRUE
# if(x_is_flowtibble && ! y_is_flowtibble){
# varname <- y$var[1]
# y$var = NULL
# } else if(y_is_flowtibble && ! x_is_flowtibble){
# varname <- x$var[1]
# x$var = NULL
# } else if(! x_is_flowtibble && ! y_is_flowtibble){
# varname <- x$var[1]
# x$var = NULL
# y$var = NULL
# } else {
# stop('x and y are both "flow" tibbles. There should be no need for this')
# }
# if(x_is_flowtibble) x <- rename(x, val = flow)
# if(y_is_flowtibble) y <- rename(y, val = flow)
#data.table doesn't work with the errors package, so error needs
#to be separated into its own column. also give same-name columns suffixes
if('val' %in% colnames(x)){
x <- x %>%
mutate(err = errors::errors(val),
val = errors::drop_errors(val)) %>%
rename_with(.fn = ~paste0(., '_x'),
.cols = everything()) %>%
data.table::as.data.table()
y <- y %>%
mutate(err = errors::errors(val),
val = errors::drop_errors(val)) %>%
rename_with(.fn = ~paste0(., '_y'),
.cols = everything()) %>%
data.table::as.data.table()
} else {
if(indices_only){
x <- rename(x, datetime_x = datetime) %>%
mutate(across(where(~inherits(., 'errors')),
~errors::drop_errors(.))) %>%
data.table::as.data.table()
y <- rename(y, datetime_y = datetime) %>%
mutate(across(where(~inherits(., 'errors')),
~errors::drop_errors(.))) %>%
data.table::as.data.table()
} else {
stop('this case not yet handled')
}
}
#alternative implementation of the "on" argument in data.table joins...
#probably more flexible, so leaving it here in case we need to do something crazy
# data.table::setkeyv(x, 'datetime')
# data.table::setkeyv(y, 'datetime')
#convert the desired maximum roll distance from string to integer seconds
rollmax <- ifelse(test = rollmax == '7:30',
yes = 7 * 60 + 30,
no = 12 * 60 * 60)
#leaving this here in case the nearest neighbor join implemented below is too
#slow. then we can fall back to a basic rolling join with a maximum distance
# rollmax <- ifelse(test = direction == 'forward',
# yes = -rollmax,
# no = rollmax)
#rollends will move the first/last value of x in the opposite `direction` if necessary
# joined <- y[x, on = 'datetime', roll = rollmax, rollends = c(TRUE, TRUE)]
#create columns in x that represent the snapping window around each datetime
x[, `:=` (datetime_min = datetime_x - rollmax,
datetime_max = datetime_x + rollmax)]
y[, `:=` (datetime_y_orig = datetime_y)] #datetime col will be dropped from y
# if(indices_only){
# y_indices <- y[x,
# on = .(datetime_y <= datetime_max,
# datetime_y >= datetime_min),
# which = TRUE]
# return(y_indices)
# }
#join x rows to y if y's datetime falls within the x range
joined <- y[x, on = .(datetime_y <= datetime_max,
datetime_y >= datetime_min)]
joined <- na.omit(joined, cols = 'datetime_y_orig') #drop rows without matches
#for any datetimes in x or y that were matched more than once, keep only
#the nearest match
joined[, `:=` (datetime_match_diff = abs(datetime_x - datetime_y_orig))]
joined <- joined[, .SD[which.min(datetime_match_diff)], by = datetime_x]
joined <- joined[, .SD[which.min(datetime_match_diff)], by = datetime_y_orig]
if(indices_only){
y_indices <- which(y$datetime_y %in% joined$datetime_y_orig)
x_indices <- which(x$datetime_x %in% joined$datetime_x)
return(list(x = x_indices, y = y_indices))
}
#drop and rename columns (data.table makes weird name modifications)
if(keep_datetimes_from == 'x'){
joined[, c('datetime_y', 'datetime_y.1', 'datetime_y_orig', 'datetime_match_diff') := NULL]
data.table::setnames(joined, 'datetime_x', 'datetime')
} else {
joined[, c('datetime_x', 'datetime_y.1', 'datetime_y', 'datetime_match_diff') := NULL]
data.table::setnames(joined, 'datetime_y_orig', 'datetime')
}
#restore error objects, var column, original column names (with suffixes).
#original column order
joined <- tibble::as_tibble(joined) %>%
mutate(val_x = errors::set_errors(val_x, err_x),
val_y = errors::set_errors(val_y, err_y)) %>%
dplyr::select(-err_x, -err_y)
# mutate(var = !!varname)
# if(x_is_flowtibble) joined <- rename(joined,
# flow = val_x,
# ms_status_flow = ms_status_x,
# ms_interp_flow = ms_interp_x)
# if(y_is_flowtibble) joined <- rename(joined,
# flow = val_y,
# ms_status_flow = ms_status_y,
# ms_interp_flow = ms_interp_y)
# if(! sum(grepl('^val_[xy]$', colnames(joined))) > 1){
# joined <- rename(joined, val = matches('^val_[xy]$'))
# }
joined <- dplyr::select(joined,
datetime,
# matches('^val_?[xy]?$'),
# any_of('flow'),
starts_with('site_code'),
any_of(c(starts_with('var_'), matches('^var$'))),
any_of(c(starts_with('val_'), matches('^val$'))),
starts_with('ms_status_'),
starts_with('ms_interp_'))
return(joined)
}
# general interp helpers ####
ms_linear_interpolate <- function(d, interval, max_samples_to_impute){
#d: a ms tibble with no ms_interp column (this will be created)
#TODO: prefer imputeTS::na_seadec when there are >=2 non-NA datapoints.
# There are commented sections that begin this work, but we still would
# need to calculate start and end when creating a ts() object. we'd
# also need to separate uncertainty from the val column before converting
# to ts. here is the line that could be added to this documentation
# if we ever implement na_seadec:
#For linear interpolation with
# seasonal decomposition, interval will also be used to determine
# the fraction of the sampling period between samples.
if(length(unique(d$site_code)) > 1){
stop(paste('ms_linear_interpolate is not designed to handle datasets',
'with more than one site.'))
}
if(length(unique(d$var)) > 1){
stop(paste('ms_linear_interpolate is not designed to handle datasets',
'with more than one variable'))
}
var <- ms_drop_var_prefix_(d$var[1])
d <- arrange(d, datetime)
ms_interp_column <- is.na(d$val)
d_interp <- d %>%
mutate(
#carry ms_status to any rows that have just been populated (probably
#redundant now, but can't hurt)
ms_status <- imputeTS::na_locf(ms_status,
na_remaining = 'rev',
maxgap = max_samples_to_impute),
val = if(sum(! is.na(val)) > 1){
#linear interp NA vals
imputeTS::na_interpolation(val,
maxgap = max_samples_to_impute)
#unless not enough data in group; then do nothing
} else val,
val_err = if(sum(! is.na(val_err)) > 1){
#linear interp NA vals
imputeTS::na_interpolation(val_err,
maxgap = max_samples_to_impute)
#unless not enough data in group; then do nothing
} else val_err
) %>%
dplyr::select(any_of(c('datetime', 'site_code', 'var', 'val', 'ms_status', 'ms_interp', 'val_err'))) %>%
arrange(site_code, var, datetime)
d_interp$ms_status[is.na(d_interp$ms_status)] = 0
ms_interp_column <- ms_interp_column & ! is.na(d_interp$val)
d_interp$ms_interp <- as.numeric(ms_interp_column)
d_interp <- filter(d_interp,
! is.na(val))
return(d_interp)
}
ms_zero_interpolate <- function(d, interval, max_samples_to_impute){
#d: a ms tibble with no ms_interp column (this will be created)
#interval: the sampling interval (either '15 min' or '1 day').
#for precip only, and only relevant at konza (so far)
#fills gaps up to maxgap (determined automatically), then removes missing values
if(length(unique(d$site_code)) > 1){
stop(paste('ms_zero_interpolate is not designed to handle datasets',
'with more than one site.'))
}
if(length(unique(d$var)) > 1){
stop(paste('ms_zero_interpolate is not designed to handle datasets',
'with more than one variable'))
}
var <- ms_drop_var_prefix_(d$var[1])
d <- arrange(d, datetime)
ms_interp_column <- is.na(d$val)
d_interp <- d %>%
mutate(
ms_status = imputeTS::na_replace(ms_status,
fill = 1,
maxgap = max_samples_to_impute),
val = if(sum(! is.na(val)) > 1){
#nocb interp NA vals
imputeTS::na_replace(val,
fill = 0,
maxgap = max_samples_to_impute)
#unless not enough data in group; then do nothing
} else val
) %>%
dplyr::select(any_of(c('datetime', 'site_code', 'var', 'val', 'ms_status', 'ms_interp', 'val_err'))) %>%
arrange(site_code, var, datetime)
d_interp$ms_status[is.na(d_interp$ms_status)] = 0
ms_interp_column <- ms_interp_column & ! is.na(d_interp$val)
d_interp$ms_interp <- as.numeric(ms_interp_column)
d_interp <- filter(d_interp,
! is.na(val))
return(d_interp)
}
ms_nocb_interpolate <- function(d, interval, max_samples_to_impute){
#d: a ms tibble with no ms_interp column (this will be created)
#interval: the sampling interval (either '15 min' or '1 day').
#for pchem only, where measured concentrations represent aggregated
#concentration over the measurement period.
#fills gaps up to maxgap (determined automatically), then removes missing values
if(length(unique(d$site_code)) > 1){
stop(paste('ms_nocb_interpolate is not designed to handle datasets',
'with more than one site.'))
}
if(length(unique(d$var)) > 1){
stop(paste('ms_nocb_interpolate is not designed to handle datasets',
'with more than one variable'))
}
var <- ms_drop_var_prefix_(d$var[1])
d <- arrange(d, datetime)
ms_interp_column <- is.na(d$val)
d_interp <- d %>%
mutate(
#carry ms_status to any rows that have just been populated (probably
#redundant now, but can't hurt)
ms_status <- imputeTS::na_locf(ms_status,
option = 'nocb',
na_remaining = 'rev',
maxgap = max_samples_to_impute),
val = if(sum(! is.na(val)) > 1){
#nocb interp NA vals
imputeTS::na_locf(val,
option = 'nocb',
na_remaining = 'keep',
maxgap = max_samples_to_impute)
#unless not enough data in group; then do nothing
} else val,
val_err = if(sum(! is.na(val_err)) > 1){
#do the same for uncertainty
imputeTS::na_locf(val_err,
option = 'nocb',
na_remaining = 'keep',
maxgap = max_samples_to_impute)
} else val_err
) %>%
dplyr::select(any_of(c('datetime', 'site_code', 'var', 'val', 'ms_status', 'ms_interp', 'val_err'))) %>%
arrange(site_code, var, datetime)
d_interp$ms_status[is.na(d_interp$ms_status)] = 0
ms_interp_column <- ms_interp_column & ! is.na(d_interp$val)
d_interp$ms_interp <- as.numeric(ms_interp_column)
d_interp <- filter(d_interp,
! is.na(val))
return(d_interp)
}
ms_nocb_mean_interpolate <- function(d, interval, max_samples_to_impute){
#d: a ms tibble with no ms_interp column (this will be created)
#interval: the sampling interval (either '15 min' or '1 day').
#for pchem only, where measured concentrations represent aggregated
#concentration over the measurement period.
#fills gaps up to maxgap (determined automatically), then removes missing values
if(length(unique(d$site_code)) > 1){
stop(paste('ms_nocb_mean_interpolate is not designed to handle datasets',
'with more than one site.'))
}
if(length(unique(d$var)) > 1){
stop(paste('ms_nocb_mean_interpolate is not designed to handle datasets',
'with more than one variable'))
}
var <- ms_drop_var_prefix_(d$var[1])
d <- arrange(d, datetime)
ms_interp_column <- is.na(d$val)
d_interp <- d %>%
mutate(
#carry ms_status to any rows that have just been populated (probably
#redundant now, but can't hurt)
ms_status <- imputeTS::na_locf(ms_status,
option = 'nocb',
na_remaining = 'rev',
maxgap = max_samples_to_impute),
val = if(sum(! is.na(val)) > 1){
#nocb interp NA vals
imputeTS::na_locf(val,
option = 'nocb',
na_remaining = 'keep',
maxgap = max_samples_to_impute)
#unless not enough data in group; then do nothing
} else val,
val_err = if(sum(! is.na(val_err)) > 1){
#do the same for uncertainty
imputeTS::na_locf(val_err,
option = 'nocb',
na_remaining = 'keep',
maxgap = max_samples_to_impute)
} else val_err
) %>%
dplyr::select(any_of(c('datetime', 'site_code', 'var', 'val', 'ms_status', 'ms_interp', 'val_err'))) %>%
arrange(site_code, var, datetime)
#identify series of records that need to be divided by their n
laginterp <- lag(d_interp$ms_interp)
laginterp[1] <- d_interp$ms_interp[1]
laginterp <- as.numeric(laginterp | d_interp$ms_interp)
# err_ <- errors::errors(d_interp$val)
# d_interp$val <- d_interp$val
vals_interped <- d_interp$val * laginterp
err_interped <- d_interp$val_err * laginterp
#use run length encoding to do the division quickly
vals_new <- rle2(vals_interped) %>%
mutate(values = values / lengths) %>%
dplyr::select(lengths, values) %>%
as.list()
class(vals_new) <- 'rle'
vals_new <- inverse.rle(vals_new)
#same for uncertainty
err_new <- rle2(err_interped) %>%
mutate(values = values / lengths) %>%
dplyr::select(lengths, values) %>%
as.list()
class(err_new) <- 'rle'
err_new <- inverse.rle(err_new)
real_vals_new <- vals_new != 0
d_interp$val[real_vals_new] <- vals_new[real_vals_new]
d_interp$val_err <- err_new
d_interp$ms_status[is.na(d_interp$ms_status)] = 0
return(d_interp)
}
# ms_read_csv helpers ####
gsub_v <- function(pattern, replacement_vec, x){
#just like the first three arguments to gsub, except that
# replacement is now a vector of replacements.
#return a vector of the same length as replacement_vec, where
# each element in replacement_vec has been used once
subbed <- sapply(replacement_vec,
function(v) gsub(pattern = pattern,
replacement = v,
x = x),
USE.NAMES = FALSE)
return(subbed)
}
resolve_datetime <- function(d, #needs updates if we ever re-implement!
datetime_colnames,
datetime_formats,
datetime_tz,
optional){
#d: a data.frame or tibble with at least one date or time column
# (all date and/or time columns must contain character strings,
# not parsed date/time/datetime objects).
#datetime_colnames: character vector; column names that contain
# relevant datetime information.
#datetime_formats: character vector; datetime parsing tokens
# (like '%A, %Y-%m-%d %I:%M:%S %p' or '%j') corresponding to the
# elements of datetime_colnames.
#datetime_tz: character; time zone of the returned datetime column.
#optional: character vector; see dt_format_to_regex.
#return value: d, but with a "datetime" column containing POSIXct datetimes
# and without the input datetime columns
dt_tb <- tibble(basecol = rep(NA, nrow(d)))
for(i in 1:length(datetime_colnames)){
dt_comps <- stringr::str_match_all(string = datetime_formats[i],
pattern = '%([a-zA-Z])')[[1]][,2]
dt_regex <- dt_format_to_regex(datetime_formats[i],
optional = optional)
# for loop handling with number-of-character issues
for(match in grepl("m|e|d|H|I|M|S", dt_comps)) {
if(match){
for (dt_entry in 1:nrow(d[datetime_colnames[i]])) {
if(! is.na(d[datetime_colnames[i]][dt_entry,])){
if(numbers_only(d[datetime_colnames[i]][dt_entry,])){
if(nchar(d[datetime_colnames[i]][dt_entry,]) == 1) {
d[datetime_colnames[i]][dt_entry,] <- paste0(0, d[datetime_colnames[i]][dt_entry,])
} else if(nchar(d[datetime_colnames[i]][dt_entry,]) ==3){
d[datetime_colnames[i]][dt_entry,] <- paste0(0, d[datetime_colnames[i]][dt_entry,])
}
}
}
}
}
}
dt_tb <- d %>%
dplyr::select(one_of(datetime_colnames[i])) %>%
tidyr::extract(col = !!datetime_colnames[i],
into = dt_comps,
regex = dt_regex,
remove = TRUE,
convert = FALSE) %>%
bind_cols(dt_tb)
}
dt_tb$basecol = NULL
#fill in defaults if applicable:
#(12 for hour, 00 for minute and second, PM for AM/PM)
dt_tb <- dt_tb %>%
mutate(
# across(any_of(c('H', 'I')), ~ifelse(nchar(.x) < 2, paste0(0, .x), .x)),
across(any_of(c('H', 'I')), ~ifelse(is.na(.x), '12', .x)),
# across(any_of(c('M', 'S')), ~ifelse(nchar(.x) < 2, paste0(0, .x), .x)),
across(any_of(c('M', 'S')), ~ifelse(is.na(.x), '00', .x)),
across(any_of('p'), ~ifelse(is.na(.x), 'PM', .x)))
#resolve datetime structure into POSIXct
datetime_formats_split <- stringr::str_extract_all(datetime_formats,
'%[a-zA-Z]') %>%
unlist()
dt_col_order <- match(paste0('%',
colnames(dt_tb)),
datetime_formats_split)
if('H' %in% colnames(dt_tb)){
dt_tb$H[dt_tb$H == ''] <- '00'
}
if('M' %in% colnames(dt_tb)){
dt_tb$M[dt_tb$M == ''] <- '00'
}
if('S' %in% colnames(dt_tb)){
dt_tb$S[dt_tb$S == ''] <- '00'
}
if('I' %in% colnames(dt_tb)){
dt_tb$I[dt_tb$I == ''] <- '00'
}
if('P' %in% colnames(dt_tb)){
dt_tb$P[dt_tb$P == ''] <- 'AM'
}
dt_tb <- dt_tb %>%
tidyr::unite(col = 'datetime',
everything(),
sep = ' ',
remove = TRUE) %>%
mutate(datetime = as_datetime(datetime,
format = paste(datetime_formats_split[dt_col_order],
collapse = ' '),
tz = datetime_tz) %>%
with_tz(tz = 'UTC'))
d <- d %>%
bind_cols(dt_tb) %>%
dplyr::select(-one_of(datetime_colnames), datetime) %>% #in case 'datetime' is in datetime_colnames
relocate(datetime)
return(d)
}
identify_sampling <- function(df,
is_sensor,
date_col = 'datetime',
network,
domain,
prodname_ms,
sampling_type){
#TODO: for hbef, identify_sampling is writing sites names as 1 not w1
#is_sensor: named logical vector. see documention for
# ms_read_raw_csv, but note that an unnamed logical vector of length one
# cannot be used here. also note that the original variable/flag column names
# from the raw file are converted to canonical macrosheds names by
# ms_read_raw_csv before it passes is_sensor to identify_sampling.
#checks
if(any(! is.logical(is_sensor))){
stop('all values in is_sensor must be logical.')
}
svh_names <- names(is_sensor)
if(is.null(svh_names) || any(is.na(svh_names))){
stop('all elements of is_sensor must be named.')
}
#parse is_sensor into a character vector of sample regimen codes
is_sensor <- ifelse(is_sensor, 'S', 'N')
#set up directory system to store sample regimen metadata
sampling_dir <- glue::glue('data/{n}/{d}',
n = network,
d = domain)
sampling_file <- glue::glue('data/{n}/{d}/sampling_type.json',
n = network,
d = domain)
master <- try(jsonlite::fromJSON(readr::read_file(sampling_file)),
silent = TRUE)
if('try-error' %in% class(master)){
dir.create(sampling_dir, recursive = TRUE)
file.create(sampling_file)
master <- list()
}
#determine and record sample regimen for each variable
col_names <- colnames(df)
data_cols <- grep(pattern = '__[|]dat',
col_names,
value = TRUE)
flg_cols <- grep(pattern = '__[|]flg',
col_names,
value = TRUE)
site_codes <- unique(df$site_code)
for(p in 1:length(data_cols)){
# var_name <- stringr::str_split_fixed(data_cols[p], '__', 2)[1]
# df_var <- df %>%
# dplyr::select(datetime, !!var_name := .data[[data_cols[p]]], site_code)
all_sites <- tibble()
for(i in 1:length(site_codes)){
# df_site <- df_var %>%
df_site <- df %>%
filter(site_code == !!site_codes[i]) %>%
arrange(datetime)
# ! is.na(.data[[date_col]]), #NAs here are indicative of bugs we want to fix, so let's let them through
# ! is.na(.data[[var_name]])) #NAs here are indicative of bugs we want to fix, so let's let them through
dates <- df_site[[date_col]]
dif <- diff(dates)
unit <- attr(dif, 'units')
conver_mins <- case_when(
unit %in% c('seconds', 'secs') ~ 0.01666667,
unit %in% c('minutes', 'mins') ~ 1,
unit == 'hours' ~ 60,
unit == 'days' ~ 1440,
TRUE ~ NA_real_)
if(is.na(conver_mins)) stop('Weird time unit encountered. address this.')
dif_mins <- as.numeric(dif) * conver_mins
dif_mins <- round(dif_mins)
mode_mins <- Mode(dif_mins)
mean_mins <- mean(dif_mins, na.rm = T)
prop_mode_min <- length(dif_mins[dif_mins == mode_mins])/length(dif_mins)
# remove gaps larger than 90 days (for seasonal sampling)
dif_mins <- dif_mins[dif_mins < 129600]
if(length(dif_mins) == 0){
# This is grab
g_a <- tibble('site_code' = site_codes[i],
'type' = 'G',
'starts' = min(dates, na.rm = TRUE),
'interval' = mode_mins)
} else{
if(prop_mode_min >= 0.3 && mode_mins <= 1440){
# This is installed
g_a <- tibble('site_code' = site_codes[i],
'type' = 'I',
'starts' = min(dates, na.rm = TRUE),
'interval' = mode_mins)
} else{
if(mean_mins <= 1440){
# This is installed (non standard interval like HBEF)
g_a <- tibble('site_code' = site_codes[i],
'type' = 'I',
'starts' = min(dates, na.rm = TRUE),
'interval' = mean_mins)
} else{
# This is grab
g_a <- tibble('site_code' = site_codes[i],
'type' = 'G',
'starts' = min(dates, na.rm = TRUE),
'interval' = mean_mins)
}
}
}
if(! is.null(sampling_type)){
g_a <- g_a %>%
mutate(type = sampling_type)
}
var_name_base <- stringr::str_split(string = data_cols[p],
pattern = '__\\|')[[1]][1]
g_a <- g_a %>%
mutate(
type = paste0(type,
!!is_sensor[var_name_base]),
var = as.character(glue::glue('{ty}_{vb}',
ty = type,
vb = var_name_base)))
master[[prodname_ms]][[var_name_base]][[site_codes[i]]] <-
list('startdt' = g_a$starts,
'type' = g_a$type,
'interval' = g_a$interval)
g_a <- g_a %>%
mutate(interval = as.character(interval))
all_sites <- bind_rows(all_sites, g_a)
}
#include new prefixes in df column names
prefixed_varname <- all_sites$var[1]
dat_colname <- paste0(drop_var_prefix(prefixed_varname),
'__|dat')
flg_colname <- paste0(drop_var_prefix(prefixed_varname),
'__|flg')
data_col_ind <- match(dat_colname,
colnames(df))
flag_col_ind <- match(flg_colname,
colnames(df))
colnames(df)[data_col_ind] <- paste0(prefixed_varname,
'__|dat')
colnames(df)[flag_col_ind] <- paste0(prefixed_varname,
'__|flg')
}
readr::write_file(jsonlite::toJSON(master), sampling_file)
return(df)
}
# user response helpers ####
get_response_1char <- function(msg,
possible_chars,
subsequent_prompt = FALSE,
response_from_file = NULL){
#msg: character. a message that will be used to prompt the user
#possible_chars: character vector of acceptable single-character responses
#subsequent prompt: not to be set directly. This is handled by
# get_response_mchar during recursion.
if(subsequent_prompt){
cat(paste('Please choose one of:',
paste(possible_chars,
collapse = ', '),
'\n> '))
} else {
cat(msg)
}
if(! is.null(response_from_file)){
ch <- as.character(readLines(con = response_from_file, 1))
rsps <- readLines(con = response_from_file)
rsps <- rsps[2:length(rsps)]
writeLines(rsps, con = response_from_file)
} else {
ch <- as.character(readLines(con = stdin(), 1))
}
if(length(ch) == 1 && ch %in% possible_chars){
return(ch)
} else {
get_response_1char(msg = msg,
possible_chars = possible_chars,
subsequent_prompt = TRUE)
}
}
get_response_mchar <- function(msg,
possible_resps,
allow_alphanumeric_response = TRUE,
subsequent_prompt = FALSE,
response_from_file = NULL){
#msg: character. a message that will be used to prompt the user
#possible_resps: character vector. If length 1, each character in the response
# will be required to match a character in possible_resps, and the return
# value will be a character vector of each single-character tokens in the
# response. If
# length > 1, the response will be required to match an element of
# possible_resps exactly, and the response will be returned as-is.
#allow_alphanumeric_response: logical. If FALSE, the response may not
# include both numerals and letters. Only applies when possible_resps
# has length 1.
#subsequent prompt: not to be set directly. This is handled by
# get_response_mchar during recursion.
split_by_character <- ifelse(length(possible_resps) == 1, TRUE, FALSE)
if(subsequent_prompt){
if(split_by_character){
pr <- strsplit(possible_resps, split = '')[[1]]
} else {
pr <- possible_resps
}
cat(paste('Your options are:',
paste(pr,
collapse = ', '),
'\n> '))
} else {
cat(msg)
}
if(! is.null(response_from_file)){
chs <- as.character(readLines(con = response_from_file, 1))
rsps <- readLines(con = response_from_file)
rsps <- rsps[2:length(rsps)]
writeLines(rsps, con = response_from_file)
} else {
chs <- as.character(readLines(con = stdin(), 1))
}
if(! allow_alphanumeric_response &&
split_by_character &&
grepl('[0-9]', chs) &&
grepl('[a-zA-Z]', chs)){
cat('Response may not include both letters and numbers.\n> ')
resp <- get_response_mchar(
msg = msg,
possible_resps = possible_resps,
allow_alphanumeric_response = allow_alphanumeric_response,
subsequent_prompt = FALSE)
return(resp)
}
if(length(chs)){
if(split_by_character){
if(length(possible_resps) != 1){
stop('possible_resps must be length 1 if split_by_character is TRUE')
}
chs <- strsplit(chs, split = '')[[1]]
possible_resps_split <- strsplit(possible_resps, split = '')[[1]]
if(all(chs %in% possible_resps_split)){
return(chs)
}
} else {
if(length(possible_resps) < 2){
stop('possible_resps must have length > 1 if split_by_character is FALSE')
}
if(any(possible_resps == chs)){
return(chs)
}
}
}
resp <- get_response_mchar(
msg = msg,
possible_resps = possible_resps,
allow_alphanumeric_response = allow_alphanumeric_response,
subsequent_prompt = TRUE)
return(resp)
}
get_response_int <- function(msg,
min_val,
max_val,
subsequent_prompt = FALSE,
response_from_file = NULL){
#msg: character. a message that will be used to prompt the user
#min_val: int. minimum allowable value, inclusive
#max_val: int. maximum allowable value, inclusive
#subsequent prompt: not to be set directly. This is handled by
# get_response_int during recursion.
if(subsequent_prompt){
cat(glue::glue('Please choose an integer in the range [{minv}, {maxv}].',
minv = min_val,
maxv = max_val))
} else {
cat(msg)
}
if(! is.null(response_from_file)){
nm <- as.numeric(as.character(readLines(con = response_from_file, 1)))
rsps <- readLines(con = response_from_file)
rsps <- rsps[2:length(rsps)]
writeLines(rsps, con = response_from_file)
} else {
nm <- as.numeric(as.character(readLines(con = stdin(), 1)))
}
if(nm %% 1 == 0 && nm >= min_val && nm <= max_val){
return(nm)
} else {
get_response_int(msg = msg,
min_val = min_val,
max_val = max_val,
subsequent_prompt = TRUE)
}
}
get_response_enter <- function(msg,
response_from_file = NULL){
#only returns if ENTER is pressed (or if anything is passed by response_from_file
cat(msg)
if(! is.null(response_from_file)){
ch <- as.character(readLines(con = response_from_file, 1))
rsps <- readLines(con = response_from_file)
rsps <- rsps[2:length(rsps)]
writeLines(rsps, con = response_from_file)
} else {
ch <- as.character(readLines(con = stdin(), 1))
}
return(invisible(NULL))
}
# attribution helpers ####
match_ws_attr_attrib <- function(ws_attrib){
attr_sources_ <- ms_load_variables('ws_attr') %>%
filter(variable_code %in% ws_attrib)
attr_sources1 <- attr_sources_ %>%
filter(data_source %in% c('USGS', 'MODIS')) %>%
mutate(prodname = case_when(
data_source == 'USGS' ~ recode(
data_class,
!!!c(hydrology = 'bfi',
vegetation = 'start_season',
`parent material` = 'geochemical'
)),
data_source == 'MODIS' & data_class == 'landcover' ~ 'modis_igbp',
data_source == 'MODIS' & grepl('lai|fpar', variable_code) ~ 'lai',
grepl('^ndvi_', variable_code) ~ 'ndvi',
grepl('^evi_', variable_code) ~ 'evi',
data_source == 'MODIS' & grepl('cover', variable_code) ~ 'veg_cover',
data_source == 'MODIS' & grepl('global_500', variable_code) ~ 'gpp_global_500m',
TRUE ~ variable_code)
) %>%
dplyr::select(data_source, prodname) %>%
left_join(macrosheds::attrib_ws_data,
by = c(data_source = 'primary_source',
prodname = 'prodname')) %>%
distinct()
attr_sources <- attr_sources_ %>%
dplyr::select(data_source) %>%
filter(! is.na(data_source),
! data_source %in% c('USGS', 'MODIS')) %>%
distinct() %>%
left_join(macrosheds::attrib_ws_data,
by = c(data_source = 'primary_source')) %>%
distinct(data_source, .keep_all = TRUE) %>%
bind_rows(attr_sources1) %>%
dplyr::select(prodname, primary_source = data_source, retrieved_from_GEE,
doi, license, citation, url, addtl_info)
return(attr_sources)
}
format_acknowledgements <- function(ts_attrib, ws_attrib, all_ws_attr = FALSE){
ntw_join <- macrosheds::ms_site_data %>%
dplyr::select(domain, network_fullname, domain_fullname) %>%
distinct()
custom_acks <- ts_attrib %>%
filter(! is.na(IR_acknowledgement_text)) %>%
dplyr::select(domain, IR_acknowledgement_text) %>%
left_join(ntw_join, by = 'domain') %>%
distinct() %>%
mutate(network_fullname = ifelse(network_fullname == domain_fullname, '', network_fullname)) %>%
mutate(txt = paste0(domain_fullname, ' ', network_fullname, ': ', IR_acknowledgement_text)) %>%
pull(txt)
relevant_deets <- ts_attrib %>%
distinct(domain, funding) %>%
left_join(ntw_join, by = 'domain') %>%
distinct() %>%
# bind_rows(tibble(domain='a', domain_fullname = 'a', network_fullname='a', funding='NSF awards: 345, 3535')) %>%
mutate(network_fullname = ifelse(stringr::str_detect(domain_fullname, network_fullname), '', network_fullname)) %>%
mutate(txt = paste0(domain_fullname, ' ', network_fullname, ' (', funding, ')'),
txt = stringr::str_extract(txt, '.+?(?=(?: \\(NA\\)|$))')) %>%
pull(txt)
ndeets <- length(relevant_deets)
if(ndeets){
relevant_deets <- paste0(1:ndeets, '. ', relevant_deets)
sepr <- '.'
} else {
sepr <- ''
}
ack <- glue::glue('Primary data were provided by the following sources:\n{ack_ls}{sepr}',
ack_ls = paste(relevant_deets, collapse = '\n'))
if(length(custom_acks)){
ack <- paste(ack, paste(custom_acks, collapse = '\n'), sep = '\n')
}
if(all_ws_attr){
ack_ls2 <- paste(unique(macrosheds::attrib_ws_data$primary_source),
collapse = ', ')
} else if(length(ws_attrib)){
ack_ls2 <- ms_load_variables('ws_attr') %>%
filter(! is.na(data_class),
variable_code %in% ws_attrib) %>%
pull(data_source) %>%
unique() %>%
paste(collapse = ', ')
} else {
ack_ls2 <- ''
}
if(nchar(ack_ls2)){
ws_add <- glue::glue('Spatial summary data were derived from layers ',
'provided by:\n{ack_ls2}')
ack <- paste(ack, ws_add, sep = '\n')
}
return(ack)
}
format_bibliography <- function(ts_attrib, ws_attrib, all_ws_attr = FALSE){
#organize bibtex records
bts <- strsplit(macrosheds::ts_bib, '\n\n')[[1]]
bts[1] <- stringr::str_replace(bts[1], '\\\n', '')
bts <- grep('^(?:@misc|@article)', bts, value = TRUE)
authors <- stringr::str_match(bts, 'author = \\{(.+?)\\},\\\n')[, 2]
authors <- stringr::str_replace_all(authors, ' and ', '')
authors <- stringr::str_remove_all(authors, '[[[:punct:]] ]')
year <- stringr::str_match(bts, 'year = \\{([0-9]{4})\\},?\\\n')[, 2]
title <- stringr::str_match(bts, '\\\ttitle = \\{(.+?)(?=(?:\\.| ver |\\},\\\n|$))')[, 2]
title <- stringr::str_remove_all(title, '[[[:punct:]] ]')
title <- tolower(title)
year[is.na(year)] <- 'none'
#organize formatted citations
extracts <- ts_attrib %>%
filter(! is.na(citation)) %>%
distinct(citation) %>%
mutate(authors = stringr::str_extract(citation, '^.*(?= \\([0-9]{4}[a-z]*\\))'),
authors = stringr::str_remove_all(authors, '[ &]'),
authors = stringr::str_replace_all(authors, '\\.,', '.'),
authors = stringr::str_replace_all(authors, '[[[:punct:]] ]', ''),
pubyr = stringr::str_match(citation, '\\(([0-9]{4})[a-z]*\\)')[, 2],
title = stringr::str_extract(citation, '(?<=\\([0-9]{4}[a-z]{0,2}\\)\\. ).{1,999}?(?=(?:\\.| ver |\\},\\\n|$))'),
title = stringr::str_replace_all(title, '[[[:punct:]] ]', ''),
title = tolower(title)) %>%
dplyr::select(authors, pubyr, title)
#match records to citations
matches <- c()
for(i in seq_len(nrow(extracts))){
mch0 <- which(authors == extracts$authors[i])
mch1 <- which(title == extracts$title[i])
mch2 <- which(year == extracts$pubyr[i])
mch <- intersect(intersect(mch0, mch1), mch2)
if(length(mch) != 1) stop('error in bibliographic extract ', i)
matches <- c(matches, mch)
}
#retrieve bibtex for any matches
bibtex_out <- bts[matches]
#include the entry for MacroSheds itself, and for NWIS if applicable
ms_bib <- paste0(
"@article{vlah_etal_macrosheds_2023,\n\t",
"author = {Vlah, Michael J. and Rhea, Spencer and Bernhardt, Emily S. and Slaughter, Weston and Gubbins, Nick and DelVecchia, Amanda G. and Thellman, Audrey and Ross, Matthew R. V.},\n\t",
"title = {MacroSheds: A synthesis of long-term biogeochemical, hydroclimatic, and geospatial data from small watershed ecosystem studies},\n\t",
"journal = {Limnology and Oceanography Letters},\n\t",
"volume = {8},\n\t",
"number = {3},\n\t",
"pages = {419-452},\n\t",
"doi = {https://doi.org/10.1002/lol2.10325},\n\t",
"url = {https://aslopubs.onlinelibrary.wiley.com/doi/full/10.1002/lol2.10325},\n\t",
"year = {2023},\n}")
usgs_borrowing_domains <- c(
'baltimore', 'santa_barbara', 'luquillo', 'bear', 'sleepers',
'trout_lake', 'loch_vale', 'streampulse'
)
if(any(ts_attrib$domain %in% usgs_borrowing_domains) &&
'discharge' %in% ts_attrib$macrosheds_prodname){
nwisyr <- macrosheds::attrib_ts_data %>%
filter(domain == 'usgs',
macrosheds_prodname == 'discharge') %>%
pull(link_download_datetime) %>%
lubridate::year()
nwis_bib <- glue::glue(
'@misc{nwis_<<nwisyr>>,\n\t',
'title = {{National} {Water} {Information} {System} data available on the {World} {Wide} {Web} ({USGS} {Water} {Data} for the {Nation})},\n\t',
'publisher = {National Water Information System},\n\t',
'author = {{U.S. Geological Survey}},\n\t',
'year = {<<nwisyr>>},\n\t',
'url = {http://waterdata.usgs.gov/nwis/}\n}',
.open = '<<', .close = '>>', .trim = FALSE
)
bibtex_out <- c(bibtex_out, nwis_bib)
}
bibtex_out <- c(ms_bib, bibtex_out)
#tack on ws attr bibtex if requested
if(all_ws_attr){
bts_w <- strsplit(macrosheds::ws_bib, '\n\n')[[1]]
bts_w[1] <- stringr::str_replace(bts_w[1], '\\\n', '')
bibtex_out <- c(bibtex_out, bts_w)
} else if(length(ws_attrib)){
bts_w <- strsplit(macrosheds::ws_bib, '\n\n')[[1]]
bts_w[1] <- stringr::str_replace(bts_w[1], '\\\n', '')
ws_cites <- match_ws_attr_attrib(ws_attrib) %>%
pull(citation) %>%
stringr::str_extract('\\([12][0-9]{3}\\). ([^\\.]+)', group = 1)
keepers <- c()
for(b in bts_w){
for(cc in ws_cites){
if(grepl(cc, gsub('[{}]', '', b), fixed = TRUE)) keepers <- c(keepers, b)
}
}
bibtex_out <- c(bibtex_out, keepers)
}
bibtex_out <- paste0(bibtex_out, '\n')
return(bibtex_out)
}
format_IR <- function(ts_attrib, ws_attrib, all_ws_attr = FALSE, abide_by){
noncomm <- ts_attrib %>%
filter(grepl('NonCommercial', license_type)) %>%
dplyr::select(network, domain, macrosheds_prodname) %>%
distinct()
sharealike <- ts_attrib %>%
filter(grepl('ShareAlike', license_type)) %>%
mutate(may_disregard_with_permission = ! is.na(license_sharealike) & license_sharealike == 'p') %>%
dplyr::select(network, domain, macrosheds_prodname, may_disregard_with_permission, contact) %>%
distinct()
if(all_ws_attr || any(grepl('tcw', ws_attrib))){
sharealike <- bind_rows(
sharealike,
tibble(network = NA, domain = NA, macrosheds_prodname = 'tcw (watershed attribute)',
may_disregard_with_permission = FALSE, contact = 'https://www.bdi.ox.ac.uk/research/malaria-atlas-project'))
}
notify_intent_s <- ts_attrib %>%
filter(IR_notify_of_intentions == 's') %>%
dplyr::select(network, domain, macrosheds_prodname, contact) %>%
distinct()
notify_intent_m <- ts_attrib %>%
filter(IR_notify_of_intentions == 'm') %>%
dplyr::select(network, domain, macrosheds_prodname, contact) %>%
distinct()
notify_dist_s <- ts_attrib %>%
filter(IR_notify_on_distribution == 's') %>%
dplyr::select(network, domain, macrosheds_prodname, contact) %>%
distinct()
notify_dist_m <- ts_attrib %>%
filter(IR_notify_on_distribution == 'm') %>%
dplyr::select(network, domain, macrosheds_prodname, contact) %>%
distinct()
provide_access_s <- ts_attrib %>%
filter(IR_provide_online_access == 's') %>%
dplyr::select(network, domain, macrosheds_prodname, contact) %>%
distinct()
provide_access_m <- ts_attrib %>%
filter(IR_provide_online_access == 'm') %>%
dplyr::select(network, domain, macrosheds_prodname, contact) %>%
distinct()
consult_s <- ts_attrib %>%
filter(IR_collaboration_consultation == 's') %>%
dplyr::select(network, domain, macrosheds_prodname, contact) %>%
distinct()
consult_m <- ts_attrib %>%
filter(IR_collaboration_consultation == 'm') %>%
dplyr::select(network, domain, macrosheds_prodname, contact) %>%
distinct()
ir <- list()
ir_explanations <- c('A noncommercial license means you cannot profit from derivative works.',
'A share-alike license means derivative works must propagate the original license terms. If may_disregard_with_permission is TRUE, you may ask the primary source contact for permission to use your own license.',
'notify_of_intent_S means the primary source has requested notice of any plans to publish derivative works that use their data.',
'notify_of_intent_M means the primary source requires notice of any plans to publish derivative works that use their data.',
'notify_on_distribution_S means the primary source has requested that they be informed of any publications resulting from their data.',
'notify_on_distribution_M means the primary source requires that they be informed of any publications resulting from their data.',
'provide_access_S means the primary source requests online access to any publications resulting from their data.',
'provide_access_M means the primary source requires online access to any publications resulting from their data.',
"consult_or_collab_S means the primary source requests consultation and/or collaboration where reasonable (e.g. if you're only using data from one or two domains).",
"consult_or_collab_S means the primary source requires opportunities for consultation and/or collaboration where reasonable (e.g. if you're only using data from one or two domains).")
ir$noncommercial_license <- noncomm
ir$sharealike_license <- sharealike
ir$notify_of_intent_S <- if(abide_by == 'suggestions') notify_intent_s else tibble()
ir$notify_of_intent_M <- notify_intent_m
ir$notify_on_distribution_S <- if(abide_by == 'suggestions') notify_dist_s else tibble()
ir$notify_on_distribution_M <- notify_dist_m
ir$provide_access_S <- if(abide_by == 'suggestions') provide_access_s else tibble()
ir$provide_access_M <- provide_access_m
ir$consult_or_collab_S <- if(abide_by == 'suggestions') consult_s else tibble()
ir$consult_or_collab_M <- consult_m
applicable <- sapply(ir, function(x) nrow(x) != 0)
ir <- ir[applicable]
ir_explanations <- ir_explanations[applicable]
ir <- list(intellectual_rights = ir,
IR_explanations = ir_explanations)
return(ir)
}
attrib_output_write <- function(attrib, write_to_dir){
write_to_dir <- file.path(write_to_dir, 'macrosheds_attribution_information')
dir.create(write_to_dir)
readr::write_lines(attrib$acknowledgements,
file.path(write_to_dir, 'acknowledgements.txt'))
if(length(attrib$intellectual_rights_explanations)){
readr::write_lines(attrib$intellectual_rights_explanations, sep = '\n\n',
file.path(write_to_dir, 'intellectual_rights_definitions.txt'))
}
if(length(attrib$intellectual_rights_notifications)){
sink(file = file.path(write_to_dir, 'intellectual_rights_notifications.txt'))
cat('----INTELLECTUAL RIGHTS NOTIFICATIONS----\n\n')
print(lapply(attrib$intellectual_rights_notifications, as.data.frame))
sink()
}
readr::write_lines(attrib$bibliography, sep = '\n',
file.path(write_to_dir, 'ms_bibliography.bib'))
}
# misc helpers ####
expo_backoff <- function(expr,
max_attempts = 10,
verbose = TRUE){
for(attempt_i in seq_len(max_attempts)){
results <- try(expr = expr,
silent = TRUE)
if(inherits(results, 'try-error')){
if(attempt_i == max_attempts){
stop(attr(results, 'condition'))
}
backoff <- runif(n = 1,
min = 0,
max = 2^attempt_i - 1)
if(verbose){
message(glue::glue("Backing off for ", round(backoff, 1), " seconds."))
}
Sys.sleep(backoff)
} else {
# if(verbose){
# print(paste0("Request succeeded after ", attempt_i, " attempt(s)."))
# }
break
}
}
return(results)
}
sd_or_0 <- function(x, na.rm = FALSE){
#Only used to bypass the tyranny of the errors package not letting
#me take the mean of an errors object of length 1 without setting the
#uncertainty to 0
x <- if(is.vector(x) || is.factor(x)) x else as.double(x)
if(length(x) == 1) return(0)
x <- sqrt(var(x, na.rm = na.rm))
}
populate_implicit_NAs <- function(d,
interval,
val_fill = NA,
edges_only = FALSE){
#TODO: this would be more flexible if we could pass column names as
# positional args and use them in group_by and mutate
#d: a ms tibble with at minimum datetime, site_code, and var columns
#interval: the interval along which to populate missing values. (must be
# either '15 min' or '1 day'.
#val_fill: character or NA. the token with which to populate missing
# elements of the `val` column. All other columns will be populated
# invariably with NA or 0. See details.
#edges_only: logical. if TRUE, only two filler rows will be inserted into each
# gap, one just after the gap begins and the other just before the gap ends.
# If FALSE (the default), the gap will be fully populated according to
# the methods outlined in the details section.
#this function makes implicit missing timeseries records explicit,
# by populating rows so that the datetime column is complete
# with respect to the sampling interval. In other words, if
# samples are taken every 15 minutes, but some samples are skipped
# (rows not present), this will create those rows. If ms_status or
# ms_interp columns are present, their new records will be populated
# with 0s. The val column will be populated with whatever is passed to
# val_fill. Any other columns will be populated with NAs.
#returns d, complete with new rows, sorted by site_code, then var, then datetime
complete_d <- d %>%
mutate(fill_marker = 1) %>%
group_by(site_code, var) %>%
tidyr::complete(datetime = seq(min(datetime),
max(datetime),
by = interval)) %>%
# mutate(site_code = .$site_code[1],
# var = .$var[1]) %>%
ungroup() %>%
arrange(site_code, var, datetime) %>%
dplyr::select(datetime, site_code, var, everything())
if(! any(is.na(complete_d$fill_marker))) return(d)
if(! is.na(val_fill)){
complete_d$val[is.na(complete_d$fill_marker)] <- val_fill
}
if('ms_status' %in% colnames(complete_d)){
complete_d$ms_status[is.na(complete_d$ms_status)] <- 0
}
if('ms_interp' %in% colnames(complete_d)){
complete_d$ms_interp[is.na(complete_d$ms_interp)] <- 0
}
if(edges_only){
midgap_rows <- rle2(is.na(complete_d$fill_marker)) %>%
filter(values == TRUE) %>%
# if(nrow(fill_runs) == 0) return(d)
# midgap_rows <- fill_runs %>%
dplyr::select(starts, stops) %>%
{purrr::map2(.x = .$starts,
.y = .$stops,
~seq(.x, .y))} %>%
purrr::map(~( if(length(.x) <= 2)
{
return(NULL)
} else {
return(.x[2:(length(.x) - 1)])
}
)) %>%
unlist()
if(! is.null(midgap_rows)){
complete_d <- slice(complete_d,
-midgap_rows)
}
}
complete_d$fill_marker <- NULL
return(complete_d)
}
numeric_any <- function(num_vec){
return(as.numeric(any(as.logical(num_vec))))
}
numeric_any_v <- function(...){ #attack of the ellipses
#...: numeric vectors of equal length. should be just 0s and 1s, but
# integers other than 1 are also considered TRUE by as.logical()
#the vectorized version of numeric_any. good for stuff like:
# mutate(ms_status = numeric_any(c(ms_status_x, ms_status_flow)))
#returns a single vector of the same length as arguments
#this func could be useful in global situations
numeric_any_positional <- function(...) numeric_any(c(...))
numeric_any_elementwise <- function(...){
Map(function(...) numeric_any_positional(...), ...)
}
out <- do.call(numeric_any_elementwise,
args = list(...)) %>%
unlist()
if(is.null(out)) out <- numeric()
return(out)
}
check_suggested_pkgs <- function(pkgs){
#pkgs: character vector of package names
loaded <- rep(NA, length(pkgs))
for(i in seq_along(pkgs)){
loaded[i] <- sw(require(pkgs[i], quietly = TRUE, character.only = TRUE))
}
if(any(! loaded)){
stop(glue::glue('Additional package(s) required to use this function. ',
'Run install.packages(c("{p}")) and try again.',
p = paste(pkgs[! loaded], collapse = '", "')))
}
}
list_all_product_dirs <- function(macrosheds_root, prodname){
prodname_dirs <- list.dirs(path = macrosheds_root,
full.names = TRUE,
recursive = TRUE)
prodname_dirs <- grep(pattern = paste0('/', prodname),
x = prodname_dirs,
value = TRUE)
return(prodname_dirs)
}
Mode <- function(x, na.rm = TRUE){
if(na.rm){
x <- na.omit(x)
}
ux <- unique(x)
mode_out <- ux[which.max(tabulate(match(x, ux)))]
return(mode_out)
}
mean_or_x <- function(x, na.rm = FALSE){
# used to bypass the tyranny of the errors package not letting
# someone take the mean of an errors object of length 1. this func returns the
# original value if the group is length one, and the mean otherwise
if(length(x) == 1) return(x)
x <- mean(x, na.rm = na.rm)
return(x)
}
ms_drop_var_prefix_ <- function(x){
if(any(is.na(stringr::str_match(x, '^[IGa-z][SNa-z]_.+')))){
return(x)
}
unprefixed <- substr(x, 4, nchar(x))
return(unprefixed)
}
ms_extract_var_prefix_ <- function(x){
if(any(is.na(stringr::str_match(x, '^[IGa-z][SNa-z]_.+')))){
stop('x is not prefixed.')
}
prefix <- substr(x, 1, 2)
return(prefix)
}
validate_version <- function(macrosheds_root, version, warn = TRUE){
root_files <- list.files(macrosheds_root,
full.names = TRUE)
held_versions <- grep('/v[0-9]+$', root_files, value = TRUE) %>%
stringr::str_extract('/v([0-9]+)$', group = 1)
if(version == 'latest'){
if(! length(held_versions)){
stop('No data detected in macrosheds_root. This should be the directory you specified when you ran ms_download_core_data or ms_download_ws_attr.')
}
version <- max(as.numeric(held_versions))
}
v_num <- sw(as.numeric(version))
if(is.na(v_num) || v_num <= 0 || v_num %% 1 != 0){
stop('`version` must be a positive integer, or "latest".')
}
root_vsn <- file.path(macrosheds_root, paste0('v', version))
version <- as.character(version)
figshare_codes <- macrosheds::file_ids_for_r_package #loaded in R/sysdata.rda, which is written in postprocessing
avail_vsns <- figshare_codes %>%
dplyr::select(starts_with('fig_code_')) %>%
rename_with(~sub('fig_code_v', '', .)) %>%
colnames()
latest_avail <- max(as.numeric(avail_vsns))
if(warn && as.numeric(version) < latest_avail){
warning('Data from MacroSheds v', version, ' were requested, but v', latest_avail, ' is available. See ms_download_core_data() and ms_download_ws_attr().')
}
if(as.numeric(version) > latest_avail){
stop('Version ', version, ' data were requested, but ', latest_avail,
' is the highest version of the MacroSheds dataset known to this version of the macrosheds package.')
}
if(as.numeric(version) > max(as.numeric(held_versions))){
stop('Version ', version, ' data were requested, but v', version,
' is not present in macrosheds_root. see ms_download_core_data()/ms_download_ws_attr() or check macrosheds_root.')
}
return(root_vsn)
}
# flux helpers ####
wtr_yr <- function(dates, start_month = 10){
d1 <- as.Date(dates)
offset <- ifelse(as.integer(format(d1, '%m')) < start_month, 0, 1)
adj_year <- as.integer(format(d1, '%Y')) + offset
return(adj_year)
}
warn_sum <- function(x) {
# length of record
n_x <- length(x)
# number of NAs in record
n_na <- length(x[is.na(x)])
# number of inf values in record
n_inf <- length(x[is.infinite(x)])
# warn user
writeLines(paste("WARNING: infinite values found in flux record, ignoring during SUM",
"\n count NA:", n_na,
"\n count Inf:", n_inf))
xsum <- sum(x[!is.infinite(x)], na.rm = TRUE)
return(xsum)
}
carry_flags <- function(raw_q_df, raw_conc_df, target_solute = NULL, target_year = NULL, period = NULL){
#### set up ratio functions #####
# interp ratio
calc_interp_ratio <- function(trimmed_df, period = NULL){
if(period == 'annual'){
no_interp <- trimmed_df %>%
filter(ms_interp == 0) %>%
count() %>%
pull(n)
interp <- trimmed_df %>%
filter(ms_interp ==1) %>%
count() %>%
pull(n)
interp_ratio <- interp/(interp+no_interp)
return(interp_ratio)
} else if(period == 'month'){
interp_ratio <- trimmed_df %>%
mutate(month = lubridate::month(date)) %>%
group_by(month) %>%
summarize(n_interp = sum(ms_interp == 1),
n_no_interp = sum(ms_interp == 0)) %>%
mutate(interp_ratio = n_interp/(n_interp+n_no_interp)) %>%
ungroup() %>%
dplyr::select(month, interp_ratio)
return(interp_ratio)
} else {
message('Specify period as month or annual.')
}
}
# status ratio
calc_status_ratio <- function(trimmed_df, period = NULL){
if(period == 'annual'){
no_status <- trimmed_df %>%
filter(ms_status == 0) %>%
count() %>%
pull(n)
status <- trimmed_df %>%
filter(ms_interp == 1) %>%
count() %>%
pull(n)
status_ratio <- status/(status+no_status)
return(status_ratio)}
else if (period == 'month'){
status_ratio <- trimmed_df %>%
mutate(month = lubridate::month(date)) %>%
group_by(month) %>%
summarize(n_stat = sum(ms_status == 1),
n_no_stat = sum(ms_status == 0)) %>%
mutate(status_ratio = n_stat/(n_stat+n_no_stat)) %>%
ungroup() %>%
dplyr::select(month, status_ratio)
return(status_ratio)
} else {
message('Specify period as month or annual.')
}
}
# missing ratio
calc_missing_ratio <- function(trimmed_df, period = NULL){
if(period == 'annual'){
present <- trimmed_df %>%
dplyr::select(val) %>%
na.omit() %>%
nrow()
missing_ratio <- (365-present)/365
return(missing_ratio)
}else if(period == 'month'){
missing_ratio <- trimmed_df %>%
mutate(month = lubridate::month(date)) %>%
dplyr::select(month, date, val) %>%
group_by(month) %>%
na.omit() %>%
summarize(n = n()) %>%
mutate(full_days = lubridate::days_in_month(month),
missing_ratio = (full_days-n)/full_days) %>%
ungroup() %>%
dplyr::select(month, missing_ratio)
return(missing_ratio)
} else {
message('Specify period as month or annual.')
}
}
### run functions on data ####
if(period == 'annual'){
year_conc_df <- raw_conc_df %>%
mutate(wy = wtr_yr(date, start_month = 10)) %>%
filter(wy == target_year,
target_solute == target_solute)
conc_tbl <- tibble(wy = target_year, var = target_solute,
ms_interp = calc_interp_ratio(trimmed_df = year_conc_df, period = period),
ms_status = calc_status_ratio(trimmed_df = year_conc_df, period = period),
ms_missing = calc_missing_ratio(trimmed_df = year_conc_df, period = period),
)
year_q_df <- raw_q_df %>%
mutate(wy = wtr_yr(date, start_month = 10)) %>%
filter(wy == target_year)
q_tbl <- tibble(wy = target_year, var = unique(year_q_df$var)[1],
ms_interp = calc_interp_ratio(trimmed_df = year_q_df, period = period),
ms_status = calc_status_ratio(trimmed_df = year_q_df, period = period),
ms_missing = calc_missing_ratio(trimmed_df = year_q_df, period = period)
)
out_frame <- rbind(conc_tbl, q_tbl)
return(out_frame)
} else
if(period == 'month'){
year_conc_df <- raw_conc_df %>%
mutate(wy = wtr_yr(date, start_month = 10)) %>%
filter(wy == target_year,
target_solute == target_solute)
ms_interp = calc_interp_ratio(trimmed_df = year_conc_df, period = period)
ms_status = calc_status_ratio(trimmed_df = year_conc_df, period = period)
ms_missing = calc_missing_ratio(trimmed_df = year_conc_df, period = period)
conc_tbl <- full_join(ms_interp, ms_status, by = 'month') %>%
full_join(ms_missing, by = 'month') %>%
mutate(wy = target_year,
var = target_solute)
year_q_df <- raw_q_df %>%
mutate(wy = wtr_yr(date, start_month = 10)) %>%
filter(wy == target_year)
ms_interp = calc_interp_ratio(trimmed_df = year_q_df, period = period)
ms_status = calc_status_ratio(trimmed_df = year_q_df, period = period)
ms_missing = calc_missing_ratio(trimmed_df = year_q_df, period = period)
q_tbl <- full_join(ms_interp, ms_status, by = 'month') %>%
full_join(ms_missing, by = 'month') %>%
mutate(wy = target_year,
var = unique(year_q_df$var)[1])
out_frame <- rbind(conc_tbl, q_tbl)
return(out_frame)
}
}
prep_raw_for_riverload <- function(chem_df, q_df, datecol = 'date'){
conv_q <- q_df %>%
mutate(datetime = as.POSIXct(get(datecol), format = '%Y-%m-%d %H:%M:%S', tz = 'UTC')) %>%
mutate(flow = q_lps * 0.001) %>% # convert lps to cubic meters per second)
dplyr::select(datetime, flow) %>%
arrange(datetime) %>%
data.frame()
if(! nrow(conv_q)) return(data.frame())
if(lubridate::month(conv_q$datetime[1]) == 9 & lubridate::day(conv_q$datetime[1]) == 30){
conv_q <- conv_q[-1, ]
}
conv_c <- chem_df %>%
mutate(datetime = as.POSIXct(get(datecol), format = '%Y-%m-%d %H:%M:%S', tz = 'UTC')) %>%
dplyr::select(datetime, conc) %>%
data.frame()
if(! nrow(conv_c)) return(data.frame())
db <- full_join(conv_q, conv_c, by = 'datetime') %>%
#filter(!is.na(flow)) %>%
arrange(datetime)
return(db)
}
calculate_pw <- function(chem_df, q_df, datecol = 'date', area = 1, period = NULL){
if(is.null(period)){
warning('parameter "period" unspecified; calculating annual flux.')
period <- 'annual'
}
rl_data <- prep_raw_for_riverload(chem_df = chem_df,
q_df = q_df,
datecol = datecol)
if(nrow(rl_data) %in% 0:1){
return(NA)
}
if(is.na(rl_data[1, 2])){
rl_data <- rl_data[-1, ]
}
if(period == 'annual'){
flux_from_pw <- sm(RiverLoad::method6(rl_data, ncomp = 1)) %>%
sum(.) / (1000 * area)
} else if(period == 'month'){
method6_month <- function(db, ncomp, period){
message('why is method6 rewritten for monthly pw load?')
browser()
n <- nrow(db)
interpolation <- data.frame(imputeTS::na_interpolation(db, 'linear'))
load <- data.frame(interpolation[, 2] * interpolation[, -c(1:2)])
difference <- matrix(nrow = (nrow(db) - 1), ncol = 1)
for(i in 1:(nrow(db) - 1)){
difference[i] <- difftime(db[i + 1, 1], db[i, 1], units = 'days')
}
loadtot <- cbind.data.frame(interpolation$datetime, load)
colnames(loadtot)[1] <- c('datetime')
loadtot[, 1] <- format(as.POSIXct(loadtot[, 1]), format = '%Y-%m')
forrow <- aggregate(loadtot[, 2] ~ datetime, loadtot, sum)
agg.dataC <- matrix(nrow = nrow(forrow), ncol = (ncomp))
for(i in 1:ncomp){
agg.data <- aggregate(loadtot[, i + 1] ~ datetime,
loadtot, sum)
agg.dataC[, i] <- as.matrix(agg.data[, 2])
}
colnames(agg.dataC) <- c(names(db)[3:(ncomp + 2)])
rownames(agg.dataC) <- forrow$datetime
return(agg.dataC)
# }
#
# colnames(agg.dataC) <- c(names(db)[3:(ncomp + 2)])
# rownames(agg.dataC) <- forrow$datetime
# return(agg.dataC)
}
##### apply #####
flux_from_pw <- method6_month(rl_data, ncomp = 1, period = period)
flux_from_pw <- tibble(date = rownames(flux_from_pw),
flux = (flux_from_pw[, 1] / (1000 * area)))
}
return(flux_from_pw)
}
calculate_beale <- function(chem_df, q_df, datecol = 'date', period = NULL, area = 1){
if(is.null(period)) {
warning('no period supplied, calculating annual flux')
period <- 'annual'
}
rl_data <- prep_raw_for_riverload(chem_df = chem_df,
q_df = q_df,
datecol = datecol)
if(nrow(rl_data) %in% 0:1){
return(NA)
}
if(is.na(rl_data[1, 2])){
rl_data <- rl_data[-1, ]
}
if(period == 'annual'){
flux_from_beale <- RiverLoad::beale.ratio(rl_data, ncomp = 1) %>%
sum(.) / (1000 * area)
} else if(period == 'month'){
flux_from_beale <- RiverLoad::beale.ratio(rl_data, ncomp = 1, period = period)
flux_from_beale <- tibble(date = rownames(flux_from_beale),
flux = (flux_from_beale[, 1] / (1000 * area)))
}
return(flux_from_beale)
}
calculate_rating <- function(chem_df, q_df, datecol = 'date', period = NULL, area = 1){
if(is.null(period)) {
warning('no period supplied, calculating annual flux')
period <- 'annual'
}
rl_data <- prep_raw_for_riverload(chem_df = chem_df, q_df = q_df, datecol = datecol)
out <- try({
if(period == 'annual'){
flux_from_reg <- RiverLoad::rating(rl_data, ncomp = 1) %>%
sum(.)/(1000*area)
} else {
if(period == 'month'){
flux_from_reg <- RiverLoad::rating(rl_data, ncomp = 1, period = period)
flux_from_reg <- tibble(date = rownames(flux_from_reg),
flux = (flux_from_reg[,1]/(1000*area)))
}
}
}, silent = TRUE)
if(inherits(out, 'try-error')) out <- NA
return(out)
}
calculate_wrtds <- function(chem_df, q_df, ws_size, lat, long, datecol = 'date',
agg = 'default', minNumObs = 2, minNumUncen = 2, gap_period = 730){
tryCatch(
expr = {
# default sums all daily flux values in df
egret_results <- adapt_ms_egret(chem_df, q_df, ws_size,
lat, long,
datecol = datecol,
minNumObs = minNumObs,
minNumUncen = minNumUncen,
gap_period = gap_period)
if(agg == 'default') {
flux_from_egret <- egret_results$Daily$FluxDay %>%
warn_sum(.)/(area)
} else if(agg == 'annual') {
flux_from_egret <- egret_results$Daily %>%
mutate(
wy = water_year(Date)
) %>%
group_by(wy) %>%
summarize(
flux = warn_sum(FluxDay)/(area)
)
} else if(agg == 'monthly') {
flux_from_egret <- egret_results$Daily %>%
mutate(
wy = water_year(Date),
month = lubridate::month(Date)
) %>%
group_by(wy, month) %>%
summarize(
flux = warn_sum(FluxDay)/(area)
)
}
},
error = function(e){
print('ERROR: WRTDS failed to run')
return(NA)
})
return(flux_from_egret)
}
generate_residual_corrected_conc <- function(chem_df, q_df, datecol = 'date', sitecol = 'site_no'){
# first make c:q rating
paired_df <- q_df %>%
full_join(chem_df, by = c(datecol, sitecol, 'wy')) %>%
na.omit() %>%
filter(q_lps > 0,
is.finite(q_lps))
if(nrow(paired_df) <= 2) return(NA)
q_log <- log10(paired_df$q_lps)
c_log <- log10(paired_df$conc)
model_data <- tibble(c_log, q_log) %>%
filter(is.finite(c_log),
is.finite(q_log)) %>%
na.omit()
rating <- sw(summary(lm(model_data$c_log ~ model_data$q_log,
singular.ok = TRUE)))
# extract model info
intercept <- rating$coefficients[1]
slope <- rating$coefficients[2]
# create modeled c, calc residuals, adjust modeled c by interpolated residuals
site_code <- paired_df$site_code[[1]]
rating_filled_df <- q_df %>%
mutate(conc_reg = 10^(intercept + (slope * log10(q_lps)))) %>%
dplyr::select(all_of(datecol), conc_reg, q_lps) %>%
full_join(chem_df, by = datecol) %>%
dplyr::select(site_code, all_of(datecol), conc, conc_reg, q_lps, wy) %>%
mutate(res = conc_reg - conc,
res = imputeTS::na_interpolation(res),
conc_com = conc_reg - res,
site_code = !!site_code,
wy = wtr_yr(get(datecol), start_month = 10))
rating_filled_df$conc_com[! is.finite(rating_filled_df$conc_com)] <- 0
return(rating_filled_df)
}
calculate_composite_from_resid_corrected <- function(rating_filled_df,
site_no = 'site_no',
period = NULL,
area = 1){
if(is.null(period)){
warning('no period supplied, calculating annual flux')
period <- 'annual'
}
if(period == 'annual'){
flux_from_comp <- rating_filled_df %>%
dplyr::select(date, conc_com, q_lps, wy) %>%
tidyr::drop_na() %>%
mutate(flux = conc_com * q_lps * 86400 * (1 / area) * 1e-6) %>%
group_by(wy) %>%
summarize(flux = sum(flux),
.groups = 'drop') %>%
mutate(site_code = site_no)
} else if(period == 'month'){
flux_from_comp <- rating_filled_df %>%
mutate(month = lubridate::month(date),
flux = conc_com * q_lps * 86400 * (1 / area) * 1e-6) %>%
group_by(wy, month) %>%
summarize(date = max(date),
flux = sum(flux),
.droups = 'drop')
}
return(flux_from_comp)
}
detect_record_break <- function(data){
data_time <- data %>%
dplyr::select(Date, Julian) %>%
# how many days between this day and the next
mutate(days_gap = lead(Julian, 1) - Julian)
return(data_time)
}
get_break_dates <- function(data, gap_period = 730){
start = list()
end = list()
index = 1
# default 730 bc USGS says breaks below 2 years probably fine
for(i in 1:nrow(data)) {
gap <- data$days_gap[i]
if(!is.na(gap)) {
if(gap > gap_period) {
start[[index]] = c(data$Date[i])
end[[index]] = c(data$Date[i+1])
index = index + 1
}
}
}
break_dates = list('start' = start, 'end'= end)
return(break_dates)
}
adapt_ms_egret <- function(chem_df, q_df, ws_size, lat, long,
site_data = NULL, kalman = FALSE,
datecol = 'date', minNumObs = 2, minNumUncen = 2, gap_period = 730){
check_suggested_pkgs(c('EGRET'))
# TODO: reorder site data args to make fully optional
get_MonthSeq <- function(dates){
## dates <- Sample_file$Date
years <- lubridate::year(dates)-1850
MonthSeq <- years*12
MonthSeq <- MonthSeq + lubridate::month(dates)
return(MonthSeq)
}
decimalDate <- function(rawData){
dateTime <- as.POSIXlt(rawData)
year <- dateTime$year + 1900
startYear <- as.POSIXct(paste0(year,"-01-01 00:00"))
endYear <- as.POSIXct(paste0(year+1,"-01-01 00:00"))
DecYear <- year + as.numeric(difftime(dateTime, startYear, units = "secs"))/as.numeric(difftime(endYear, startYear, units = "secs"))
return(DecYear)
}
wyday <- function(dates, wy_type = 'usgs') {
# get earliest and latest date from series
start_date <- min(dates)
end_date <- max(dates)
# get the normal year, and make the start of WY date from that year
year_firsthalf <- year(start_date)
wy_start <- paste0(year_firsthalf, '-10-01')
wy_end <- paste0(year_firsthalf+1, '-09-30')
# gap between start WY and end of normal year, always 91 days
correction_val <- yday('2021-12-31') - yday('2021-10-01')
# adjust yday from start of gregorian year to (usgs) WY year
wy_firsthalf <- yday(dates[lubridate::month(dates) %in% c(10, 11, 12)]) - yday(wy_start) + 1
wy_secondhalf <- yday(dates[!lubridate::month(dates) %in% c(10, 11, 12)]) + correction_val + 1
wydays <- c(wy_firsthalf, wy_secondhalf)
return(wydays)
}
decimalDateWY <- function(dates, wy_type = 'usgs') {
# get earliest and latest date from series
start_date <- min(dates)
end_date <- max(dates)
# get the normal year, and make the start of WY date from that year
year_firsthalf <- year(start_date)
wy_start <- paste0(year_firsthalf, '-10-01')
wy_end <- paste0(year_firsthalf+1, '-09-30')
wy_seq <- seq(as.Date(wy_start), as.Date(wy_end), by = 1)
wydays <- wyday(dates) - 1
wylength <- length(wy_seq)
ydec <- as.integer(as.character(water_year(dates, wy_type))) + wydays/wylength
return(ydec)
}
# TODO: rename with real descriptive name
enlightened_yday <- function(dates, wy_type = 'usgs') {
# get earliest and latest date from series
start_date <- min(dates)
end_date <- max(dates)
# get the normal year, and make the start of WY date from that year
wy_first <- year(start_date)
wy_start <- paste0(wy_first, '-10-01')
wy_end <- paste0(wy_first+1, '-09-30')
# unenlightened yday of date vector
wy_firsthalf <- yday(dates[lubridate::month(dates) %in% c(10, 11, 12)])
wy_secondhalf <- yday(dates[!lubridate::month(dates) %in% c(10, 11, 12)])
# if first half of WY is a leap year,
if(366 %in% wy_firsthalf) {
## print('first half of WY is leap year, adjusting')
wy_firsthalf = wy_firsthalf - 1
} else if(366 %in% wy_secondhalf) {
## print('second half of WY is leap year, no action')
} else {
## print('neither side of water year is a leap year')
invisible()
}
# adjust yday from start of gregorian year to (usgs) WY year
wy_ydays <- c(wy_firsthalf, wy_secondhalf)
return(wy_ydays)
}
get_start_end <- function(d){
start_date <- min(d$datetime)
start_year <- lubridate::year(start_date)
start_wy <- ifelse(lubridate::month(start_date) %in% c(10, 11, 12), start_year+1, start_year)
filter_start_date <- lubridate::ymd(paste0(start_wy-1, '-10-01'))
end_date <- max(d$datetime)
end_year <- lubridate::year(end_date)
end_wy <- ifelse(lubridate::month(end_date) %in% c(10, 11, 12), end_year+1, end_year)
filter_end_date <- lubridate::ymd(paste0(end_wy, '-10-01'))
fin_dates <- c(filter_start_date, filter_end_date)
return(fin_dates)
}
ms_run_egret_adapt <- function(stream_chemistry, discharge, prep_data = TRUE,
run_egret = TRUE, kalman = FALSE, quiet = FALSE,
site_data = NULL, min_q_method = 'USGS', minNumObs = 2, minNumUncen = 2, gap_period = 730){
# Checks
if(any(! c('site_code', 'var', 'val', 'date') %in% names(stream_chemistry))){
stop('The argument to `stream_chemistry` must be in MacroSheds format (required columns: date, site_code, val, var).')
}
if(any(! c('site_code', 'var', 'val', 'date') %in% names(discharge))){
stop('The argument to `discharge` must be in MacroSheds format (required columns: date, site_code, val, var).')
}
if(! length(unique(ms_drop_var_prefix_(stream_chemistry$var))) == 1){
stop('Only one chemistry variable can be run at a time.')
}
if((! length(unique(stream_chemistry$site_code)) == 1) || (! length(unique(discharge$site_code)) == 1)){
stop('Only one site can be run in EGRET at a time')
}
if(! unique(stream_chemistry$site_code) == unique(discharge$site_code)){
stop('stream_chemistry and discharge must contain the same site_code')
}
# Get var and site info
if(is.null(site_data)){
site_data <- macrosheds::ms_load_sites()
if(! unique(stream_chemistry$site_code) %in% site_data$site_code){
stop('This site is not in the MacroSheds dataset, provide a site_data table with the names: site_code, ws_area_ha, latitude, longitude')
}
} else{
if(!all(names(site_data) %in% c('site_code', 'ws_area_ha', 'latitude', 'longitude', 'site_type'))){
stop('If you are not using a macrosheds site, you must supply site_data with a tibble with the names: site_code, ws_area_ha, latitude, longitude')
}
site_data <- site_data %>%
mutate(site_type = 'stream_gauge')
}
## ms_vars <- read.csv('data/ms/macrosheds_vardata.csv')
ms_vars <- macrosheds::ms_load_variables()
site_code <- unique(stream_chemistry$site_code)
#### Prep Files ####
if(prep_data){
# EGRET does not like NAs
stream_chemistry <- stream_chemistry %>%
filter(!is.na(val))
discharge <- discharge %>%
filter(!is.na(val))
# Egret can't accept 0s in the column for min val (either hack egret or do this or
# look for detection limits)
# TODO: use DL system to replace min vals, get_hdl()
# TODO: AND/OR use EGRET built in uncertainty system, ConcLow = NA, ConcHigh = DL
## min_chem <- stream_chemistry %>%
## filter(val > 0,
## !is.na(val),
## !is.infinite(val),
## !is.null(val)) %>%
## pull(val) %>%
## # NOTE: upsettingly, use of errors package makes min() now return NA instead of Inf
## min()
min_chem <- min(as.numeric(stream_chemistry[!is.infinite(stream_chemistry$val) & !is.na(stream_chemistry$val),]$val))
# NOTE: changed so now we are forced to have a real number minimum value
# NOTE: change to no conditional- min chem should never be infinite
## if(!is.infinite(min_chem)){
## }
stream_chemistry <- stream_chemistry %>%
mutate(val = ifelse(val == 0, !!min_chem, val))
# Filter so there is only Q going into the model that also has chem
stream_chemistry <- stream_chemistry %>%
mutate(year = lubridate::year(datetime),
month = lubridate::month(datetime)) %>%
mutate(waterYear = ifelse(month %in% c(10, 11, 12), year+1, year))
# TODO: filter isn't even active... and why 6?
# Get years with at least 6 chemistry samples (bi-monthly sampling is a
# reasonable requirement?)
years_with_data <- stream_chemistry %>%
group_by(waterYear) %>%
summarize(n = n()) %>%
# filter(n >= 6) %>%
pull(waterYear)
# Filter Q and chem to overlap in a water-year
chem_dates <- get_start_end(stream_chemistry)
q_dates <- get_start_end(discharge)
start_date <- if(chem_dates[1] > q_dates[1]) { chem_dates[1] } else{ q_dates[1] }
end_date <- if(chem_dates[2] < q_dates[2]) { chem_dates[2] } else{ q_dates[2] }
discharge <- discharge %>%
filter(datetime >= !!start_date,
datetime <= !!end_date)
stream_chemistry <- stream_chemistry %>%
filter(datetime >= !!start_date,
datetime <= !!end_date)
# Filter discharge to only include water years with chemistry sampling
discharge <- discharge %>%
mutate(year = lubridate::year(datetime),
month = lubridate::month(datetime)) %>%
mutate(waterYear = ifelse(month %in% c(10, 11, 12), year+1, year)) %>%
filter(waterYear %in% !!years_with_data)
# Remove times when there is a chem sample and no Q reported
samples_to_remove <- left_join(stream_chemistry, discharge, by = 'datetime') %>%
filter(is.na(val.y)) %>%
pull(datetime)
stream_chemistry <- stream_chemistry %>%
filter(!datetime %in% samples_to_remove)
}
# last redundant assurance of no NA, Inf, or 0 values
stream_chemistry <- stream_chemistry %>%
filter(!is.na(val),
!is.infinite(val),
!is.null(val),
val > 0)
n_records <- length(stream_chemistry$val)
n_years <- length(unique(stream_chemistry$wy))
# if n_records < 100, change minNumObs accordingly
if(n_records < 100) {
minNumObs = n_records - ceiling(n_records*0.1)
minNumUncen = ceiling(minNumObs/2)
warning(paste('number of samples less than 100, modifying WRTDS arguments',
'\n minNumObs:', minNumObs,
'\n minNumUncen', minNumUncen))
}
writeLines(paste('stream chemistry dataframe being passed into EGRET sample, \nfile has:',
n_records, 'samples over ', n_years, 'water years'))
# Set up EGRET Sample file
Sample_file <- tibble(Name = site_code,
Date = as.Date(stream_chemistry$datetime),
ConcLow = stream_chemistry$val,
ConcHigh = stream_chemistry$val,
Uncen = 1,
ConcAve = stream_chemistry$val,
Julian = as.numeric(julian(lubridate::ymd(stream_chemistry$datetime),origin=as.Date("1850-01-01"))),
Month = lubridate::month(stream_chemistry$datetime),
Day = enlightened_yday(stream_chemistry$datetime),
DecYear = decimalDate(stream_chemistry$datetime),
MonthSeq = get_MonthSeq(stream_chemistry$datetime)) %>%
mutate(SinDY = sin(2*pi*DecYear),
CosDY = cos(2*pi*DecYear)) %>%
mutate(waterYear = ifelse(Month %in% c(10, 11, 12), lubridate::year(Date) + 1, lubridate::year(Date))) %>%
dplyr::select(Name, Date, ConcLow, ConcHigh, Uncen, ConcAve, Julian, Month, Day,
DecYear, MonthSeq, waterYear, SinDY, CosDY)
# Set up EGRET Daily file
# make sure dischagre is daily aggregated
discharge_daily <- discharge %>%
mutate(datetime = lubridate::date(datetime)) %>%
group_by(datetime, site_code, var, ms_status, ms_interp, year, month) %>%
summarize(val = mean(val))
Daily_file <- tibble(Name = site_code,
Date = as.Date(discharge_daily$datetime),
# converting lps to m^3/s
Q = discharge_daily$val/1000,
Julian = as.numeric(julian(lubridate::ymd(discharge_daily$datetime),origin=as.Date("1850-01-01"))),
Month = lubridate::month(discharge_daily$datetime),
Day = enlightened_yday(discharge_daily$datetime),
DecYear = decimalDate(discharge_daily$datetime),
MonthSeq = get_MonthSeq(discharge_daily$datetime),
Qualifier = discharge_daily$ms_status)
if(prep_data){
# Egret can't handle 0 in Q, setting 0 to the minimum Q ever reported seem reasonable
no_flow_days <- Daily_file %>%
filter(Q == 0) %>%
pull(Date)
n_nfd <- length(no_flow_days)
n_record <- length(Daily_file$Date)
percent_no_flow <- n_nfd/n_record
writeLines(paste('days with no flow, percent of record:', percent_no_flow))
# trying USGS WRTDS flow min method,
# also NOTE: USGS WRTDS manual says no flow > %0.2 of days is an issue
if(min_q_method == 'USGS'){
message('using USGS reccomended no flow replacement method')
mean_flow <- mean(Daily_file$Q[Daily_file$Q > 0], na.rm = TRUE)
Daily_file <- Daily_file %>%
mutate(Q = ifelse(Q <= 0, !!mean_flow, Q))
} else {
# NOTE: could this be where Inf shows up too? like in min_chem?
min_flow <- min(Daily_file$Q[Daily_file$Q > 0], na.rm = TRUE)
Daily_file <- Daily_file %>%
mutate(Q = ifelse(Q <= 0, !!min_flow, Q))
}
# TODO: record zero flow days, and set flux for those days to zero
# TODO: USGS WRTDS manual says method for replacing 0 and negative flow
# is to set all 0 and neg to 0, then replace with 0.1% of mean flow
# and they say final results should have this small flow increment
# subtracted from Q and flux results (pg 6, manual)
}
Daily_file <- Daily_file %>%
# 'extend' rollmean NOTE: cpuld rollmean args cause mischief
# tho right align i believe is correct based off of egret docs``
mutate(Q7 = zoo::rollmean(Q, 7, fill = 'extend', align = 'right'),
Q30 = zoo::rollmean(Q, 30, fill = 'extend', align = 'right'),
LogQ = log(Q))
Daily_file <- tibble::rowid_to_column(Daily_file, 'i') %>%
dplyr::select(Name, Date, Q, Julian, Month, Day, DecYear, MonthSeq, Qualifier,
i, LogQ, Q7, Q30)
# Set up INFO table
var <- ms_drop_var_prefix_(unique(stream_chemistry$var))
var_unit <- ms_vars %>%
filter(variable_code == !!var) %>%
pull(unit)
if(length(var_unit) == 0){
message("length of var_unit for this flux calc is zero, setting var_unit to NA")
var_unit <- NA
}
site_lat <- site_data %>%
filter(site_code == !!site_code) %>%
pull('latitude')
site_lon <- site_data %>%
filter(site_code == !!site_code) %>%
pull('longitude')
site_ws_area <- site_data %>%
filter(site_code == !!site_code,
site_type == 'stream_gauge') %>%
pull('ws_area_ha')
# ws area change?
site_ws_area <- site_ws_area / 100
new_point <- sf::st_sfc(sf::st_point(c(site_lon, site_lat)), crs = 4326) %>%
sf::st_transform(., crs = 4267)
# NOTE: question- should we allow interpolating back to Oct 1st from a
# record where first ever sample is Oct 26th? (as example of issue)
INFO_file <- tibble(agency_cd = 'macrosheds',
site_no = site_code,
station_nm = site_code,
site_tp_code = 'ST',
lat_va = site_lat,
long_va = site_lon,
dec_lat_va = site_lat,
dec_long_va = site_lon,
coord_meth_cd = NA,
coord_acy_cd = NA,
coord_datum_cd = NA,
dec_coord_datum_cd = NA,
district_cd = NA,
state_cd = NA,
county_cd = NA,
country_cd = NA,
land_net_ds = NA,
map_nm = NA,
map_scale_fc = NA,
alt_va = NA,
alt_meth_cd = NA,
alt_acy_va = NA,
alt_datum_cd = NA,
huc_cd = NA,
basin_cd = NA,
topo_cd = NA,
instruments_cd = NA,
construction_dt = NA,
inventory_dt = NA,
# ws area change (hectares to square miles)
drain_area_va = area / 2.59,
contrib_drain_area_va = NA,
tz_cd = 'UTC',
local_time_fg = 'Y',
reliability_cd = NA,
gw_file_cd = NA,
nat_aqfr_cd = NA,
aqfr_cd = NA,
aqfr_type_cd = NA,
well_depth_va = NA,
hole_depth_va = NA,
depth_src_cd = NA,
project_no = NA,
shortName = site_code,
drainSqKm = site_ws_area,
staAbbrev = site_code,
param.nm = var,
param.units = var_unit,
paramShortName = var,
paramNumber = NA,
constitAbbrev = var,
paStart = 10,
paLong = 12)
eList <- EGRET::mergeReport(INFO_file,
Daily_file,
Sample_file,
verbose = TRUE)
if(! run_egret){
return(eList)
}
# TODO: print stats on the record being calculated,
# particularly what the nuber of obs truly is (helps contextualize absurd results)
# keeping in mind EGRET says that anything less than n=60 is "dangerous"
eList <- try(modelEstimation(eList,
minNumObs = minNumObs,
minNumUncen = minNumUncen,
windowY = 7,
windowQ = 2,
windowS = 0.5,
verbose = TRUE))
if(inherits(eList, 'try-error')){
stop('EGRET failed while running WRTDS. See https://github.com/USGS-R/EGRET for reasons data may not be compatible with the WRTDS model.')
}
if(kalman){
eList <- EGRET::WRTDSKalman(eList, verbose = !quiet)
if(prep_data){
# Set flux and Q to 0 and conc to NA on no flow days
eList$Daily <- eList$Daily %>%
mutate(Q = ifelse(Date %in% !!no_flow_days, 0, Q),
LogQ = ifelse(Date %in% !!no_flow_days, 0, LogQ),
Q7 = ifelse(Date %in% !!no_flow_days, 0, Q7),
Q30 = ifelse(Date %in% !!no_flow_days, 0, Q30),
ConcDay = ifelse(Date %in% !!no_flow_days, NA, ConcDay),
FluxDay = ifelse(Date %in% !!no_flow_days, 0, FluxDay),
FNConc = ifelse(Date %in% !!no_flow_days, NA, FNConc),
FNFlux = ifelse(Date %in% !!no_flow_days, 0, FNFlux),
GenFlux = ifelse(Date %in% !!no_flow_days, 0, GenFlux),
GenConc = ifelse(Date %in% !!no_flow_days, 0, GenConc))
}
} else if(prep_data){
# Set flux and Q to 0 and conc to NA on no flow days
eList$Daily <- eList$Daily %>%
mutate(Q = ifelse(Date %in% !!no_flow_days, 0, Q),
LogQ = ifelse(Date %in% !!no_flow_days, 0, LogQ),
Q7 = ifelse(Date %in% !!no_flow_days, 0, Q7),
Q30 = ifelse(Date %in% !!no_flow_days, 0, Q30),
ConcDay = ifelse(Date %in% !!no_flow_days, NA, ConcDay),
FluxDay = ifelse(Date %in% !!no_flow_days, 0, FluxDay),
FNConc = ifelse(Date %in% !!no_flow_days, NA, FNConc),
FNFlux = ifelse(Date %in% !!no_flow_days, 0, FNFlux))
}
# find any 'breaks' in record
sample_rec <- detect_record_break(Sample_file)
sample_breaks <- get_break_dates(sample_rec)
# TODO: make dynamic in case of multiple long breaks
if(length(sample_breaks['start'][[1]]) < 1) {
writeLines(paste('no gap in record detected larger than', gap_period, 'days'))
} else {
for(i in length(sample_breaks['start'])) {
# set period
startBlank = sample_breaks['start'][[1]][[i]]
endBlank = sample_breaks['end'][[1]][[i]]
writeLines(paste('\n\nfound gap in record greater than', gap_period, 'days\n',
'between', startBlank, 'and', endBlank, 'masking this period',
'with EGRET blankTime()'))
eList <- blankTime(eList,
startBlank = startBlank,
endBlank = endBlank)
}
}
return(eList)
}
ms_chem <- chem_df %>%
mutate(site_code = 'none',
# TODO: variable handling
var = 'variable_chem',
ms_status = 0,
ms_interp = 0) %>%
rename(val = conc,
datetime = all_of(datecol))
ms_q <- q_df %>%
mutate(site_code = 'none',
var = 'variable_q',
ms_status = 0,
ms_interp = 0) %>%
rename(val = q_lps,
datetime = all_of(datecol))
site_data <- tibble(site_code = 'none',
ws_area_ha = ws_size,
latitude = lat,
longitude = long,
site_type = 'stream_gauge')
egret_results <- ms_run_egret_adapt(stream_chemistry = ms_chem,
discharge = ms_q,
prep_data = TRUE,
site_data = site_data,
kalman = kalman,
run_egret = TRUE,
minNumObs = minNumObs,
minNumUncen = minNumUncen,
gap_period = gap_period)
return(egret_results)
}
calc_simple_flux <- function(chem, q){
q_type <- q$var[1]
c_stts <- 'ms_status' %in% colnames(chem)
c_intp <- 'ms_interp' %in% colnames(chem)
q_stts <- 'ms_status' %in% colnames(q)
q_intp <- 'ms_interp' %in% colnames(q)
if(sum(c(c_stts, q_stts)) == 1) stop('ms_status column should be present in both OR neither of `q` and `chemistry`.')
if(sum(c(c_intp, q_intp)) == 1) stop('ms_interp column should be present in both OR neither of `q` and `chemistry`.')
d <- left_join(
chem,
dplyr::select(q, date, val, starts_with('ms_')),
by = 'date',
suffix = c('_x', '_y')
)
if(q_type == 'discharge'){
d <- d %>%
mutate(# kg/d = mg/L * L/s * s / 1e6
val = val_x * val_y * 86400 / 1e6)
#val = val_x * val_y * errors::as.errors(86400) / errors::as.errors(1e6),
if(c_stts){
d <- mutate(d, ms_status = numeric_any_v(ms_status_x, ms_status_y))
}
if(c_intp){
d <- mutate(d, ms_interp = numeric_any_v(ms_interp_x, ms_interp_y))
}
d <- d %>%
dplyr::select(-starts_with(c('site_code_', 'var_', 'val_',
'ms_status_', 'ms_interp_'))) %>%
filter(! is.na(val)) %>% #should be redundant
arrange(date) %>%
ms_scale_flux_by_area()
} else {
d <- d %>%
mutate(# kg/ha/d = mg/L * mm/d * ha/100
val = val_x * val_y / 100)
#val = val_x * val_y / errors::as.errors(100),
if(c_stts){
d <- mutate(d, ms_status = numeric_any_v(ms_status_x, ms_status_y))
}
if(c_intp){
d <- mutate(d, ms_interp = numeric_any_v(ms_interp_x, ms_interp_y))
}
d <- d %>%
dplyr::select(-starts_with(c('site_code_', 'var_', 'val_',
'ms_status_', 'ms_interp_'))) %>%
filter(! is.na(val)) %>% #should be redundant
arrange(date)
}
return(d)
}
calc_load <- function(chem, q, site_code, area, method,
aggregation, verbose){
fvar <- chem$var[1]
if(any(na.omit(chem$val == 0))){
message('Concentration values of 0 detected. These will be removed before computing load.')
}
chem_filt <- chem %>%
filter(val > 0) %>%
#errors::drop_errors(val) > 0) %>%
dplyr::select(date, val) %>%
sw(tidyr::drop_na(date, val))
chunk_daterange <- sw(range(chem_filt$date))
q_filt <- filter(q,
date >= !!chunk_daterange[1],
date <= !!chunk_daterange[2])
#find water years during which Q and chem overlap
gd_q <- q_filt %>%
mutate(water_year = wtr_yr(date, start_month = 10)) %>%
pull(water_year)
gd_chm <- chem_filt %>%
mutate(water_year = wtr_yr(date, start_month = 10)) %>%
pull(water_year)
paired_years <- intersect(gd_q, gd_chm)
if(! length(paired_years)){
message(glue::glue('Not enough overlap between {fvar} and discharge to calculate flux.'))
return(tibble(site_code = character(),
var = character(),
wy = numeric(),
val = numeric(),
method = character(),
ms_recommended = logical()))
}
daily_data_conc <- chem_filt %>%
group_by(date) %>%
summarize(val = mean_or_x(val),
.groups = 'drop') %>%
mutate(site_code = !!site_code) %>%
dplyr::select(site_code, date, conc = val)
daily_data_q <- q_filt %>%
group_by(date) %>%
summarize(val = mean_or_x(val, na.rm = TRUE),
.groups = 'drop') %>%
mutate(site_code = !!site_code) %>%
dplyr::select(site_code, date, q_lps = val)
raw_data_full <- full_join(daily_data_conc, daily_data_q,
by = c('site_code', 'date')) %>%
mutate(wy = wtr_yr(date, start_month = 10),
month = lubridate::month(date)) %>%
filter(wy %in% paired_years)
out_frame <- out_deets <- tibble()
for(target_year in paired_years){
aggregation <- ifelse(aggregation == 'monthly', 'month', aggregation)
q_chem_yr <- raw_data_full %>%
mutate(conc = conc,
q_lps = q_lps) %>%
#mutate(conc = errors::drop_errors(conc),
# q_lps = errors::drop_errors(q_lps)) %>%
filter(wy == !!target_year)
q_yr <- q_chem_yr %>%
dplyr::select(site_code, date, q_lps, wy) %>%
tidyr::drop_na() %>%
mutate(month = lubridate::month(date))
chem_yr <- q_chem_yr %>%
dplyr::select(site_code, date, conc, wy) %>%
tidyr::drop_na() %>%
mutate(month = lubridate::month(date))
# set up default flux values to be replaced if user has chosen to run method
if(aggregation == 'annual'){
flux_annual_average <- NA
flux_annual_pw <- NA
flux_annual_beale <- NA
flux_annual_rating <- NA
flux_annual_comp <- NA
} else {
flux_monthly_average <- list(flux = rep(NA, 12))
flux_monthly_pw <- list(flux = rep(NA, 12))
flux_monthly_beale <- list(flux = rep(NA, 12))
flux_monthly_rating <- list(flux = rep(NA, 12))
flux_monthly_comp <- list(flux = rep(NA, 12))
}
#### calculate average #####
if('average' %in% method){
if(aggregation == 'month'){
flux_monthly_average <- q_chem_yr %>%
group_by(wy, month) %>%
summarize(q_lps = mean(q_lps, na.rm = TRUE),
conc = mean(conc, na.rm = TRUE),
.groups = 'drop') %>%
# multiply by seconds in a year, and divide by mg to kg conversion (1M)
mutate(flux = conc * q_lps * 31557600 * (1 / area) * 1e-6) %>%
dplyr::select(month, flux) %>%
if_else(is.na(.), NA, .) #convert NaN
} else {
flux_annual_average <- q_chem_yr %>%
group_by(wy) %>%
summarize(q_lps = mean(q_lps, na.rm = TRUE),
conc = mean(conc, na.rm = TRUE),
.groups = 'drop') %>%
# multiply by seconds in a year, and divide by mg to kg conversion (1M)
mutate(flux = conc * q_lps * 31557600 * (1 / area) * 1e-6) %>%
pull(flux) %>%
if_else(is.na(.), NA, .) #convert NaN
}
}
#### calculate period weighted #####
if(aggregation == 'annual'){
if('pw' %in% method){
flux_annual_pw <- sw(calculate_pw(
chem_df = chem_yr,
q_df = q_yr,
datecol = 'date',
area = area,
period = aggregation
))
}
} else {
# NOTE: for now, month-order is based off of pw, so this needs to run
# for monthly flux calcs internals to run (should be fixed)
flux_annual_pw <- sw(calculate_pw(
chem_df = chem_yr,
q_df = q_yr,
datecol = 'date',
area = area,
period = aggregation
))
}
#### calculate beale ######
if('beale' %in% method){
flux_annual_beale <- calculate_beale(
chem_df = chem_yr,
q_df = q_yr,
datecol = 'date',
area = area,
period = aggregation
)
}
#### calculate rating #####
if('rating' %in% method){
flux_annual_rating <- calculate_rating(
chem_df = chem_yr,
q_df = q_yr,
datecol = 'date',
area = area,
period = aggregation
)
}
#### calculate composite ######
comp_violation <- FALSE
if('composite' %in% method){
rating_filled_df <- generate_residual_corrected_conc(
chem_df = chem_yr,
q_df = q_yr,
datecol = 'date',
sitecol = 'site_code'
)
if(length(rating_filled_df) > 1 || ! is.na(rating_filled_df)){
flux_annual_comp <- calculate_composite_from_resid_corrected(
rating_filled_df = rating_filled_df,
area = area,
period = aggregation
) %>% pull(flux)
comp_violation <- flux_annual_comp < 0
} else {
comp_violation <- TRUE
}
}
if(aggregation == 'month'){
# NOTE: lots of code with month matching and merging below this is all
# to match everything to month order of period weighted, to account for
# possible worlds where water year rearranges month order, etc.
months <- sapply(strsplit(flux_annual_pw$date, '-'), `[`, 2)
flux_monthly_pw <- flux_annual_pw %>%
mutate(month = months) %>%
dplyr::select(month, flux)
if(length(months) != 12){
warning('Number of months in pw not 12, need handling for setting NA to uncalc months')
}
# beale
if('beale' %in% method){
months_beale <- sapply(strsplit(flux_annual_beale$date, '-'), `[`, 2)
flux_monthly_beale <- flux_annual_beale %>%
mutate(month = as.character(months_beale))
flux_monthly_beale <- flux_monthly_beale[match(months, flux_monthly_beale$month),]
flux_monthly_beale <- left_join(flux_monthly_pw %>% dplyr::select(month),
flux_monthly_beale, by = 'month') %>%
dplyr::select(month, flux)
}
# rating
if('rating' %in% method){
months_rating <- sapply(strsplit(flux_annual_rating$date, '-'), `[`, 2)
flux_monthly_rating <- flux_annual_rating %>%
mutate(month = as.character(months_rating))
flux_monthly_rating <- flux_monthly_rating[match(months, flux_monthly_rating$month),]
flux_monthly_rating <- left_join(flux_monthly_pw %>% dplyr::select(month),
flux_monthly_rating, by = 'month') %>%
dplyr::select(month, flux)
}
# comp
if('composite' %in% method){
flux_monthly_comp <- flux_annual_comp[match(as.double(months), flux_annual_comp$month),]
flux_monthly_comp <- flux_monthly_comp %>%
mutate(month = sprintf('%02d', month))
# make sure all months present, filled w NAs if no value
flux_monthly_comp <- left_join(flux_monthly_pw %>% dplyr::select(month),
flux_monthly_comp, by = 'month') %>%
dplyr::select(month, flux)
}
if(!'pw' %in% method){
flux_monthly_pw$flux <- rep(NA, 12)
}
}
#### select MS favored ####
if(aggregation == 'month'){
paired_df <- full_join(q_yr, chem_yr,
by = c('date', 'site_code', 'wy', 'month'))
} else {
paired_df <- q_yr %>%
dplyr::select(-month) %>%
full_join(dplyr::select(chem_yr, -month),
by = c('date', 'site_code', 'wy'))
}
paired_df <- paired_df %>%
tidyr::drop_na() %>%
filter(q_lps > 0,
is.finite(q_lps),
conc > 0,
is.finite(conc))
model_data <- tibble(c_log = log10(paired_df$conc),
q_log = log10(paired_df$q_lps))
if(nrow(model_data)){
# `model_data` is the site-variable-year dataframe of Q and concentration
# log-log rating curve of C by Q
modl <- lm(model_data$c_log ~ model_data$q_log, singular.ok = TRUE)
rating <- sw(summary(modl))
r_squared <- rating$r.squared
# auto-correlation of residuals of rating curve
resid_acf <- abs(acf(rating$residuals, lag.max = 1, plot = FALSE)$acf[2])
# auto-correlation of concentration data
con_acf <- abs(acf(paired_df$conc, lag.max = 1, plot = FALSE)$acf[2])
} else {
r_squared <- resid_acf <- con_acf <- NA_real_
}
# modified from figure 10 of Aulenbach et al 2016
if(nrow(model_data) && ! is.na(r_squared)){
if(r_squared > 0.3 && ! is.na(resid_acf)){
if(resid_acf > 0.2){
recommended_method <- 'composite'
} else {
recommended_method <- 'rating'
}
} else {
if(! is.na(con_acf) && con_acf > 0.20){
recommended_method <- 'pw'
} else {
recommended_method <- 'average'
}
}
} else {
if(verbose){
message(glue::glue('Invalid C-Q regression for: {site_code}, {fvar}, {target_year}. Not recommending any method.'))
# 'during application of Aulenbach et al. 2016 procedure for choosing recommended load estimation method',
# ' concentration:discharge log-log linear regression r_squared value was NaN; recommended method set to NA\n\n'))
}
recommended_method <- NA
}
if(! is.na(recommended_method) && recommended_method == 'composite' &&
! is.na(comp_violation) && comp_violation){
if(verbose){
message(glue::glue('Composite method invalid for: {site_code}, {fvar}, {target_year}. ',
'Using period-weighting instead.'))
# 'the recommended method was set to composite',
# ' using procedure from Aulenbach et al. 2016, but that method results in illegal values.',
# ' Setting recommended method to period-weighting instead.'))
}
recommended_method <- 'period weighted'
}
#### collect fluxes ####
if(aggregation == 'month'){
target_year_out <- tibble(
wy = as.character(target_year),
month = rep(months, 5),
val = c(flux_monthly_average$flux,
flux_monthly_pw$flux,
flux_monthly_beale$flux,
flux_monthly_rating$flux,
## flux_monthly_wrtds,
flux_monthly_comp$flux),
site_code = !!site_code,
var = !!fvar,
method = c(rep('average', 12), rep('pw', 12), rep('beale', 12),
rep('rating', 12), rep('composite', 12))
) %>%
mutate(ms_recommended = ifelse(method == !!recommended_method, 1, 0))
} else {
target_year_out <- tibble(
wy = target_year,
val = c(flux_annual_average,
flux_annual_pw,
flux_annual_beale,
flux_annual_rating,
## flux_annual_wrtds,
flux_annual_comp),
site_code = site_code,
var = fvar,
method = c('average', 'pw', 'beale', 'rating', 'composite')
) %>%
mutate(ms_recommended = ifelse(method == !!recommended_method,
TRUE,
FALSE))
if(all(is.na(target_year_out$val))){
target_year_out$ms_recommended <- NA
}
target_year_deets <- tibble(site_code = site_code,
var = fvar,
water_year = target_year,
cq_rsquared = r_squared,
cq_resid_acf = resid_acf,
c_acf = con_acf,
n_c_obs = length(na.omit(chem_yr$conc)),
n_q_obs = length(na.omit(q_yr$q_lps)),
n_paired_cq_obs = nrow(paired_df),
unique_c_quarters = length(unique(lubridate::quarter(chem_yr$date))),
ws_area_ha = area)
target_year_deets[is.na(target_year_deets)] <- NA
}
out_frame <- bind_rows(out_frame, target_year_out)
out_deets <- bind_rows(out_deets, target_year_deets)
}
return(list(load = out_frame,
diag = out_deets))
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.