inst/sbc/sbc_tools.R

#'
#' Utilities for SBC validation
#'
load_rbest_dev <- function() {
    here::i_am("inst/sbc/sbc_tools.R")
    if(!("RBesT" %in% .packages())) {
        cat("RBesT not yet loaded, bringing up local dev version.\n")
        library(devtools)
        devtools::load_all(here())
    } else {
        cat("RBesT is already loaded.\n")
    }
}

setup_lecuyer_seeds <- function(lecuyer_seed, num) {
    ## note: seed have the format from L'Ecuyer. Just set
    ## RNGkind("L'Ecuyer-CMRG")
    ## and then use .Random.seed
    job_seeds <- list()
    job_seeds[[1]] <- parallel::nextRNGStream(lecuyer_seed)
    i <- 2
    while(i < num+1) {
        job_seeds[[i]] <- parallel::nextRNGSubStream(job_seeds[[i-1]])
        i  <- i + 1
    }
    job_seeds
}

#'
#' Simulates a draw from the prior and fake data for it. This will be
#' the data generating step in the simulation. The function recieves
#' the problem data, job specifics and hyper-parameters which
#' determine the scenario (choices for the prior).
#'
simulate_fake <- function(data, family, mean_mu, sd_mu, sd_tau, samp_sd) {
    G <- length(unique(data$group))
    N <- length(data$group)
    rl <- rle(data$group)
    Ng <- rl$lengths
    mu <- rnorm(1, mean_mu, sd_mu)
    tau <- abs(rnorm(1, 0, sd_tau))
    theta <- rnorm(G, mu, tau)
    family <- get(family, mode = "function", envir = parent.frame())
    inv_link <- family()$linkinv
    alpha <- inv_link(rep(theta, times=Ng))
    likelihood <- family()$family
    if (likelihood == "binomial") {
        r <- rbinom(N, 1, alpha)
        y <- as.numeric(tapply(r, data$group, sum))
        fake <- data.frame(r=y, nr=Ng-y, group=1:G)
    }
    if (likelihood == "poisson") {
        count <- rpois(N, alpha)
        y <- as.numeric(tapply(count, data$group, sum))
        fake <- data.frame(y=y, n=Ng, group=1:G)
    }
    if (likelihood == "gaussian") {
        y_i <- rnorm(N, alpha, samp_sd)
        y <- as.numeric(tapply(y_i, data$group, mean))
        fake <- data.frame(y=y, y_se=samp_sd/sqrt(Ng), group=1:G)
    }
    list(fake=fake,
         draw=c(mu=mu, tau=tau),
         draw_theta=theta)
}

#'
#'
#' Procedure to fit each fake data set using our fitting
#' procedure. This method obtains the problem data, job details and an
#' **instance** of the scenario as generated by `simulate_fake`.
#'

fit_rbest <- function(fake, draw, draw_theta, family, prior_mean_mu, prior_sd_mu, prior_sd_tau, samp_sd) {
    Ng <- length(draw_theta)

    ##pars <- job$pars$prob.pars
    ##prior_mean_mu <- pars$mean_mu
    ##prior_sd_mu <- pars$sd_mu
    ##prior_sd_tau <- pars$sd_tau
    ##samp_sd <- pars$samp_sd
    ##family <- pars$family

    model <- switch(family,
                    binomial=cbind(r, nr) ~ 1 | group,
                    poisson=y ~ 1 + offset(log(n)) | group,
                    gaussian=cbind(y, y_se) ~ 1 | group)

    options(RBesT.MC.warmup=2000, RBesT.MC.iter=4000, RBesT.MC.thin=1, RBesT.MC.init=0.1,
            RBesT.MC.control=list(adapt_delta=0.999, stepsize=0.01, max_treedepth=10,
                                  adapt_init_buffer=100, adapt_term_buffer=300))

    if(Ng > 5) {
        ## for the dense case we can be a bit less aggressive with the sampling tuning parameters
        options(RBesT.MC.control=list(adapt_delta=0.95, stepsize=0.01, max_treedepth=10,
                                      adapt_init_buffer=100, adapt_term_buffer=300))
    }
    fit <- gMAP(model, data=fake, family=family,
                tau.dist="HalfNormal",
                tau.prior=cbind(0, prior_sd_tau),
                beta.prior=cbind(c(prior_mean_mu),c(prior_sd_mu)),
                chains=2)

    params <- c("beta[1]", "tau[1]")
    params_group <- paste0("theta[", 1:Ng, "]")

    sampler_params <- rstan::get_sampler_params(fit$fit, inc_warmup=FALSE)
    n_divergent <- sum(sapply(sampler_params, function(x) sum(x[,'divergent__'])) )

    fit_sum <- rstan::summary(fit$fit)$summary
    samp_diags <- fit_sum[params, c("n_eff", "Rhat")]
    min_Neff <- ceiling(min(samp_diags[, "n_eff"], na.rm=TRUE))
    max_Rhat <- max(samp_diags[, "Rhat"], na.rm=TRUE)

    lp_ess  <- as.numeric(rstan::monitor(as.array(fit$fit, pars="lp__"), print=FALSE)[1, c("Bulk_ESS", "Tail_ESS")])

    post <- as.matrix(fit)[,params]
    post_group <- as.matrix(fit)[,params_group]
    S <- nrow(post)
    ## thin down to 1023 draws so that we get 1024 bins
    idx  <- round(seq(1, S, length=1024-1))
    post <- post[idx,]
    post_group <- post_group[idx,]
    colnames(post) <- c("mu", "tau")
    unlist(list(rank=c(colSums(sweep(post, 2, draw) < 0), colSums(sweep(post_group, 2, draw_theta) < 0)),
                min_Neff = min_Neff,
                n_divergent = n_divergent,
                max_Rhat = max_Rhat,
                lp_ess_bulk = lp_ess[1],
                lp_ess_tail = lp_ess[2]))
}

scale_ranks <- function(Nbins, scale=1) {
    ## scale must evenly divide the total number of bins
    assert_that(round(Nbins/scale) == Nbins/scale)
    breaks <- (0:(Nbins/scale))
    Nbreaks <- length(breaks)
    function(scen) {
        vars <- grep("^rank.", names(scen), value=TRUE)
        res <- lapply(vars, function(v) hist(ceiling((scen[[v]]+1)/scale), breaks=breaks, plot=FALSE, include.lowest=FALSE)$counts)
        names(res) <- gsub("^rank", "count", vars)
        res$rank <- breaks[-Nbreaks]
        res <- as.data.frame(do.call(cbind, res))
        res
    }
}


run_sbc_case <- function(job.id, repl, data_scenario, family, mean_mu, sd_mu, sd_tau, samp_sd, base_scenarios, seeds) {
    RNGkind("L'Ecuyer-CMRG")
    .Random.seed <<- seeds[[job.id]]

    runtime <- system.time({
        suppressMessages(load_rbest_dev())
        data <- base_scenarios[[data_scenario]]
        fake <- simulate_fake(data, family, mean_mu, sd_mu, sd_tau, samp_sd)
        fit <- fit_rbest(fake$fake, fake$draw, fake$draw_theta, family, mean_mu, sd_mu, sd_tau, samp_sd)
    })
    c(list(job.id=job.id, time.running=runtime["elapsed"]), fit)
}

## Submits to batchtools cluster with fault tolerance, i.e.
## resubmitting failed jobs max_num_tries times
auto_submit <- function(jobs, registry, resources=list(), max_num_tries = 10) {
  all_unfinished_jobs <- jobs

  num_unfinished_jobs <- nrow(all_unfinished_jobs)
  num_all_jobs <- num_unfinished_jobs
  remaining_tries <- max_num_tries
  all_jobs_finished <- FALSE
  while (remaining_tries > 0 && !all_jobs_finished) {
    remaining_tries <- remaining_tries - 1

    message("Submitting jobs at ", Sys.time())
    # Once things run fine let's submit this work to the cluster.
    submitJobs(all_unfinished_jobs, resources=resources)
    # Wait for results.
    waitForJobs()
    message("Finished waiting for jobs at ", Sys.time())

    # Check status:
    print(getStatus())

    # Ensure that all jobs are done
    if (nrow(findNotDone()) != 0) {
      not_done_jobs <- findNotDone()
      print(getErrorMessages(not_done_jobs))
      ##browser()
      ##invisible(readline(prompt="Press [enter] to continue"))

      message("Some jobs did not complete. Please check the batchtools registry ", registry$file.dir)
      all_unfinished_jobs <- inner_join(not_done_jobs, all_unfinished_jobs)

      if (num_unfinished_jobs == nrow(all_unfinished_jobs) &&  nrow(all_unfinished_jobs) > 0.25 * num_all_jobs)
      {
        # Unfinished job count did not change -> retrying will probably not help. Abort!
        warning("Error: unfinished job count is not decreasing. Aborting job retries.")
        remaining_tries <- 0
      }

      if (num_unfinished_jobs == nrow(jobs))
      {
        # All jobs errored -> retrying will probably not help. Abort!
        warning("Error: all jobs errored. Aborting job retries.")
        remaining_tries <- 0
      }

      num_unfinished_jobs <- nrow(all_unfinished_jobs)
      message("Trying to resubmit jobs. Remaining tries: ", remaining_tries, " / ", max_num_tries)
    } else {
      all_jobs_finished <- TRUE
    }
  }

  invisible(all_jobs_finished)
}

Try the RBesT package in your browser

Any scripts or data that you put into this service are public.

RBesT documentation built on Aug. 22, 2023, 1:08 a.m.