R/tempo_derive.R

Defines functions tempo_derive dtvgeom_wrapper calc_prob tempo_calc_group_betas tempo_derive_probs tempo_derive_moment tempo_calc_prob_posterior tempo_calc_moment_posterior tempo_calc_all_params_re

Documented in tempo_calc_group_betas tempo_derive

#' Calculate derived quantities
#' 
#' This function can calulate select derived quantites using the output from
#' \code{tempo_mcmc()} for observations chosed by the user. Derived quantities 
#' include the pmf and/or cdf of transition time, the transition probabilities
#' for each time step (i.e. the parameter for the tvgeom distribution), 
#' the mean predicted transition time, and the standard deviation 
#' of the transition time distribution.
#' 
#' @param draws output from \code{tempo_mcmc()}
#' 
#' @param quantity character vector; one or more of "probabilities", "pmf",
#' "cdf", "mean", and "sd", specifying which quantities to calculate. Note that
#' if your model included random effects, you must run \code{tempo_mcmc()} with
#' \code{monitor_random_effects = TRUE} in order to calculate derived
#' quantities.
#' 
#' @param obs_idx integer vector; specifies the observations for which derived
#' quantities will be calculated. Indices in \code{obs_idx} corespond to the
#' rows in the observation data supplied to \code{tempo_mcmc()}.
#' 
#' @param max_samples integer; specify the number of samples for which to 
#' calculate derived quantites \strong{for each chain}. Defaults to \code{1000}.
#' If \code{max_samples} is less than the number of MCMC samples, 
#' each chain in \code{draws} will be downsampled deterministically to size 
#' \code{max_samples}, resulting in a total of 
#' \code{max_samples * <number of chains>} samples.
#' 
#' @param parallelize Boolean, should the function be run in parallel. Defaults
#' to \code{TRUE}.
#' 
#' @param n_cores Integer, number of cores to use for parallel processing. 
#' Defaults to \code{NULL}. If \code{parallelize = TRUE}, and 
#' \code{n_corese = NULL} (the dafaults), \code{tempo_dic()} will use 
#' \code{parallel::detectCores() - 1} as the number of cores.
#' 
#' @return list of posterior distributions for the derived quantities. 
#' "probabilities", "pmf", and "cdf" each return a 3D array with mcmc 
#' iterations as rows, time steps as columns, and observations along the 
#' 3rd margin. "mean" and "sd" return matrices with mcmc iterations as rows and
#' observations as columns. All objects are returned with named dimensions. The 
#' dimension names for observations match the dimention names of the rows for
#' matrices in the list of covariates supplied to \code{tempo_mcmc()}. 
#' 
#' @importFrom tvgeom dtvgeom tvgeom_mean tvgeom_var
#' @importFrom boot inv.logit
#' @importFrom stringr str_detect
#' @importFrom abind abind
#' @name tempo_derive
#' @rdname tempo_derive
#' @export
tempo_derive <- function(draws,
                         quantity, # one of probabilities, pmf, cdf, mean, sd
                         obs_idx,
                         max_samples = 1000,
                         parallelize = TRUE,
                         n_cores = NULL) {
  ## Extract needed attributes from draws
  time_steps <- dim(attr(draws, "covariates"))[2]
  
  if (attr(draws, "rand_eff_bool") & !attr(draws, "monitor_random_effects")) {
    stop("If random effects were used, draws must include monitored ",
         "group-level\n  effects in order to use this function. ",
         "Rerun tempo_mcmc() with\n  monitor_random_effects = TRUE")
  }
  
  if (!any(quantity %in% c("probabilities", "pmf", "cdf", "mean", "sd"))) {
    stop('quantity must be a vector of one or more of "probabilities", ', 
    '"pmf", "cdf",\n  "mean", or "sd"')
  }
  
  # sub-sample draws for faster compute times
  n_mcmc <- nrow(draws[[1]])
  n_chains <- attr(draws, "n_chains")
  n_samples <- n_mcmc * n_chains
  
  if (max_samples < n_mcmc) {
    sample_idx <- round(seq(1, n_mcmc, by = n_mcmc / n_samples))
    
    for (i in 1:n_chains) {
      draws[[i]] <-  draws[[i]][sample_idx, ]
    }
  }
  
  
  # define number of workers don't put inside of (parallelize) control flow
  # for code coverage purposes
  if (!is.null(n_cores)) {
    n_workers <- n_cores 
  } else {
    n_workers <- detectCores() - 1
  }
  
  ## Set up output object
  output <- list()
  
  ## calculating probabilities so it is only done once if any of "pmf", 
  ## "probabilities", or "cdf" are in "quantity"
  if (any(quantity %in% c("pmf", "cdf", "probabilities"))) {
    probs <- tempo_derive_probs(draws, obs_idx, parallelize, n_workers)
  }
  
  ## calculate mean and/or sd first, so cl can be create inside and destroyed
  if("mean" %in% quantity) {
    means <- tempo_derive_moment(draws, obs_idx, parallelize, n_workers, "mean")
    
    output <- c(output, list("mean" = means))
  }
  
  if("sd" %in% quantity) {
    sds <- tempo_derive_moment(draws, obs_idx, parallelize, n_workers, "sd")
    
    output <- c(output, list("sd" = sds))
  }

  ## Reinitialize a cluster if parallelize = TRUE
  # make cluster
  if (parallelize) {
    cl <- makeCluster(max(1, n_workers))
    chunk_size <- (n_mcmc * n_chains) / (4 * length(cl))
  } else {
    cl <- makeCluster(1) 
    chunk_size <- n_mcmc * n_chains
  }
  
  ## Calculate the desired derived quantities and append to output list
  if ("probabilities" %in% quantity) {
    output <- c(output, list("probabilities" = probs))
  }
  
  if (any(quantity %in% c("pmf", "cdf"))) { # need to calculate pmf if user 
                                            # wants cdf
    pmf <- aperm(
      parApply(cl = cl,
               X = probs,
               MARGIN = c(1, 3),
               FUN = dtvgeom_wrapper, 
               t = 1:(time_steps + 1),
               chunk.size = chunk_size), #plus one for no transition case
      perm = c(2, 1, 3)
    )
    
    dimnames(pmf)[c(1,3, 2)] <- c(dimnames(probs)[c(1,3)],
                                  list(paste0("time_step_",
                                              1:(time_steps + 1))))
    
    if ("pmf" %in% quantity) {
      output <- c(output, list("pmf" = pmf))
    }
  }
  
  if ("cdf" %in% quantity) {
    cdf <- aperm(
      parApply(cl = cl,
               X = pmf,
               MARGIN = c(1, 3),
               FUN = cumsum,
               chunk.size = chunk_size), 
      perm = c(2, 1, 3)
    )
    dimnames(cdf) <- dimnames(pmf)
    output <- c(output, list("cdf" = cdf))
  }
  
  stopCluster(cl)
  
  output
}

dtvgeom_wrapper <- function(prob, t){
  dtvgeom(t, prob)
}

calc_prob <- function(beta, covs){
  inv.logit(covs %*% beta)
}

#' Get group-level betas
#' 
#' This function uses the output from \code{tempo_mcmc()} to combine means and 
#' group-level epsilon posteriors for betas with random effects. To use this
#' function, \code{tempo_mcmc()} must be run with 
#' \code{monitor_random_effects = TRUE}
#' 
#' @param draws the output object from \code{tempo_mcmc()}
#' 
#' @return Returns a list of matrices with named dimensions, one matrix 
#' per beta parameter, with mcmc iterations as rows, and columns as groups. 
#' The elements in the list are names using the names provided in the 
#' covariates list supplied to \code{tempo_mcmc}.
#' 
#' @name tempo_calc_group_betas
#' @rdname tempo_calc_group_betas
#' @export
tempo_calc_group_betas <- function(draws){
  # stop condition if rand_eff_bool false or monitor random effects is false
  if (!attr(draws, "rand_eff_bool")) {
    stop("This function only applies for random effects models. ",
         "The model used to produce\n  your input (draws) to this function did",
         "not include random effects")
  }
  if (!attr(draws, "monitor_random_effects")) {
    stop("If random effects were used, draws must include monitored ",
         "group-level\n  effects in order to use this function. ",
         "Rerun tempo_mcmc() with \n  monitor_random_effects = TRUE")
  }
  
  # pull needed variables from attributes
  cov_names <- dimnames(attr(draws, "covariates"))[[3]]
  rand_eff_ind <- attr(draws, "rand_eff_ind")
  rand_eff_names <- cov_names[rand_eff_ind]
  group_ids <- attr(draws, "group_ids")
  
  # convert draws to a single matrix
  draws_all <- as.matrix(do.call(rbind, draws))
  
  # Set up storage object 
  group_betas <- array(0, dim = c(nrow(draws_all),
                                  length(rand_eff_ind),
                                  length(unique(group_ids))))
  
  # extract the means and random effects
  param_means <- 
    draws_all[, str_detect(colnames(draws_all), pattern = "^mean"),
              drop = FALSE]
  
  group_effects <-
    draws_all[, str_detect(colnames(draws_all), pattern = "^eps")]
  
  # combine them to create and array with params in cols, iters in rows, and 
  # groups along 3rd margin 
  # keep as a for loop since apply messes with dims
  for (i in seq_along(rand_eff_ind)) { 
    name <- cov_names[rand_eff_ind[i]]
    mean <- param_means[, paste0("mean_beta_", name)]
    effects <- group_effects[, str_detect(colnames(group_effects),
                                          pattern = name)]
    
    group_params <- sweep(effects, 1, mean, "+")
    
    group_betas[, i, ] <- group_params
  }
  
  # This should be sort safe because group epsilons stored then
  # output by tempo in increasing order of group indices
  dimnames(group_betas) <- list(dimnames(param_means)[[1]],
                                paste0("beta_", rand_eff_names),
                                paste0("group_", seq_len(dim(group_betas)[3])))
  
  # Return a list (which will be friendlier for users than an array)
  out <- asplit(group_betas, MARGIN = 2)
  names(out) <- rand_eff_names
  
  out
}


#### unexported functions ####

# calculates the posteriors of transition probabilities for
# each time step in multiple observations
# draws is output from tempo_mcmc
# obs_idx is obs_idx arg to tempo_derive
# parallelize is a Bool, should it be run in parallel?
# n_workers, integer, defined in tempo_derive()
tempo_derive_probs <- function(draws, obs_idx, parallelize, n_workers) {
  ## Extract needed attributes from draws
  # objects
  covariates <- attr(draws, "covariates")
  rand_eff_ind <- attr(draws, "rand_eff_ind")
  group_ids <- attr(draws, "group_ids")
  # constants
  n_chains <- attr(draws, "n_chains")
  # flags
  rand_eff_bool <- attr(draws, "rand_eff_bool")
  
  ## Other variables for later reference
  cov_names <- dimnames(covariates)[[3]]
  fixed_eff_ind <- c(1:dim(covariates)[3])[-rand_eff_ind]
  fixed_eff_names <- cov_names[fixed_eff_ind]
  rand_eff_names <- cov_names[rand_eff_ind]
  n_mcmc <- nrow(draws[[1]])
  
  ## combine all of the chains in draws to 1
  draws_all <- as.matrix(do.call(rbind, draws))
  
  ## Calculate the probabilities for each iteration
  # if random effects...
  if (rand_eff_bool) {
    # create empty array to store group param values
    # params in cols, iters in rows, and groups along 3rd dim
    params <- tempo_calc_all_params_re(draws)
    
  } else {
    params_all <- 
      draws_all[, str_detect(colnames(draws_all), pattern = "^beta"),
                drop = FALSE]
    params <- array(params_all, c(dim(params_all), 1))  
    group_ids <- rep(1, nrow(attr(draws, "y")))
  }
  
  if (parallelize) {
    cl <- makeCluster(max(1, n_workers))
    chunk_size <- (n_mcmc * n_chains) / (4 * length(cl))
  } else {
    cl <- makeCluster(1) 
    chunk_size <- n_mcmc * n_chains
  }
  
  ## Calculate probability for each covariate observation
  all_probs_list  <- lapply(obs_idx,
                            FUN = tempo_calc_prob_posterior,
                            group_ids = group_ids,
                            covariates = covariates,
                            params = params,
                            cl = cl,
                            chunk_size = chunk_size)
  
  stopCluster(cl)
  
  # row = iteration, column = time step, 3rd margin = observation
  posterior_probs <- do.call(abind, c(all_probs_list, along = 3))
  
  ## Name posterior_probs dimensions
  dimnames(posterior_probs) <- list(
    dimnames(draws_all)[[1]],
    paste0("time_step_", seq_len(dim(covariates)[2])),
    dimnames(covariates)[[1]][obs_idx]
  )
  
  posterior_probs
}

## Built off tempo_derive_probs, same args docs, but optimized for memory,
## moment is one of "mean" or "sd"
tempo_derive_moment <- function(draws, obs_idx, parallelize, n_workers,
                                moment) {
  ## Extract needed attributes from draws
  # objects
  covariates <- attr(draws, "covariates")
  rand_eff_ind <- attr(draws, "rand_eff_ind")
  group_ids <- attr(draws, "group_ids")
  # constants
  n_chains <- attr(draws, "n_chains")
  # flags
  rand_eff_bool <- attr(draws, "rand_eff_bool")
  
  ## Other variables for later reference
  cov_names <- dimnames(covariates)[[3]]
  fixed_eff_ind <- c(1:dim(covariates)[3])[-rand_eff_ind]
  fixed_eff_names <- cov_names[fixed_eff_ind]
  rand_eff_names <- cov_names[rand_eff_ind]
  n_mcmc <- nrow(draws[[1]])
  
  ## combine all of the chains in draws to 1
  draws_all <- as.matrix(do.call(rbind, draws))
  
  ## Calculate the probabilities for each iteration
  # if random effects...
  if (rand_eff_bool) {
    # create empty array to store group param values
    # params in cols, iters in rows, and groups along 3rd dim
    params <- tempo_calc_all_params_re(draws)
    
  } else {
    params_all <- 
      draws_all[, str_detect(colnames(draws_all), pattern = "^beta"),
                drop = FALSE]
    params <- array(params_all, c(dim(params_all), 1))  
    group_ids <- rep(1, nrow(attr(draws, "y")))
  }
  
  if (parallelize) {
    cl <- makeCluster(max(1, n_workers))
    chunk_size <- (n_mcmc * n_chains) / (4 * length(cl))
  } else {
    cl <- makeCluster(1) 
    chunk_size <- n_mcmc * n_chains
  }
  
  all_moments_list  <- lapply(obs_idx,
                               FUN = tempo_calc_moment_posterior,
                               group_ids = group_ids,
                               covariates = covariates,
                               params = params,
                               cl = cl,
                               chunk_size = chunk_size,
                               moment = moment)
  
  stopCluster(cl)

  # row = iteration, column = observation
  posterior_moments <- do.call(cbind, all_moments_list)
  
  dimnames(posterior_moments) <- list(
    dimnames(draws_all)[[1]],
    dimnames(covariates)[[1]][obs_idx]
  )
  
  posterior_moments
}

# function to calculate posterior of transition probabilities for a single
# observation for a random effects model
# cl is the cluster created just before the funtion call in tempo_derive_probs
#' @importFrom parallel parApply
tempo_calc_prob_posterior <- function(id, group_ids, covariates, params,
                                      cl, chunk_size) {
  covs <- covariates[id, , ]
  betas <- params[, , group_ids[id]]

  # cols = iteration, rows = time step
  prob_matrix <- parApply(cl = cl,
                          X = betas,
                          MARGIN = 1,
                          FUN = calc_prob,
                          covs = covs,
                          chunk.size = chunk_size)
  
  # transpose prob matrix so rows are iters
  prob_matrix <- t(prob_matrix)
}

# Built off of tempo_calc_prob_posterior, same args, but optimized for memory
# because the larger prob_matrix object can be garbage collected
# moment is one of "mean" or "sd"
tempo_calc_moment_posterior <- function(id, group_ids, covariates, params,
                                        cl, chunk_size, moment) {
  covs <- covariates[id, , ]
  betas <- params[, , group_ids[id]]
  
  # cols = iteration, rows = time step
  prob_matrix <- parApply(cl = cl,
                          X = betas,
                          MARGIN = 1,
                          FUN = calc_prob,
                          covs = covs,
                          chunk.size = chunk_size)
  if (moment == "mean") {
    means_vector <- parApply(cl = cl,
                             X = t(prob_matrix),
                             MARGIN = 1,
                             FUN = tvgeom_mean,
                             chunk.size = chunk_size)
    
    return(means_vector)
  }
  if (moment == "sd") {
    var_vector <- parApply(cl = cl,
                           X = t(prob_matrix),
                           MARGIN = 1,
                           FUN = tvgeom_var,
                           chunk.size = chunk_size)

    sd_vector <- sqrt(var_vector)
    return(sd_vector) 
  }
}

# combine means and random effects to get an array of params, 1st dim is iter,
# 2nd dim is covariate, 3rd dim is group
tempo_calc_all_params_re <- function(draws) {
  x <- attr(draws, "covariates")
  group_ids <- attr(draws, "group_ids")
  rand_eff_ind <- attr(draws, "rand_eff_ind")
  fixed_eff_ind <- c(1:dim(x)[3])[-rand_eff_ind]
  draws_all <- as.matrix(do.call(rbind, draws))
  
  params_all <- array(0, dim = c(nrow(draws_all),
                                 dim(x)[[3]],
                                 length(unique(group_ids))))
  dimnames(params_all) <- list(rownames(draws_all),
                               dimnames(x)[[3]],
                               paste0("group_", sort(unique(group_ids))))
  fixed_betas <- 
    draws_all[, str_detect(colnames(draws_all), pattern = "^beta"),
              drop = FALSE]
  
  group_betas_list <- tempo_calc_group_betas(draws)
  
  group_betas <- aperm(do.call(abind, c(group_betas_list, along = 3)),
                       perm = c(1,3,2))
  
  params_all[, fixed_eff_ind, ] <- fixed_betas
  params_all[, rand_eff_ind, ] <- group_betas
  
  params_all
}
vlandau/tempo documentation built on March 18, 2020, 12:04 a.m.