R/config_to_data.R

Defines functions config_preds_to_data_column config_to_data_column config_to_data

Documented in config_preds_to_data_column config_to_data config_to_data_column

#' Convert config specifications into a data input table
#' 
#' Turns a single config row into a data.frame of input data for the specified 
#' metabolism modeling function
#' 
#' @param config_row a 1-row config data.frame
#' @param row_num the row number/name that this config_row held in the original 
#'   config
#' @param metab_fun a metabolism modeling function
#' @param on_error character. What to do if data cannot be acquired or merged? 
#'   If 'stop' or 'warn', the appropriate condition is thrown. If 'warn' or 
#'   'quiet', the function [also] returns a character of errors and/or a 
#'   character of warnings, attached as attributes of the output object
#' @return NA or a unitted data.frame, possibly with attributes "errors" and/or 
#'   "warnings" attached (see on_error)
#' @import dplyr
#' @importFrom unitted u v
#' @importFrom stats setNames
#' @import streamMetabolizer
#' @examples 
#' \dontrun{
#' depth_file <- download_ts("depth_calcDischHarvey", "nwis_04087142", 
#'   on_local_exists="replace")
#' config <- stage_metab_config(tag="0.0.1", strategy="try stage_metab_config", 
#'   site="nwis_04087142", dosat=choose_data_source("dosat", "nwis_04087142", 
#'     logic="simple dosat", type="const", src="12,mg L^-1"), 
#'     disch=choose_data_source("disch", "nwis_04087142"),
#'   depth=choose_data_source("depth", "nwis_04087142", logic="local file", 
#'     type="ts_file", src=depth_file), filename=NULL)
#' cdat <- config_to_data(config[1,], row_num=1, metab_fun=streamMetabolizer::metab_bayes)
#' names(cdat)
#' head(cdat[['data']][which(complete.cases(cdat[['data']])),])
#'  
#' login_sb()
#' site="nwis_01646000"
#' config <- stage_metab_config(tag="0.0.1", strategy="try stage_metab_config", 
#'   model="metab_Kmodel", site=site, filename=NULL,
#'   sitetime=choose_data_source("sitetime", site, logic="unused var"),
#'   doobs=choose_data_source("doobs", site, logic="unused var"),
#'   dosat=choose_data_source("dosat", site, logic="unused var"),
#'   depth=choose_data_source("depth", site, logic="unused var"),
#'   wtr=choose_data_source("wtr", site, logic="unused var"),
#'   par=choose_data_source("par", site, logic="unused var"),
#'   sitedate=choose_data_source("sitedate", site, logic="priority local"),
#'   K600=choose_data_source("K600", site, logic="nighttime", src="0.0.6", type="pred"),
#'   dischdaily=choose_data_source("dischdaily", site, logic="priority local"),
#'   velocdaily=choose_data_source("velocdaily", site, logic="priority local"),
#'   omit_incomplete=FALSE)
#' cdat <- config_to_data(config[1,], row_num=1, 
#'   metab_fun=streamMetabolizer::metab_Kmodel)
#' }
#' @export
config_to_data <- function(config_row, row_num, metab_fun, on_error=c('stop','warn','quiet')) {

  # process inputs
  on_error <- match.arg(on_error)
  
  # get a lookup table to translate between mda.streams vars and metab vars
  metab_var <- '.dplyr.var'
  var_lookup <- unique(get_var_src_codes(out=c("metab_var","var","metab_units","units"))) %>%
    filter(!is.na(metab_var))
  
  # get a list of vars and the relevant columns supplied in the config_row
  spec_suffixes <- colnames(choose_data_source(var="doobs", site=NA, logic="unused var"))
  spec_prefixes <- gsub(".src$", "", names(config_row)[grepl(".src$", names(config_row))])
  spec_data_names <- var_lookup[match(spec_prefixes, var_lookup$var),"metab_var"]
  data_specs <- lapply(setNames(spec_prefixes,spec_data_names), function(var) {
    setNames(paste0(var, ".", spec_suffixes), spec_suffixes)
  })
  
  # look to formals(metab_fun) for a list of vars for which we expect complete info
  data_needs_list <- var_needs_list <- optional_list <- list() 
  data_args <- intersect(c('data','data_daily'), names(formals(metab_fun)))
  for(possible_arg in data_args) {
    arg_default <- eval(formals(metab_fun)[[possible_arg]])
    data_needs_list[[possible_arg]] <- names(arg_default)
    var_needs_list[[possible_arg]] <- var_lookup[match(names(arg_default), var_lookup$metab_var), 'var']
    optional_list[[possible_arg]] <- attr(arg_default, 'optional')
    # if we couldn't look it up in var_lookup, it's not a data need we need to import
    na_needs <- which(is.na(var_needs_list[[possible_arg]]))
    if(length(na_needs) > 0) {
      data_needs_list[[possible_arg]] <- data_needs_list[[possible_arg]][-na_needs]
      var_needs_list[[possible_arg]] <- var_needs_list[[possible_arg]][-na_needs]
    }
  }
  
  # prepare vectors to collect any errors/warnings
  err_strs <- warn_strs <- character()

  # check the vars for which we expect the specs to be NAs
  data_ignored <- which(names(data_specs) %in% setdiff(names(data_specs), c(data_needs_list[['data']],data_needs_list[['data_daily']])))
  for(ignored in data_ignored) {
    type <- config_row[[1,data_specs[[ignored]][['type']]]]
    site <- config_row[[1,data_specs[[ignored]][['site']]]]
    src <- config_row[[1,data_specs[[ignored]][['src']]]]
    
    if(is.na(type) || type != 'none') {
      warn_strs <- c(warn_strs, paste0("need type='none' for unused var ", spec_prefixes[ignored]))
    }
  }
    
  final_data_list <- lapply(data_args, function(datatype) {
    
    data_needs <- data_needs_list[[datatype]]
    var_needs <- var_needs_list[[datatype]]
    optional <- optional_list[[datatype]]
    if(length(optional)==1) {
      optional <- switch(
        optional,
        'all'=data_needs,
        'none'=NULL,
        optional)
    }
    
    # check and acquire the vars we need to return
    data_list <- lapply(seq_along(data_needs), function(needed) {
      # don't waste time if we've already hit errors
      if(length(err_strs) > 0) return(NA)
      
      var <- var_needs[needed]
      data <- data_needs[needed]
      type <- config_row[[1,data_specs[[data]][['type']]]]
      site <- config_row[[1,data_specs[[data]][['site']]]]
      src <- config_row[[1,data_specs[[data]][['src']]]]
      opt <- data %in% optional
      # create a conversion factor if needed
      units <- var_lookup[var_lookup$var==var,c('metab_units','units')]
      conv <- if(units$metab_units == units$units) NA else {
        if(var == 'disch') u(0.028316847, 'm^3 ft^-3') else # multiplier to convert from units=ft^3/s to metab_units=m^3/s
          stop("units != metab_units but only know how to convert for var == 'disch'")
      }
                 
      # run config_to_data_column (defined below) in a tryCatch block
      data <- withCallingHandlers(
        tryCatch({
          dat <- config_to_data_column(var, type, site, src, optional=opt)
          if(is.data.frame(dat) && !is.na(conv)) dat[[2]] <- dat[[2]] * conv
          dat
        }, error=function(e) { 
          err_strs <<- c(err_strs, paste0(var,"-",type,": ",e$message))
          NA 
        }),
        warning=function(w) { 
          warn_strs <<- c(warn_strs, paste0(var,"-",type,": ",w$message))
          invokeRestart("muffleWarning")
        })
      data
    }) %>% setNames(data_needs)
    
    # check for missing and non-optional columns
    omitted_cols <- sapply(data_list, function(d) isTRUE(is.na(d)))
    if(all(omitted_cols) && 'all' %in% optional_list[[datatype]]) {
      return(NULL) # we're allowed to return nothing in this case
    } else if(any(omitted_cols)) {
      if(all(names(which(omitted_cols)) %in% optional)) {
        # if the missing columns are all optional, then we can proceed. just
        # need to lessen our expectations
        data_needs <- data_needs[!omitted_cols]
        var_needs <- var_needs[!omitted_cols]
        data_list[omitted_cols] <- NULL
      } else {
        # these missing cols should already have been identified by warning in
        # config_to_data_column. plan a full-on error here.
        err_strs <- c(err_strs, paste0("required-but-missing data columns in config row ", row_num))
      }
    }
    
    # if something is broken, return without attempting combine_ts
    if(length(err_strs) != 0) return(NA) # NA is the flag for a bad result; won't be deleted when we pare down final_data_list
    
    # combine the data into a single data.frame
    data_df <- withCallingHandlers(
      tryCatch({
        combo <- do.call(combine_ts, c(data_list, list(
          method='approx', approx_tol=as.difftime(3, units="hours")
        )))
        
        # restrict dates of data if requested
        datetime_col <- names(combo)[na.omit(match(c('sitetime','sitedate','DateTime'), names(combo)))[1]]
        if(!is.na(start_date <- config_row[[1, "start_date"]])) {
          combo_datetime <- v(combo[[datetime_col]])
          combo <- switch(
            datatype,
            'data'=combo[combo_datetime >= as.POSIXct(start_date, tz="UTC"), ],
            'data_daily'=combo[combo_datetime >= as.Date(start_date), ])
        }
        if(!is.na(end_date <- config_row[[1, "end_date"]])) {
          combo_datetime <- v(combo[[datetime_col]])
          combo <- switch(
            datatype, # use < for data or <= for data_daily for end_date
            'data'=combo[combo_datetime < as.POSIXct(end_date, tz="UTC"), ],
            'data_daily'=combo[combo_datetime <= as.Date(end_date), ])
        }
        
        # rename
        combo %>% select_(.dots=var_needs) %>% setNames(data_needs)
      },
      error=function(e) { 
        err_strs <<- c(err_strs, paste0("combine: ",e$message))
        NA # NA is the flag for a bad result; won't be deleted when we pare down final_data_list
      }),
      warning=function(w) { 
        warn_strs <<- c(warn_strs, paste0("combine: ",w$message))
        invokeRestart("muffleWarning")
      })
    
    data_df
  }) %>% setNames(data_args)
  
  # return the results and/or error/warning strings
  if(length(warn_strs) > 0) {
    attr(final_data_list, "warnings") <- warn_strs
    warn_str <- paste0(warn_strs,collapse="\n  ")
    if(on_error != 'quiet') warning(warn_str)
  }
  if(length(err_strs) > 0) {
    err_str <- paste0(err_strs, collapse="\n  ")
    switch(
      on_error,
      'stop'=stop(err_str),
      'warn'={
        warning(err_str)
        attr(final_data_list, "errors") <- err_strs
      },
      'quiet'={
        attr(final_data_list, "errors") <- err_strs
      })
  }
  
  # remove data or data_daily if they were empty and optional (and hence NULL
  # within final_data_list)
  final_data_list[sapply(final_data_list, function(d) isTRUE(is.null(d)))] <- NULL
  final_data_list
  
}

#' Read a single column worth of data
#' 
#' For use within config_to_data
#' 
#' @inheritParams choose_data_source
#' @return a data.frame, else NA
#' @import dplyr
#' @import streamMetabolizer
#' @importFrom unitted u v get_units
#' @importFrom stats setNames
#' @keywords internal
config_to_data_column <- function(var, type, site, src, optional=FALSE) {
  data <- NA
  switch(
    type,
    'ts'={
      num_tries <- 3
      for(i in 1:num_tries) {
        dfile <- tryCatch(
          download_ts(make_var_src(var, src), site, version="rds", on_local_exists="skip", on_remote_missing="stop"),
          error=function(e) { warning(paste0("download try ",i,": ",e$message)); NULL })
        if(!is.null(dfile)) break else Sys.sleep(1)
      }
      if(is.null(dfile)) stop("failed to download file after ", num_tries, " tries")
      data <- read_ts(dfile)
    },
    'meta'={
      warning("meta type not currently implemented")
    },
    'ts_file'={
      data <- read_ts(src)
    },
    'const'={
      const_strs <- strsplit(src, ',')[[1]]
      const_num <- as.numeric(const_strs[1])
      if(length(const_strs) != 2 || is.na(const_num)) {
        warning("value could not be interpreted as 'number,units'")
      }
      const <- u(const_num, const_strs[2])
      data <- data.frame(NA, const) %>%
        setNames(c("DateTime", var)) %>%
        u()
    },
    'pred'={
      data <- config_preds_to_data_column(var=var, site=site, src=src, type=type)
    },
    'pred_file'={
      data <- config_preds_to_data_column(var=var, site=site, src=src, type=type)
    },
    'none'={
      if(!isTRUE(optional)) {
        stop("need type!='none' for needed var ", var)
      }
    },
    stop("unrecognized type for needed var ", var)
  )
  data
}

#' Get predictions of metabolism or DO from a model
#' 
#' Most of this code is the same whether it's a pred (the name of a model on 
#' sciencebase) or a pred_file (the name of a local file name)
#' 
#' The trick is that model predictions only store the input time, with was 
#' either local.time (< streamMetabolizer 0.8.0) or solar.time (>=0.8.0) rather 
#' than UTC time. So this function draws on our knowledge of the site-specific 
#' mapping between UTC time and solar.time or date (as stored in sitetime 
#' and sitedate, respectively) to come up with the UTC DateTime stamps that 
#' correspond to these predictions. We could potentially just use
#' convert_solartime_to_UTC, but I'm nervous about tiny imprecisions that would
#' make these not match up to other DateTime stamps.
#' 
#' @param var the variable name to retrieve
#' @param src the model_name or model file name
#' @param type character in c("SB","file") indicating whether src is a model on 
#'   sciencebase or a local file name
#' @importFrom unitted u v
#' @import streamMetabolizer
#' @import dplyr
#' @import tibble
#' @keywords internal
config_preds_to_data_column <- function(var, site, src, type) {
  # load the model
  mm <- if(type=="pred") {
    get_metab_model(src) 
  } else if(type=="pred_file") {
    varname <- load(src)
    get(varname) %>% 
      modernize_metab_model()
  }
  
  # determine which temporal resolution we'll be working in and which
  dailypredvars <- c(
    K600="K600",K600lwr="K600.lower",K600upr="K600.upper",
    gpp="GPP",gpplwr="GPP.lower",ggpupr="GPP.upper",
    er="ER",erlwr="ER.lower",erupr="ER.upper")
  if(var=='doobs') {
    resolution <- 'inst'
    preds_var <- c(doobs='DO.obs')
  } else if(var %in% names(dailypredvars)) {
    resolution <- 'daily'
    preds_var <- dailypredvars[var]
  } else {
    stop('cannot determine temporal resolution of desired prediction')
  }
  
  # get predictions
  preds <- switch(
    resolution,
    inst=predict_DO(mm),
    daily=predict_metab(mm))
  
  # determine which time translation needs to happen
  preds_res <- '.dplyr.var'
  time_relation <- 
    tibble(
      preds_res = c('inst','daily'),
      preds_col = c('solar.time','date'),
      ts_col = c('sitetime','sitedate')) %>%
    filter(preds_res == resolution)
  
  if(resolution=='inst') {
    # remove NA predictions, which might occur if the predictors were available
    # for a partial day and GPP, etc. were not fit for that partial day
    preds <- preds[!is.na(preds[[unname(preds_var)]]), ]
    
    # filter predictions to only those that are the first of each timestamp (even 
    # though some models will have had >24-hour 'days' and therefore two 
    # predictions per time that falls into two 'days'). match() picks the first
    # match when there's more than one available
    preds <- preds[match(unique(preds[[time_relation$preds_col]]), preds[[time_relation$preds_col]]),]
  }
  
  # download and format the appropriate datetime key for translating from preds_time to DateTime
  preds_time_config <- c(var=time_relation$ts_col, as.list(choose_data_source(time_relation$ts_col, site, "priority local"))[c('type','site','src')])
  datetime_key <- v(do.call(config_to_data_column, preds_time_config))
    
  # add translated DateTime to preds. use match(), which picks the first match,
  # to make sure we only get one prediction per DateTime even if the time ranges 
  # overlap
  preds_w_DateTime <- inner_join(preds, datetime_key, by=setNames(time_relation$ts_col, time_relation$preds_col))
  if(nrow(preds_w_DateTime) != nrow(preds)) stop('failed to match up model predictions to UTC DateTime')
  preds <- preds_w_DateTime
  
  # select just the two columns we need
  preds <- preds %>%
    select_('DateTime', unname(preds_var)) %>%
    setNames(c('DateTime', names(preds_var)))
  
  # add units and return
  myvar <- var # need just for following line in get_var_src_codes
  varunits <- unique(get_var_src_codes(var==myvar, out='units'))
  data <- preds %>% u(c(NA, varunits))  
  data
}
USGS-R/mda.streams documentation built on June 3, 2023, 8:43 a.m.