R/ms_calc_watershed_precip.R

Defines functions ms_calc_watershed_precip

Documented in ms_calc_watershed_precip

#' Spatially interpolate precipitation gauge data to whole watersheds (i.e. basins).
#'
#' Using precipitation gauge recordings (depth and/or chemistry) and locations,
#' this function interpolates an estimated value at each cell of an automatically retrieved digital
#' elevation model (DEM), then averages cell values to generate a watershed average
#' at every time step. The local relationship between
#' elevation and precipitation depth may also be leveraged for better prediction.
#' If precipitation chemistry is supplied in addition to depth, then watershed-average
#' precipitation depth, chemistry, and chemical flux estimates will all be generated.
#' 
#' @keywords internal
#' @author Spencer Rhea 
#' @author Mike Vlah, \email{vlahm13@@gmail.com}
#' @author Wes Slaughter
#' @param precip optional \code{data.frame} in MacroSheds format (see details),
#'    containing precipitation depth time series in mm. A \code{tibble} fitting these specifications
#'    can be generated by [ms_load_product()]. If \code{precip} is
#'    supplied and \code{pchem} is not, only watershed-average precip is returned.
#'    If both are supplied, both are returned, as well as watershed-average chemical flux,
#'    computed from the cell-wise product of precip and pchem after imputation.
#' @param pchem optional \code{data.frame} in MacroSheds format (see details),
#'    containing precipitation chemistry time series in mg/L. A \code{tibble} fitting these specifications
#'    can be generated by by [ms_load_product()]. If \code{pchem} is
#'    supplied and \code{precip} is not, only watershed-average pchem is returned.
#'    If both are supplied, both are returned, as well as watershed-average precipitation chemical flux,
#'    computed from the cell-wise product of precip and pchem after imputation.
#' @param ws_boundary \code{sf} object or path to a directory \emph{containing} directories, each in turn containing individual shapefiles 
#'    of watershed boundaries. Object/files must have site_code and geometry features (columns).
#'    See [ms_delineate_watershed()].
#' @param precip_gauge \code{sf} object or path to a directory \emph{containing} directories, each in turn containing shapefiles
#'    of precipitation gauge locations. Object/files must have site_code and geometry features (columns).
#'    All supplied precip gauges will be used for imputation, even if they are far from
#'    the watershed of interest.
#' @param out_path character. Directory to which output data will be saved.
#' @param parallel logical. If TRUE (default), interpolation attempts to use \code{maxcores} CPU cores/threads.
#' @param maxcores numeric. Default \code{Inf}; the maximum number of cores (or hyperthreads)
#'    used in parallel processing.
#'    Ignored if \code{parallel} is FALSE. If greater than the available number of cores/hyperthreads,
#'    all available cores/hyperthreads will be used.
#' @param elevation_agnostic Logical. Should elevation data be ignored when interpolating
#'    precipitation data? TRUE means "ignore elevation." See details for more information.
#' @param verbose Logical. If TRUE (default), prints more information during run.
#' @return Saves watershed-average precipitation depth and/or chemistry, and chemical flux (if both precip 
#'    and pchem are supplied) to \code{out_path} as feather files in MacroSheds format (see details).
#'    Output units for flux are kg/d. See [ms_scale_flux_by_area()] to convert to kg/ha/d.
#' @details This function uses inverse distance weighted interpolation to extend gauge measurements
#'    to every cell of a watershed DEM. If \code{elevation_agnostic} is set to TRUE, the resulting
#'    watershed average will be a simple arithmetic mean of all watershed cell values, ignoring
#'    the local relationship between precipitation and elevation.
#'    If FALSE (the default), the resulting watershed average will be a weighted mean of two separate estimates: the
#'    mean of cell values (weight: 1) \emph{and} the prediction of a linear regression with elevation (weight: absolute value of adjusted R^2 of modeled data).
#'    At least 3 gauges must record at a given time step for the regression to be leveraged in generating the final prediction.
#'    Note that these computations are performed at every time step, using all available
#'    (non-missing, non-NA) gauges.
#'    
#'    This function will interpolate only precipitation depth if no
#'    pchem is supplied. It will only interpolate precipitation chemistry 
#'    if no precip data is supplied. If both precip and pchem are supplied, then 
#'    precipitation depth, chemical concentrations, and precipitation chemical fluxes
#'    will all be calculated. 
#'    
#'    In general, elevation is a relevant predictor for precipitation depth, but not necessarily
#'    concentration or flux of precip chemical components. As such, \code{elevation_agnostic}
#'    is always set to TRUE (ignore the elev-precip relationship) when determining watershed-average precipitation chemistry \emph{in the case of both}
#'    \code{precip} and \code{pchem} being supplied. If only \code{pchem} or \code{precip}is
#'    supplied and \code{elevation_agnostic} is FALSE, then the elevation-pchem relationship
#'    \emph{will} be incorporated into the final estimate of the watershed average
#'    for each time step.
#'    
#'    Note that this function does not return the imputed watershed grid for each
#'    timestep--only the watershed-average estimate(s) as tibbles.
#'    
#'    MacroSheds format (only date, site_code, var, and val are required as inputs. all are written to output files):
#' + "date" in YYYY-mm-dd format
#' + "site_code": Short name for MacroSheds site. See [ms_load_sites()].
#' + "grab_sample": Boolean integer indicating whether the observation was obtained via grab sample or installed sensor. 1 = TRUE (grab sample), 0 = FALSE (installed sensor).
#' + "var": Variable code. See [ms_load_variables()].
#' + "val": Data value. See [ms_load_variables()] for units.
#' + "ms_status": Boolean integer. 0 = clean value. 1 = questionable value. See [the MacroSheds data paper](https://aslopubs.onlinelibrary.wiley.com/doi/full/10.1002/lol2.10325) for details.
#' + "ms_interp": Boolean integer. 0 = measured or imputed by primary source. 1 = interpolated by MacroSheds. See [the MacroSheds data paper](https://aslopubs.onlinelibrary.wiley.com/doi/full/10.1002/lol2.10325) for details.
#' + "val_err": The combined standard uncertainty associated with the corresponding data point, or NA if unknown. See "Detection limits and propagation of uncertainty" section of See [the MacroSheds data paper](https://aslopubs.onlinelibrary.wiley.com/doi/full/10.1002/lol2.10325) for details.
#' @seealso [ms_load_product()], [ms_delineate_watershed()], [ms_scale_flux_by_area()]
#' @importFrom data.table ':='
#' @examples
#' See vignette: https://github.com/MacroSHEDS/macrosheds/blob/master/vignettes/ms_interpolate_precip.md

ms_calc_watershed_precip <- function(precip,
                                     ws_boundary,
                                     precip_gauge,
                                     pchem,
                                     parallel = TRUE,
                                     maxcores = Inf,
                                     out_path,
                                     elevation_agnostic = TRUE,
                                     verbose = TRUE){

    library("dplyr", quietly = TRUE)

    check_suggested_pkgs(c('terra', 'elevatr', 'parallel', 'data.table', 'raster'))

    precip_only <- missing(pchem)
    pchem_only <- missing(precip)

    requireNamespace('macrosheds', quietly = TRUE)
    
    ms_vars <- macrosheds::ms_vars_ts %>% 
        filter(unit != 'kg/ha/d') %>% 
        dplyr::select(variable_code, flux_convertible) %>% 
        distinct()
        
    #### Load in data if file path supplied ####
    # load watershed boundaries
    if(! inherits(ws_boundary, 'sf') && inherits(ws_boundary, 'character')){
        
        if(grepl('\\.sh[a-z]$', ws_boundary)){
            stop('ws_boundary must be a directory if it is not an sf object')
        }
        
        wb_path <- list.files(ws_boundary, full.names = TRUE)
        wb <- try(purrr::map_dfr(wb_path, sf::st_read))
        
        if(inherits(wb, 'try-error')){
            stop('ws_boundary file failed to load, check file path is correct')
        }
    } else{
        wb <- ws_boundary
    }
    
    # Load in precipitation gauge locations
    if(! inherits(precip_gauge, 'sf') && inherits(precip_gauge, 'character')){
        
        if(grepl('\\.sh[a-z]$', precip_gauge)){
            stop('precip_gauge must be a directory if it is not an sf object')
        }
        
        rg_path <- list.files(precip_gauge, full.names = TRUE)
        rg <- try(purrr::map_dfr(rg_path, sf::st_read))
        
        if(inherits(rg, 'try-error')){
            stop('precip_gauge file failed to load, check file path is correct')
        }
    } else{
        rg <- precip_gauge
    }
    
    # Load in precipitation data 
    if(pchem_only){
        if(verbose){
            print('Only interpolating precipitation chemistry')
        }
    } else {
        if(! inherits(precip, 'data.frame')){
            precip_path <- list.files(precip, full.names = TRUE)
            precip <- try(purrr::map_dfr(precip_path, feather::read_feather))
            
            if(inherits(precip, 'try-error')){
                stop('precip files failed to load; check file path is correct')
            }
        }
    }
    
    # Load in precip chemistry
    if(precip_only){
        if(verbose){
            print('Only interpolating precipitation depth')
        }
    } else {
        if(! inherits(pchem, 'data.frame')){
            pchem_path <- list.files(pchem, full.names = TRUE)
            pchem <- try(purrr::map_dfr(pchem_path, feather::read_feather))
            
            if(inherits(pchem, 'try-error')){
                stop('pchem files failed to load; check file path is correct')
            }
        }
    }
    
    
    #### Checks ####
    if(!inherits(rg, 'sf')){
        stop('precip_gauge file must be an sf object.')
    }
    if(!inherits(wb, 'sf')){
        stop('ws_boundary file must be an sf object.')
    }
    if(!missing(precip) && !all(c('datetime', 'val', 'var', 'site_code', 'val_err') %in% names(precip))){
        stop('precip must be in MacroSheds format (required columns: datetime, site_code, var, val, val_err)')
    }
    if(!missing(pchem) && !all(c('datetime', 'val', 'var', 'site_code', 'val_err') %in% names(pchem))){
        stop('precip_chem must be in MacroSheds format (required columns: datetime, site_code, var, val, val_err)')
    }
    if(! length(unique(rg$site_code)) == length(rg$site_code)){
        stop('precip_gauge contains duplicate entries for the same gauge')
    }
    
    if(! pchem_only){
        if(! all(rg$site_code %in% unique(precip$site_code))){
            missing_gauge <- rg$site_code[! rg$site_code %in% unique(precip$site_code)]
            stop(paste0('a precip gauge location exists in precip_gauge for the gauge(s): ',
                              paste0(missing_gauge, collapse = ', '),
                              ', but no corresponding data exist in precip.',
                              ' Either add data to precip for these gauges or',
                              ' remove them from precip_gauge.'))
        }
        if(! all(unique(precip$site_code) %in% rg$site_code)){
            missing_gauge <- unique(precip$site_code)[!unique(precip$site_code) %in% rg$site_code]
            stop(paste0('data exist in precip for the gauge(s): ', paste0(missing_gauge, collapse = ', '),
                         ', but have no corresponding location in precip_gauge.',
                         ' Either add these gauge(s) to precip_gauge or remove corresponding data from precip.'))
        }
    }

    if(! precip_only){
        if(! all(rg$site_code %in% unique(pchem$site_code))){
            missing_gauge <- rg$site_code[! rg$site_code %in% unique(pchem$site_code)]
            stop(paste0('a gauge location exists in precip_gauge for the gauge(s): ',
                              paste0(missing_gauge, collapse = ', '),
                              ', but no corresponding data exist in pchem',
                              ' Either add data to pchem for these gauges or',
                              ' remove them from precip_gauge.'))
        }
        if(! all(unique(pchem$site_code) %in% rg$site_code)){
            missing_gauge <- unique(pchem$site_code)[!unique(pchem$site_code) %in% rg$site_code]
            stop(paste0('data exist in pchem for the gauge(s): ', paste0(missing_gauge, collapse = ', '),
                         ', but have no corresponding location in precip_gauge.',
                         ' Either add these gauge(s) to precip_gauge or remove corresponding data from pchem.'))
        }
    }
    
    # get precip var name
    if(! pchem_only){
        precip_var_name <- unique(precip$var)
        if(! length(precip_var_name) == 1){
            stop('Only one unique variable code is allowed in the var column of precip.')
        }
    }
    
    # Apply val_err as errors 
    if(! pchem_only){
        errors::errors(precip$val) <- precip$val_err
        precip <- precip %>%
            dplyr::select(-val_err)
    }
    
    if(! precip_only){
        errors::errors(pchem$val) <- pchem$val_err
        pchem <- pchem %>%
            dplyr::select(-val_err)
    }
    
    if(pchem_only) precip <- NULL
    
    #### Set up spatial ####
    
    #project based on average latlong of watershed boundaries
    bbox <- as.list(sf::st_bbox(wb))
    projstring <- choose_projection(lat = mean(bbox$ymin, bbox$ymax),
                                    long = mean(bbox$xmin, bbox$xmax))
    wb <- sf::st_transform(wb, projstring)
    rg <- sf::st_transform(rg, projstring)
    
    # get a DEM that encompasses all watersheds and gauges and buffer because
    # gauges on the edge of bbox can error in extracting elevation 
    wb_rg_bbox <- sf::st_as_sf(sf::st_as_sfc(sf::st_bbox(bind_rows(wb, rg)))) %>%
        sf::st_buffer(50)
    
    if('area' %in% names(wb)){
        wb <- wb %>%
            mutate(area = as.numeric(sf::st_area(geometry)/10000))
    }
    
    # Maybe change this 
    dem_res <- ifelse(any(wb$area < 5), 9, 8)
    
    dem <- expo_backoff(
        expr = {
            suppressWarnings(elevatr::get_elev_raster(locations = wb_rg_bbox,
                                     z = dem_res,
                                     clip = 'bbox',
                                     expand = 0.005,
                                     verbose = verbose,
                                     override_size_check = TRUE))
        },
        max_attempts = 5
    )
    
    #add elev column to rain gauges
    rg$elevation <- terra::extract(dem, rg)
    
    #this avoids a lot of slow summarizing
    if(! pchem_only){
        
        status_cols <- precip %>%
            dplyr::select(datetime, ms_status, ms_interp) %>%
            group_by(datetime) %>%
            summarize(
                ms_status = numeric_any(ms_status),
                ms_interp = numeric_any(ms_interp))
        
        day_durations_byproduct <- datetimes_to_durations(
            datetime_vec = precip$datetime,
            variable_prefix_vec = ms_extract_var_prefix_(precip$var),
            unit = 'days',
            installed_maxgap = 2,
            grab_maxgap = 30)
        
        precip$val[is.na(day_durations_byproduct)] <- NA
        
        precip <- precip %>%
            dplyr::select(-ms_status, -ms_interp, -var) %>%
            tidyr::pivot_wider(names_from = site_code,
                               values_from = val) %>%
            left_join(status_cols, #they get lumped anyway
                      by = 'datetime') %>%
            arrange(datetime)
        
        day_durations <- datetimes_to_durations(
            datetime_vec = precip$datetime,
            unit = 'days')
    }
    
    
    if(! precip_only){
        
        #determine which variables can be flux converted (prefix handling clunky here)
        
        flux_vars <- ms_vars %>% 
            filter(as.logical(flux_convertible)) %>%
            pull(variable_code)
        
        pchem_vars <- unique(pchem$var)
        pchem_vars_fluxable0 <- base::intersect(ms_drop_var_prefix_(pchem_vars),
                                                flux_vars)
        pchem_vars_fluxable <- pchem_vars[ms_drop_var_prefix_(pchem_vars) %in%
                                              pchem_vars_fluxable0]
        
        #this avoids a lot of slow summarizing
        status_cols <- pchem %>%
            dplyr::select(datetime, ms_status, ms_interp) %>%
            group_by(datetime) %>%
            summarize(
                ms_status = numeric_any(ms_status),
                ms_interp = numeric_any(ms_interp))
        
        #clean pchem one variable at a time, matrixify it, insert it into list
        nvars <- length(pchem_vars)
        pchem_setlist <- as.list(rep(NA, nvars))
        for(i in 1:nvars){
            
            v <- pchem_vars[i]
            
            #clean data and arrange for matrixification
            pchem_setlist[[i]] <- pchem %>%
                filter(var == v) %>%
                dplyr::select(-var, -ms_status, -ms_interp) %>%
                tidyr::pivot_wider(names_from = site_code,
                                   values_from = val) %>%
                left_join(status_cols,
                          by = 'datetime') %>%
                arrange(datetime)
        }
    } #end conditional pchem+pflux block (1)
    
    #make sure old quickref data aren't still sitting around
    unlink(glue::glue('{ms}/precip_idw_quickref',
                      ms = out_path),
           recursive = TRUE)
    
    #send vars into interpolator with precip, one at a time. if var is flux-
    #convertible, interpolate precip, pchem, and pflux. otherwise, just precip
    #and pchem. combine and write outputs by site
    
    for(i in 1:nrow(wb)){
        
        wbi <- slice(wb, i)
        site_code <- wbi$site_code
        wbi_area_ha <- as.numeric(sf::st_area(wbi)) / 10000
        
        idw_log_wb(verbose = verbose,
                   site_code = site_code,
                   i = i,
                   nw = nrow(wb))
        
        nthreads <- parallel::detectCores()
        
        if(maxcores > nthreads){
            
            if(! is.infinite(maxcores)){
                message(paste('Requested more cores than are available. Using',
                              nthreads, 'instead.'))
            }
            
            maxcores <- nthreads
        }
        
        ## IDW INTERPOLATE PRECIP FOR ALL TIMESTEPS. STORE CELL VALUES
        ## SO THEY CAN BE USED FOR PFLUX INTERP
        
        # Conditional switch between running in parallel and series 
        `%parcond%` <- ifelse(parallel, `%dopar%`, `%do%`)
        
        if(! pchem_only){
            
            ntimesteps_precip <- nrow(precip)
            nsuperchunks <- ceiling(ntimesteps_precip / 25000 * 2)
            nchunks_precip <- nthreads * nsuperchunks
            
            precip_superchunklist <- chunk_df(d = precip,
                                              nchunks = nsuperchunks,
                                              create_index_column = TRUE)
            
            ws_mean_precip <- tibble()
            for(s in 1:length(precip_superchunklist)){
                # ws_mean_precip <- foreach::foreach(
                #     s = 1:length(precip_superchunklist),
                #     .combine = idw_parallel_combine,
                #     .init = 'first iter') %:% {
                
                precip_superchunk <- precip_superchunklist[[s]]
                
                precip_chunklist <- chunk_df(d = precip_superchunk,
                                             nchunks = nthreads,
                                             create_index_column = FALSE)
                
                idw_log_var(verbose = verbose,
                            site_code = site_code,
                            v = 'precipitation',
                            j = paste('chunk', s),
                            ntimesteps = nrow(precip_superchunk),
                            nvars = nsuperchunks)
                
                clst <- ms_parallelize(maxcores = nthreads)
                # doFuture::registerDoFuture()
                # ncores <- min(parallel::detectCores(), maxcores)
                # clst <- parallel::makeCluster(nthreads, type='FORK')
                # future::plan(future::multicore, workers = 48)
                # future::plan(future::multisession, workers = 48)
                
                # parallel::stopCluster(clst)
                # fe_junk <- foreach:::.foreachGlobals
                # rm(list = ls(name = fe_junk),
                #    pos = fe_junk)
                
                ws_mean_precip_chunk <- foreach::foreach(
                    j = 1:min(nthreads, nrow(precip_superchunk)),
                    .combine = idw_parallel_combine,
                    .init = 'first iter',
                    .packages = c('dplyr', 'raster')) %parcond% {
                        
                        pchunk <- precip_chunklist[[j]]
                        
                        # idw_log_var(verbose = verbose,
                        #             site_code = site_code,
                        #             v = 'precipitation',
                        #             j = paste('chunk', j + (nthreads * (s - 1))),
                        #             ntimesteps = nrow(pchunk),
                        #             nvars = nchunks_precip)
                        
                        foreach_return <- shortcut_idw(
                            encompassing_dem = dem,
                            wshd_bnd = wbi,
                            data_locations = rg,
                            data_values = pchunk,
                            durations_between_samples = day_durations[pchunk$ind],
                            stream_site_code = site_code,
                            output_varname = 'SPECIAL CASE PRECIP',
                            save_precip_quickref = ! precip_only,
                            elev_agnostic = elevation_agnostic,
                            p_var = precip_var_name,
                            verbose = verbose,
                            macrosheds_root = out_path)
                        
                        foreach_return
                    }
                
                ms_unparallelize(clst)
                
                rm(precip_chunklist); gc()
                
                ws_mean_precip <- bind_rows(ws_mean_precip, ws_mean_precip_chunk)
            }
            
            rm(precip_superchunklist); gc()
            
            if(any(is.na(ws_mean_precip$datetime))){
                stop('NA datetime found in ws_mean_precip')
            }
            
            # #restore original varnames by site and dt
            # ws_mean_precip <- ws_mean_precip %>%
            #     arrange(datetime) %>%
            #     dplyr::select(-var) %>% #just a placeholder
            #     left_join(precip_varnames,
            #               by = c('datetime', 'site_code'))
            
            ws_mean_precip <- ws_mean_precip %>%
                dplyr::rename_all(dplyr::recode, concentration = 'val') %>%
                # rename(val = concentration) %>%
                arrange(datetime)
            
            # ws_mean_precip <- apply_detection_limit_t(
            #     X = ws_mean_precip,
            #     network = network,
            #     domain = domain,
            #     prodname_ms = precursor_prodnames[grepl('^precipitation',
            #                                             precursor_prodnames)])
            
            write_ms_file(ws_mean_precip,
                          macrosheds_root = out_path,
                          prodname_ms = 'precipitation__ms900',
                          site_code = site_code,
                          shapefile = FALSE)
            
            rm(ws_mean_precip); gc()
        }
        
        ## NOW IDW INTERPOLATE PCHEM (IF PRECIP CHEMISTRY DATA EXIST)
        ## AND PFLUX (FOR VARIABLES THAT ARE FLUXABLE).
        
        if(! precip_only){
            
            ws_mean_chemflux <- tibble()
            for(j in 1:nvars){
                
                v <- pchem_vars[j]
                jd <- pchem_setlist[[j]]
                ntimesteps_chemflux <- nrow(jd)
                
                if(v %in% pchem_vars_fluxable && ! pchem_only){
                    is_fluxable <- TRUE
                } else {
                    is_fluxable <- FALSE
                }
                
                idw_log_var(verbose = verbose,
                            site_code = site_code,
                            v = v,
                            j = paste('var', j),
                            nvars = nvars,
                            ntimesteps = ntimesteps_chemflux,
                            is_fluxable = is_fluxable)
                
                nsuperchunks <- ceiling(ntimesteps_chemflux / 5000 * 2)
                nchunks_chemflux <- nthreads * nsuperchunks
                
                chemflux_superchunklist <- chunk_df(d = jd,
                                                    nchunks = nsuperchunks)
                nsuperchunks <- length(chemflux_superchunklist)
                
                ws_mean_chemflux_var <- tibble()
                for(s in 1:nsuperchunks){
                    
                    chemflux_superchunk <- chemflux_superchunklist[[s]]
                    
                    chemflux_chunklist <- chunk_df(d = chemflux_superchunk,
                                                   nchunks = nthreads)
                    
                    idw_log_var(verbose = verbose,
                                site_code = site_code,
                                v = v,
                                j = paste('chunk', s),
                                ntimesteps = nrow(chemflux_superchunk),
                                nvars = nsuperchunks)
                    
                    clst <- ms_parallelize(maxcores = nthreads)
                    
                    foreach_out <- foreach::foreach(
                        l = 1:length(chemflux_chunklist),
                        # l = 1:min(nthreads, nrow(chemflux_superchunk)),
                        .combine = idw_parallel_combine,
                        .init = 'first iter',
                        .packages = 'dplyr') %parcond% {
                            
                            if(is_fluxable){
                                
                                foreach_chunk <- shortcut_idw_concflux_v2(
                                    encompassing_dem = dem,
                                    wshd_bnd = wbi,
                                    ws_area = wbi_area_ha,
                                    data_locations = rg,
                                    precip_values = precip,
                                    chem_values = chemflux_chunklist[[l]],
                                    stream_site_code = site_code,
                                    output_varname = v,
                                    out_path = out_path,
                                    verbose = verbose)
                                
                            } else {
                                
                                foreach_chunk <- shortcut_idw(
                                    encompassing_dem = dem,
                                    wshd_bnd = wbi,
                                    data_locations = rg,
                                    data_values = chemflux_chunklist[[l]],
                                    stream_site_code = site_code,
                                    output_varname = v,
                                    elev_agnostic = elevation_agnostic,
                                    macrosheds_root = out_path,
                                    verbose = verbose)
                            }
                            
                            foreach_chunk
                        }
                    
                    ms_unparallelize(clst)
                    
                    
                    rm(chemflux_chunklist); gc()
                    
                    ws_mean_chemflux_var <- bind_rows(ws_mean_chemflux_var,
                                                      foreach_out)
                }
                
                rm(chemflux_superchunklist); gc()
                
                ws_mean_chemflux <- bind_rows(ws_mean_chemflux,
                                              ws_mean_chemflux_var)
            }
            
            if(any(is.na(ws_mean_chemflux$datetime))){
                stop('NA datetime found in ws_mean_chemflux')
            }
            

            if(! pchem_only){
                
                ws_mean_pflux <- ws_mean_chemflux %>%
                    dplyr::select(-concentration) %>%
                    rename(val = flux) %>%
                    arrange(var, datetime)
                
                write_ms_file(ws_mean_pflux,
                              macrosheds_root = out_path,
                              prodname_ms = 'precip_flux_inst__ms902',
                              site_code = site_code,
                              shapefile = FALSE)
                
                rm(ws_mean_pflux)
            }
            
            ws_mean_pchem <- ws_mean_chemflux %>%
                dplyr::select(-any_of('flux')) %>%
                rename(val = concentration) %>%
                arrange(var, datetime)
            
            write_ms_file(ws_mean_pchem,
                          macrosheds_root = out_path,
                          prodname_ms = 'precip_chemistry__ms901',
                          site_code = site_code,
                          shapefile = FALSE)
            
            rm(ws_mean_pchem); gc()
        } #end conditional pchem+pflux block (2)
    }
    
    unlink(glue::glue('{ms}/precip_idw_quickref',
                      ms = out_path),
           recursive = TRUE)
}
MacroSHEDS/macrosheds documentation built on Oct. 30, 2024, 11:15 a.m.