R/aggregateSolute.R

#' Aggregate loads by the time periods specified by the user
#' 
#' This will aggregate the total loads or mean concentrations per aggregation 
#' interval, specified by \code{agg.by}. The time frame specified by 
#' \code{agg.by} can be "unit," "day," "month," "water year," "calendar year," 
#' "total," or the name of a column in newdata that can be used to group the 
#' data.
#' 
#' This also calculates the uncertainty in the sum over a regular time series 
#' (loads) with known standard errors (loadsSEs) for each short-term load 
#' estimate.
#' 
#' The general equation for propagation of error in a sum of potentially 
#' autocorrelated values is:
#' 
#' sum_t(var(x[t])) + 2*sum_a,b(cov(x_a,a_t+l))
#' 
#' where we will assume something about the covariance matrix.
#' 
#' However, we will deviate from the above equation to accommodate the lognormal
#' distribution of each flux prediction.
#' 
#' @importFrom dplyr %>% group_by_ summarise filter n n_groups
#' @importFrom lubridate tz
#' @importFrom smwrBase waterYear
#' @importFrom unitted u v get_units
#' @importFrom methods is
#' @importFrom stats aggregate qt qnorm setNames
#' @param preds Either a vector of predicted instantaneous fluxes or 
#'   concentrations or a data.frame containing the columns "fit", "se.pred", and
#'   "date"
#' @param metadata A metadata object describing the model
#' @param format character. The desired format of the aggregated values. If 
#'   "conc", preds is assumed to already be formatted as "conc". If "flux" or 
#'   "flux rate", preds is assumed to already be formatted as "flux rate". If 
#'   preds has a "units" attribute, that attribute is checked for consistency 
#'   with \code{format} and \code{metadata}, but if preds has no "units" 
#'   attribute then no such checks can be made.
#' @param agg.by character. The date interval or grouping column by which to 
#'   aggregate. If agg.by="unit", values will be returned unaggregated but in 
#'   the standard post-aggregation format. If agg.by is one of "day", "month", 
#'   "water year", or "calendar year", the dates vector will be split into 
#'   periods corresponding to those intervals, and the flux or concentration 
#'   will be computed for each period. If agg.by="total", \code{dates} will be 
#'   ignored and the entire vector \code{preds} will be aggregated. If 
#'   agg.by="[custom]", aggregation will occur for each unique value in 
#'   \code{dates}.
#' @param se.preds A vector of standard errors of prediction for instantaneous 
#'   flux or concentration predictions. This data may also be given as a column 
#'   named "se.pred" in preds when preds is a data.frame.
#' @param dates A vector, of the same length as preds, containing the dates to 
#'   aggregate over. This data may also be given as a column named "date" in 
#'   preds when preds is a data.frame.
#' @param custom An optional data.frame of one or more columns each containing 
#'   factors or other labels on which to aggregate. The columns to be used are 
#'   set by \code{agg.by}.
#' @param cormat.function A function that takes a vector of datetimes (Date, 
#'   POSIXct, chron, etc.) and returns a Matrix indicating the assumed/estimated
#'   correlation between prediction errors on each pair of datetimes. See 
#'   \link{correlations-2D} for predefined options.
#' @param ci.agg logical. Should confidence intervals for the aggregate 
#'   predictions be returned?
#' @param level numeric. The interval to span with the confidence intervals.
#' @param deg.free numeric. The degrees of freedom to use in calculating 
#'   confidence intervals from SEPs. If NA, a normal distribution is used rather
#'   than the more standard t distribution.
#' @param ci.distrib character. The distribution to assume for uncertainty in 
#'   the aggregate flux or concentration distribution. The default is 
#'   "lognormal".
#' @param se.agg logical. Should standard errors of the aggregate predictions be
#'   returned?
#' @param na.rm logical. Should NA values be ignored during aggregation (TRUE), 
#'   or should NA be returned for intervals that contain one or more NA 
#'   predictions (FALSE)?
#' @param attach.units logical. If true, units will be attached as an attribute 
#'   of the second column of the returned data.frame.
#' @param min.n numeric number of observations below which an \code{agg.by}
#'  value, e.g. a year, will be considered incomplete and be discarded 
#' @return A data.frame with two columns. The first contains the aggregation 
#'   period or custom aggregation unit and is named after the value of 
#'   \code{agg.by}. The second contains the aggregate flux or concentration 
#'   estimates and is named after the value of \code{format}. The values in the 
#'   second column will be in the units specified by \code{metadata}.
#'   
#' @examples
#' \dontrun{
#' data(eg_metadata)
#' metadata_example <- updateMetadata(eg_metadata, dates="date")
#' preds_example <- data.frame(fit=abs(rnorm(365, 5, 2)), se.pred=abs(rnorm(365, 1, 0.2)), 
#'   date=seq(as.Date("2018-05-15"), as.Date("2019-05-14"), by=as.difftime(1, units="days")))
#' aggregateSolute(preds_example, metadata=metadata_example, format="conc", agg.by="month")
#' 
#' # with a custom aggregation group
#' preds_regrouped <- transform(preds_example, simple.season=ordered(
#'   c("winter","spring","summer","fall")[floor(((as.numeric(strftime(date, "%m"))+0)%%12)/3)+1], 
#'   c("winter","spring","summer","fall")))
#' aggregateSolute(preds_example, metadata=metadata_example, format="conc", 
#'                 agg.by="simple.season", custom=preds_regrouped)
#' 
#' # with a custom prediction error correlation matrix
#' new_correlation_assumption <- getCormatFirstOrder(rho=0.9, 
#' time.step=as.difftime(1, units="days"), max.tao=as.difftime(10, units="days"))
#' aggregateSolute(preds_example, metadata=metadata_example, format="conc", agg.by="month",
#'                 cormat.function=new_correlation_assumption)
#' }
aggregateSolute <- function(
  preds, metadata, format=c("conc", "flux rate"), 
  agg.by=c("unit", "day", "month", "water year", "calendar year", "total", 
           "mean water year", "mean calendar year", "[custom]"),
  se.preds, dates, custom=NA, 
  cormat.function=cormat1DayBand,
  ci.agg=TRUE, level=0.95, deg.free=NA, ci.distrib=c("lognormal","normal"), se.agg=TRUE,
  na.rm=FALSE, attach.units=FALSE, min.n = 0) {
  
  # Warn users about flaws in uncertainty
  warning("Shoot, we've discovered a big problem in aggregateSolute. ",
          "The Values are fine, but the uncertainty estimates (SE, CI_lower, CI_upper) ",
          "are too low by a factor of 3 to 10 or more. Consequently we are returning NAs",
          "for standard errors and confidence intervals (SE, CI_lower, and CI_upper).",
          "We'll be working on this over the coming year (it's not a trivial challenge). ",
          "In the meantime, please consider reporting instantaneous uncertainties only",
          "(by setting agg.by = \"unit\"), or using predLoad(getFittedModel(load.model), by=[format]) ",
          "if you need aggregated uncertainties from a loadReg2 model. Sorry about this!")
  
  # Validate arguments
  if(format == "flux total") {
    warning("format == \"flux total\" is deprecated.  Flux rate can be multiplied by duration to get total flux")
  }
  format <- match.arg.loadflex(format, c("conc", "flux rate"))
  attach.units <- match.arg.loadflex(attach.units)
  default_agg.by <- c("unit", "day", "month", "water year", "calendar year", 
                        "total", "mean water year", "mean calendar year")
  for(abi in 1:length(agg.by)) {
    agg.by[abi] <- match.arg.loadflex(agg.by[abi], c(default_agg.by, colnames(custom)))
  }
  agg.by <- .reSpace(agg.by,"_") # replace spaces with underscores to use agg.by as a column name
  if(!is(custom, "data.frame")) {
    if(!is.na(custom)) {
      stop("Custom must be NA or a data.frame")
    }
  } else {
    numpreds <- if(is.data.frame(preds)) nrow(preds) else length(preds)
    if(nrow(custom) != numpreds) {
      stop("When custom is a data.frame, it must have as many rows as there are values in preds, se.preds, etc.")
    }
    colnames(custom) <- .reSpace(colnames(custom),"_") # do this after the match.arg.loadflex call
  }
  ci.distrib <- match.arg.loadflex(ci.distrib, c("lognormal","normal"))
  if(is.data.frame(preds)) {
    # check for required columns
    need_col <- c('date', 'fit')
    missing_col <- need_col[!need_col %in% colnames(preds)]
    if(length(missing_col) > 0) 
      stop(paste0("missing column[s] ", paste0("'", missing_col, "'", collapse=' & '), " in the preds data.frame"))
    
    # extract columns into vectors
    dates <- preds[,'date']
    preds <- preds[,'fit']
  }
  
  # Check that dates contains actual dates
  if(!(is(dates, "POSIXt") | is(dates, "Date") | is(dates, "chron"))) {
    stop("Unexpected format for dates - must be POSIXt, Date, or chron")
  }
  
  # If possible, check the units of preds against the units implied by format and metadata
  pred_units <- get_units(preds)
  if(!is.na(pred_units)) {
    expected_pred_units <- switch(
      format,
      "conc"=metadata@conc.units,
      "flux rate"=metadata@load.rate.units
    )
    if(pred_units != expected_pred_units) {
      stop(paste0("The units of preds should be ", expected_pred_units, 
                  ", given the metadata and format==", format))
    }
  }
  
  # Cohn WRR 2005, section 5, indicates that "one usually approximates the 
  # integral [of loads]" as a sum of sums, where the inner sums are days or 
  # hours and the outermost sum is over the entire period of interest. This is
  # quite conceptually different from pre-aggregating the predictors before
  # making the predictions, and it is something that I would like to implement
  # in a flexible way. See issue #109.
  #   if(isTRUE(preaggregate)) {
  #     if(all(agg.by %in% c("Month", "Water Year", "Calendar Year", "Total"))) {
  #       preaggregation <- aggregateSolute(
  #         preds, se.preds, format, metadata, dates, custom, agg.by="Day",
  #         cormat.function, preaggregate=FALSE, ci.agg, level, se.agg, na.rm, attach.units)
  #       preds
  #       se.preds
  #       dates
  #     }
  #   }
  
  # Decide on the aggregation vector (the usual case) or list of vectors
  # (uncommon, but possible for "custom")
  if(length(agg.by) == 1 & all(agg.by %in% gsub(" ", "_", default_agg.by))) {
    aggregate_by <- setNames(
      data.frame(
        switch(
          agg.by,
          "unit"=1:length(preds),
          "day"=strftime(dates, "%Y-%m-%d", tz=tz(dates)),
          "month"=strftime(dates, "%Y-%m", tz=tz(dates)),
          "water_year"=waterYear(dates),
          "calendar_year"=strftime(dates, "%Y", tz=tz(dates)),
          "total"=rep(1,length(preds)),
          # 2-phase aggregation:
          "mean_water_year"=waterYear(dates),
          "mean_calendar_year"=strftime(dates, "%Y", tz=tz(dates)),
          stop("")
        )),
      agg.by)
  }  else {
    aggregate_by <- custom[agg.by]
  }
  
  # Group the estimates as requested
  preds_grp <- group_by_(
    v(data.frame(preds, dates, aggregate_by)), 
    .dots=as.list(agg.by)) 
  groupsInRecord <- n_groups(preds_grp)
  # Remove grouping periods with insufficient data
  preds_filt <- filter(preds_grp, n() >= min.n)
  groupsComplete <- n_groups(preds_filt)
  # Compute the means and SEs values in each group
  agg_preds <- as.data.frame(summarise(
    preds_filt, 
    Value = mean(preds), 
    SE = NA, #returning NAs since this is not accurate
    n = n()))
  
  ### Notes on Uncertainty ### 
  
  # We may find that a faster way to estimate the uncertainty in this sum 
  # is by Monte Carlo simulation, using the means and variances (or se.preds) of 
  # instantaneous fluxes to parametrically resample from those distributions, 
  # find the sum, and repeat until we have a population of sums from which we 
  # can estimate a distribution. See, for example, 
  # http://eprints.sics.se/2253/1/SICS-T--2002-01--SE.pdf ("Evaluating the CDF 
  # for m weighted sums of n correlated lognormal random variables", Lars 
  # Rasmusson, 2002, Swedish Institute of Compute Science, Report T2002:01,
  # ISRN: SICS-T-2002/01-SE, ISSN:110-3154)
 
  # Compute prediction intervals if requested.
  #Returning NAs until implemented correctly
  if(ci.agg) {
      agg_preds$CI_lower <- NA
      agg_preds$CI_upper <- NA
  }
  
  # If requested, determine the new units for the conc/flux/fluxrate columns.
  # Other columns (e.g., Period, Duration) are either non-unitted or already
  # have units attached.
  if(attach.units) {
    new_units <- switch(
      format,
      "conc"=metadata@conc.units,
      "flux rate"=metadata@load.rate.units
    )
    retDF <- u(agg_preds, replace(rep(NA, ncol(agg_preds)), names(agg_preds) %in% c("Value","SE","CI_lower","CI_upper"), new_units))
  } else {
    # Shake off any pre-existing units - e.g., those attached to Duration
    retDF <- v(agg_preds)
  }
  #aggregate to multi year if asked
  if(grepl(pattern = "mean", x = agg.by, ignore.case = TRUE)) {
    # Treat the final mean as a normal distribution
    multiSE <- sqrt(sum(retDF$SE ^ 2)) / nrow(retDF)
    CI_quantile <- qnorm(1 - (1 - level)/2)
    retDF <- data.frame(
      Value = mean(retDF$Value),
      SE = multiSE, 
      CI_lower = mean(retDF$Value) - CI_quantile*multiSE,
      CI_upper = mean(retDF$Value) + CI_quantile*multiSE,
      years.record = groupsInRecord,
      years.complete = groupsComplete,
      stringsAsFactors = FALSE)
  }
  
  # Give the data.frame nice column names
  names(retDF)[1] <- .reSpace(.sentenceCase(names(retDF)[1]), "_")
  names(retDF)[match("Value", names(retDF))] <- .reSpace(.sentenceCase(format), "_")
  
  return(retDF)
}

SEofSum <- function(dates, se.preds, cormat.function) {
  # By Cohn 2005 Equation 51, Var[L-muL] = 
  # sum_j(sum_i(rho_ij*mui*muj*(exp(sigma^2)-1))) where sigma^2 is the
  # variance in log space and mui is the mean in linear space (mui = exp(mulog
  # + 0.5*sigma^2)). In other words, this is the standard equation relating 
  # correlation to covariance as cov=cor*se_i*se_j. We can use the same 
  # equation, except that the SEs of the instantaneous loads have already been
  # calculated for us as the values returned when you call predictSolute with 
  # se.pred=TRUE. So it's sum_j(sum_i(cor_ij*se_i*se_j)).
  
  # Newey and West argued that autocovariance at longer lags should be
  # downweighted in calculating the variance of the sum, largely because it's
  # really hard to calculate the variance at a lag close to the length of the
  # time series (because there are only a few pairs of points with that lag
  # distance); this is therefore a small sample problem. So if you choose, you
  # can multiply each term by a weight specified by weight.fun. The default is 1
  # for any value of lag.num, but another good choice is 1-(l/(L+1)) as 
  # suggested by Newey and West (1987); other weights were suggested in papers 
  # subsequent to that one. If you do use such a weighting function, you should
  # choose L wisely; it should probably be constant across all aggregation
  # periods rather than changing from period to period, and it should probably
  # depend on the temporal resolution of your predictions. The weight function
  # is sometimes also called a kernel function.
  #   weight.fun=function(lag.num) { rep(1, length(lag.num)) }
  #   acfvec <- acfvec * weight.fun(0:num.lags)
  
  # First compute the correlations among observation errors, based on
  # assumptions or estimates embodied in cormat.function
  cor_matrix <- cormat.function(dates)
  
  # The covariance matrix is the element-wise product of cor_matrix and 
  # varvar_matrix, where varvar_matrix[i,j] is se.preds[i]*se.preds[j]. 
  # Because what we really want is the sum of all terms in the covariance 
  # matrix, and because varvar_matrix could be huge, we don't actually ever 
  # create varvar_matrix. Instead we use sparse matrix multiplication to get 
  # us first to another sparse matrix (half_cov_matrix) and then to a column 
  # vector (sums by row of products), and then we take the sum of that vector
  # straight away.
  half_cov_matrix <- cor_matrix*se.preds
  cov_matrix_sum <- sum(half_cov_matrix %*% se.preds)
  
  # At this point (between creating cov_matrix and computing the summation,
  # actually), the AMLE algorithm in rloadest adds an additional term for
  # covariance arising from coefficient uncertainty - see src/TAC_LOAD.f - but
  # for other model types this additional covariance is captured in se.pred
  # and the chosen cormat.function already.
  
  # Return the calculated SE
  return(sqrt(cov_matrix_sum)/length(se.preds))
}
McDowellLab/loadflex documentation built on May 8, 2019, 9:48 a.m.