R/ms_internals.R

Defines functions shortcut_idw idw_parallel_combine ms_parallelize idw_log_var chunk_df datetimes_to_durations read_combine_feathers read_combine_shapefiles decimalDate drop_var_prefix get_start_end get_MonthSeq get_DecYear get_days_since_1850 convert_unit convert_from_gl convert_to_gl convert_molecule calculate_molar_mass combine_atomic_masses parse_molecular_formulae

#' Internal functions
#'
#' Not intended to be called directly by the user
#'
#' @noRd

# Function aliases
sw <- suppressWarnings
sm <- suppressMessages
`%dopar%` <- foreach::`%dopar%`
`%do%` <- foreach::`%do%`

# Unit conversion helpers ####
parse_molecular_formulae <- function(formulae){

    #`formulae` is a vector

    # formulae = c('C', 'C4', 'Cl', 'Cl2', 'CCl', 'C2Cl', 'C2Cl2', 'C2Cl2B2')
    # formulae = 'BCH10He10PLi2'
    # formulae='Mn'

    conc_vars = stringr::str_match(formulae, '^(?:OM|TM|DO|TD|UT|UTK|TK|TI|TO|DI)?([A-Za-z0-9]+)_?')[,2]
    two_let_symb_num = stringr::str_extract_all(conc_vars, '([A-Z][a-z][0-9]+)')
    conc_vars = stringr::str_remove_all(conc_vars, '([A-Z][a-z][0-9]+)')
    one_let_symb_num = stringr::str_extract_all(conc_vars, '([A-Z][0-9]+)')
    conc_vars = stringr::str_remove_all(conc_vars, '([A-Z][0-9]+)')
    two_let_symb = stringr::str_extract_all(conc_vars, '([A-Z][a-z])')
    conc_vars = stringr::str_remove_all(conc_vars, '([A-Z][a-z])')
    one_let_symb = stringr::str_extract_all(conc_vars, '([A-Z])')

    constituents = mapply(c, SIMPLIFY=FALSE,
                          two_let_symb_num, one_let_symb_num, two_let_symb, one_let_symb)

    return(constituents) # a list of vectors
}

combine_atomic_masses <- function(molecular_constituents){

    #`molecular_constituents` is a vector

    xmat = stringr::str_match(molecular_constituents,
                              '([A-Z][a-z]?)([0-9]+)?')[, -1, drop=FALSE]
    elems = xmat[,1]
    mults = as.numeric(xmat[,2])
    mults[is.na(mults)] = 1
    molecular_mass = sum(PeriodicTable::mass(elems) * mults)

    return(molecular_mass) #a scalar
}

calculate_molar_mass <- function(molecular_formula){

    if(length(molecular_formula) > 1){
        stop('molecular_formula must be a string of length 1')
    }

    parsed_formula = parse_molecular_formulae(molecular_formula)[[1]]

    molar_mass = combine_atomic_masses(parsed_formula)

    return(molar_mass)
}

convert_molecule <- function(x, from, to){

    #e.g. convert_molecule(1.54, 'NH4', 'N')

    from_mass <- calculate_molar_mass(from)
    to_mass <- calculate_molar_mass(to)
    converted_mass <- x * to_mass / from_mass

    return(converted_mass)
}

convert_to_gl <- function(x, input_unit, formula, ms_vars){

    if(grepl('eq', input_unit)) {
        valence = ms_vars$valence[ms_vars$variable_code %in% formula]

        if(length(valence) == 0 | is.na(valence)) {stop(paste('valency of', formula, 'unknown'))}
        x = (x * calculate_molar_mass(formula)) / valence

        return(x)
    }

    if(grepl('mol', input_unit)) {
        x = x * calculate_molar_mass(formula)

        return(x)
    }

    return(x)

}

convert_from_gl <- function(x, input_unit, output_unit, molecule, g_conver, ms_vars){

    molecule_real <- ms_vars %>%
        filter(variable_code == !!molecule) %>%
        pull(molecule)

    if(!is.na(molecule_real)) {
        formula <- molecule_real
    } else {
        formula <- molecule
    }

    if(grepl('eq', output_unit) && grepl('g', input_unit) ||
       grepl('eq', output_unit) && g_conver) {

        valence = ms_vars$valence[ms_vars$variable_code %in% molecule]
        if(length(valence) == 0 | is.na(valence)) {stop(paste('valency of', molecule, 'unknown'))}
        x = (x * valence) / calculate_molar_mass(formula)

        return(x)
    }

    if(grepl('mol', output_unit) && grepl('g', input_unit) ||
       grepl('mol', output_unit) && g_conver) {

        x = x / calculate_molar_mass(formula)

        return(x)
    }

    if(grepl('mol', output_unit) && grepl('eq', input_unit) && !g_conver) {

        valence = ms_vars$valence[ms_vars$variable_code %in% molecule]
        if(length(valence) == 0 | is.na(valence)) {stop(paste('valency of', molecule, 'unknown'))}
        x = (x * calculate_molar_mass(formula)) / valence

        x = x / calculate_molar_mass(formula)

        return(x)
    }

    if(grepl('eq', output_unit) && grepl('mol', input_unit) && !g_conver) {

        x = x * calculate_molar_mass(formula)

        valence = ms_vars$valence[ms_vars$variable_code %in% molecule]
        if(length(valence) == 0 | is.na(valence)) {stop(paste('valency of', molecule, 'unknown'))}
        x = (x * valence)/calculate_molar_mass(formula)

        return(x)
    }

    return(x)

}

convert_unit <- function(x, input_unit, output_unit){

    units <- tibble(prefix = c('n', "u", "m", "c", "d", "h", "k", "M"),
                    convert_factor = c(0.000000001, 0.000001, 0.001, 0.01, 0.1, 100,
                                       1000, 1000000))

    old_fraction <- as.vector(stringr::str_split_fixed(input_unit, "/", n = Inf))
    old_top <- as.vector(stringr::str_split_fixed(old_fraction[1], "", n = Inf))

    if(length(old_fraction) == 2) {
        old_bottom <- as.vector(stringr::str_split_fixed(old_fraction[2], "", n = Inf))
    }

    new_fraction <- as.vector(stringr::str_split_fixed(output_unit, "/", n = Inf))
    new_top <- as.vector(stringr::str_split_fixed(new_fraction[1], "", n = Inf))

    if(length(new_fraction == 2)) {
        new_bottom <- as.vector(stringr::str_split_fixed(new_fraction[2], "", n = Inf))
    }

    old_top_unit <- tolower(stringr::str_split_fixed(old_top, "", 2)[1])

    if(old_top_unit %in% c('g', 'e', 'q', 'l') || old_fraction[1] == 'mol') {
        old_top_conver <- 1
    } else {
        old_top_conver <- as.numeric(filter(units, prefix == old_top_unit)[,2])
    }

    old_bottom_unit <- tolower(stringr::str_split_fixed(old_bottom, "", 2)[1])

    if(old_bottom_unit %in% c('g', 'e', 'q', 'l') || old_fraction[2] == 'mol') {
        old_bottom_conver <- 1
    } else {
        old_bottom_conver <- as.numeric(filter(units, prefix == old_bottom_unit)[,2])
    }

    new_top_unit <- tolower(stringr::str_split_fixed(new_top, "", 2)[1])

    if(new_top_unit %in% c('g', 'e', 'q', 'l') || new_fraction[1] == 'mol') {
        new_top_conver <- 1
    } else {
        new_top_conver <- as.numeric(filter(units, prefix == new_top_unit)[,2])
    }

    new_bottom_unit <- tolower(stringr::str_split_fixed(new_bottom, "", 2)[1])

    if(new_bottom_unit %in% c('g', 'e', 'q', 'l') || new_fraction[2] == 'mol') {
        new_bottom_conver <- 1
    } else {
        new_bottom_conver <- as.numeric(filter(units, prefix == new_bottom_unit)[,2])
    }

    new_val <- x*old_top_conver
    new_val <- new_val/new_top_conver

    new_val <- new_val/old_bottom_conver
    new_val <- new_val*new_bottom_conver

    return(new_val)
}

# EGRET helpers ####
get_days_since_1850 <- function(dates){
    return_dates <- as.numeric(as_date(dates)-lubridate::ymd('1850-01-01'))
    return(return_dates)
}

get_DecYear <- function(dates){

    year <- lubridate::year(dates)

    get_days <- function(year){
        if((year %% 4) == 0) {
            if((year %% 100) == 0) {
                if((year %% 400) == 0) {
                    day_years <- 366
                } else {
                    day_years <- 365
                }
            } else {
                day_years <- 366
            }
        } else {
            day_years <- 365
        }
    }

    days_in_year <- purrr::map_dbl(year, get_days)

    DecYear <- (lubridate::yday(dates)/days_in_year)+lubridate::year(dates)

    return(DecYear)
}

get_MonthSeq <- function(dates){

    years <- lubridate::year(dates)-1850

    MonthSeq <- years*12

    MonthSeq <- MonthSeq + lubridate::month(dates)

    return(MonthSeq)
}

get_start_end <- function(d){
    start_date <- min(d$datetime)
    start_year <- lubridate::year(start_date)
    start_wy <- ifelse(lubridate::month(start_date) %in% c(10, 11, 12), start_year+1, start_year)
    filter_start_date <- lubridate::ymd(paste0(start_wy, '-10-01'))

    end_date <- max(d$datetime)
    end_year <- lubridate::year(end_date)
    end_wy <- ifelse(lubridate::month(end_date) %in% c(10, 11, 12), end_year+1, end_year)
    filter_end_date <- lubridate::ymd(paste0(end_wy, '-10-01'))

    fin_dates <- c(filter_start_date, filter_end_date)
    return(fin_dates)

}

drop_var_prefix <- function(x){

    unprefixed <- substr(x, 4, nchar(x))

    return(unprefixed)
}

decimalDate <- function(rawData){

    dateTime <- as.POSIXlt(rawData)
    year <- dateTime$year + 1900

    startYear <- as.POSIXct(paste0(year,"-01-01 00:00"))
    endYear <- as.POSIXct(paste0(year+1,"-01-01 00:00"))

    DecYear <- year + as.numeric(difftime(dateTime, startYear, units = "secs"))/as.numeric(difftime(endYear, startYear, units = "secs"))
    return(DecYear)
}

# precip interpolation helpers ####
read_combine_shapefiles <- function(network, domain, prodname_ms){

    #TODO: sometimes multiple locations are listed for the same rain gauge.
    #   if there's a date associated with those locations, we should take that
    #   into account when performing IDW, and when plotting gauges on the map.
    #   (maybe previous locations could show up semitransparent). for now,
    #   we're just grabbing the last location listed (by order). We don't store
    #   date columns with our raingauge shapes yet, so that's the place to start.

    #TODO: also, this may be a problem for other spatial stuff.

    level <- ifelse(is_derived_product(prodname_ms),
                    'derived',
                    'munged')

    prodpaths <- list.files(glue::glue('data/{n}/{d}/{l}/{p}',
                                       n = network,
                                       d = domain,
                                       l = level,
                                       p = prodname_ms),
                            recursive = TRUE,
                            full.names = TRUE,
                            pattern = '*.shp')

    shapes <- lapply(prodpaths,
                     function(x){
                         sf::st_read(x,
                                     stringsAsFactors = FALSE,
                                     quiet = TRUE) %>%
                             slice_tail()
                     })

    # wb <- sw(Reduce(sf::st_union, wbs)) %>%
    combined <- sw(Reduce(bind_rows, shapes))
    # sf::st_transform(projstring)

    return(combined)
}

read_combine_feathers <- function(network,
                                  domain,
                                  prodname_ms){

    #read all data feathers associated with a network-domain-product,
    #row bind them, arrange by site_code, var, datetime. insert val_err column
    #into the val column as errors attribute and then remove val_err column
    #(error/uncertainty is handled by the errors package as an attribute,
    #so it must be written/read as a separate column).

    #the processing level is determined automatically from prodname_ms.
    #   If the product code is "msXXX" where X is a numeral, the processing
    #   level is assumed to be "derived". otherwise "munged"

    #handled in ms_list_files
    # level <- ifelse(is_derived_product(prodname_ms),
    #                 'derived',
    #                 'munged')

    prodpaths <- ms_list_files(network = network,
                               domain = domain,
                               prodname_ms = prodname_ms)

    combined <- tibble()
    for(i in 1:length(prodpaths)){
        part <- feather::read_feather(prodpaths[i])
        combined <- bind_rows(combined, part)
    }

    combined <- combined %>%
        mutate(val = errors::set_errors(val, val_err)) %>%
        dplyr::select(-val_err) %>%
        arrange(site_code, var, datetime)

    return(combined)
}

datetimes_to_durations <- function(datetime_vec,
                                   variable_prefix_vec = NULL,
                                   unit,
                                   sensor_maxgap = Inf,
                                   nonsensor_maxgap = Inf,
                                   grab_maxgap = Inf,
                                   installed_maxgap = Inf){

    #datetime_vec: POSIXct. a vector of datetimes
    #variable_prefix_vec: POSIXct. a vector of macrosheds variable prefixes,
    #   e.g. 'IS'. This vector is generated by calling extract_var_prefix
    #   on the var column of a macrosheds data.frame. Only required if you
    #   supply one or more of the maxgap parameters.
    #unit: string. The desired datetime unit. Must be expressed in a form
    #   that's readable by base::difftime
    #sensor_maxgap: numeric. the largest data gap that should return
    #   a value. Durations longer than this gap will return NA. Expressed in
    #   the same units as unit (see above). This applies
    #   to sensor data only (IS and GS).
    #nonsensor_maxgap: numeric. the largest data gap that should return
    #   a value. Durations longer than this gap will return NA. Expressed in
    #   the same units as unit (see above). This applies
    #   to nonsensor data only (IN and GN).
    #grab_maxgap: numeric. the largest data gap that should return
    #   a value. Durations longer than this gap will return NA. Expressed in
    #   the same units as unit (see above). This applies
    #   to grab data only (GS and GN).
    #installed_maxgap: numeric. the largest data gap that should return
    #   a value. Durations longer than this gap will return NA. Expressed in
    #   the same units as unit (see above). This applies
    #   only to data from installed units (IS and IN).

    #an NA is prepended to the output, so that its length is the same as
    #   datetime_vec

    if(! is.null(variable_prefix_vec) &&
       ! all(variable_prefix_vec %in% c('GN', 'GS', 'IN', 'IS'))){
        stop(paste('all elements of variable_prefix_vec must be one of "GN",',
                   '"GS", "IN", "IS"'))
    }

    durs <- diff(datetime_vec)
    units(durs) <- unit
    durs <- c(NA_real_, as.numeric(durs))

    if(! is.null(variable_prefix_vec)){
        is_sensor_data <- grepl('^.S', variable_prefix_vec)
        is_installed_data <- grepl('^I', variable_prefix_vec)
    }

    if(! is.infinite(sensor_maxgap)){
        durs[is_sensor_data & durs > sensor_maxgap] <- NA_real_
    }
    if(! is.infinite(nonsensor_maxgap)){
        durs[! is_sensor_data & durs > nonsensor_maxgap] <- NA_real_
    }
    if(! is.infinite(grab_maxgap)){
        durs[! is_installed_data & durs > grab_maxgap] <- NA_real_
    }
    if(! is.infinite(installed_maxgap)){
        durs[is_installed_data & durs > installed_maxgap] <- NA_real_
    }

    return(durs)
}

chunk_df <- function(d, nchunks, create_index_column = FALSE){

    nr <- nrow(d)
    chunksize <- nr/nchunks

    if(nr > 0){

        if(create_index_column) d <- mutate(d, ind = 1:n())

        chunklist <- split(d,
                           0:(nr - 1) %/% chunksize)

        return(chunklist)

    } else {

        logwarn(msg = 'Trying to chunk an empty tibble. Something is probably wrong',
                logger = logger_module)

        return(d)
    }
}

idw_log_var <- function(verbose,
                        site_code,
                        v,
                        j,
                        nvars,
                        ntimesteps,
                        is_fluxable = NA,
                        note = ''){

    if(! verbose) return()

    flux_calc_msg <- case_when(is_fluxable == FALSE ~ '[NO FLUX]',
                               is_fluxable == TRUE ~ '[yes flux]',
                               is.na(is_fluxable) ~ '')

    note <- ifelse(note == '',
                   note,
                   paste0('[', note, ']'))

    msg <- glue::glue('site: {s}; var: {vv} ({jj}/{nv}) {f}; timesteps: {nt}; {n}',
                      s = site_code,
                      vv = v,
                      jj = j,
                      nv = nvars,
                      nt = ntimesteps,
                      f = flux_calc_msg,
                      n = note)

    message(msg)
}

ms_parallelize <- function(maxcores = Inf){

    #maxcores is the maximum number of processor cores to use for R tasks.
    #   you may want to leave a few aside for other processes.

    check_suggested_pkgs(c('parallel', 'doParallel'))

    clst <- NULL
    ncores <- min(parallel::detectCores(), maxcores)

    if(Sys.info()['sysname'] == 'Windows'){
        clst <- parallel::makeCluster(ncores, type = 'PSOCK')
        doParallel::registerDoParallel(clst)
    } else{
        clst <- parallel::makeCluster(ncores, type = 'FORK')
        doParallel::registerDoParallel(clst)
    }

    return(clst)
}

idw_parallel_combine <- function(d1, d2){

    #this is for use with foreach loops inside the 4 idw prep functions
    #   (precip_idw, pchem_idw, flux_idw, precip_pchem_pflux_idw)

    if(is.character(d1) && d1 == 'first iter') return(d2)

    d_comb <- bind_rows(d1, d2)

    return(d_comb)
}

shortcut_idw <- function(encompassing_dem,
                         wshd_bnd,
                         data_locations,
                         data_values,
                         durations_between_samples = NULL,
                         stream_site_code,
                         output_varname,
                         save_precip_quickref = FALSE,
                         elev_agnostic = FALSE,
                         p_var = NULL,
                         verbose = FALSE,
                         macrosheds_root){

    #encompassing_dem: RasterLayer must cover the area of wshd_bnd and precip_gauges
    #wshd_bnd: sf polygon with columns site_code and geometry
    #   it represents a single watershed boundary
    #data_locations:sf point(s) with columns site_code and geometry.
    #   it represents all sites (e.g. rain gauges) that will be used in
    #   the interpolation
    #data_values: data.frame with one column each for datetime and ms_status,
    #   and an additional named column of data values for each data location.
    #durations_between_samples: numeric vector representing the time differences
    #   between rows of data_values. Must be expressed in days. Only used if
    #   output_varname == 'SPECIAL CASE PRECIP' and save_precip_quickref == TRUE,
    #   so that quickref data can be expressed in mm/day, which is the unit
    #   required to calculate precip flux.
    #   NOTE: this could be calculated internally, but the duration of measurement
    #   preceding the first value can't be known. Because shortcut_idw is
    #   often run iteratively on chunks of a dataset, we require that
    #   durations_between_samples be passed as an input, so as to minimize
    #   the number of NAs generated.
    #output_varname: character; a prodname_ms, unless you're interpolating
    #   precipitation, in which case it must be "SPECIAL CASE PRECIP", because
    #   prefix information for precip is lost during the widen-by-site step
    #save_precip_quickref: logical. should interpolated precip for all DEM cells
    #   be saved for later use. Should only be true when precip chem will be
    #   interpolated too. only useable when output_varname = 'PRECIP SPECIAL CASE'
    #elev_agnostic: logical that determines whether elevation should be
    #   included as a predictor of the variable being interpolated

    check_suggested_pkgs(c('parallel'))

    if(output_varname != 'SPECIAL CASE PRECIP' && save_precip_quickref){
        stop(paste('save_precip_quickref can only be TRUE if output_varname',
                   '== "SPECIAL CASE PRECIP"'))
    }
    if(save_precip_quickref && is.null(durations_between_samples)){
        stop(paste('save_precip_quickref can only be TRUE if',
                   'durations_between_samples is supplied.'))
    }
    if(output_varname != 'SPECIAL CASE PRECIP' && ! is.null(durations_between_samples)){
        logwarn(msg = paste('In shortcut_idw: ignoring durations_between_samples because',
                            'output_varname != "SPECIAL CASE PRECIP".'),
                logger = logger_module)
    }

    if('ind' %in% colnames(data_values)){
        timestep_indices <- data_values$ind
        data_values$ind <- NULL
    }

    #matrixify input data so we can use matrix operations
    d_status <- data_values$ms_status
    d_interp <- data_values$ms_interp
    d_dt <- data_values$datetime
    data_matrix <- dplyr::select(data_values,
                                 -ms_status,
                                 -datetime,
                                 -ms_interp) %>%
        err_df_to_matrix()

    #clean dem and get elevation values
    dem_wb <- terra::crop(encompassing_dem, wshd_bnd)
    dem_wb <- terra::mask(dem_wb, wshd_bnd)

    wb_is_linear <- terra::nrow(dem_wb) == 1 || terra::ncol(dem_wb) == 1
    wb_all_na <- all(is.na(terra::values(dem_wb)))

    if(wb_all_na){

        if(wb_is_linear){

            #masking will cause trouble here (only known for niwot-MARTINELLI)
            dem_wb <- terra::crop(encompassing_dem, wshd_bnd)

        } else {
            stop('some kind of crop/mask issue with small watersheds?')
        }
    }

    elevs <- terra::values(dem_wb)
    elevs_masked <- elevs[! is.na(elevs)]

    #compute distances from all dem cells to all data locations
    inv_distmat <- matrix(NA,
                          nrow = length(elevs_masked),
                          ncol = ncol(data_matrix),
                          dimnames = list(NULL,
                                          colnames(data_matrix)))

    dem_wb_all_na <- dem_wb
    terra::values(dem_wb_all_na) <- NA
    dem_wb_all_na <- terra::rast(dem_wb_all_na)
    for(k in 1:ncol(data_matrix)){

        dk <- filter(data_locations,
                     site_code == colnames(data_matrix)[k])

        inv_dists_site <- 1 / terra::distance(terra::rast(dem_wb_all_na), terra::vect(dk))^2 %>%
            terra::values(.)

        inv_dists_site <- inv_dists_site[! is.na(elevs)] #drop elevs not included in mask
        inv_distmat[, k] <- inv_dists_site
    }

    #calculate watershed mean at every timestep
    if(save_precip_quickref) precip_quickref <- list()
    ptm <- proc.time()
    ws_mean <- rep(NA, nrow(data_matrix))
    ntimesteps <- nrow(data_matrix)
    for(k in 1:ntimesteps){

        #assign cell weights as normalized inverse squared distances
        dk <- t(data_matrix[k, , drop = FALSE])
        inv_distmat_sub <- inv_distmat[, ! is.na(dk), drop = FALSE]
        dk <- dk[! is.na(dk), , drop = FALSE]
        weightmat <- do.call(rbind, #avoids matrix transposition
                             unlist(apply(inv_distmat_sub, #normalize by row
                                          1,
                                          function(x) list(x / sum(x))),
                                    recursive = FALSE))

        #perform vectorized idw
        dk[is.na(dk)] <- 0 #allows matrix multiplication
        d_idw <- weightmat %*% dk

        if(nrow(dk) == 0){
            ws_mean[k] <- errors::set_errors(NA_real_, NA)
            if(save_precip_quickref){
                precip_quickref[[k]] <- matrix(NA,
                                               nrow = nrow(d_idw),
                                               ncol = ncol(d_idw))
            }
            next
        }

        #reapply uncertainty dropped by `%*%`
        errors::errors(d_idw) <- weightmat %*% matrix(errors::errors(dk),
                                                      nrow = nrow(dk))

        #determine data-elevation relationship for interp weighting
        if(! elev_agnostic && nrow(dk) >= 3){
            d_elev <- tibble(site_code = rownames(dk),
                             d = dk[,1]) %>%
                left_join(data_locations,
                          by = 'site_code')
            mod <- lm(d ~ elevation, data = d_elev)
            ab <- as.list(mod$coefficients)

            #estimate raster values from elevation alone
            d_from_elev <- ab$elevation * elevs_masked + ab$`(Intercept)`

            # Set all negative values to 0
            d_from_elev[d_from_elev < 0] <- 0

            #get weighted mean of both approaches:
            #weight on idw is 1; weight on elev-predicted is |R^2|
            abs_r2 <- abs(cor(d_elev$d, mod$fitted.values)^2)
            d_idw <- (d_idw + d_from_elev * abs_r2) / (1 + abs_r2)
        }

        ws_mean[k] <- mean(d_idw, na.rm=TRUE)
        errors::errors(ws_mean)[k] <- mean(errors::errors(d_idw), na.rm=TRUE)

        if(save_precip_quickref) precip_quickref[[k]] <- d_idw
    }

    if(save_precip_quickref){

        #convert to mm/d
        precip_quickref <- base::Map(
            f = function(millimeters, days){
                return(millimeters/days)
            },
            millimeters = precip_quickref,
            days = durations_between_samples)

        names(precip_quickref) <- as.character(timestep_indices)
        write_precip_quickref(precip_idw_list = precip_quickref,
                              site_code = stream_site_code,
                              chunkdtrange = range(d_dt),
                              macrosheds_root = macrosheds_root)
    }

    if(output_varname == 'SPECIAL CASE PRECIP'){

        ws_mean <- tibble(datetime = d_dt,
                          site_code = stream_site_code,
                          var = p_var,
                          val = ws_mean,
                          ms_status = d_status,
                          ms_interp = d_interp)

    } else {

        ws_mean <- tibble(datetime = d_dt,
                          site_code = stream_site_code,
                          var = output_varname,
                          concentration = ws_mean,
                          ms_status = d_status,
                          ms_interp = d_interp)
    }

    return(ws_mean)
}

ms_unparallelize <- function(cluster_object){

    #if cluster_object is NULL, nothing will happen

    check_suggested_pkgs(c('parallel'))

    # tryCatch({print(site_code)},
    #         error=function(e) print('nope'))

    if(is.null(cluster_object)){
        # future::plan(future::sequential)
        return()
    }

    parallel::stopCluster(cluster_object)

    #remove foreach clutter that might compromise the next parallel run
    fe_junk <- foreach:::.foreachGlobals

    rm(list = ls(name = fe_junk),
       pos = fe_junk)

    # #remove any unneeded globals that were created during parallelization
    # unneeded_globals <- c('pchem_vars', 'pchem_vars_fluxable',
    #                       'dem', 'wbi', 'rg', 'precip',
    #                       'pchem_setlist', 'first_fluxvar_ind', 'i', 'j')
    # sw(rm(list = unneeded_globals,
    #       envir = .GlobalEnv))

    # #restore globals that were overwritten during parallelization
    # protected_vars <- mget('protected_vars',
    #                           envir = protected_environment)
    #
    # for(i in 1:length(protected_vars)){
    #
    #     nm <- names(protected_vars)[i]
    #     val <- protected_vars[[i]]
    #
    #     if(! is.null(val)){
    #         assign(nm,
    #                value = val,
    #                envir = .GlobalEnv)
    #     } else {
    #
    #         #or remove them if they didn't exist before parallelization
    #         sw(rm(list = nm,
    #               envir = .GlobalEnv))
    #     }
    # }
}

write_ms_file <- function(d,
                          prodname_ms,
                          site_code,
                          shapefile = FALSE,
                          sep_errors = TRUE,
                          macrosheds_root){

    #write an ms tibble or shapefile to its appropriate destination based on
    #network, domain, prodname_ms, site_code, and processing level. If a tibble,
    #write as a feather file (site_code.feather). Uncertainty (error) associated
    #with the val column will be extracted into a separate column called
    #val_err. Write the file to the appropriate location within the data
    #acquisition repository.

    #deprecated:
    #if link_to_portal == TRUE, create a hard link to the
    #file from the portal repository, which is assumed to be a sibling of the
    #data_acquision directory and to be named "portal".

    if(shapefile){

        site_dir = glue::glue('{ms}/{p}/{s}',
                              ms = macrosheds_root,
                              p = prodname_ms,
                              s = site_code)

        dir.create(site_dir,
                   showWarnings = FALSE,
                   recursive = TRUE)

        if(any(! sf::st_is_valid(d))) {
            d <- sf::st_make_valid(d)
        }

        sw(sf::st_write(obj = d,
                        dsn = glue::glue(site_dir, '/', site_code, '.shp'),
                        delete_dsn = TRUE,
                        quiet = TRUE))

    } else {

        prod_dir = glue::glue('{ms}/{p}',
                              ms = macrosheds_root,
                              p = prodname_ms)

        dir.create(prod_dir,
                   showWarnings = FALSE,
                   recursive = TRUE)

        site_file = glue::glue('{pd}/{s}.feather',
                               pd = prod_dir,
                               s = site_code)

        if(sep_errors) {

            #separate uncertainty into a new column.
            #remove errors attribute from val column if it exists (it always should)
            d$val_err <- errors::errors(d$val)
            if('errors' %in% class(d$val)){
                d$val <- errors::drop_errors(d$val)
            } else {
                warning(glue::glue('Uncertainty missing from val column ({n}-{d}-{s}-{p}). ',
                                   'That means this dataset has not passed through ',
                                   'carry_uncertainty yet. it should have.',
                                   n = network,
                                   d = domain,
                                   s = site_code,
                                   p = prodname_ms))
            }
        }
        #make sure feather::write_feather will omit attrib by def (with no artifacts)
        feather::write_feather(d, site_file)
    }

    #return()
}

shortcut_idw_concflux_v2 <- function(encompassing_dem,
                                     wshd_bnd,
                                     ws_area,
                                     data_locations,
                                     precip_values,
                                     chem_values,
                                     stream_site_code,
                                     output_varname,
                                     out_path,
                                     # dump_idw_precip,
                                     verbose = FALSE){

    #This replaces shortcut_idw_concflux! shortcut_idw is still used for
    #variables that can't be flux-converted, and for precipitation

    #this function is similar to shortcut_idw_concflux.
    #if the variable represented by chem_values and output_varname is
    #flux-convertible, it multiplies precip chem
    #by precip volume to calculate flux for each cell and returns
    #the means of IDW-interpolated precipitation, precip chem, and precip flux
    #for each sample timepoint. If that variable is not flux-convertible,
    #It only returns precipitation and precip chem. All interpolated products are
    #returned as standard macrosheds timeseries tibbles in a single list.

    #encompassing_dem: RasterLayer; must cover the area of wshd_bnd and
    #   recip_gauges
    #wshd_bnd: sf polygon with columns site_code and geometry.
    #   it represents a single watershed boundary.
    #ws_area: numeric scalar representing watershed area in hectares. This is
    #   passed so that it doesn't have to be calculated repeatedly (if
    #   shortcut_idw_concflux_v2 is running iteratively).
    #data_locations: sf point(s) with columns site_code and geometry.
    #   represents all sites (e.g. rain gauges) that will be used in
    #   the interpolation.
    #precip_values: a data.frame with datetime, ms_status, ms_interp,
    #   and a column of data values for each precip location.
    #chem_values: a data.frame with datetime, ms_status, ms_interp,
    #   and a column of data values for each precip chemistry location.
    #stream_site_code: character; the name of the watershed/stream, not the
    #   name of a precip gauge
    #output_varname: character; the prodname_ms used to populate the var
    #   column in the returned tibble
    #dump_idw_precip: logical; if TRUE, IDW-interpolated precipitation will
    #   be dumped to disk (data/<network>/<domain>/precip_idw_dumps/<site_code>.rds).
    #   this file will then be read by precip_pchem_pflux_idw, in order to
    #   properly build data/<network>/<domain>/derived/<precipitation_msXXX>.
    #   the precip_idw_dumps directory is automatically removed after it's used.
    #   This should only be set to TRUE for one iteration of the calling loop,
    #   or else time will be wasted rewriting the files.
    #   REMOVED; OBSOLETE

    precip_quickref <- read_precip_quickref(out_path = out_path,
                                            site_code = stream_site_code,
                                            dtrange = range(chem_values$datetime))

    if(length(precip_quickref) == 1){

        just_checkin <- precip_quickref[[1]]

        if(class(just_checkin) == 'character' &&
           just_checkin == 'NO QUICKREF AVAILABLE'){

            return(tibble())
        }
    }

    precip_is_highres <- Mode(diff(as.numeric(precip_values$datetime))) <= 15 * 60
    if(is.na(precip_is_highres)) precip_is_highres <- FALSE
    chem_is_highres <- Mode(diff(as.numeric(chem_values$datetime))) <= 15 * 60
    if(is.na(chem_is_highres)) chem_is_highres <- FALSE

    #if both chem and precip data are low resolution (grab samples),
    #   let approxjoin_datetime match up samples with a 12-hour gap. otherwise the
    #   gap should be 7.5 mins so that there isn't enormous duplication of
    #   timestamps where multiple high-res values can be snapped to the
    #   same low-res value
    if(! chem_is_highres && ! precip_is_highres){
        join_distance <- c('12:00:00')
    } else {
        join_distance <- c('7:30')
    }

    dt_match_inds <- approxjoin_datetime(x = chem_values,
                                         y = precip_values,
                                         rollmax = join_distance,
                                         indices_only = TRUE)

    chem_values <- chem_values[dt_match_inds$x, ]
    common_datetimes <- chem_values$datetime

    if(length(common_datetimes) == 0){
        pchem_range <- range(chem_values$datetime)
        test <- filter(precip_values,
                       datetime > pchem_range[1],
                       datetime < pchem_range[2])
        if(nrow(test) > 0){
            logging::logerror('something is wrong with approxjoin_datetime')
        }
        return(tibble())
    }

    precip_values <- precip_values %>%
        mutate(ind = 1:n()) %>%
        slice(dt_match_inds$y) %>%
        mutate(datetime = !!common_datetimes)

    quickref_inds <- precip_values$ind
    precip_values$ind <- NULL

    #matrixify input data so we can use matrix operations
    p_status <- precip_values$ms_status
    p_interp <- precip_values$ms_interp
    p_matrix <- dplyr::select(precip_values,
                              -ms_status,
                              -datetime,
                              -ms_interp) %>%
        err_df_to_matrix()

    c_status <- chem_values$ms_status
    c_interp <- chem_values$ms_interp
    c_matrix <- dplyr::select(chem_values,
                              -ms_status,
                              -datetime,
                              -ms_interp) %>%
        err_df_to_matrix()

    rm(precip_values, chem_values); gc()

    d_status <- bitwOr(p_status, c_status)
    d_interp <- bitwOr(p_interp, c_interp)

    #clean dem and get elevation values
    dem_wb <- terra::crop(encompassing_dem, wshd_bnd)
    dem_wb <- terra::mask(dem_wb, wshd_bnd)
    elevs <- terra::values(dem_wb)
    elevs_masked <- elevs[! is.na(elevs)]

    #compute distances from all dem cells to all chemistry locations
    inv_distmat_c <- matrix(NA,
                            nrow = length(elevs_masked),
                            ncol = ncol(c_matrix), #ngauges
                            dimnames = list(NULL,
                                            colnames(c_matrix)))

    dem_wb_all_na <- dem_wb
    terra::values(dem_wb_all_na) <- NA
    dem_wb_all_na <- terra::rast(dem_wb_all_na)
    for(k in 1:ncol(c_matrix)){
        dk <- filter(data_locations,
                     site_code == colnames(c_matrix)[k])

        inv_dists_site <- 1 / terra::distance(dem_wb_all_na, terra::vect(dk))^2 %>%
            terra::values(.)
        inv_dists_site <- inv_dists_site[! is.na(elevs)] #drop elevs not included in mask
        inv_distmat_c[, k] <- inv_dists_site
    }

    #calculate watershed mean concentration and flux at every timestep
    ptm <- proc.time()
    ntimesteps <- nrow(c_matrix)
    ws_mean_conc <- ws_mean_flux <- rep(NA, ntimesteps)

    for(k in 1:ntimesteps){

        ## GET CHEMISTRY FOR ALL CELLS IN TIMESTEP k

        #assign cell weights as normalized inverse squared distances (c)
        ck <- t(c_matrix[k, , drop = FALSE])
        inv_distmat_c_sub <- inv_distmat_c[, ! is.na(ck), drop=FALSE]
        ck <- ck[! is.na(ck), , drop=FALSE]
        weightmat_c <- do.call(rbind,
                               unlist(apply(inv_distmat_c_sub,
                                            1,
                                            function(x) list(x / sum(x))),
                                      recursive = FALSE))

        if(ncol(weightmat_c) == 0){
            ws_mean_conc[k] <- NA_real_ %>% errors::set_errors(NA)
            ws_mean_flux[k] <- NA_real_ %>% errors::set_errors(NA)
            next
        }

        #perform vectorized idw (c)
        ck[is.na(ck)] <- 0
        c_idw <- weightmat_c %*% ck

        #reapply uncertainty dropped by `%*%`
        errors::errors(c_idw) <- weightmat_c %*% matrix(errors::errors(ck),
                                                        nrow = nrow(ck))

        ## GET FLUX FOR ALL CELLS; THEN AVERAGE CELLS FOR PCHEM, PFLUX, PRECIP

        #calculate flux for every cell:
        #   This is how we'd calcualate flux as kg/(ha * d):
        #       mm/d * mg/L * m/1000mm * kg/1,000,000mg * 1000L/m^(2 + 1) * 10,000m^2/ha
        #       therefore, kg/(ha * d) = mm * mg/L / d / 100 = (mm * mg) / (d * L * 100)
        #   But stream_flux_inst is not scaled by area when it's derived (that
        #   happens later), so we're going to derive precip_flux_inst
        #   as unscaled here, and then both can be scaled the same way in
        #   scale_flux_by_area. So we calculate flux in kg/(ha * d) as above,
        #   then multiply by watershed area in hectares.

        quickref_ind <- as.character(quickref_inds[k])
        #              mg/L        mm/day                          ha
        flux_interp <- c_idw * precip_quickref[[quickref_ind]] * ws_area / 100

        #calculate watershed averages (work around error drop)
        ws_mean_conc[k] <- mean(c_idw, na.rm=TRUE)
        ws_mean_flux[k] <- mean(flux_interp, na.rm=TRUE)
        errors::errors(ws_mean_conc)[k] <- mean(errors::errors(c_idw), na.rm=TRUE)
        errors::errors(ws_mean_flux)[k] <- mean(errors::errors(flux_interp), na.rm=TRUE)
    }

    # compare_interp_methods()

    ws_means <- tibble(datetime = common_datetimes,
                       site_code = stream_site_code,
                       var = output_varname,
                       concentration = ws_mean_conc,
                       flux = ws_mean_flux,
                       ms_status = d_status,
                       ms_interp = d_interp)

    return(ws_means)
}

idw_log_wb <- function(verbose, site_code, i, nw){

    if(! verbose) return()

    msg <- glue::glue('site: {s} ({ii}/{w})',
                      s = site_code,
                      ii = i,
                      w = nw)

    message(msg)
}

err_df_to_matrix <- function(df){

    if(! all(sapply(df, class) %in% c('errors', 'numeric'))){
        stop('all columns of df must be of class "errors" or "numeric"')
    }

    errmat <- as.matrix(as.data.frame(lapply(df, errors::errors)))
    M <- as.matrix(df)
    errors::errors(M) <- errmat

    return(M)
}

write_precip_quickref <- function(precip_idw_list,
                                  macrosheds_root,
                                  site_code,
                                  chunkdtrange){
    # timestep){

    #allows precip values computed by shortcut_idw for each watershed
    #   raster cell to be reused by shortcut_idw_concflux_v2

    quickref_dir <- glue::glue('{mr}/precip_idw_quickref/',
                               mr = macrosheds_root)

    dir.create(path = quickref_dir,
               showWarnings = FALSE,
               recursive = TRUE)

    chunkfile <- paste(strftime(chunkdtrange[1],
                                format = '%Y-%m-%d %H:%M:%S',
                                tz = 'UTC'),
                       strftime(chunkdtrange[2],
                                format = '%Y-%m-%d %H:%M:%S',
                                tz = 'UTC'),
                       sep = '_')

    chunkfile <- stringr::str_replace_all(chunkfile, ':', '-')

    saveRDS(object = precip_idw_list,
            file = glue::glue('{qd}/{cf}', #omitting extension for easier parsing
                              qd = quickref_dir,
                              cf = chunkfile))
}

choose_projection <- function(lat = NULL,
                              long = NULL,
                              unprojected = FALSE){

    #TODO: CHOOSE PROJECTIONS MORE CAREFULLY

    if(unprojected){
        PROJ4 <- glue::glue('+proj=longlat +datum=WGS84 +no_defs ',
                            '+ellps=WGS84 +towgs84=0,0,0')
        return(PROJ4)
    }

    if(is.null(lat) || is.null(long)){
        stop('If projecting, lat and long are required.')
    }

    abslat <- abs(lat)

    # if(abslat < 23){ #tropical
    #     PROJ4 = glue::glue('+proj=laea +lon_0=', long)
    #              # ' +datum=WGS84 +units=m +no_defs')
    # } else { #temperate or polar
    #     PROJ4 = glue::glue('+proj=laea +lat_0=', lat, ' +lon_0=', long)
    # }

    #this is what the makers of https://projectionwizard.org/# use to choose
    #a suitable projection: https://rdrr.io/cran/rCAT/man/simProjWiz.html
    # THIS WORKS (PROJECTS STUFF), BUT CAN'T BE READ AUTOMATICALLY BY sf::st_read
    if(abslat < 70){ #tropical or temperate
        PROJ4 <- glue::glue('+proj=cea +lon_0={lng} +lat_ts=0 +x_0=0 +y_0=0 ',
                            '+ellps=WGS84 +datum=WGS84 +units=m +no_defs',
                            lng = long)
    } else { #polar
        PROJ4 <- glue::glue('+proj=laea +lat_0={lt} +lon_0={lng} +x_0=0 +y_0=0 ',
                            '+ellps=WGS84 +datum=WGS84 +units=m +no_defs',
                            lt = lat,
                            lng = long)
    }

    ## UTM/UPS would be nice for watersheds that don't fall on more than two zones
    ## (incomplete)
    # if(lat > 84 || lat < -80){ #polar; use Universal Polar Stereographic (UPS)
    #     PROJ4 <- glue::glue('+proj=ups +lon_0=', long)
    #              # ' +datum=WGS84 +units=m +no_defs')
    # } else { #not polar; use UTM
    #     PROJ4 <- glue::glue('+proj=utm +lat_0=', lat, ' +lon_0=', long)
    # }

    ## EXTRA CODE FOR CHOOSING PROJECTION BY LATITUDE ONLY
    # if(abslat < 23){ #tropical
    #     PROJ4 <- 9835 #Lambert cylindrical equal area (ellipsoidal; should spherical 9834 be used instead?)
    # } else if(abslat > 23 && abslat < 66){ # middle latitudes
    #     PROJ4 <- 5070 #albers equal area conic
    # } else { #polar (abslat >= 66)
    #     PROJ4 <- 9820 #lambert equal area azimuthal
    #     # PROJ4 <- 1027 #lambert equal area azimuthal (spherical)
    # }
    # PROJ4 <- 3857 #WGS 84 / Pseudo-Mercator
    # PROJ4 <- 2163

    return(PROJ4)
}

read_precip_quickref <- function(out_path,
                                 site_code,
                                 dtrange){
    # timestep){

    #allows precip values computed by shortcut_idw for each watershed
    #   raster cell to be reused by shortcut_idw_concflux_v2. These values are
    #   in mm/day

    quickref_dir <- glue::glue('{ms}/precip_idw_quickref',
                               ms = out_path)

    quickref_chunks <- list.files(quickref_dir)

    refranges <- lapply(quickref_chunks,
                        function(x){
                            as.POSIXct(strsplit(x, '_')[[1]],
                                       tz = 'UTC')
                        }) %>%
        plyr::ldply(function(y){
            data.frame(startdt = y[1],
                       enddt = y[2])
        }) %>%
        mutate(ref_ind = 1:n())

    refranges_sel <- refranges %>%
        filter((startdt >= dtrange[1] & enddt <= dtrange[2]) |
                   (startdt < dtrange[1] & enddt >= dtrange[1]) |
                   (enddt > dtrange[2] & startdt <= dtrange[2]))
    #redundant?
    # (startdt > dtrange[1] & startdt <= dtrange[2] & enddt > dtrange[2]) |
    # (startdt < dtrange[1] & enddt < dtrange[2] & enddt >= dtrange[1]))

    if(nrow(refranges_sel) == 0){
        return(list('0' = 'NO QUICKREF AVAILABLE'))
    }

    #handle the case where an end of dtrange falls right between the start and
    #end dates of a quickref file. this is possible because precip dates can
    #be shifted (replaced with pchem dates) inside precip_pchem_pflux_idw2
    ref_ind_range <- range(refranges_sel$ref_ind)

    if(dtrange[1] < refranges_sel$startdt[1] && ref_ind_range[1] > 1){

        refranges_sel <- bind_rows(refranges[ref_ind_range[1] - 1, ],
                                   refranges_sel)
    }

    if(dtrange[2] > refranges_sel$enddt[nrow(refranges_sel)] &&
       ref_ind_range[2] < nrow(refranges)){

        refranges_sel <- bind_rows(refranges_sel,
                                   refranges[ref_ind_range[2] + 1, ])
    }

    quickref <- list()
    for(i in 1:nrow(refranges_sel)){

        fn <- paste(strftime(refranges_sel$startdt[i],
                             format = '%Y-%m-%d %H-%M-%S',
                             tz = 'UTC'),
                    strftime(refranges_sel$enddt[i],
                             format = '%Y-%m-%d %H-%M-%S',
                             tz = 'UTC'),
                    sep = '_')

        qf <- readRDS(glue::glue('{qd}/{f}',
                                 qd = quickref_dir,
                                 f = fn))

        quickref <- append(quickref, qf)
    }

    #for some reason the first ref sometimes gets duplicated?
    quickref <- quickref[! duplicated(names(quickref))]

    return(quickref)
}

approxjoin_datetime <- function(x,
                                y,
                                rollmax = '7:30',
                                keep_datetimes_from = 'x',
                                indices_only = FALSE){
    #direction = 'forward'){

    #x and y: macrosheds standard tibbles with only one site_code,
    #   which must be the same in x and y. Nonstandard tibbles may also work,
    #   so long as they have datetime columns, but the only case where we need
    #   this for other tibbles is inside precip_pchem_pflux_idw, in which case
    #   indices_only == TRUE, so it's not really set up for general-purpose joining
    #rollmax: the maximum snap time for matching elements of x and y.
    #   either '7:30' for continuous data or '12:00:00' for grab data
    #direction [REMOVED]: either 'forward', meaning elements of x will be rolled forward
    #   in time to match the next y, or 'backward', meaning elements of
    #   x will be rolled back in time to reach the previous y
    #keep_datetimes_from: string. either 'x' or 'y'. the datetime column from
    #   the corresponding tibble will be kept, and the other will be dropped
    #indices_only: logical. if TRUE, a join is not performed. rather,
    #   the matching indices from each tibble are returned as a named list of vectors..

    #good datasets for testing this function:
    # x <- tribble(
    #     ~datetime, ~site_code, ~var, ~val, ~ms_status, ~ms_interp,
    #     '1968-10-09 04:42:00', 'GSWS10', 'GN_alk', set_errors(27.75, 1), 0, 0,
    #     '1968-10-09 04:44:00', 'GSWS10', 'GN_alk', set_errors(21.29, 1), 0, 0,
    #     '1968-10-09 04:47:00', 'GSWS10', 'GN_alk', set_errors(21.29, 1), 0, 0,
    #     '1968-10-09 04:59:59', 'GSWS10', 'GN_alk', set_errors(16.04, 1), 0, 0,
    #     '1968-10-09 05:15:01', 'GSWS10', 'GN_alk', set_errors(17.21, 1), 1, 0,
    #     '1968-10-09 05:30:59', 'GSWS10', 'GN_alk', set_errors(16.50, 1), 0, 0) %>%
    # mutate(datetime = as.POSIXct(datetime, tz = 'UTC'))
    # y <- tribble(
    #     ~datetime, ~site_code, ~var, ~val, ~ms_status, ~ms_interp,
    #     '1968-10-09 04:00:00', 'GSWS10', 'GN_alk', set_errors(1.009, 1), 1, 0,
    #     '1968-10-09 04:15:00', 'GSWS10', 'GN_alk', set_errors(2.009, 1), 1, 1,
    #     '1968-10-09 04:30:00', 'GSWS10', 'GN_alk', set_errors(3.009, 1), 1, 1,
    #     '1968-10-09 04:45:00', 'GSWS10', 'GN_alk', set_errors(4.009, 1), 1, 1,
    #     '1968-10-09 05:00:00', 'GSWS10', 'GN_alk', set_errors(5.009, 1), 1, 1,
    #     '1968-10-09 05:15:00', 'GSWS10', 'GN_alk', set_errors(6.009, 1), 1, 1) %>%
    #     mutate(datetime = as.POSIXct(datetime, tz = 'UTC'))

    check_suggested_pkgs(c('data.table'))

    #tests
    if('site_code' %in% colnames(x) && length(unique(x$site_code)) > 1){
        stop('Only one site_code allowed in x at the moment')
    }
    if('var' %in% colnames(x) && length(unique(drop_var_prefix(x$var))) > 1){
        stop('Only one var allowed in x at the moment (not including prefix)')
    }
    if('site_code' %in% colnames(y) && length(unique(y$site_code)) > 1){
        stop('Only one site_code allowed in y at the moment')
    }
    if('var' %in% colnames(y) && length(unique(drop_var_prefix(y$var))) > 1){
        stop('Only one var allowed in y at the moment (not including prefix)')
    }
    if('site_code' %in% colnames(x) &&
       'site_code' %in% colnames(y) &&
       x$site_code[1] != y$site_code[1]) stop('x and y site_code must be the same')
    if(! rollmax %in% c('7:30', '12:00:00')) stop('rollmax must be "7:30" or "12:00:00"')
    # if(! direction %in% c('forward', 'backward')) stop('direction must be "forward" or "backward"')
    if(! keep_datetimes_from %in% c('x', 'y')) stop('keep_datetimes_from must be "x" or "y"')
    if(! 'datetime' %in% colnames(x) || ! 'datetime' %in% colnames(y)){
        stop('both x and y must have "datetime" columns containing POSIXct values')
    }
    if(! is.logical(indices_only)) stop('indices_only must be a logical')

    #deal with the case of x or y being a specialized "flow" tibble
    # x_is_flowtibble <- y_is_flowtibble <- FALSE
    # if('flow' %in% colnames(x)) x_is_flowtibble <- TRUE
    # if('flow' %in% colnames(y)) y_is_flowtibble <- TRUE
    # if(x_is_flowtibble && ! y_is_flowtibble){
    #     varname <- y$var[1]
    #     y$var = NULL
    # } else if(y_is_flowtibble && ! x_is_flowtibble){
    #     varname <- x$var[1]
    #     x$var = NULL
    # } else if(! x_is_flowtibble && ! y_is_flowtibble){
    #     varname <- x$var[1]
    #     x$var = NULL
    #     y$var = NULL
    # } else {
    #     stop('x and y are both "flow" tibbles. There should be no need for this')
    # }
    # if(x_is_flowtibble) x <- rename(x, val = flow)
    # if(y_is_flowtibble) y <- rename(y, val = flow)

    #data.table doesn't work with the errors package, so error needs
    #to be separated into its own column. also give same-name columns suffixes

    if('val' %in% colnames(x)){

        x <- x %>%
            mutate(err = errors::errors(val),
                   val = errors::drop_errors(val)) %>%
            rename_with(.fn = ~paste0(., '_x'),
                        .cols = everything()) %>%
            data.table::as.data.table()

        y <- y %>%
            mutate(err = errors::errors(val),
                   val = errors::drop_errors(val)) %>%
            rename_with(.fn = ~paste0(., '_y'),
                        .cols = everything()) %>%
            data.table::as.data.table()

    } else {

        if(indices_only){
            x <- rename(x, datetime_x = datetime) %>%
                mutate(across(where(~inherits(., 'errors')),
                              ~errors::drop_errors(.))) %>%
                data.table::as.data.table()

            y <- rename(y, datetime_y = datetime) %>%
                mutate(across(where(~inherits(., 'errors')),
                              ~errors::drop_errors(.))) %>%
                data.table::as.data.table()
        } else {
            stop('this case not yet handled')
        }

    }

    #alternative implementation of the "on" argument in data.table joins...
    #probably more flexible, so leaving it here in case we need to do something crazy
    # data.table::setkeyv(x, 'datetime')
    # data.table::setkeyv(y, 'datetime')

    #convert the desired maximum roll distance from string to integer seconds
    rollmax <- ifelse(test = rollmax == '7:30',
                      yes = 7 * 60 + 30,
                      no = 12 * 60 * 60)

    #leaving this here in case the nearest neighbor join implemented below is too
    #slow. then we can fall back to a basic rolling join with a maximum distance
    # rollmax <- ifelse(test = direction == 'forward',
    #                   yes = -rollmax,
    #                   no = rollmax)
    #rollends will move the first/last value of x in the opposite `direction` if necessary
    # joined <- y[x, on = 'datetime', roll = rollmax, rollends = c(TRUE, TRUE)]

    #create columns in x that represent the snapping window around each datetime
    x[, `:=` (datetime_min = datetime_x - rollmax,
              datetime_max = datetime_x + rollmax)]
    y[, `:=` (datetime_y_orig = datetime_y)] #datetime col will be dropped from y

    # if(indices_only){
    #     y_indices <- y[x,
    #                    on = .(datetime_y <= datetime_max,
    #                           datetime_y >= datetime_min),
    #                    which = TRUE]
    #     return(y_indices)
    # }

    #join x rows to y if y's datetime falls within the x range
    joined <- y[x, on = .(datetime_y <= datetime_max,
                          datetime_y >= datetime_min)]
    joined <- na.omit(joined, cols = 'datetime_y_orig') #drop rows without matches

    #for any datetimes in x or y that were matched more than once, keep only
    #the nearest match
    joined[, `:=` (datetime_match_diff = abs(datetime_x - datetime_y_orig))]
    joined <- joined[, .SD[which.min(datetime_match_diff)], by = datetime_x]
    joined <- joined[, .SD[which.min(datetime_match_diff)], by = datetime_y_orig]

    if(indices_only){
        y_indices <- which(y$datetime_y %in% joined$datetime_y_orig)
        x_indices <- which(x$datetime_x %in% joined$datetime_x)
        return(list(x = x_indices, y = y_indices))
    }

    #drop and rename columns (data.table makes weird name modifications)
    if(keep_datetimes_from == 'x'){
        joined[, c('datetime_y', 'datetime_y.1', 'datetime_y_orig', 'datetime_match_diff') := NULL]
        data.table::setnames(joined, 'datetime_x', 'datetime')
    } else {
        joined[, c('datetime_x', 'datetime_y.1', 'datetime_y', 'datetime_match_diff') := NULL]
        data.table::setnames(joined, 'datetime_y_orig', 'datetime')
    }

    #restore error objects, var column, original column names (with suffixes).
    #original column order
    joined <- tibble::as_tibble(joined) %>%
        mutate(val_x = errors::set_errors(val_x, err_x),
               val_y = errors::set_errors(val_y, err_y)) %>%
        dplyr::select(-err_x, -err_y)
    # mutate(var = !!varname)

    # if(x_is_flowtibble) joined <- rename(joined,
    #                                      flow = val_x,
    #                                      ms_status_flow = ms_status_x,
    #                                      ms_interp_flow = ms_interp_x)
    # if(y_is_flowtibble) joined <- rename(joined,
    #                                      flow = val_y,
    #                                      ms_status_flow = ms_status_y,
    #                                      ms_interp_flow = ms_interp_y)

    # if(! sum(grepl('^val_[xy]$', colnames(joined))) > 1){
    #     joined <- rename(joined, val = matches('^val_[xy]$'))
    # }

    joined <- dplyr::select(joined,
                            datetime,
                            # matches('^val_?[xy]?$'),
                            # any_of('flow'),
                            starts_with('site_code'),
                            any_of(c(starts_with('var_'), matches('^var$'))),
                            any_of(c(starts_with('val_'), matches('^val$'))),
                            starts_with('ms_status_'),
                            starts_with('ms_interp_'))

    return(joined)
}

# general interp helpers ####

ms_linear_interpolate <- function(d, interval, max_samples_to_impute){

    #d: a ms tibble with no ms_interp column (this will be created)

    #TODO: prefer imputeTS::na_seadec when there are >=2 non-NA datapoints.
    #   There are commented sections that begin this work, but we still would
    #   need to calculate start and end when creating a ts() object. we'd
    #   also need to separate uncertainty from the val column before converting
    #   to ts. here is the line that could be added to this documentation
    #   if we ever implement na_seadec:
    #For linear interpolation with
    #   seasonal decomposition, interval will also be used to determine
    #   the fraction of the sampling period between samples.

    if(length(unique(d$site_code)) > 1){
        stop(paste('ms_linear_interpolate is not designed to handle datasets',
                   'with more than one site.'))
    }

    if(length(unique(d$var)) > 1){
        stop(paste('ms_linear_interpolate is not designed to handle datasets',
                   'with more than one variable'))
    }

    var <- ms_drop_var_prefix_(d$var[1])

    d <- arrange(d, datetime)
    ms_interp_column <- is.na(d$val)

    d_interp <- d %>%
        mutate(
            #carry ms_status to any rows that have just been populated (probably
            #redundant now, but can't hurt)
            ms_status <- imputeTS::na_locf(ms_status,
                                           na_remaining = 'rev',
                                           maxgap = max_samples_to_impute),
            val = if(sum(! is.na(val)) > 1){
                #linear interp NA vals
                imputeTS::na_interpolation(val,
                                           maxgap = max_samples_to_impute)
                #unless not enough data in group; then do nothing
            } else val,
            val_err = if(sum(! is.na(val_err)) > 1){
                #linear interp NA vals
                imputeTS::na_interpolation(val_err,
                                           maxgap = max_samples_to_impute)
                #unless not enough data in group; then do nothing
            } else val_err
        ) %>%
        dplyr::select(any_of(c('datetime', 'site_code', 'var', 'val', 'ms_status', 'ms_interp', 'val_err'))) %>%
        arrange(site_code, var, datetime)

    d_interp$ms_status[is.na(d_interp$ms_status)] = 0
    ms_interp_column <- ms_interp_column & ! is.na(d_interp$val)
    d_interp$ms_interp <- as.numeric(ms_interp_column)
    d_interp <- filter(d_interp,
                       ! is.na(val))

    return(d_interp)
}

ms_zero_interpolate <- function(d, interval, max_samples_to_impute){

    #d: a ms tibble with no ms_interp column (this will be created)
    #interval: the sampling interval (either '15 min' or '1 day').

    #for precip only, and only relevant at konza (so far)

    #fills gaps up to maxgap (determined automatically), then removes missing values

    if(length(unique(d$site_code)) > 1){
        stop(paste('ms_zero_interpolate is not designed to handle datasets',
                   'with more than one site.'))
    }

    if(length(unique(d$var)) > 1){
        stop(paste('ms_zero_interpolate is not designed to handle datasets',
                   'with more than one variable'))
    }

    var <- ms_drop_var_prefix_(d$var[1])

    d <- arrange(d, datetime)
    ms_interp_column <- is.na(d$val)

    d_interp <- d %>%
        mutate(

            ms_status = imputeTS::na_replace(ms_status,
                                             fill = 1,
                                             maxgap = max_samples_to_impute),

            val = if(sum(! is.na(val)) > 1){

                #nocb interp NA vals
                imputeTS::na_replace(val,
                                     fill = 0,
                                     maxgap = max_samples_to_impute)

                #unless not enough data in group; then do nothing
            } else val
        ) %>%
        dplyr::select(any_of(c('datetime', 'site_code', 'var', 'val', 'ms_status', 'ms_interp', 'val_err'))) %>%
        arrange(site_code, var, datetime)

    d_interp$ms_status[is.na(d_interp$ms_status)] = 0
    ms_interp_column <- ms_interp_column & ! is.na(d_interp$val)
    d_interp$ms_interp <- as.numeric(ms_interp_column)
    d_interp <- filter(d_interp,
                       ! is.na(val))

    return(d_interp)
}

ms_nocb_interpolate <- function(d, interval, max_samples_to_impute){

    #d: a ms tibble with no ms_interp column (this will be created)
    #interval: the sampling interval (either '15 min' or '1 day').

    #for pchem only, where measured concentrations represent aggregated
    #concentration over the measurement period.

    #fills gaps up to maxgap (determined automatically), then removes missing values


    if(length(unique(d$site_code)) > 1){
        stop(paste('ms_nocb_interpolate is not designed to handle datasets',
                   'with more than one site.'))
    }

    if(length(unique(d$var)) > 1){
        stop(paste('ms_nocb_interpolate is not designed to handle datasets',
                   'with more than one variable'))
    }

    var <- ms_drop_var_prefix_(d$var[1])

    d <- arrange(d, datetime)
    ms_interp_column <- is.na(d$val)

    d_interp <- d %>%
        mutate(
            #carry ms_status to any rows that have just been populated (probably
            #redundant now, but can't hurt)
            ms_status <- imputeTS::na_locf(ms_status,
                                           option = 'nocb',
                                           na_remaining = 'rev',
                                           maxgap = max_samples_to_impute),
            val = if(sum(! is.na(val)) > 1){
                #nocb interp NA vals
                imputeTS::na_locf(val,
                                  option = 'nocb',
                                  na_remaining = 'keep',
                                  maxgap = max_samples_to_impute)
                #unless not enough data in group; then do nothing
            } else val,
            val_err = if(sum(! is.na(val_err)) > 1){
                #do the same for uncertainty
                imputeTS::na_locf(val_err,
                                  option = 'nocb',
                                  na_remaining = 'keep',
                                  maxgap = max_samples_to_impute)
            } else val_err
        ) %>%
        dplyr::select(any_of(c('datetime', 'site_code', 'var', 'val', 'ms_status', 'ms_interp', 'val_err'))) %>%
        arrange(site_code, var, datetime)

    d_interp$ms_status[is.na(d_interp$ms_status)] = 0
    ms_interp_column <- ms_interp_column & ! is.na(d_interp$val)
    d_interp$ms_interp <- as.numeric(ms_interp_column)
    d_interp <- filter(d_interp,
                       ! is.na(val))

    return(d_interp)
}

ms_nocb_mean_interpolate <- function(d, interval, max_samples_to_impute){

    #d: a ms tibble with no ms_interp column (this will be created)
    #interval: the sampling interval (either '15 min' or '1 day').

    #for pchem only, where measured concentrations represent aggregated
    #concentration over the measurement period.

    #fills gaps up to maxgap (determined automatically), then removes missing values


    if(length(unique(d$site_code)) > 1){
        stop(paste('ms_nocb_mean_interpolate is not designed to handle datasets',
                   'with more than one site.'))
    }

    if(length(unique(d$var)) > 1){
        stop(paste('ms_nocb_mean_interpolate is not designed to handle datasets',
                   'with more than one variable'))
    }

    var <- ms_drop_var_prefix_(d$var[1])

    d <- arrange(d, datetime)
    ms_interp_column <- is.na(d$val)

    d_interp <- d %>%
        mutate(
            #carry ms_status to any rows that have just been populated (probably
            #redundant now, but can't hurt)
            ms_status <- imputeTS::na_locf(ms_status,
                                           option = 'nocb',
                                           na_remaining = 'rev',
                                           maxgap = max_samples_to_impute),
            val = if(sum(! is.na(val)) > 1){
                #nocb interp NA vals
                imputeTS::na_locf(val,
                                  option = 'nocb',
                                  na_remaining = 'keep',
                                  maxgap = max_samples_to_impute)
                #unless not enough data in group; then do nothing
            } else val,
            val_err = if(sum(! is.na(val_err)) > 1){
                #do the same for uncertainty
                imputeTS::na_locf(val_err,
                                  option = 'nocb',
                                  na_remaining = 'keep',
                                  maxgap = max_samples_to_impute)
            } else val_err
        ) %>%
        dplyr::select(any_of(c('datetime', 'site_code', 'var', 'val', 'ms_status', 'ms_interp', 'val_err'))) %>%
        arrange(site_code, var, datetime)


    #identify series of records that need to be divided by their n
    laginterp <- lag(d_interp$ms_interp)
    laginterp[1] <- d_interp$ms_interp[1]
    laginterp <- as.numeric(laginterp | d_interp$ms_interp)

    # err_ <- errors::errors(d_interp$val)
    # d_interp$val <- d_interp$val
    vals_interped <- d_interp$val * laginterp
    err_interped <- d_interp$val_err * laginterp

    #use run length encoding to do the division quickly
    vals_new <- rle2(vals_interped) %>%
        mutate(values = values / lengths) %>%
        dplyr::select(lengths, values) %>%
        as.list()
    class(vals_new) <- 'rle'
    vals_new <- inverse.rle(vals_new)

    #same for uncertainty
    err_new <- rle2(err_interped) %>%
        mutate(values = values / lengths) %>%
        dplyr::select(lengths, values) %>%
        as.list()
    class(err_new) <- 'rle'
    err_new <- inverse.rle(err_new)

    real_vals_new <- vals_new != 0
    d_interp$val[real_vals_new] <- vals_new[real_vals_new]
    d_interp$val_err <- err_new

    d_interp$ms_status[is.na(d_interp$ms_status)] = 0

    return(d_interp)
}

# ms_read_csv helpers ####
gsub_v <- function(pattern, replacement_vec, x){

    #just like the first three arguments to gsub, except that
    #   replacement is now a vector of replacements.
    #return a vector of the same length as replacement_vec, where
    #   each element in replacement_vec has been used once

    subbed <- sapply(replacement_vec,
                     function(v) gsub(pattern = pattern,
                                      replacement = v,
                                      x = x),
                     USE.NAMES = FALSE)

    return(subbed)
}

resolve_datetime <- function(d, #needs updates if we ever re-implement!
                             datetime_colnames,
                             datetime_formats,
                             datetime_tz,
                             optional){

    #d: a data.frame or tibble with at least one date or time column
    #   (all date and/or time columns must contain character strings,
    #   not parsed date/time/datetime objects).
    #datetime_colnames: character vector; column names that contain
    #   relevant datetime information.
    #datetime_formats: character vector; datetime parsing tokens
    #   (like '%A, %Y-%m-%d %I:%M:%S %p' or '%j') corresponding to the
    #   elements of datetime_colnames.
    #datetime_tz: character; time zone of the returned datetime column.
    #optional: character vector; see dt_format_to_regex.

    #return value: d, but with a "datetime" column containing POSIXct datetimes
    #   and without the input datetime columns

    dt_tb <- tibble(basecol = rep(NA, nrow(d)))
    for(i in 1:length(datetime_colnames)){
        dt_comps <- stringr::str_match_all(string = datetime_formats[i],
                                           pattern = '%([a-zA-Z])')[[1]][,2]
        dt_regex <- dt_format_to_regex(datetime_formats[i],
                                       optional = optional)

        # for loop handling with number-of-character issues
        for(match in grepl("m|e|d|H|I|M|S", dt_comps)) {
            if(match){
                for (dt_entry in 1:nrow(d[datetime_colnames[i]])) {
                    if(! is.na(d[datetime_colnames[i]][dt_entry,])){
                        if(numbers_only(d[datetime_colnames[i]][dt_entry,])){
                            if(nchar(d[datetime_colnames[i]][dt_entry,]) == 1) {
                                d[datetime_colnames[i]][dt_entry,] <- paste0(0, d[datetime_colnames[i]][dt_entry,])
                            } else if(nchar(d[datetime_colnames[i]][dt_entry,]) ==3){
                                d[datetime_colnames[i]][dt_entry,] <- paste0(0, d[datetime_colnames[i]][dt_entry,])
                            }
                        }
                    }
                }
            }
        }

        dt_tb <- d %>%
            dplyr::select(one_of(datetime_colnames[i])) %>%
            tidyr::extract(col = !!datetime_colnames[i],
                           into = dt_comps,
                           regex = dt_regex,
                           remove = TRUE,
                           convert = FALSE) %>%
            bind_cols(dt_tb)
    }

    dt_tb$basecol = NULL

    #fill in defaults if applicable:
    #(12 for hour, 00 for minute and second, PM for AM/PM)
    dt_tb <- dt_tb %>%
        mutate(
            # across(any_of(c('H', 'I')), ~ifelse(nchar(.x) < 2, paste0(0, .x), .x)),
            across(any_of(c('H', 'I')), ~ifelse(is.na(.x), '12', .x)),
            # across(any_of(c('M', 'S')), ~ifelse(nchar(.x) < 2, paste0(0, .x), .x)),
            across(any_of(c('M', 'S')), ~ifelse(is.na(.x), '00', .x)),
            across(any_of('p'), ~ifelse(is.na(.x), 'PM', .x)))

    #resolve datetime structure into POSIXct
    datetime_formats_split <- stringr::str_extract_all(datetime_formats,
                                                       '%[a-zA-Z]') %>%
        unlist()

    dt_col_order <- match(paste0('%',
                                 colnames(dt_tb)),
                          datetime_formats_split)

    if('H' %in% colnames(dt_tb)){
        dt_tb$H[dt_tb$H == ''] <- '00'
    }
    if('M' %in% colnames(dt_tb)){
        dt_tb$M[dt_tb$M == ''] <- '00'
    }
    if('S' %in% colnames(dt_tb)){
        dt_tb$S[dt_tb$S == ''] <- '00'
    }
    if('I' %in% colnames(dt_tb)){
        dt_tb$I[dt_tb$I == ''] <- '00'
    }
    if('P' %in% colnames(dt_tb)){
        dt_tb$P[dt_tb$P == ''] <- 'AM'
    }
    dt_tb <- dt_tb %>%
        tidyr::unite(col = 'datetime',
                     everything(),
                     sep = ' ',
                     remove = TRUE) %>%
        mutate(datetime = as_datetime(datetime,
                                      format = paste(datetime_formats_split[dt_col_order],
                                                     collapse = ' '),
                                      tz = datetime_tz) %>%
                   with_tz(tz = 'UTC'))
    d <- d %>%
        bind_cols(dt_tb) %>%
        dplyr::select(-one_of(datetime_colnames), datetime) %>% #in case 'datetime' is in datetime_colnames
        relocate(datetime)

    return(d)
}


identify_sampling <- function(df,
                              is_sensor,
                              date_col = 'datetime',
                              network,
                              domain,
                              prodname_ms,
                              sampling_type){

    #TODO: for hbef, identify_sampling is writing sites names as 1 not w1

    #is_sensor: named logical vector. see documention for
    #   ms_read_raw_csv, but note that an unnamed logical vector of length one
    #   cannot be used here. also note that the original variable/flag column names
    #   from the raw file are converted to canonical macrosheds names by
    #   ms_read_raw_csv before it passes is_sensor to identify_sampling.

    #checks
    if(any(! is.logical(is_sensor))){
        stop('all values in is_sensor must be logical.')
    }

    svh_names <- names(is_sensor)
    if(is.null(svh_names) || any(is.na(svh_names))){
        stop('all elements of is_sensor must be named.')
    }

    #parse is_sensor into a character vector of sample regimen codes
    is_sensor <- ifelse(is_sensor, 'S', 'N')

    #set up directory system to store sample regimen metadata
    sampling_dir <- glue::glue('data/{n}/{d}',
                               n = network,
                               d = domain)

    sampling_file <- glue::glue('data/{n}/{d}/sampling_type.json',
                                n = network,
                                d = domain)

    master <- try(jsonlite::fromJSON(readr::read_file(sampling_file)),
                  silent = TRUE)

    if('try-error' %in% class(master)){
        dir.create(sampling_dir, recursive = TRUE)
        file.create(sampling_file)
        master <- list()
    }

    #determine and record sample regimen for each variable
    col_names <- colnames(df)

    data_cols <- grep(pattern = '__[|]dat',
                      col_names,
                      value = TRUE)

    flg_cols <- grep(pattern = '__[|]flg',
                     col_names,
                     value = TRUE)

    site_codes <- unique(df$site_code)

    for(p in 1:length(data_cols)){

        # var_name <- stringr::str_split_fixed(data_cols[p], '__', 2)[1]

        # df_var <- df %>%
        #     dplyr::select(datetime, !!var_name := .data[[data_cols[p]]], site_code)

        all_sites <- tibble()
        for(i in 1:length(site_codes)){

            # df_site <- df_var %>%
            df_site <- df %>%
                filter(site_code == !!site_codes[i]) %>%
                arrange(datetime)
            # ! is.na(.data[[date_col]]), #NAs here are indicative of bugs we want to fix, so let's let them through
            # ! is.na(.data[[var_name]])) #NAs here are indicative of bugs we want to fix, so let's let them through

            dates <- df_site[[date_col]]
            dif <- diff(dates)
            unit <- attr(dif, 'units')

            conver_mins <- case_when(
                unit %in% c('seconds', 'secs') ~ 0.01666667,
                unit %in% c('minutes', 'mins') ~ 1,
                unit == 'hours' ~ 60,
                unit == 'days' ~ 1440,
                TRUE ~ NA_real_)

            if(is.na(conver_mins)) stop('Weird time unit encountered. address this.')

            dif_mins <- as.numeric(dif) * conver_mins
            dif_mins <- round(dif_mins)

            mode_mins <- Mode(dif_mins)
            mean_mins <- mean(dif_mins, na.rm = T)
            prop_mode_min <- length(dif_mins[dif_mins == mode_mins])/length(dif_mins)

            # remove gaps larger than 90 days (for seasonal sampling)
            dif_mins <- dif_mins[dif_mins < 129600]

            if(length(dif_mins) == 0){
                # This is grab
                g_a <- tibble('site_code' = site_codes[i],
                              'type' = 'G',
                              'starts' = min(dates, na.rm = TRUE),
                              'interval' = mode_mins)
            } else{
                if(prop_mode_min >= 0.3 && mode_mins <= 1440){
                    # This is installed
                    g_a <- tibble('site_code' = site_codes[i],
                                  'type' = 'I',
                                  'starts' = min(dates, na.rm = TRUE),
                                  'interval' = mode_mins)
                } else{
                    if(mean_mins <= 1440){
                        # This is installed (non standard interval like HBEF)
                        g_a <- tibble('site_code' = site_codes[i],
                                      'type' = 'I',
                                      'starts' = min(dates, na.rm = TRUE),
                                      'interval' = mean_mins)
                    } else{
                        # This is grab
                        g_a <- tibble('site_code' = site_codes[i],
                                      'type' = 'G',
                                      'starts' = min(dates, na.rm = TRUE),
                                      'interval' = mean_mins)
                    }
                }
            }

            if(! is.null(sampling_type)){
                g_a <- g_a %>%
                    mutate(type = sampling_type)
            }

            var_name_base <- stringr::str_split(string = data_cols[p],
                                                pattern = '__\\|')[[1]][1]

            g_a <- g_a %>%
                mutate(
                    type = paste0(type,
                                  !!is_sensor[var_name_base]),
                    var = as.character(glue::glue('{ty}_{vb}',
                                                  ty = type,
                                                  vb = var_name_base)))

            master[[prodname_ms]][[var_name_base]][[site_codes[i]]] <-
                list('startdt' = g_a$starts,
                     'type' = g_a$type,
                     'interval' = g_a$interval)

            g_a <- g_a %>%
                mutate(interval = as.character(interval))

            all_sites <- bind_rows(all_sites, g_a)
        }

        #include new prefixes in df column names
        prefixed_varname <- all_sites$var[1]

        dat_colname <- paste0(drop_var_prefix(prefixed_varname),
                              '__|dat')
        flg_colname <- paste0(drop_var_prefix(prefixed_varname),
                              '__|flg')

        data_col_ind <- match(dat_colname,
                              colnames(df))
        flag_col_ind <- match(flg_colname,
                              colnames(df))

        colnames(df)[data_col_ind] <- paste0(prefixed_varname,
                                             '__|dat')
        colnames(df)[flag_col_ind] <- paste0(prefixed_varname,
                                             '__|flg')
    }

    readr::write_file(jsonlite::toJSON(master), sampling_file)

    return(df)
}

# user response helpers ####

get_response_1char <- function(msg,
                               possible_chars,
                               subsequent_prompt = FALSE,
                               response_from_file = NULL){

    #msg: character. a message that will be used to prompt the user
    #possible_chars: character vector of acceptable single-character responses
    #subsequent prompt: not to be set directly. This is handled by
    #   get_response_mchar during recursion.

    if(subsequent_prompt){
        cat(paste('Please choose one of:',
                  paste(possible_chars,
                        collapse = ', '),
                  '\n> '))
    } else {
        cat(msg)
    }

    if(! is.null(response_from_file)){
        ch <- as.character(readLines(con = response_from_file, 1))
        rsps <- readLines(con = response_from_file)
        rsps <- rsps[2:length(rsps)]
        writeLines(rsps, con = response_from_file)
    } else {
        ch <- as.character(readLines(con = stdin(), 1))
    }

    if(length(ch) == 1 && ch %in% possible_chars){
        return(ch)
    } else {
        get_response_1char(msg = msg,
                           possible_chars = possible_chars,
                           subsequent_prompt = TRUE)
    }
}

get_response_mchar <- function(msg,
                               possible_resps,
                               allow_alphanumeric_response = TRUE,
                               subsequent_prompt = FALSE,
                               response_from_file = NULL){

    #msg: character. a message that will be used to prompt the user
    #possible_resps: character vector. If length 1, each character in the response
    #   will be required to match a character in possible_resps, and the return
    #   value will be a character vector of each single-character tokens in the
    #   response. If
    #   length > 1, the response will be required to match an element of
    #   possible_resps exactly, and the response will be returned as-is.
    #allow_alphanumeric_response: logical. If FALSE, the response may not
    #   include both numerals and letters. Only applies when possible_resps
    #   has length 1.
    #subsequent prompt: not to be set directly. This is handled by
    #   get_response_mchar during recursion.

    split_by_character <- ifelse(length(possible_resps) == 1, TRUE, FALSE)

    if(subsequent_prompt){

        if(split_by_character){
            pr <- strsplit(possible_resps, split = '')[[1]]
        } else {
            pr <- possible_resps
        }

        cat(paste('Your options are:',
                  paste(pr,
                        collapse = ', '),
                  '\n> '))
    } else {
        cat(msg)
    }

    if(! is.null(response_from_file)){
        chs <- as.character(readLines(con = response_from_file, 1))
        rsps <- readLines(con = response_from_file)
        rsps <- rsps[2:length(rsps)]
        writeLines(rsps, con = response_from_file)
    } else {
        chs <- as.character(readLines(con = stdin(), 1))
    }

    if(! allow_alphanumeric_response &&
       split_by_character &&
       grepl('[0-9]', chs) &&
       grepl('[a-zA-Z]', chs)){

        cat('Response may not include both letters and numbers.\n> ')
        resp <- get_response_mchar(
            msg = msg,
            possible_resps = possible_resps,
            allow_alphanumeric_response = allow_alphanumeric_response,
            subsequent_prompt = FALSE)

        return(resp)
    }

    if(length(chs)){
        if(split_by_character){

            if(length(possible_resps) != 1){
                stop('possible_resps must be length 1 if split_by_character is TRUE')
            }

            chs <- strsplit(chs, split = '')[[1]]
            possible_resps_split <- strsplit(possible_resps, split = '')[[1]]

            if(all(chs %in% possible_resps_split)){
                return(chs)
            }

        } else {

            if(length(possible_resps) < 2){
                stop('possible_resps must have length > 1 if split_by_character is FALSE')
            }

            if(any(possible_resps == chs)){
                return(chs)
            }
        }
    }

    resp <- get_response_mchar(
        msg = msg,
        possible_resps = possible_resps,
        allow_alphanumeric_response = allow_alphanumeric_response,
        subsequent_prompt = TRUE)

    return(resp)
}

get_response_int <- function(msg,
                             min_val,
                             max_val,
                             subsequent_prompt = FALSE,
                             response_from_file = NULL){

    #msg: character. a message that will be used to prompt the user
    #min_val: int. minimum allowable value, inclusive
    #max_val: int. maximum allowable value, inclusive
    #subsequent prompt: not to be set directly. This is handled by
    #   get_response_int during recursion.

    if(subsequent_prompt){
        cat(glue::glue('Please choose an integer in the range [{minv}, {maxv}].',
                       minv = min_val,
                       maxv = max_val))
    } else {
        cat(msg)
    }

    if(! is.null(response_from_file)){
        nm <- as.numeric(as.character(readLines(con = response_from_file, 1)))
        rsps <- readLines(con = response_from_file)
        rsps <- rsps[2:length(rsps)]
        writeLines(rsps, con = response_from_file)
    } else {
        nm <- as.numeric(as.character(readLines(con = stdin(), 1)))
    }

    if(nm %% 1 == 0 && nm >= min_val && nm <= max_val){
        return(nm)
    } else {
        get_response_int(msg = msg,
                         min_val = min_val,
                         max_val = max_val,
                         subsequent_prompt = TRUE)
    }
}

get_response_enter <- function(msg,
                               response_from_file = NULL){

    #only returns if ENTER is pressed (or if anything is passed by response_from_file

    cat(msg)

    if(! is.null(response_from_file)){
        ch <- as.character(readLines(con = response_from_file, 1))
        rsps <- readLines(con = response_from_file)
        rsps <- rsps[2:length(rsps)]
        writeLines(rsps, con = response_from_file)
    } else {
        ch <- as.character(readLines(con = stdin(), 1))
    }

    return(invisible(NULL))
}

# attribution helpers ####

match_ws_attr_attrib <- function(ws_attrib){

    attr_sources_ <- ms_load_variables('ws_attr') %>%
        filter(variable_code %in% ws_attrib)

    attr_sources1 <- attr_sources_ %>%
        filter(data_source %in% c('USGS', 'MODIS')) %>%
        mutate(prodname = case_when(
            data_source == 'USGS' ~ recode(
                data_class,
                !!!c(hydrology = 'bfi',
                     vegetation = 'start_season',
                     `parent material` = 'geochemical'
                )),
            data_source == 'MODIS' & data_class == 'landcover' ~ 'modis_igbp',
            data_source == 'MODIS' & grepl('lai|fpar', variable_code) ~ 'lai',
            grepl('^ndvi_', variable_code) ~ 'ndvi',
            grepl('^evi_', variable_code) ~ 'evi',
            data_source == 'MODIS' & grepl('cover', variable_code) ~ 'veg_cover',
            data_source == 'MODIS' & grepl('global_500', variable_code) ~ 'gpp_global_500m',
            TRUE ~ variable_code)
        ) %>%
        dplyr::select(data_source, prodname) %>%
        left_join(macrosheds::attrib_ws_data,
                  by = c(data_source = 'primary_source',
                         prodname = 'prodname')) %>%
        distinct()

    attr_sources <- attr_sources_ %>%
        dplyr::select(data_source) %>%
        filter(! is.na(data_source),
               ! data_source %in% c('USGS', 'MODIS')) %>%
        distinct() %>%
        left_join(macrosheds::attrib_ws_data,
                  by = c(data_source = 'primary_source')) %>%
        distinct(data_source, .keep_all = TRUE) %>%
        bind_rows(attr_sources1) %>%
        dplyr::select(prodname, primary_source = data_source, retrieved_from_GEE,
                      doi, license, citation, url, addtl_info)

    return(attr_sources)
}

format_acknowledgements <- function(ts_attrib, ws_attrib, all_ws_attr = FALSE){

    ntw_join <- macrosheds::ms_site_data %>%
        dplyr::select(domain, network_fullname, domain_fullname) %>%
        distinct()

    custom_acks <- ts_attrib %>%
        filter(! is.na(IR_acknowledgement_text)) %>%
        dplyr::select(domain, IR_acknowledgement_text) %>%
        left_join(ntw_join, by = 'domain') %>%
        distinct() %>%
        mutate(network_fullname = ifelse(network_fullname == domain_fullname, '', network_fullname)) %>%
        mutate(txt = paste0(domain_fullname, ' ', network_fullname, ': ', IR_acknowledgement_text)) %>%
        pull(txt)

    relevant_deets <- ts_attrib %>%
        distinct(domain, funding) %>%
        left_join(ntw_join, by = 'domain') %>%
        distinct() %>%
        # bind_rows(tibble(domain='a', domain_fullname = 'a', network_fullname='a', funding='NSF awards: 345, 3535')) %>%
        mutate(network_fullname = ifelse(stringr::str_detect(domain_fullname, network_fullname), '', network_fullname)) %>%
        mutate(txt = paste0(domain_fullname, ' ', network_fullname, ' (', funding, ')'),
               txt = stringr::str_extract(txt, '.+?(?=(?: \\(NA\\)|$))')) %>%
        pull(txt)

    ndeets <- length(relevant_deets)

    if(ndeets){
        relevant_deets <- paste0(1:ndeets, '. ', relevant_deets)
        sepr <- '.'
    } else {
        sepr <- ''
    }

    ack <- glue::glue('Primary data were provided by the following sources:\n{ack_ls}{sepr}',
                      ack_ls = paste(relevant_deets, collapse = '\n'))

    if(length(custom_acks)){
        ack <- paste(ack, paste(custom_acks, collapse = '\n'), sep = '\n')
    }

    if(all_ws_attr){

        ack_ls2 <- paste(unique(macrosheds::attrib_ws_data$primary_source),
                         collapse = ', ')

    } else if(length(ws_attrib)){

        ack_ls2 <- ms_load_variables('ws_attr') %>%
            filter(! is.na(data_class),
                   variable_code %in% ws_attrib) %>%
            pull(data_source) %>%
            unique() %>%
            paste(collapse = ', ')

    } else {
        ack_ls2 <- ''
    }

    if(nchar(ack_ls2)){
        ws_add <- glue::glue('Spatial summary data were derived from layers ',
                             'provided by:\n{ack_ls2}')
        ack <- paste(ack, ws_add, sep = '\n')
    }

    return(ack)
}

format_bibliography <- function(ts_attrib, ws_attrib, all_ws_attr = FALSE){

    #organize bibtex records
    bts <- strsplit(macrosheds::ts_bib, '\n\n')[[1]]
    bts[1] <- stringr::str_replace(bts[1], '\\\n', '')
    bts <- grep('^(?:@misc|@article)', bts, value = TRUE)
    authors <- stringr::str_match(bts, 'author = \\{(.+?)\\},\\\n')[, 2]
    authors <- stringr::str_replace_all(authors, ' and ', '')
    authors <- stringr::str_remove_all(authors, '[[[:punct:]] ]')
    year <- stringr::str_match(bts, 'year = \\{([0-9]{4})\\},?\\\n')[, 2]
    title <- stringr::str_match(bts, '\\\ttitle = \\{(.+?)(?=(?:\\.| ver |\\},\\\n|$))')[, 2]
    title <- stringr::str_remove_all(title, '[[[:punct:]] ]')
    title <- tolower(title)
    year[is.na(year)] <- 'none'

    #organize formatted citations
    extracts <- ts_attrib %>%
        filter(! is.na(citation)) %>%
        distinct(citation) %>%
        mutate(authors = stringr::str_extract(citation, '^.*(?= \\([0-9]{4}[a-z]*\\))'),
               authors = stringr::str_remove_all(authors, '[ &]'),
               authors = stringr::str_replace_all(authors, '\\.,', '.'),
               authors = stringr::str_replace_all(authors, '[[[:punct:]] ]', ''),
               pubyr = stringr::str_match(citation, '\\(([0-9]{4})[a-z]*\\)')[, 2],
               title = stringr::str_extract(citation, '(?<=\\([0-9]{4}[a-z]{0,2}\\)\\. ).{1,999}?(?=(?:\\.| ver |\\},\\\n|$))'),
               title = stringr::str_replace_all(title, '[[[:punct:]] ]', ''),
               title = tolower(title)) %>%
        dplyr::select(authors, pubyr, title)

    #match records to citations
    matches <- c()
    for(i in seq_len(nrow(extracts))){
        mch0 <- which(authors == extracts$authors[i])
        mch1 <- which(title == extracts$title[i])
        mch2 <- which(year == extracts$pubyr[i])
        mch <- intersect(intersect(mch0, mch1), mch2)
        if(length(mch) != 1) stop('error in bibliographic extract ', i)

        matches <- c(matches, mch)
    }

    #retrieve bibtex for any matches
    bibtex_out <- bts[matches]

    #include the entry for MacroSheds itself, and for NWIS if applicable
    ms_bib <- paste0(
        "@article{vlah_etal_macrosheds_2023,\n\t",
        "author = {Vlah, Michael J. and Rhea, Spencer and Bernhardt, Emily S. and Slaughter, Weston and Gubbins, Nick and DelVecchia, Amanda G. and Thellman, Audrey and Ross, Matthew R. V.},\n\t",
        "title = {MacroSheds: A synthesis of long-term biogeochemical, hydroclimatic, and geospatial data from small watershed ecosystem studies},\n\t",
        "journal = {Limnology and Oceanography Letters},\n\t",
        "volume = {8},\n\t",
        "number = {3},\n\t",
        "pages = {419-452},\n\t",
        "doi = {https://doi.org/10.1002/lol2.10325},\n\t",
        "url = {https://aslopubs.onlinelibrary.wiley.com/doi/full/10.1002/lol2.10325},\n\t",
        "year = {2023},\n}")

    usgs_borrowing_domains <- c(
        'baltimore', 'santa_barbara', 'luquillo', 'bear', 'sleepers',
        'trout_lake', 'loch_vale', 'streampulse'
    )

    if(any(ts_attrib$domain %in% usgs_borrowing_domains) &&
       'discharge' %in% ts_attrib$macrosheds_prodname){

        nwisyr <- macrosheds::attrib_ts_data %>%
            filter(domain == 'usgs',
                   macrosheds_prodname == 'discharge') %>%
            pull(link_download_datetime) %>%
            lubridate::year()

        nwis_bib <- glue::glue(
            '@misc{nwis_<<nwisyr>>,\n\t',
            'title = {{National} {Water} {Information} {System} data available on the {World} {Wide} {Web} ({USGS} {Water} {Data} for the {Nation})},\n\t',
            'publisher = {National Water Information System},\n\t',
            'author = {{U.S. Geological Survey}},\n\t',
            'year = {<<nwisyr>>},\n\t',
            'url = {http://waterdata.usgs.gov/nwis/}\n}',
            .open = '<<', .close = '>>', .trim = FALSE
        )

        bibtex_out <- c(bibtex_out, nwis_bib)
    }

    bibtex_out <- c(ms_bib, bibtex_out)

    #tack on ws attr bibtex if requested
    if(all_ws_attr){

        bts_w <- strsplit(macrosheds::ws_bib, '\n\n')[[1]]
        bts_w[1] <- stringr::str_replace(bts_w[1], '\\\n', '')
        bibtex_out <- c(bibtex_out, bts_w)

    } else if(length(ws_attrib)){

        bts_w <- strsplit(macrosheds::ws_bib, '\n\n')[[1]]
        bts_w[1] <- stringr::str_replace(bts_w[1], '\\\n', '')

        ws_cites <- match_ws_attr_attrib(ws_attrib) %>%
            pull(citation) %>%
            stringr::str_extract('\\([12][0-9]{3}\\). ([^\\.]+)', group = 1)

        keepers <- c()
        for(b in bts_w){
            for(cc in ws_cites){
                if(grepl(cc, gsub('[{}]', '', b), fixed = TRUE)) keepers <- c(keepers, b)
            }
        }
        bibtex_out <- c(bibtex_out, keepers)
    }

    bibtex_out <- paste0(bibtex_out, '\n')

    return(bibtex_out)
}

format_IR <- function(ts_attrib, ws_attrib, all_ws_attr = FALSE, abide_by){

    noncomm <- ts_attrib %>%
        filter(grepl('NonCommercial', license_type)) %>%
        dplyr::select(network, domain, macrosheds_prodname) %>%
        distinct()

    sharealike <- ts_attrib %>%
        filter(grepl('ShareAlike', license_type)) %>%
        mutate(may_disregard_with_permission = ! is.na(license_sharealike) & license_sharealike == 'p') %>%
        dplyr::select(network, domain, macrosheds_prodname, may_disregard_with_permission, contact) %>%
        distinct()

    if(all_ws_attr || any(grepl('tcw', ws_attrib))){
        sharealike <- bind_rows(
            sharealike,
            tibble(network = NA, domain = NA, macrosheds_prodname = 'tcw (watershed attribute)',
                   may_disregard_with_permission = FALSE, contact = 'https://www.bdi.ox.ac.uk/research/malaria-atlas-project'))
    }

    notify_intent_s <- ts_attrib %>%
        filter(IR_notify_of_intentions == 's') %>%
        dplyr::select(network, domain, macrosheds_prodname, contact) %>%
        distinct()

    notify_intent_m <- ts_attrib %>%
        filter(IR_notify_of_intentions == 'm') %>%
        dplyr::select(network, domain, macrosheds_prodname, contact) %>%
        distinct()

    notify_dist_s <- ts_attrib %>%
        filter(IR_notify_on_distribution == 's') %>%
        dplyr::select(network, domain, macrosheds_prodname, contact) %>%
        distinct()

    notify_dist_m <- ts_attrib %>%
        filter(IR_notify_on_distribution == 'm') %>%
        dplyr::select(network, domain, macrosheds_prodname, contact) %>%
        distinct()

    provide_access_s <- ts_attrib %>%
        filter(IR_provide_online_access == 's') %>%
        dplyr::select(network, domain, macrosheds_prodname, contact) %>%
        distinct()

    provide_access_m <- ts_attrib %>%
        filter(IR_provide_online_access == 'm') %>%
        dplyr::select(network, domain, macrosheds_prodname, contact) %>%
        distinct()

    consult_s <- ts_attrib %>%
        filter(IR_collaboration_consultation == 's') %>%
        dplyr::select(network, domain, macrosheds_prodname, contact) %>%
        distinct()

    consult_m <- ts_attrib %>%
        filter(IR_collaboration_consultation == 'm') %>%
        dplyr::select(network, domain, macrosheds_prodname, contact) %>%
        distinct()

    ir <- list()
    ir_explanations <- c('A noncommercial license means you cannot profit from derivative works.',
                         'A share-alike license means derivative works must propagate the original license terms. If may_disregard_with_permission is TRUE, you may ask the primary source contact for permission to use your own license.',
                         'notify_of_intent_S means the primary source has requested notice of any plans to publish derivative works that use their data.',
                         'notify_of_intent_M means the primary source requires notice of any plans to publish derivative works that use their data.',
                         'notify_on_distribution_S means the primary source has requested that they be informed of any publications resulting from their data.',
                         'notify_on_distribution_M means the primary source requires that they be informed of any publications resulting from their data.',
                         'provide_access_S means the primary source requests online access to any publications resulting from their data.',
                         'provide_access_M means the primary source requires online access to any publications resulting from their data.',
                         "consult_or_collab_S means the primary source requests consultation and/or collaboration where reasonable (e.g. if you're only using data from one or two domains).",
                         "consult_or_collab_S means the primary source requires opportunities for consultation and/or collaboration where reasonable (e.g. if you're only using data from one or two domains).")

    ir$noncommercial_license <- noncomm
    ir$sharealike_license <- sharealike
    ir$notify_of_intent_S <- if(abide_by == 'suggestions') notify_intent_s else tibble()
    ir$notify_of_intent_M <- notify_intent_m
    ir$notify_on_distribution_S <- if(abide_by == 'suggestions') notify_dist_s else tibble()
    ir$notify_on_distribution_M <- notify_dist_m
    ir$provide_access_S <- if(abide_by == 'suggestions') provide_access_s else tibble()
    ir$provide_access_M <- provide_access_m
    ir$consult_or_collab_S <- if(abide_by == 'suggestions') consult_s else tibble()
    ir$consult_or_collab_M <- consult_m

    applicable <- sapply(ir, function(x) nrow(x) != 0)
    ir <- ir[applicable]
    ir_explanations <- ir_explanations[applicable]

    ir <- list(intellectual_rights = ir,
               IR_explanations = ir_explanations)

    return(ir)
}

attrib_output_write <- function(attrib, write_to_dir){

    write_to_dir <- file.path(write_to_dir, 'macrosheds_attribution_information')
    dir.create(write_to_dir)

    readr::write_lines(attrib$acknowledgements,
                       file.path(write_to_dir, 'acknowledgements.txt'))

    if(length(attrib$intellectual_rights_explanations)){
        readr::write_lines(attrib$intellectual_rights_explanations, sep = '\n\n',
                           file.path(write_to_dir, 'intellectual_rights_definitions.txt'))
    }

    if(length(attrib$intellectual_rights_notifications)){
        sink(file = file.path(write_to_dir, 'intellectual_rights_notifications.txt'))
        cat('----INTELLECTUAL RIGHTS NOTIFICATIONS----\n\n')
        print(lapply(attrib$intellectual_rights_notifications, as.data.frame))
        sink()
    }

    readr::write_lines(attrib$bibliography, sep = '\n',
                       file.path(write_to_dir, 'ms_bibliography.bib'))
}


# misc helpers ####
expo_backoff <- function(expr,
                         max_attempts = 10,
                         verbose = TRUE){

    for(attempt_i in seq_len(max_attempts)){

        results <- try(expr = expr,
                       silent = TRUE)

        if(inherits(results, 'try-error')){

            if(attempt_i == max_attempts){
                stop(attr(results, 'condition'))
            }

            backoff <- runif(n = 1,
                             min = 0,
                             max = 2^attempt_i - 1)

            if(verbose){
                message(glue::glue("Backing off for ", round(backoff, 1), " seconds."))
            }

            Sys.sleep(backoff)

        } else {

            # if(verbose){
            #     print(paste0("Request succeeded after ", attempt_i, " attempt(s)."))
            # }

            break
        }
    }

    return(results)
}

sd_or_0 <- function(x, na.rm = FALSE){

    #Only used to bypass the tyranny of the errors package not letting
    #me take the mean of an errors object of length 1 without setting the
    #uncertainty to 0

    x <- if(is.vector(x) || is.factor(x)) x else as.double(x)

    if(length(x) == 1) return(0)

    x <- sqrt(var(x, na.rm = na.rm))
}

populate_implicit_NAs <- function(d,
                                  interval,
                                  val_fill = NA,
                                  edges_only = FALSE){

    #TODO: this would be more flexible if we could pass column names as
    #   positional args and use them in group_by and mutate

    #d: a ms tibble with at minimum datetime, site_code, and var columns
    #interval: the interval along which to populate missing values. (must be
    #   either '15 min' or '1 day'.
    #val_fill: character or NA. the token with which to populate missing
    #   elements of the `val` column. All other columns will be populated
    #   invariably with NA or 0. See details.
    #edges_only: logical. if TRUE, only two filler rows will be inserted into each
    #   gap, one just after the gap begins and the other just before the gap ends.
    #   If FALSE (the default), the gap will be fully populated according to
    #   the methods outlined in the details section.

    #this function makes implicit missing timeseries records explicit,
    #   by populating rows so that the datetime column is complete
    #   with respect to the sampling interval. In other words, if
    #   samples are taken every 15 minutes, but some samples are skipped
    #   (rows not present), this will create those rows. If ms_status or
    #   ms_interp columns are present, their new records will be populated
    #   with 0s. The val column will be populated with whatever is passed to
    #   val_fill. Any other columns will be populated with NAs.

    #returns d, complete with new rows, sorted by site_code, then var, then datetime


    complete_d <- d %>%
        mutate(fill_marker = 1) %>%
        group_by(site_code, var) %>%
        tidyr::complete(datetime = seq(min(datetime),
                                       max(datetime),
                                       by = interval)) %>%
        # mutate(site_code = .$site_code[1],
        #        var = .$var[1]) %>%
        ungroup() %>%
        arrange(site_code, var, datetime) %>%
        dplyr::select(datetime, site_code, var, everything())

    if(! any(is.na(complete_d$fill_marker))) return(d)

    if(! is.na(val_fill)){
        complete_d$val[is.na(complete_d$fill_marker)] <- val_fill
    }

    if('ms_status' %in% colnames(complete_d)){
        complete_d$ms_status[is.na(complete_d$ms_status)] <- 0
    }

    if('ms_interp' %in% colnames(complete_d)){
        complete_d$ms_interp[is.na(complete_d$ms_interp)] <- 0
    }

    if(edges_only){

        midgap_rows <- rle2(is.na(complete_d$fill_marker)) %>%
            filter(values == TRUE) %>%
            # if(nrow(fill_runs) == 0) return(d)
            # midgap_rows <- fill_runs %>%
            dplyr::select(starts, stops) %>%
            {purrr::map2(.x = .$starts,
                         .y = .$stops,
                         ~seq(.x, .y))} %>%
            purrr::map(~( if(length(.x) <= 2)
            {
                return(NULL)
            } else {
                return(.x[2:(length(.x) - 1)])
            }
            )) %>%
            unlist()
        if(! is.null(midgap_rows)){
            complete_d <- slice(complete_d,
                                -midgap_rows)
        }
    }

    complete_d$fill_marker <- NULL

    return(complete_d)
}

numeric_any <- function(num_vec){
    return(as.numeric(any(as.logical(num_vec))))
}

numeric_any_v <- function(...){ #attack of the ellipses

    #...: numeric vectors of equal length. should be just 0s and 1s, but
    #   integers other than 1 are also considered TRUE by as.logical()

    #the vectorized version of numeric_any. good for stuff like:
    #    mutate(ms_status = numeric_any(c(ms_status_x, ms_status_flow)))

    #returns a single vector of the same length as arguments

    #this func could be useful in global situations
    numeric_any_positional <- function(...) numeric_any(c(...))

    numeric_any_elementwise <- function(...){
        Map(function(...) numeric_any_positional(...), ...)
    }

    out <- do.call(numeric_any_elementwise,
                   args = list(...)) %>%
        unlist()

    if(is.null(out)) out <- numeric()

    return(out)
}

check_suggested_pkgs <- function(pkgs){

    #pkgs: character vector of package names

    loaded <- rep(NA, length(pkgs))
    for(i in seq_along(pkgs)){
        loaded[i] <- sw(require(pkgs[i], quietly = TRUE, character.only = TRUE))
    }

    if(any(! loaded)){
        stop(glue::glue('Additional package(s) required to use this function. ',
                        'Run install.packages(c("{p}")) and try again.',
                        p = paste(pkgs[! loaded], collapse = '", "')))
    }
}

list_all_product_dirs <- function(macrosheds_root, prodname){

    prodname_dirs <- list.dirs(path = macrosheds_root,
                               full.names = TRUE,
                               recursive = TRUE)

    prodname_dirs <- grep(pattern = paste0('/', prodname),
                          x = prodname_dirs,
                          value = TRUE)

    return(prodname_dirs)
}

Mode <- function(x, na.rm = TRUE){

    if(na.rm){
        x <- na.omit(x)
    }

    ux <- unique(x)
    mode_out <- ux[which.max(tabulate(match(x, ux)))]
    return(mode_out)

}

mean_or_x <- function(x, na.rm = FALSE){

    # used to bypass the tyranny of the errors package not letting
    # someone take the mean of an errors object of length 1. this func returns the
    # original value if the group is length one, and the mean otherwise

    if(length(x) == 1) return(x)

    x <- mean(x, na.rm = na.rm)
    return(x)
}

ms_drop_var_prefix_ <- function(x){

    if(any(is.na(stringr::str_match(x, '^[IGa-z][SNa-z]_.+')))){
        return(x)
    }

    unprefixed <- substr(x, 4, nchar(x))

    return(unprefixed)
}

ms_extract_var_prefix_ <- function(x){

    if(any(is.na(stringr::str_match(x, '^[IGa-z][SNa-z]_.+')))){
        stop('x is not prefixed.')
    }

    prefix <- substr(x, 1, 2)

    return(prefix)
}

validate_version <- function(macrosheds_root, version, warn = TRUE){

    root_files <- list.files(macrosheds_root,
                             full.names = TRUE)
    held_versions <- grep('/v[0-9]+$', root_files, value = TRUE) %>%
        stringr::str_extract('/v([0-9]+)$', group = 1)

    if(version == 'latest'){
        if(! length(held_versions)){
            stop('No data detected in macrosheds_root. This should be the directory you specified when you ran ms_download_core_data or ms_download_ws_attr.')
        }
        version <- max(as.numeric(held_versions))
    }

    v_num <- sw(as.numeric(version))
    if(is.na(v_num) || v_num <= 0 || v_num %% 1 != 0){
        stop('`version` must be a positive integer, or "latest".')
    }
    root_vsn <- file.path(macrosheds_root, paste0('v', version))

    version <- as.character(version)
    figshare_codes <- macrosheds::file_ids_for_r_package #loaded in R/sysdata.rda, which is written in postprocessing
    avail_vsns <- figshare_codes %>%
        dplyr::select(starts_with('fig_code_')) %>%
        rename_with(~sub('fig_code_v', '', .)) %>%
        colnames()

    latest_avail <- max(as.numeric(avail_vsns))
    if(warn && as.numeric(version) < latest_avail){
        warning('Data from MacroSheds v', version, ' were requested, but v', latest_avail, ' is available. See ms_download_core_data() and ms_download_ws_attr().')
    }

    if(as.numeric(version) > latest_avail){
        stop('Version ', version, ' data were requested, but ', latest_avail,
             ' is the highest version of the MacroSheds dataset known to this version of the macrosheds package.')
    }

    if(as.numeric(version) > max(as.numeric(held_versions))){
        stop('Version ', version, ' data were requested, but v', version,
             ' is not present in macrosheds_root. see ms_download_core_data()/ms_download_ws_attr() or check macrosheds_root.')
    }

    return(root_vsn)
}


# flux helpers ####

wtr_yr <- function(dates, start_month = 10){

    d1 <- as.Date(dates)
    offset <- ifelse(as.integer(format(d1, '%m')) < start_month, 0, 1)
    adj_year <- as.integer(format(d1, '%Y')) + offset
    return(adj_year)
}

warn_sum <- function(x) {
    # length of record
    n_x <- length(x)

    # number of NAs in record
    n_na <- length(x[is.na(x)])
    # number of inf values in record
    n_inf <- length(x[is.infinite(x)])

    # warn user
    writeLines(paste("WARNING: infinite values found in flux record, ignoring during SUM",
                     "\n count NA:", n_na,
                     "\n count Inf:", n_inf))


    xsum <- sum(x[!is.infinite(x)], na.rm = TRUE)
    return(xsum)
}

carry_flags <- function(raw_q_df, raw_conc_df, target_solute = NULL, target_year = NULL, period = NULL){
    #### set up ratio functions #####
    # interp ratio
    calc_interp_ratio <- function(trimmed_df, period = NULL){

        if(period == 'annual'){
            no_interp <- trimmed_df %>%
                filter(ms_interp == 0) %>%
                count() %>%
                pull(n)

            interp <- trimmed_df %>%
                filter(ms_interp ==1) %>%
                count() %>%
                pull(n)

            interp_ratio <- interp/(interp+no_interp)

            return(interp_ratio)

        } else if(period == 'month'){
            interp_ratio <- trimmed_df %>%
                mutate(month = lubridate::month(date)) %>%
                group_by(month) %>%
                summarize(n_interp = sum(ms_interp == 1),
                          n_no_interp = sum(ms_interp == 0)) %>%
                mutate(interp_ratio = n_interp/(n_interp+n_no_interp)) %>%
                ungroup() %>%
                dplyr::select(month, interp_ratio)

            return(interp_ratio)

        } else {
            message('Specify period as month or annual.')
        }

    }

    # status ratio
    calc_status_ratio <- function(trimmed_df, period = NULL){

        if(period == 'annual'){
            no_status <- trimmed_df %>%
                filter(ms_status == 0) %>%
                count() %>%
                pull(n)

            status <- trimmed_df %>%
                filter(ms_interp == 1) %>%
                count() %>%
                pull(n)

            status_ratio <- status/(status+no_status)

            return(status_ratio)}
        else if (period == 'month'){
            status_ratio <- trimmed_df %>%
                mutate(month = lubridate::month(date)) %>%
                group_by(month) %>%
                summarize(n_stat = sum(ms_status == 1),
                          n_no_stat = sum(ms_status == 0)) %>%
                mutate(status_ratio = n_stat/(n_stat+n_no_stat)) %>%
                ungroup() %>%
                dplyr::select(month, status_ratio)

            return(status_ratio)

        } else {
            message('Specify period as month or annual.')
        }
    }

    # missing ratio
    calc_missing_ratio <- function(trimmed_df, period = NULL){

        if(period == 'annual'){
            present <- trimmed_df %>%
                dplyr::select(val) %>%
                na.omit() %>%
                nrow()

            missing_ratio <- (365-present)/365

            return(missing_ratio)

        }else if(period == 'month'){
            missing_ratio <- trimmed_df %>%
                mutate(month = lubridate::month(date)) %>%
                dplyr::select(month, date, val) %>%
                group_by(month) %>%
                na.omit() %>%
                summarize(n = n()) %>%
                mutate(full_days = lubridate::days_in_month(month),
                       missing_ratio = (full_days-n)/full_days) %>%
                ungroup() %>%
                dplyr::select(month, missing_ratio)

            return(missing_ratio)

        } else {
            message('Specify period as month or annual.')
        }

    }
    ### run functions on data ####
    if(period == 'annual'){

        year_conc_df <- raw_conc_df %>%
            mutate(wy = wtr_yr(date, start_month = 10)) %>%
            filter(wy == target_year,
                   target_solute == target_solute)

        conc_tbl <- tibble(wy = target_year, var = target_solute,
                           ms_interp = calc_interp_ratio(trimmed_df = year_conc_df, period = period),
                           ms_status = calc_status_ratio(trimmed_df = year_conc_df, period = period),
                           ms_missing = calc_missing_ratio(trimmed_df = year_conc_df, period = period),
        )

        year_q_df <- raw_q_df %>%
            mutate(wy = wtr_yr(date, start_month = 10)) %>%
            filter(wy == target_year)

        q_tbl <- tibble(wy = target_year, var = unique(year_q_df$var)[1],
                        ms_interp = calc_interp_ratio(trimmed_df = year_q_df, period = period),
                        ms_status = calc_status_ratio(trimmed_df = year_q_df, period = period),
                        ms_missing = calc_missing_ratio(trimmed_df = year_q_df, period = period)
        )

        out_frame <- rbind(conc_tbl, q_tbl)
        return(out_frame)
    } else

        if(period == 'month'){

            year_conc_df <- raw_conc_df %>%
                mutate(wy = wtr_yr(date, start_month = 10)) %>%
                filter(wy == target_year,
                       target_solute == target_solute)

            ms_interp = calc_interp_ratio(trimmed_df = year_conc_df, period = period)
            ms_status = calc_status_ratio(trimmed_df = year_conc_df, period = period)
            ms_missing = calc_missing_ratio(trimmed_df = year_conc_df, period = period)

            conc_tbl <- full_join(ms_interp, ms_status, by = 'month') %>%
                full_join(ms_missing, by = 'month') %>%
                mutate(wy = target_year,
                       var = target_solute)

            year_q_df <- raw_q_df %>%
                mutate(wy = wtr_yr(date, start_month = 10)) %>%
                filter(wy == target_year)

            ms_interp = calc_interp_ratio(trimmed_df = year_q_df, period = period)
            ms_status = calc_status_ratio(trimmed_df = year_q_df, period = period)
            ms_missing = calc_missing_ratio(trimmed_df = year_q_df, period = period)

            q_tbl <- full_join(ms_interp, ms_status, by = 'month') %>%
                full_join(ms_missing, by = 'month') %>%
                mutate(wy = target_year,
                       var = unique(year_q_df$var)[1])

            out_frame <- rbind(conc_tbl, q_tbl)
            return(out_frame)
        }

}

prep_raw_for_riverload <- function(chem_df, q_df, datecol = 'date'){

    conv_q <- q_df %>%
        mutate(datetime = as.POSIXct(get(datecol), format = '%Y-%m-%d %H:%M:%S', tz = 'UTC')) %>%
        mutate(flow = q_lps * 0.001) %>% # convert lps to cubic meters per second)
        dplyr::select(datetime, flow) %>%
        arrange(datetime) %>%
        data.frame()

    if(! nrow(conv_q)) return(data.frame())

    if(lubridate::month(conv_q$datetime[1]) == 9 & lubridate::day(conv_q$datetime[1]) == 30){
        conv_q <- conv_q[-1, ]
    }

    conv_c <- chem_df %>%
        mutate(datetime = as.POSIXct(get(datecol), format = '%Y-%m-%d %H:%M:%S', tz = 'UTC')) %>%
        dplyr::select(datetime, conc) %>%
        data.frame()

    if(! nrow(conv_c)) return(data.frame())

    db <- full_join(conv_q, conv_c, by = 'datetime') %>%
        #filter(!is.na(flow)) %>%
        arrange(datetime)

    return(db)
}

calculate_pw <- function(chem_df, q_df, datecol = 'date', area = 1, period = NULL){

    if(is.null(period)){
        warning('parameter "period" unspecified; calculating annual flux.')
        period <- 'annual'
    }

    rl_data <- prep_raw_for_riverload(chem_df = chem_df,
                                      q_df = q_df,
                                      datecol = datecol)

    if(nrow(rl_data) %in% 0:1){
        return(NA)
    }

    if(is.na(rl_data[1, 2])){
        rl_data <- rl_data[-1, ]
    }

    if(period == 'annual'){

        flux_from_pw <- sm(RiverLoad::method6(rl_data, ncomp = 1)) %>%
            sum(.) / (1000 * area)

    } else if(period == 'month'){

        method6_month <- function(db, ncomp, period){

            message('why is method6 rewritten for monthly pw load?')
            browser()

            n <- nrow(db)
            interpolation <- data.frame(imputeTS::na_interpolation(db, 'linear'))
            load <- data.frame(interpolation[, 2] * interpolation[, -c(1:2)])
            difference <- matrix(nrow = (nrow(db) - 1), ncol = 1)

            for(i in 1:(nrow(db) - 1)){
                difference[i] <- difftime(db[i + 1, 1], db[i, 1], units = 'days')
            }

            loadtot <- cbind.data.frame(interpolation$datetime, load)
            colnames(loadtot)[1] <- c('datetime')
            loadtot[, 1] <- format(as.POSIXct(loadtot[, 1]), format = '%Y-%m')
            forrow <- aggregate(loadtot[, 2] ~ datetime, loadtot, sum)
            agg.dataC <- matrix(nrow = nrow(forrow), ncol = (ncomp))

            for(i in 1:ncomp){
                agg.data <- aggregate(loadtot[, i + 1] ~ datetime,
                                      loadtot, sum)
                agg.dataC[, i] <- as.matrix(agg.data[, 2])
            }

            colnames(agg.dataC) <- c(names(db)[3:(ncomp + 2)])
            rownames(agg.dataC) <- forrow$datetime

            return(agg.dataC)
            # }
            #
            # colnames(agg.dataC) <- c(names(db)[3:(ncomp + 2)])
            # rownames(agg.dataC) <- forrow$datetime
            # return(agg.dataC)
        }

        ##### apply #####

        flux_from_pw <- method6_month(rl_data, ncomp = 1, period = period)

        flux_from_pw <- tibble(date = rownames(flux_from_pw),
                               flux = (flux_from_pw[, 1] / (1000 * area)))
    }

    return(flux_from_pw)
}

calculate_beale <- function(chem_df, q_df, datecol = 'date', period = NULL, area = 1){
    if(is.null(period)) {
        warning('no period supplied, calculating annual flux')
        period <- 'annual'
    }

    rl_data <- prep_raw_for_riverload(chem_df = chem_df,
                                      q_df = q_df,
                                      datecol = datecol)

    if(nrow(rl_data) %in% 0:1){
        return(NA)
    }

    if(is.na(rl_data[1, 2])){
        rl_data <- rl_data[-1, ]
    }

    if(period == 'annual'){

        flux_from_beale <- RiverLoad::beale.ratio(rl_data, ncomp = 1) %>%
            sum(.) / (1000 * area)

    } else if(period == 'month'){

        flux_from_beale <- RiverLoad::beale.ratio(rl_data, ncomp = 1, period = period)

        flux_from_beale <- tibble(date = rownames(flux_from_beale),
                                  flux = (flux_from_beale[, 1] / (1000 * area)))
    }

    return(flux_from_beale)
}

calculate_rating <- function(chem_df, q_df, datecol = 'date', period = NULL, area = 1){

    if(is.null(period)) {
        warning('no period supplied, calculating annual flux')
        period <- 'annual'
    }

    rl_data <- prep_raw_for_riverload(chem_df = chem_df, q_df = q_df, datecol = datecol)

    out <- try({
        if(period == 'annual'){

            flux_from_reg <- RiverLoad::rating(rl_data, ncomp = 1) %>%
                sum(.)/(1000*area)

        } else {

            if(period == 'month'){
                flux_from_reg <- RiverLoad::rating(rl_data, ncomp = 1, period = period)

                flux_from_reg <- tibble(date = rownames(flux_from_reg),
                                        flux = (flux_from_reg[,1]/(1000*area)))
            }
        }
    }, silent = TRUE)

    if(inherits(out, 'try-error')) out <- NA

    return(out)
}

calculate_wrtds <- function(chem_df, q_df, ws_size, lat, long, datecol = 'date',
                            agg = 'default', minNumObs = 2, minNumUncen = 2, gap_period = 730){
    tryCatch(
        expr = {
            # default sums all daily flux values in df
            egret_results <- adapt_ms_egret(chem_df, q_df, ws_size,
                                            lat, long,
                                            datecol = datecol,
                                            minNumObs = minNumObs,
                                            minNumUncen = minNumUncen,
                                            gap_period = gap_period)

            if(agg == 'default') {
                flux_from_egret <- egret_results$Daily$FluxDay %>%
                    warn_sum(.)/(area)
            } else if(agg == 'annual') {
                flux_from_egret <- egret_results$Daily %>%
                    mutate(
                        wy = water_year(Date)
                    ) %>%
                    group_by(wy) %>%
                    summarize(
                        flux = warn_sum(FluxDay)/(area)
                    )
            } else if(agg == 'monthly') {
                flux_from_egret <- egret_results$Daily %>%
                    mutate(
                        wy = water_year(Date),
                        month = lubridate::month(Date)
                    ) %>%
                    group_by(wy, month) %>%
                    summarize(
                        flux = warn_sum(FluxDay)/(area)
                    )
            }
        },
        error = function(e){
            print('ERROR: WRTDS failed to run')
            return(NA)
        })
    return(flux_from_egret)
}

generate_residual_corrected_conc <- function(chem_df, q_df, datecol = 'date', sitecol = 'site_no'){

    # first make c:q rating
    paired_df <- q_df %>%
        full_join(chem_df, by = c(datecol, sitecol, 'wy')) %>%
        na.omit() %>%
        filter(q_lps > 0,
               is.finite(q_lps))

    if(nrow(paired_df) <= 2) return(NA)

    q_log <- log10(paired_df$q_lps)
    c_log <- log10(paired_df$conc)
    model_data <- tibble(c_log, q_log) %>%
        filter(is.finite(c_log),
               is.finite(q_log)) %>%
        na.omit()

    rating <- sw(summary(lm(model_data$c_log ~ model_data$q_log,
                            singular.ok = TRUE)))

    # extract model info
    intercept <- rating$coefficients[1]
    slope <- rating$coefficients[2]

    # create modeled c, calc residuals, adjust modeled c by interpolated residuals
    site_code <- paired_df$site_code[[1]]

    rating_filled_df <- q_df %>%
        mutate(conc_reg = 10^(intercept + (slope * log10(q_lps)))) %>%
        dplyr::select(all_of(datecol), conc_reg, q_lps) %>%
        full_join(chem_df, by = datecol) %>%
        dplyr::select(site_code, all_of(datecol), conc, conc_reg, q_lps, wy)  %>%
        mutate(res = conc_reg - conc,
               res = imputeTS::na_interpolation(res),
               conc_com = conc_reg - res,
               site_code = !!site_code,
               wy = wtr_yr(get(datecol), start_month = 10))

    rating_filled_df$conc_com[! is.finite(rating_filled_df$conc_com)] <- 0
    return(rating_filled_df)
}

calculate_composite_from_resid_corrected <- function(rating_filled_df,
                                                     site_no = 'site_no',
                                                     period = NULL,
                                                     area = 1){

    if(is.null(period)){
        warning('no period supplied, calculating annual flux')
        period <- 'annual'
    }

    if(period == 'annual'){

        flux_from_comp <- rating_filled_df %>%
            dplyr::select(date, conc_com, q_lps, wy) %>%
            tidyr::drop_na() %>%
            mutate(flux = conc_com * q_lps * 86400 * (1 / area) * 1e-6) %>%
            group_by(wy) %>%
            summarize(flux = sum(flux),
                      .groups = 'drop') %>%
            mutate(site_code = site_no)

    } else if(period == 'month'){

        flux_from_comp <- rating_filled_df %>%
            mutate(month = lubridate::month(date),
                   flux = conc_com * q_lps * 86400 * (1 / area) * 1e-6) %>%
            group_by(wy, month) %>%
            summarize(date = max(date),
                      flux = sum(flux),
                      .droups = 'drop')
    }

    return(flux_from_comp)
}

detect_record_break <- function(data){
    data_time <- data %>%
        dplyr::select(Date, Julian) %>%
        # how many days between this day and the next
        mutate(days_gap = lead(Julian, 1) - Julian)
    return(data_time)
}

get_break_dates <- function(data, gap_period = 730){
    start = list()
    end = list()

    index = 1
    # default 730 bc USGS says breaks below 2 years probably fine
    for(i in 1:nrow(data)) {
        gap <- data$days_gap[i]

        if(!is.na(gap)) {

            if(gap > gap_period) {
                start[[index]] = c(data$Date[i])
                end[[index]] = c(data$Date[i+1])
                index = index + 1
            }
        }
    }

    break_dates = list('start' = start, 'end'= end)
    return(break_dates)
}

adapt_ms_egret <- function(chem_df, q_df, ws_size, lat, long,
                           site_data = NULL, kalman = FALSE,
                           datecol = 'date', minNumObs = 2, minNumUncen = 2, gap_period = 730){

    check_suggested_pkgs(c('EGRET'))

    # TODO:  reorder site data args to make fully optional

    get_MonthSeq <- function(dates){
        ## dates <- Sample_file$Date
        years <- lubridate::year(dates)-1850

        MonthSeq <- years*12

        MonthSeq <- MonthSeq + lubridate::month(dates)

        return(MonthSeq)
    }

    decimalDate <- function(rawData){

        dateTime <- as.POSIXlt(rawData)
        year <- dateTime$year + 1900

        startYear <- as.POSIXct(paste0(year,"-01-01 00:00"))
        endYear <- as.POSIXct(paste0(year+1,"-01-01 00:00"))

        DecYear <- year + as.numeric(difftime(dateTime, startYear, units = "secs"))/as.numeric(difftime(endYear, startYear, units = "secs"))
        return(DecYear)
    }

    wyday <- function(dates, wy_type = 'usgs') {
        # get earliest and latest date from series
        start_date <- min(dates)
        end_date <- max(dates)

        # get the normal year, and make the start of WY date from that year
        year_firsthalf <- year(start_date)
        wy_start <- paste0(year_firsthalf, '-10-01')
        wy_end <- paste0(year_firsthalf+1, '-09-30')

        # gap between start WY and end of normal year, always 91 days
        correction_val <- yday('2021-12-31') - yday('2021-10-01')


        # adjust yday from start of gregorian year to (usgs) WY year
        wy_firsthalf <- yday(dates[lubridate::month(dates) %in% c(10, 11, 12)]) - yday(wy_start) + 1
        wy_secondhalf <- yday(dates[!lubridate::month(dates) %in% c(10, 11, 12)]) + correction_val + 1

        wydays <- c(wy_firsthalf, wy_secondhalf)

        return(wydays)
    }

    decimalDateWY <- function(dates, wy_type = 'usgs') {
        # get earliest and latest date from series
        start_date <- min(dates)
        end_date <- max(dates)

        # get the normal year, and make the start of WY date from that year
        year_firsthalf <- year(start_date)
        wy_start <- paste0(year_firsthalf, '-10-01')
        wy_end <- paste0(year_firsthalf+1, '-09-30')
        wy_seq <- seq(as.Date(wy_start), as.Date(wy_end), by = 1)

        wydays <- wyday(dates) - 1
        wylength <- length(wy_seq)

        ydec <- as.integer(as.character(water_year(dates, wy_type))) + wydays/wylength

        return(ydec)
    }

    # TODO: rename with real descriptive name
    enlightened_yday <- function(dates, wy_type = 'usgs') {
        # get earliest and latest date from series
        start_date <- min(dates)
        end_date <- max(dates)

        # get the normal year, and make the start of WY date from that year
        wy_first <- year(start_date)
        wy_start <- paste0(wy_first, '-10-01')
        wy_end <- paste0(wy_first+1, '-09-30')

        # unenlightened yday of date vector
        wy_firsthalf <- yday(dates[lubridate::month(dates) %in% c(10, 11, 12)])
        wy_secondhalf <- yday(dates[!lubridate::month(dates) %in% c(10, 11, 12)])

        # if first half of WY is a leap year,
        if(366 %in% wy_firsthalf) {
            ## print('first half of WY is leap year, adjusting')
            wy_firsthalf = wy_firsthalf - 1
        } else if(366 %in% wy_secondhalf) {
            ## print('second half of WY is leap year, no action')
        } else {
            ## print('neither side of water year is a leap year')
            invisible()
        }

        # adjust yday from start of gregorian year to (usgs) WY year
        wy_ydays <- c(wy_firsthalf, wy_secondhalf)

        return(wy_ydays)
    }

    get_start_end <- function(d){
        start_date <- min(d$datetime)
        start_year <- lubridate::year(start_date)
        start_wy <- ifelse(lubridate::month(start_date) %in% c(10, 11, 12), start_year+1, start_year)
        filter_start_date <- lubridate::ymd(paste0(start_wy-1, '-10-01'))

        end_date <- max(d$datetime)
        end_year <- lubridate::year(end_date)
        end_wy <- ifelse(lubridate::month(end_date) %in% c(10, 11, 12), end_year+1, end_year)
        filter_end_date <- lubridate::ymd(paste0(end_wy, '-10-01'))

        fin_dates <- c(filter_start_date, filter_end_date)
        return(fin_dates)

    }

    ms_run_egret_adapt <- function(stream_chemistry, discharge, prep_data = TRUE,
                                   run_egret = TRUE, kalman = FALSE, quiet = FALSE,
                                   site_data = NULL, min_q_method = 'USGS', minNumObs = 2, minNumUncen = 2, gap_period = 730){

        # Checks
        if(any(! c('site_code', 'var', 'val', 'date') %in% names(stream_chemistry))){
            stop('The argument to `stream_chemistry` must be in MacroSheds format (required columns: date, site_code, val, var).')
        }

        if(any(! c('site_code', 'var', 'val', 'date') %in% names(discharge))){
            stop('The argument to `discharge` must be in MacroSheds format (required columns: date, site_code, val, var).')
        }

        if(! length(unique(ms_drop_var_prefix_(stream_chemistry$var))) == 1){
            stop('Only one chemistry variable can be run at a time.')
        }

        if((! length(unique(stream_chemistry$site_code)) == 1) || (! length(unique(discharge$site_code)) == 1)){
            stop('Only one site can be run in EGRET at a time')
        }

        if(! unique(stream_chemistry$site_code) == unique(discharge$site_code)){
            stop('stream_chemistry and discharge must contain the same site_code')
        }

        # Get var and site info
        if(is.null(site_data)){
            site_data <- macrosheds::ms_load_sites()

            if(! unique(stream_chemistry$site_code) %in% site_data$site_code){
                stop('This site is not in the MacroSheds dataset, provide a site_data table with the names: site_code, ws_area_ha, latitude, longitude')
            }
        } else{
            if(!all(names(site_data) %in% c('site_code', 'ws_area_ha', 'latitude', 'longitude', 'site_type'))){
                stop('If you are not using a macrosheds site, you must supply site_data with a tibble with the names: site_code, ws_area_ha, latitude, longitude')
            }
            site_data <- site_data %>%
                mutate(site_type = 'stream_gauge')
        }

        ## ms_vars <- read.csv('data/ms/macrosheds_vardata.csv')
        ms_vars <- macrosheds::ms_load_variables()
        site_code <- unique(stream_chemistry$site_code)

        #### Prep Files ####

        if(prep_data){
            # EGRET does not like NAs
            stream_chemistry <- stream_chemistry %>%
                filter(!is.na(val))
            discharge <- discharge %>%
                filter(!is.na(val))

            # Egret can't accept 0s in the column for min val (either hack egret or do this or
            # look for detection limits)
            # TODO: use DL system to replace min vals, get_hdl()
            # TODO: AND/OR use EGRET built in uncertainty system, ConcLow = NA, ConcHigh = DL
            ## min_chem <- stream_chemistry %>%
            ##   filter(val > 0,
            ##          !is.na(val),
            ##          !is.infinite(val),
            ##          !is.null(val)) %>%
            ##   pull(val) %>%
            ##     # NOTE: upsettingly, use of errors package makes min() now return NA instead of Inf
            ##   min()

            min_chem <- min(as.numeric(stream_chemistry[!is.infinite(stream_chemistry$val) & !is.na(stream_chemistry$val),]$val))

            # NOTE: changed so now we are forced to have a real number minimum value
            # NOTE: change to no conditional- min chem should never be infinite
            ## if(!is.infinite(min_chem)){
            ## }
            stream_chemistry <- stream_chemistry %>%
                mutate(val = ifelse(val == 0, !!min_chem, val))

            # Filter so there is only Q going into the model that also has chem
            stream_chemistry <- stream_chemistry %>%
                mutate(year = lubridate::year(datetime),
                       month = lubridate::month(datetime)) %>%
                mutate(waterYear = ifelse(month %in% c(10, 11, 12), year+1, year))

            # TODO: filter isn't even active... and why 6?
            # Get years with at least 6 chemistry samples (bi-monthly sampling is a
            # reasonable requirement?)
            years_with_data <- stream_chemistry %>%
                group_by(waterYear) %>%
                summarize(n = n()) %>%
                # filter(n >= 6) %>%
                pull(waterYear)

            # Filter Q and chem to overlap in a water-year
            chem_dates <- get_start_end(stream_chemistry)
            q_dates <- get_start_end(discharge)

            start_date <- if(chem_dates[1] > q_dates[1]) { chem_dates[1] } else{ q_dates[1] }
            end_date <- if(chem_dates[2] < q_dates[2]) { chem_dates[2] } else{ q_dates[2] }

            discharge <- discharge %>%
                filter(datetime >= !!start_date,
                       datetime <= !!end_date)
            stream_chemistry <- stream_chemistry %>%
                filter(datetime >= !!start_date,
                       datetime <= !!end_date)

            # Filter discharge to only include water years with chemistry sampling
            discharge <- discharge %>%
                mutate(year = lubridate::year(datetime),
                       month = lubridate::month(datetime)) %>%
                mutate(waterYear = ifelse(month %in% c(10, 11, 12), year+1, year)) %>%
                filter(waterYear %in% !!years_with_data)

            # Remove times when there is a chem sample and no Q reported
            samples_to_remove <- left_join(stream_chemistry, discharge, by = 'datetime') %>%
                filter(is.na(val.y)) %>%
                pull(datetime)

            stream_chemistry <- stream_chemistry %>%
                filter(!datetime %in% samples_to_remove)
        }

        # last redundant assurance of no NA, Inf, or 0 values
        stream_chemistry <- stream_chemistry %>%
            filter(!is.na(val),
                   !is.infinite(val),
                   !is.null(val),
                   val > 0)

        n_records <- length(stream_chemistry$val)
        n_years <- length(unique(stream_chemistry$wy))

        # if n_records < 100, change minNumObs accordingly
        if(n_records < 100) {
            minNumObs = n_records - ceiling(n_records*0.1)
            minNumUncen = ceiling(minNumObs/2)
            warning(paste('number of samples less than 100, modifying WRTDS arguments',
                          '\n     minNumObs:', minNumObs,
                          '\n     minNumUncen', minNumUncen))
        }


        writeLines(paste('stream chemistry dataframe being passed into EGRET sample, \nfile has:',
                         n_records, 'samples over ', n_years, 'water years'))

        # Set up EGRET Sample file
        Sample_file <- tibble(Name = site_code,
                              Date = as.Date(stream_chemistry$datetime),
                              ConcLow = stream_chemistry$val,
                              ConcHigh = stream_chemistry$val,
                              Uncen = 1,
                              ConcAve = stream_chemistry$val,
                              Julian = as.numeric(julian(lubridate::ymd(stream_chemistry$datetime),origin=as.Date("1850-01-01"))),
                              Month = lubridate::month(stream_chemistry$datetime),
                              Day = enlightened_yday(stream_chemistry$datetime),
                              DecYear = decimalDate(stream_chemistry$datetime),
                              MonthSeq = get_MonthSeq(stream_chemistry$datetime)) %>%
            mutate(SinDY = sin(2*pi*DecYear),
                   CosDY = cos(2*pi*DecYear))  %>%
            mutate(waterYear = ifelse(Month %in% c(10, 11, 12), lubridate::year(Date) + 1, lubridate::year(Date))) %>%
            dplyr::select(Name, Date, ConcLow, ConcHigh, Uncen, ConcAve, Julian, Month, Day,
                          DecYear, MonthSeq, waterYear, SinDY, CosDY)

        # Set up EGRET Daily file
        # make sure dischagre is daily aggregated
        discharge_daily <- discharge %>%
            mutate(datetime = lubridate::date(datetime)) %>%
            group_by(datetime, site_code, var, ms_status, ms_interp, year, month) %>%
            summarize(val = mean(val))

        Daily_file <- tibble(Name = site_code,
                             Date = as.Date(discharge_daily$datetime),
                             # converting lps to m^3/s
                             Q = discharge_daily$val/1000,
                             Julian = as.numeric(julian(lubridate::ymd(discharge_daily$datetime),origin=as.Date("1850-01-01"))),
                             Month = lubridate::month(discharge_daily$datetime),
                             Day = enlightened_yday(discharge_daily$datetime),
                             DecYear = decimalDate(discharge_daily$datetime),
                             MonthSeq = get_MonthSeq(discharge_daily$datetime),
                             Qualifier = discharge_daily$ms_status)

        if(prep_data){
            # Egret can't handle 0 in Q, setting 0 to the minimum Q ever reported seem reasonable
            no_flow_days <- Daily_file %>%
                filter(Q == 0) %>%
                pull(Date)

            n_nfd <- length(no_flow_days)
            n_record <- length(Daily_file$Date)
            percent_no_flow <- n_nfd/n_record

            writeLines(paste('days with no flow, percent of record:', percent_no_flow))


            # trying USGS WRTDS flow min method,
            # also NOTE: USGS WRTDS manual says no flow > %0.2 of days is an issue
            if(min_q_method == 'USGS'){
                message('using USGS reccomended no flow replacement method')
                mean_flow <- mean(Daily_file$Q[Daily_file$Q > 0], na.rm = TRUE)

                Daily_file <- Daily_file %>%
                    mutate(Q = ifelse(Q <= 0, !!mean_flow, Q))
            } else {
                # NOTE: could this be where Inf shows up too? like in min_chem?
                min_flow <- min(Daily_file$Q[Daily_file$Q > 0], na.rm = TRUE)

                Daily_file <- Daily_file %>%
                    mutate(Q = ifelse(Q <= 0, !!min_flow, Q))
            }

            # TODO: record zero flow days, and set flux for those days to zero
            # TODO: USGS WRTDS manual says method for replacing 0 and negative flow
            # is to set all 0 and neg to 0, then replace with 0.1% of mean flow
            # and they say final results should have this small flow increment
            # subtracted from Q and flux results (pg 6, manual)
        }

        Daily_file <- Daily_file %>%
            # 'extend' rollmean NOTE: cpuld rollmean args cause mischief
            # tho right align i believe is correct based off of egret docs``
            mutate(Q7 = zoo::rollmean(Q, 7, fill = 'extend', align = 'right'),
                   Q30 = zoo::rollmean(Q, 30, fill = 'extend', align = 'right'),
                   LogQ = log(Q))

        Daily_file <- tibble::rowid_to_column(Daily_file, 'i') %>%
            dplyr::select(Name, Date, Q, Julian, Month, Day, DecYear, MonthSeq, Qualifier,
                          i, LogQ, Q7, Q30)

        # Set up INFO table
        var <- ms_drop_var_prefix_(unique(stream_chemistry$var))
        var_unit <- ms_vars %>%
            filter(variable_code == !!var) %>%
            pull(unit)

        if(length(var_unit) == 0){
            message("length of var_unit for this flux calc is zero, setting var_unit to NA")
            var_unit <- NA
        }

        site_lat <- site_data %>%
            filter(site_code == !!site_code) %>%
            pull('latitude')
        site_lon <- site_data %>%
            filter(site_code == !!site_code) %>%
            pull('longitude')
        site_ws_area <- site_data %>%
            filter(site_code == !!site_code,
                   site_type == 'stream_gauge') %>%
            pull('ws_area_ha')

        # ws area change?
        site_ws_area <- site_ws_area / 100

        new_point <- sf::st_sfc(sf::st_point(c(site_lon, site_lat)), crs = 4326) %>%
            sf::st_transform(., crs = 4267)

        # NOTE: question- should we allow interpolating back to Oct 1st from a
        # record where first ever sample is Oct 26th? (as example of issue)
        INFO_file <- tibble(agency_cd = 'macrosheds',
                            site_no = site_code,
                            station_nm = site_code,
                            site_tp_code = 'ST',
                            lat_va = site_lat,
                            long_va = site_lon,
                            dec_lat_va = site_lat,
                            dec_long_va = site_lon,
                            coord_meth_cd = NA,
                            coord_acy_cd = NA,
                            coord_datum_cd = NA,
                            dec_coord_datum_cd = NA,
                            district_cd = NA,
                            state_cd = NA,
                            county_cd = NA,
                            country_cd = NA,
                            land_net_ds = NA,
                            map_nm = NA,
                            map_scale_fc = NA,
                            alt_va = NA,
                            alt_meth_cd = NA,
                            alt_acy_va = NA,
                            alt_datum_cd = NA,
                            huc_cd = NA,
                            basin_cd = NA,
                            topo_cd = NA,
                            instruments_cd = NA,
                            construction_dt = NA,
                            inventory_dt = NA,
                            # ws area change (hectares to square miles)
                            drain_area_va = area / 2.59,
                            contrib_drain_area_va = NA,
                            tz_cd = 'UTC',
                            local_time_fg = 'Y',
                            reliability_cd = NA,
                            gw_file_cd = NA,
                            nat_aqfr_cd = NA,
                            aqfr_cd = NA,
                            aqfr_type_cd = NA,
                            well_depth_va = NA,
                            hole_depth_va = NA,
                            depth_src_cd = NA,
                            project_no = NA,
                            shortName = site_code,
                            drainSqKm = site_ws_area,
                            staAbbrev = site_code,
                            param.nm = var,
                            param.units = var_unit,
                            paramShortName = var,
                            paramNumber = NA,
                            constitAbbrev = var,
                            paStart = 10,
                            paLong = 12)

        eList <- EGRET::mergeReport(INFO_file,
                                    Daily_file,
                                    Sample_file,
                                    verbose = TRUE)

        if(! run_egret){
            return(eList)
        }

        # TODO: print stats on the record being calculated,
        # particularly what the nuber of obs truly is (helps contextualize absurd results)
        # keeping in mind EGRET says that anything less than n=60 is "dangerous"
        eList <- try(modelEstimation(eList,
                                     minNumObs = minNumObs,
                                     minNumUncen = minNumUncen,
                                     windowY = 7,
                                     windowQ = 2,
                                     windowS = 0.5,
                                     verbose = TRUE))

        if(inherits(eList, 'try-error')){
            stop('EGRET failed while running WRTDS. See https://github.com/USGS-R/EGRET for reasons data may not be compatible with the WRTDS model.')
        }

        if(kalman){
            eList <- EGRET::WRTDSKalman(eList, verbose = !quiet)

            if(prep_data){
                # Set flux and Q to 0 and conc to NA on no flow days
                eList$Daily <- eList$Daily %>%
                    mutate(Q = ifelse(Date %in% !!no_flow_days, 0, Q),
                           LogQ = ifelse(Date %in% !!no_flow_days, 0, LogQ),
                           Q7 = ifelse(Date %in% !!no_flow_days, 0, Q7),
                           Q30 = ifelse(Date %in% !!no_flow_days, 0, Q30),
                           ConcDay = ifelse(Date %in% !!no_flow_days, NA, ConcDay),
                           FluxDay = ifelse(Date %in% !!no_flow_days, 0, FluxDay),
                           FNConc = ifelse(Date %in% !!no_flow_days, NA, FNConc),
                           FNFlux = ifelse(Date %in% !!no_flow_days, 0, FNFlux),
                           GenFlux = ifelse(Date %in% !!no_flow_days, 0, GenFlux),
                           GenConc = ifelse(Date %in% !!no_flow_days, 0, GenConc))
            }
        } else if(prep_data){
            # Set flux and Q to 0 and conc to NA on no flow days
            eList$Daily <- eList$Daily %>%
                mutate(Q = ifelse(Date %in% !!no_flow_days, 0, Q),
                       LogQ = ifelse(Date %in% !!no_flow_days, 0, LogQ),
                       Q7 = ifelse(Date %in% !!no_flow_days, 0, Q7),
                       Q30 = ifelse(Date %in% !!no_flow_days, 0, Q30),
                       ConcDay = ifelse(Date %in% !!no_flow_days, NA, ConcDay),
                       FluxDay = ifelse(Date %in% !!no_flow_days, 0, FluxDay),
                       FNConc = ifelse(Date %in% !!no_flow_days, NA, FNConc),
                       FNFlux = ifelse(Date %in% !!no_flow_days, 0, FNFlux))
        }

        # find any 'breaks' in record
        sample_rec <- detect_record_break(Sample_file)
        sample_breaks <- get_break_dates(sample_rec)

        # TODO: make dynamic in case of multiple long breaks
        if(length(sample_breaks['start'][[1]]) < 1) {
            writeLines(paste('no gap in record detected larger than', gap_period, 'days'))
        } else {

            for(i in length(sample_breaks['start'])) {
                # set period
                startBlank = sample_breaks['start'][[1]][[i]]
                endBlank = sample_breaks['end'][[1]][[i]]

                writeLines(paste('\n\nfound gap in record greater than', gap_period, 'days\n',
                                 'between', startBlank, 'and', endBlank, 'masking this period',
                                 'with EGRET blankTime()'))

                eList <- blankTime(eList,
                                   startBlank = startBlank,
                                   endBlank = endBlank)
            }
        }

        return(eList)
    }

    ms_chem <- chem_df %>%
        mutate(site_code = 'none',
               # TODO: variable handling
               var = 'variable_chem',
               ms_status = 0,
               ms_interp = 0) %>%
        rename(val = conc,
               datetime = all_of(datecol))

    ms_q <- q_df %>%
        mutate(site_code = 'none',
               var = 'variable_q',
               ms_status = 0,
               ms_interp = 0) %>%
        rename(val = q_lps,
               datetime = all_of(datecol))

    site_data <- tibble(site_code = 'none',
                        ws_area_ha = ws_size,
                        latitude = lat,
                        longitude = long,
                        site_type = 'stream_gauge')

    egret_results <- ms_run_egret_adapt(stream_chemistry = ms_chem,
                                        discharge = ms_q,
                                        prep_data = TRUE,
                                        site_data = site_data,
                                        kalman = kalman,
                                        run_egret = TRUE,
                                        minNumObs = minNumObs,
                                        minNumUncen = minNumUncen,
                                        gap_period = gap_period)

    return(egret_results)
}

calc_simple_flux <- function(chem, q){

    q_type <- q$var[1]

    c_stts <- 'ms_status' %in% colnames(chem)
    c_intp <- 'ms_interp' %in% colnames(chem)
    q_stts <- 'ms_status' %in% colnames(q)
    q_intp <- 'ms_interp' %in% colnames(q)

    if(sum(c(c_stts, q_stts)) == 1) stop('ms_status column should be present in both OR neither of `q` and `chemistry`.')
    if(sum(c(c_intp, q_intp)) == 1) stop('ms_interp column should be present in both OR neither of `q` and `chemistry`.')

    d <- left_join(
        chem,
        dplyr::select(q, date, val, starts_with('ms_')),
        by = 'date',
        suffix = c('_x', '_y')
    )

    if(q_type == 'discharge'){

        d <- d %>%
            mutate(# kg/d = mg/L *  L/s  * s / 1e6
                val = val_x * val_y * 86400 / 1e6)
                #val = val_x * val_y * errors::as.errors(86400) / errors::as.errors(1e6),

        if(c_stts){
            d <- mutate(d, ms_status = numeric_any_v(ms_status_x, ms_status_y))
        }
        if(c_intp){
            d <- mutate(d, ms_interp = numeric_any_v(ms_interp_x, ms_interp_y))
        }

        d <- d %>%
            dplyr::select(-starts_with(c('site_code_', 'var_', 'val_',
                                         'ms_status_', 'ms_interp_'))) %>%
            filter(! is.na(val)) %>% #should be redundant
            arrange(date) %>%
            ms_scale_flux_by_area()

    } else {

        d <- d %>%
            mutate(# kg/ha/d = mg/L *  mm/d * ha/100
                val = val_x * val_y / 100)
                #val = val_x * val_y / errors::as.errors(100),

        if(c_stts){
            d <- mutate(d, ms_status = numeric_any_v(ms_status_x, ms_status_y))
        }
        if(c_intp){
            d <- mutate(d, ms_interp = numeric_any_v(ms_interp_x, ms_interp_y))
        }

        d <- d %>%
            dplyr::select(-starts_with(c('site_code_', 'var_', 'val_',
                                         'ms_status_', 'ms_interp_'))) %>%
            filter(! is.na(val)) %>% #should be redundant
            arrange(date)
    }

    return(d)
}

calc_load <- function(chem, q, site_code, area, method,
                      aggregation, verbose){

    fvar <- chem$var[1]

    if(any(na.omit(chem$val == 0))){
        message('Concentration values of 0 detected. These will be removed before computing load.')
    }

    chem_filt <- chem %>%
        filter(val > 0) %>%
               #errors::drop_errors(val) > 0) %>%
        dplyr::select(date, val) %>%
        sw(tidyr::drop_na(date, val))

    chunk_daterange <- sw(range(chem_filt$date))

    q_filt <- filter(q,
                     date >= !!chunk_daterange[1],
                     date <= !!chunk_daterange[2])

    #find water years during which Q and chem overlap
    gd_q <- q_filt %>%
        mutate(water_year = wtr_yr(date, start_month = 10)) %>%
        pull(water_year)

    gd_chm <- chem_filt %>%
        mutate(water_year = wtr_yr(date, start_month = 10)) %>%
        pull(water_year)

    paired_years <- intersect(gd_q, gd_chm)

    if(! length(paired_years)){
        message(glue::glue('Not enough overlap between {fvar} and discharge to calculate flux.'))
        return(tibble(site_code = character(),
                      var = character(),
                      wy = numeric(),
                      val = numeric(),
                      method = character(),
                      ms_recommended = logical()))
    }

    daily_data_conc <- chem_filt %>%
        group_by(date) %>%
        summarize(val = mean_or_x(val),
                  .groups = 'drop') %>%
        mutate(site_code = !!site_code) %>%
        dplyr::select(site_code, date, conc = val)

    daily_data_q <- q_filt %>%
        group_by(date) %>%
        summarize(val = mean_or_x(val, na.rm = TRUE),
                  .groups = 'drop') %>%
        mutate(site_code = !!site_code) %>%
        dplyr::select(site_code, date, q_lps = val)

    raw_data_full <- full_join(daily_data_conc, daily_data_q,
                               by = c('site_code', 'date')) %>%
        mutate(wy = wtr_yr(date, start_month = 10),
               month = lubridate::month(date)) %>%
        filter(wy %in% paired_years)

    out_frame <- out_deets <- tibble()
    for(target_year in paired_years){

        aggregation <- ifelse(aggregation == 'monthly', 'month', aggregation)

        q_chem_yr <- raw_data_full %>%
            mutate(conc = conc,
                   q_lps = q_lps) %>%
            #mutate(conc = errors::drop_errors(conc),
            #       q_lps = errors::drop_errors(q_lps)) %>%
            filter(wy == !!target_year)

        q_yr <- q_chem_yr %>%
            dplyr::select(site_code, date, q_lps, wy) %>%
            tidyr::drop_na() %>%
            mutate(month = lubridate::month(date))

        chem_yr <- q_chem_yr %>%
            dplyr::select(site_code, date, conc, wy) %>%
            tidyr::drop_na() %>%
            mutate(month = lubridate::month(date))

        # set up default flux values to be replaced if user has chosen to run method
        if(aggregation == 'annual'){

            flux_annual_average <- NA
            flux_annual_pw <- NA
            flux_annual_beale <- NA
            flux_annual_rating <- NA
            flux_annual_comp <- NA

        } else {

            flux_monthly_average <- list(flux = rep(NA, 12))
            flux_monthly_pw <- list(flux = rep(NA, 12))
            flux_monthly_beale <- list(flux = rep(NA, 12))
            flux_monthly_rating <- list(flux = rep(NA, 12))
            flux_monthly_comp <- list(flux = rep(NA, 12))
        }

        #### calculate average #####
        if('average' %in% method){
            if(aggregation == 'month'){

                flux_monthly_average <- q_chem_yr %>%
                    group_by(wy, month) %>%
                    summarize(q_lps = mean(q_lps, na.rm = TRUE),
                              conc = mean(conc, na.rm = TRUE),
                              .groups = 'drop') %>%
                    # multiply by seconds in a year, and divide by mg to kg conversion (1M)
                    mutate(flux = conc * q_lps * 31557600 * (1 / area) * 1e-6) %>%
                    dplyr::select(month, flux) %>%
                    if_else(is.na(.), NA, .) #convert NaN

            } else {

                flux_annual_average <- q_chem_yr %>%
                    group_by(wy) %>%
                    summarize(q_lps = mean(q_lps, na.rm = TRUE),
                              conc = mean(conc, na.rm = TRUE),
                              .groups = 'drop') %>%
                    # multiply by seconds in a year, and divide by mg to kg conversion (1M)
                    mutate(flux = conc * q_lps * 31557600 * (1 / area) * 1e-6) %>%
                    pull(flux) %>%
                    if_else(is.na(.), NA, .) #convert NaN
            }
        }

        #### calculate period weighted #####

        if(aggregation == 'annual'){
            if('pw' %in% method){

                flux_annual_pw <- sw(calculate_pw(
                    chem_df = chem_yr,
                    q_df = q_yr,
                    datecol = 'date',
                    area = area,
                    period = aggregation
                ))
            }

        } else {

            # NOTE: for now, month-order is based off of pw, so this needs to run
            # for monthly flux calcs internals to run (should be fixed)
            flux_annual_pw <- sw(calculate_pw(
                chem_df = chem_yr,
                q_df = q_yr,
                datecol = 'date',
                area = area,
                period = aggregation
            ))
        }

        #### calculate beale ######

        if('beale' %in% method){
            flux_annual_beale <- calculate_beale(
                chem_df = chem_yr,
                q_df = q_yr,
                datecol = 'date',
                area = area,
                period = aggregation
            )
        }

        #### calculate rating #####

        if('rating' %in% method){
            flux_annual_rating <- calculate_rating(
                chem_df = chem_yr,
                q_df = q_yr,
                datecol = 'date',
                area = area,
                period = aggregation
            )
        }

        #### calculate composite ######

        comp_violation <- FALSE
        if('composite' %in% method){

            rating_filled_df <- generate_residual_corrected_conc(
                chem_df = chem_yr,
                q_df = q_yr,
                datecol = 'date',
                sitecol = 'site_code'
            )

            if(length(rating_filled_df) > 1 || ! is.na(rating_filled_df)){

                flux_annual_comp <- calculate_composite_from_resid_corrected(
                    rating_filled_df = rating_filled_df,
                    area = area,
                    period = aggregation
                ) %>% pull(flux)

                comp_violation <- flux_annual_comp < 0
            } else {
                comp_violation <- TRUE
            }
        }

        if(aggregation == 'month'){

            # NOTE: lots of code with month matching and merging below this is all
            # to match everything to month order of period weighted, to account for
            # possible worlds where water year rearranges month order, etc.
            months <- sapply(strsplit(flux_annual_pw$date, '-'), `[`, 2)
            flux_monthly_pw <- flux_annual_pw %>%
                mutate(month = months) %>%
                dplyr::select(month, flux)

            if(length(months) != 12){
                warning('Number of months in pw not 12, need handling for setting NA to uncalc months')
            }

            # beale
            if('beale' %in% method){
                months_beale <- sapply(strsplit(flux_annual_beale$date, '-'), `[`, 2)
                flux_monthly_beale <- flux_annual_beale %>%
                    mutate(month = as.character(months_beale))
                flux_monthly_beale <- flux_monthly_beale[match(months, flux_monthly_beale$month),]
                flux_monthly_beale <- left_join(flux_monthly_pw %>% dplyr::select(month),
                                                flux_monthly_beale, by = 'month') %>%
                    dplyr::select(month, flux)
            }

            # rating
            if('rating' %in% method){
                months_rating <- sapply(strsplit(flux_annual_rating$date, '-'), `[`, 2)
                flux_monthly_rating <- flux_annual_rating %>%
                    mutate(month = as.character(months_rating))
                flux_monthly_rating <- flux_monthly_rating[match(months, flux_monthly_rating$month),]
                flux_monthly_rating <- left_join(flux_monthly_pw %>% dplyr::select(month),
                                                 flux_monthly_rating, by = 'month') %>%
                    dplyr::select(month, flux)
            }

            # comp
            if('composite' %in% method){
                flux_monthly_comp <- flux_annual_comp[match(as.double(months), flux_annual_comp$month),]
                flux_monthly_comp <- flux_monthly_comp %>%
                    mutate(month = sprintf('%02d', month))
                # make sure all months present, filled w NAs if no value
                flux_monthly_comp <- left_join(flux_monthly_pw %>% dplyr::select(month),
                                               flux_monthly_comp, by = 'month') %>%
                    dplyr::select(month, flux)
            }

            if(!'pw' %in% method){
                flux_monthly_pw$flux <- rep(NA, 12)
            }
        }

        #### select MS favored ####

        if(aggregation == 'month'){
            paired_df <- full_join(q_yr, chem_yr,
                                   by = c('date', 'site_code', 'wy', 'month'))
        } else {
            paired_df <- q_yr %>%
                dplyr::select(-month) %>%
                full_join(dplyr::select(chem_yr, -month),
                          by = c('date', 'site_code', 'wy'))
        }

        paired_df <- paired_df %>%
            tidyr::drop_na() %>%
            filter(q_lps > 0,
                   is.finite(q_lps),
                   conc > 0,
                   is.finite(conc))

        model_data <- tibble(c_log = log10(paired_df$conc),
                             q_log = log10(paired_df$q_lps))

        if(nrow(model_data)){

            # `model_data` is the site-variable-year dataframe of Q and concentration
            # log-log rating curve of C by Q
            modl <- lm(model_data$c_log ~ model_data$q_log, singular.ok = TRUE)
            rating <- sw(summary(modl))

            r_squared <- rating$r.squared
            # auto-correlation of residuals of rating curve
            resid_acf <- abs(acf(rating$residuals, lag.max = 1, plot = FALSE)$acf[2])

            # auto-correlation of concentration data
            con_acf <- abs(acf(paired_df$conc, lag.max = 1, plot = FALSE)$acf[2])
        } else {
            r_squared <- resid_acf <- con_acf <- NA_real_
        }

        # modified from figure 10 of Aulenbach et al 2016
        if(nrow(model_data) && ! is.na(r_squared)){

            if(r_squared > 0.3 && ! is.na(resid_acf)){
                if(resid_acf > 0.2){
                    recommended_method <- 'composite'
                } else {
                    recommended_method <- 'rating'
                }
            } else {
                if(! is.na(con_acf) && con_acf > 0.20){
                    recommended_method <- 'pw'
                } else {
                    recommended_method <- 'average'
                }
            }

        } else {
            if(verbose){
                message(glue::glue('Invalid C-Q regression for: {site_code}, {fvar}, {target_year}. Not recommending any method.'))
                # 'during application of Aulenbach et al. 2016 procedure for choosing recommended load estimation method',
                # ' concentration:discharge log-log linear regression r_squared value was NaN; recommended method set to NA\n\n'))
            }
            recommended_method <- NA
        }

        if(! is.na(recommended_method) && recommended_method == 'composite' &&
           ! is.na(comp_violation) && comp_violation){
            if(verbose){
                message(glue::glue('Composite method invalid for: {site_code}, {fvar}, {target_year}. ',
                                   'Using period-weighting instead.'))
                # 'the recommended method was set to composite',
                # ' using procedure from Aulenbach et al. 2016, but that method results in illegal values.',
                # ' Setting recommended method to period-weighting instead.'))
            }
            recommended_method <- 'period weighted'
        }

        #### collect fluxes ####

        if(aggregation == 'month'){

            target_year_out <- tibble(
                wy = as.character(target_year),
                month = rep(months, 5),
                val = c(flux_monthly_average$flux,
                        flux_monthly_pw$flux,
                        flux_monthly_beale$flux,
                        flux_monthly_rating$flux,
                        ## flux_monthly_wrtds,
                        flux_monthly_comp$flux),
                site_code = !!site_code,
                var = !!fvar,
                method = c(rep('average', 12), rep('pw', 12), rep('beale', 12),
                           rep('rating', 12), rep('composite', 12))
            ) %>%
                mutate(ms_recommended = ifelse(method == !!recommended_method, 1, 0))

        } else {

            target_year_out <- tibble(
                wy = target_year,
                val = c(flux_annual_average,
                        flux_annual_pw,
                        flux_annual_beale,
                        flux_annual_rating,
                        ## flux_annual_wrtds,
                        flux_annual_comp),
                site_code = site_code,
                var = fvar,
                method = c('average', 'pw', 'beale', 'rating', 'composite')
            ) %>%
                mutate(ms_recommended = ifelse(method == !!recommended_method,
                                               TRUE,
                                               FALSE))

            if(all(is.na(target_year_out$val))){
                target_year_out$ms_recommended <- NA
            }

            target_year_deets <- tibble(site_code = site_code,
                                        var = fvar,
                                        water_year = target_year,
                                        cq_rsquared = r_squared,
                                        cq_resid_acf = resid_acf,
                                        c_acf = con_acf,
                                        n_c_obs = length(na.omit(chem_yr$conc)),
                                        n_q_obs = length(na.omit(q_yr$q_lps)),
                                        n_paired_cq_obs = nrow(paired_df),
                                        unique_c_quarters = length(unique(lubridate::quarter(chem_yr$date))),
                                        ws_area_ha = area)

            target_year_deets[is.na(target_year_deets)] <- NA
        }

        out_frame <- bind_rows(out_frame, target_year_out)
        out_deets <- bind_rows(out_deets, target_year_deets)
    }

    return(list(load = out_frame,
                diag = out_deets))
}
MacroSHEDS/macrosheds documentation built on Oct. 30, 2024, 11:15 a.m.