R/h_fusion_mixG.R

##########################
# these following functions allow user to perform hierarchical fusion
# hierarchical fusion means that we perform fusion in steps to try recover samples for our target
##########################

######################################## h_fusion ########################################

#' Standard Hierarchical Monte Carlo Fusion
#'
#' Hierarchical Monte Carlo Fusion with base level with nodes that are tempered mixture Gaussians  with same weights, means, sds components
#'
#' @param N_schedule vector of length (L-1), where N_schedule[l] is the number of samples per node at level l
#' @param time_schedule vector of legnth(L-1), where time_schedule[l] is the time chosen for Fusion at level l
#' @param start_beta beta for the base level
#' @param C_schedule vector of length (L-1), where C_schedule[l] is the number of samples to fuse for level l
#' @param L total number of levels in the hierarchy
#' @param base_samples list of length (1/start_beta), where samples_to_fuse[c] containg the samples for the c-th node in the level
#' @param weights vector: weights of mixture Gaussian
#' @param means vector: means of mixture Gassuan
#' @param sds vector: st.devs of mixture Gaussian
#' @param study boolean value: defaults to F, determines whether or not to return acceptance probabilities
#'
#' @return samples samples from hierarchical fusion
#' @return time vector of length (L-1), where time[l] is the run time for level l
#' @return rho_acc vector of length (L-1), where rho_acc[l] is the acceptance rate for first fusion step for level l
#' @return Q_acc vector of length (L-1), where Q_acc[l] is the acceptance rate for second fusion step for level l
#' @return input_betas list of length (L), where input_betas[[l]] is the input betas for level l
#' @return output_beta vector of length(L-1), where output_beta[l] is the beta for level l
#' @return diffusion_times vector of length (L-1), where diffusion_times[l] are the times for fusion in level l (= time_schedule)
#'
#' @examples
#' # setting variables
#' w_example <- c(0.35, 0.65)
#' m_example <- c(-3, 8)
#' s_example <- c(1, 1.5)
#' b_example <- 1/4
#'
#' # sampling from tempered density
#' nsamples <- 500000
#' base <- hmc_base_sampler_mixG(w_example, m_example, s_example, b_example, nsamples, 4)
#'
#' test <- parallel_h_fusion_mixG(N_schedule = rep(10000, 2), time_schedule = rep(1, 2),
#'                                start_beta = b_example, C_schedule = rep(2, 2), L = 3, base_samples = base,
#'                                weights = w_example, means = m_example, sds = s_example, study = T)
#'
#' # plot results
#' plot_levels_hier_mixG(test, weights = w_example, means = m_example, sds = s_example,
#'                       from = -15, to = 20, plot_rows = 3, plot_columns = 1, bw = c(0.2, 0.3, 0.4))
#' @export
parallel_h_fusion_mixG <- function(N_schedule, time_schedule, start_beta, C_schedule, L, base_samples, weights, means, sds, study = F) {
  # N_schedule is a vector of length (L-1), where N_schedule[i] for i=1,..(L-1), is the number of samples per node at level i
  if (length(N_schedule) != (L-1)) stop('length of N_schedule must be equal to (L-1)')
  # time_schedule is a vector of length (L-1), where time_schedule[i] for i=1,...,(L-1), is the time for Fusion at level i
  if (length(time_schedule) != (L-1)) stop('length of time_schedule must be equal to (L-1)')
  # C_schedule is a vector of length (L-1) where C_schedule[l] is the number of samples to fuse for level l
  if (length(C_schedule) == (L-1)) {
    # check that at each level, we are fusing a suitable number
    for (l in length(C_schedule):1) {
      if (((1/start_beta)/prod(C_schedule[(L-1):l]))%%1 != 0) {
        stop('check that (1/beta)/prod(C_schedule[(L-1):l]) is an integer for l=L-1,...,1')
      }
    }
  } else {
    stop('C_schedule must be a vector of length (L-1)')
  }

  # we append 1 to the vector C_schedule to make the indices work later on when we call fusion
  # we need this so that we can set the right value for beta when fusing up the levels
  C_schedule <- c(C_schedule, 1)

  # initialising study results
  hier_samples <- list()
  hier_samples[[L]] <- base_samples # base level
  time_taken_per_level <- rep(0, L-1)
  input_betas <- list()
  input_betas[[L]] <- NA
  output_beta <- c(1:(L-1), start_beta)

  # make some vectors for acceptance rates of the level
  rho_acc <- rep(0, L-1)
  Q_acc <- rep(0, L-1)
  rhoQ_acc <- rep(0, L-1)

  # creating parallel cluster
  n_cores <- parallel::detectCores()
  cl <- parallel::makeCluster(n_cores, outfile = 'parallel_h_fusion_mixG.txt')

  # creating variable and functions list to pass into cluster using clusterExport
  varlist <- list("check_equal_dim", "rmix_norm", "dnorm_mix_tempered_unnorm", "dnorm_mix_tempered", "dnorm_mix",
                  "dnorm_deriv1", "dnorm_deriv2","dnorm_mix_deriv1", "dnorm_mix_deriv2",
                  "dnorm_mix_tempered_deriv1", "dnorm_mix_tempered_deriv2", "phi_function_mixG", "bounds_phi_function_mixG",
                  "lower_bound_phi_function_mixG", "simulate_langevin_diffusion_mixG", "fusion_diff_mixG",
                  "N_schedule", "time_schedule", "start_beta", "C_schedule", "L", "base_samples", "weights", "means", "sds")
  parallel::clusterExport(cl, envir = environment(), varlist = varlist)

  # exporting functions from layeredBB package to simulate layered Brownian bridges
  parallel::clusterExport(cl, varlist = ls("package:layeredBB"))

  # add to output file that starting hierarchical fusion
  cat('Starting hierarchical fusion \n', file = 'parallel_h_fusion_mixG.txt')

  # parallelising tasks for each level going up the hiearchy
  for (k in ((L-1):1)) {
    # previous level has (1/beta)/prod(C_schedule[L:(k-1)]) nodes and we fuse C_schedule[k] of these
    n_nodes <- (1/start_beta)/prod(C_schedule[L:k])
    # calculating how much we can split the work along the cores
    a <- floor(n_cores / n_nodes)

    if (a <= 1) {
      ##### IF a==0:
      # there are more nodes than cores
      # we make a cluster of size n_cores and parallelise task by asking each core to obtain samples for each node
      # when a node is completed, the core will start on a new node
      ##### IF a==1:
      # there are more cores than nodes, but not enough to split the number of samples needed by each node among the extra cores
      ###### we make a cluster of size n_nodes and parallelise task by asking each core to obtain samples for each node

      # we cannot split the number of samples to be obtained by each core
      # samples_per_core[i]=N_schedule[k] for i=1,...,n_nodes
      samples_per_core <- rep(N_schedule[k], n_nodes)

      # linking each core to samples to perform fusion on
      linker <- 1:n_nodes
    } else if (a >= 2) {
      ###### IF a >=2
      # there are more cores than nodes AND there is enough extra cores so that the number of samples by each node can be reduced
      # i.e. work can be shared among the extra cores
      # we make a cluster of size a*n_nodes and each core obtains N_schedule[k]/a samples

      # splitting the number of samples to be obtained by each core
      # samples_per_core[i]=N_schedule[k]/a for i=1,...,n_nodes
      samples_per_core <- rep(floor(N_schedule[k]/a), a)

      # N_schedule[k]/a may not be an integer
      # some cores will need to obtain one more sample so the total number of samples obtained is N_schedule[k]
      if (sum(samples_per_core) != N_schedule[k]) {
        remainder <- N_schedule[k] %% a
        samples_per_core[1:remainder] <- samples_per_core[1:remainder] + 1
      }
      samples_per_core <- rep(samples_per_core, n_nodes)

      # linking each core to samples to perform fusion on
      linker <- unlist(lapply(1:n_nodes, function(i) rep(i, a)))
    }

    # performing Fusion for this level
    # printing out some stuff to log file to track the progress
    cat('########################\n', file = 'parallel_h_fusion_mixG.txt', append = T)
    cat('Starting to fuse', C_schedule[k], 'densities for level', k, 'with time', time_schedule[k],
        '- using', min(length(samples_per_core), n_cores), 'cores\n',
        file = 'parallel_h_fusion_mixG.txt', append = T)
    cat('There are', n_nodes, 'nodes at this level each giving', N_schedule[k],
        'samples for beta =', prod(C_schedule[L:k]), '/', (1/start_beta),
        '\n', file = 'parallel_h_fusion_mixG.txt', append = T)
    cat('########################\n', file = 'parallel_h_fusion_mixG.txt', append = T)

    # starting fusion
    pcm <- proc.time()
    fused <- parallel::parLapplyLB(cl, X = 1:length(samples_per_core), fun = function(i) {
      fusion_diff_mixG(N = samples_per_core[i],
                       time = time_schedule[k],
                       C = C_schedule[k],
                       samples_to_fuse = hier_samples[[k+1]][((C_schedule[k]*linker[i])-(C_schedule[k]-1)):(C_schedule[k]*linker[i])],
                       weights = weights,
                       means = means,
                       sds = sds,
                       betas = prod(C_schedule[L:(k+1)])*(start_beta),
                       level = k,
                       acceptance_rate = TRUE)})
    final <- proc.time() - pcm

    # fused is a list of length a*n_cores containing N_schedule[k]/a samples
    # need to combine the correct samples
    if (a <= 1) {
      hier_samples[[k]] <- lapply(1:n_nodes, function(i) fused[[which(linker==i)]]$samples)
    } else if (a >= 2) {
      fused_samples <- lapply(1:length(samples_per_core), function(i) fused[[i]]$samples)
      hier_samples[[k]] <- lapply(1:n_nodes, function(i) unlist(fused_samples[which(linker==i)]))
    }

    if (study) {
      # calculating the acceptance rates for all nodes in the current level
      # summing all the iterations for each core
      rho_iterations <- sum(sapply(1:length(fused), function(i) fused[[i]]$iters_rho))
      Q_iterations <- sum(sapply(1:length(fused), function(i) fused[[i]]$iters_Q))

      rho_acc[k] <- Q_iterations / rho_iterations
      Q_acc[k] <- (N_schedule[k]*n_nodes) / Q_iterations
      rhoQ_acc[k] <- (N_schedule[k]*n_nodes) / rho_iterations
      time_taken_per_level[k] <- final['elapsed']
      input_betas[[k]] <- rep(prod(C_schedule[L:(k+1)])*(start_beta), C_schedule[k])
      output_beta[k] <- prod(C_schedule[L:k])*(start_beta)
    }
  }

  # stopping cluster
  parallel::stopCluster(cl)
  # print completion
  cat('Completed hierarchical fusion\n', file = 'parallel_h_fusion_mixG.txt', append = T)

  hier_samples[[1]] <- unlist(hier_samples[[1]])
  if (study) {
    return(list('samples' = hier_samples, 'time' = time_taken_per_level,
                'rho_acc' = rho_acc, 'Q_acc' = Q_acc, 'rhoQ_acc' = rhoQ_acc,
                'input_betas' = input_betas, 'output_beta' = output_beta, 'diffusion_times' = time_schedule))
  } else {
    return(hier_samples)
  }
}





######################################## time_adapting h_fusion ########################################

#' Time-adapting Hierarchical Monte Carlo Fusion
#'
#' Hierarchical Time-adapting Monte Carlo Fusion with base level with nodes that are tempered mixture Gaussians  with same weights, means, sds components
#'
#' @param N_schedule vector of length (L-1), where N_schedule[l] is the number of samples per node at level l
#' @param global_T time T for time-adapting fusion algorithm
#' @param start_beta beta for the base level
#' @param C_schedule vector of length (L-1), where C_schedule[l] is the number of samples to fuse for level l
#' @param L total number of levels in the hierarchy
#' @param base_samples list of length (1/start_beta), where samples_to_fuse[c] containg the samples for the c-th node in the level
#' @param weights vector: weights of mixture Gaussian
#' @param means vector: means of mixture Gassuan
#' @param sds vector: st.devs of mixture Gaussian
#' @param study boolean value: defaults to F, determines whether or not to return acceptance probabilities
#'
#' @return samples samples from hierarchical fusion
#' @return time vector of length (L-1), where time[l] is the run time for level l
#' @return rho_acc vector of length (L-1), where rho_acc[l] is the acceptance rate for first fusion step for level l
#' @return Q_acc vector of length (L-1), where Q_acc[l] is the acceptance rate for second fusion step for level l
#' @return input_betas list of length (L), where input_betas[[l]] is the input betas for level l
#' @return output_beta vector of length(L-1), where output_beta[l] is the beta for level l
#' @return diffusion_times list of length (L-1), where diffusion_times[[l]] are the scaled/adapted times for fusion in level l
#'
#' @examples
#' # setting variables
#' w_example <- c(0.35, 0.65)
#' m_example <- c(-3, 8)
#' s_example <- c(1, 1.5)
#' b_example <- 1/4
#'
#' # sampling from tempered density
#' nsamples <- 500000
#' base <- hmc_base_sampler_mixG(w_example, m_example, s_example, b_example, nsamples, 4)
#'
#' test <- parallel_h_fusion_TA_mixG(N_schedule = rep(10000, 2), global_T = 1,
#'                                   start_beta = b_example, C_schedule = rep(2, 2), L = 3, base_samples = base,
#'                                   weights = w_example, means = m_example, sds = s_example, study = T)
#'
#' # plot results
#' plot_levels_hier_mixG(test, weights = w_example, means = m_example, sds = s_example,
#'                       from = -15, to = 20, plot_rows = 3, plot_columns = 1, bw = c(0.2, 0.3, 0.4))
#'
#' @export
parallel_h_fusion_TA_mixG <- function(N_schedule, global_T, start_beta, C_schedule, L, base_samples, weights, means, sds, study = F) {
  # N_schedule is a vector of length (L-1), where N_schedule[i] for i=1,..(L-1), is the number of samples per node at level i
  if (length(N_schedule) != (L-1)) stop('length of N_schedule must be equal to (L-1)')
  # C_schedule is a vector of length (L-1) where C_schedule[l] is the number of samples to fuse for level l
  if (length(C_schedule) == (L-1)) {
    # check that at each level, we are fusing a suitable number
    for (l in length(C_schedule):1) {
      if (((1/start_beta)/prod(C_schedule[(L-1):l]))%%1 != 0) {
        stop('check that (1/beta)/prod(C_schedule[(L-1):l]) is an integer for l=L-1,...,1')
      }
    }
  } else {
    stop('C_schedule must be a vector of length (L-1)')
  }

  # change global_T by multiplying it by start_beta
  global_T <- global_T*start_beta

  # we append 1 to the vector C_schedule to make the indices work later on when we call fusion
  # we need this so that we can set the right value for beta when fusing up the levels
  C_schedule <- c(C_schedule, 1)

  # initialising study results
  hier_samples <- list()
  hier_samples[[L]] <- base_samples # base level
  time_taken_per_level <- rep(0, L-1)
  input_betas <- list()
  input_betas[[L]] <- NA
  output_beta <- c(1:(L-1), start_beta)
  diffusion_times <- list()

  # make some vectors for acceptance rates of the level
  rho_acc <- rep(0, L-1)
  Q_acc <- rep(0, L-1)
  rhoQ_acc <- rep(0, L-1)

  # creating parallel cluster
  n_cores <- parallel::detectCores()
  cl <- parallel::makeCluster(n_cores, outfile = 'parallel_h_fusion_mixG.txt')

  # creating variable and functions list to pass into cluster using clusterExport
  varlist <- list("check_equal_dim", "rmix_norm", "dnorm_mix_tempered_unnorm", "dnorm_mix_tempered", "dnorm_mix",
                  "dnorm_deriv1", "dnorm_deriv2","dnorm_mix_deriv1", "dnorm_mix_deriv2",
                  "dnorm_mix_tempered_deriv1", "dnorm_mix_tempered_deriv2", "phi_function_mixG", "bounds_phi_function_mixG",
                  "lower_bound_phi_function_mixG", "simulate_langevin_diffusion_mixG", "fusion_TA_mixG",
                  "N_schedule", "global_T", "start_beta", "C_schedule", "L", "base_samples", "weights", "means", "sds")
  parallel::clusterExport(cl, envir = environment(), varlist = varlist)

  # exporting functions from layeredBB package to simulate layered Brownian bridges
  parallel::clusterExport(cl, varlist = ls("package:layeredBB"))

  # add to output file that starting hierarchical fusion
  cat('Starting hierarchical fusion \n', file = 'parallel_h_fusion_TA_mixG.txt')

  # parallelising tasks for each level going up the hiearchy
  for (k in ((L-1):1)) {
    # previous level has (1/beta)/prod(C_schedule[L:(k-1)]) nodes and we fuse C_schedule[k] of these
    n_nodes <- (1/start_beta)/prod(C_schedule[L:k])
    # calculating how much we can split the work along the cores
    a <- floor(n_cores / n_nodes)

    if (a <= 1) {
      ##### IF a==0:
      # there are more nodes than cores
      # we make a cluster of size n_cores and parallelise task by asking each core to obtain samples for each node
      # when a node is completed, the core will start on a new node
      ##### IF a==1:
      # there are more cores than nodes, but not enough to split the number of samples needed by each node among the extra cores
      ###### we make a cluster of size n_nodes and parallelise task by asking each core to obtain samples for each node

      # we cannot split the number of samples to be obtained by each core
      # samples_per_core[i]=N_schedule[k] for i=1,...,n_nodes
      samples_per_core <- rep(N_schedule[k], n_nodes)

      # linking each core to samples to perform fusion on
      linker <- 1:n_nodes
    } else if (a >= 2) {
      ###### IF a >=2
      # there are more cores than nodes AND there is enough extra cores so that the number of samples by each node can be reduced
      # i.e. work can be shared among the extra cores
      # we make a cluster of size a*n_nodes and each core obtains N_schedule[k]/a samples

      # splitting the number of samples to be obtained by each core
      # samples_per_core[i]=N_schedule[k]/a for i=1,...,n_nodes
      samples_per_core <- rep(floor(N_schedule[k]/a), a)

      # N_schedule[k]/a may not be an integer
      # some cores will need to obtain one more sample so the total number of samples obtained is N_schedule[k]
      if (sum(samples_per_core) != N_schedule[k]) {
        remainder <- N_schedule[k] %% a
        samples_per_core[1:remainder] <- samples_per_core[1:remainder] + 1
      }
      samples_per_core <- rep(samples_per_core, n_nodes)

      # linking each core to samples to perform fusion on
      linker <- unlist(lapply(1:n_nodes, function(i) rep(i, a)))
    }

    # performing Fusion for this level
    # printing out some stuff to log file to track the progress
    cat('########################\n', file = 'parallel_h_fusion_TA_mixG.txt', append = T)
    cat('Starting to fuse', C_schedule[k], 'densities of pi^beta, where beta =', prod(C_schedule[L:(k+1)]),
        '/', (1/start_beta), 'for level', k, 'with time', global_T/(prod(C_schedule[L:(k+1)])*(start_beta)),
        '- using', min(length(samples_per_core), n_cores), 'cores\n',
        file = 'parallel_h_fusion_TA_mixG.txt', append = T)
    cat('There are', n_nodes, 'nodes at this level each giving', N_schedule[k], 'samples for beta =',
        prod(C_schedule[L:k]), '/', (1/start_beta),
        '\n', file = 'parallel_h_fusion_TA_mixG.txt', append = T)
    cat('########################\n', file = 'parallel_h_fusion_TA_mixG.txt', append = T)

    # starting fusion
    pcm <- proc.time()
    fused <- parallel::parLapplyLB(cl, X = 1:length(samples_per_core), fun = function(i) {
      fusion_TA_mixG(N = samples_per_core[i],
                     time = global_T,
                     C = C_schedule[k],
                     samples_to_fuse = hier_samples[[k+1]][((C_schedule[k]*linker[i])-(C_schedule[k]-1)):(C_schedule[k]*linker[i])],
                     weights = weights,
                     means = means,
                     sds = sds,
                     betas = prod(C_schedule[L:(k+1)])*(start_beta),
                     level = k,
                     acceptance_rate = TRUE)})
    final <- proc.time() - pcm

    # fused is a list of length a*n_cores containing N_schedule[k]/a samples
    # need to combine the correct samples
    if (a <= 1) {
      hier_samples[[k]] <- lapply(1:n_nodes, function(i) fused[[which(linker==i)]]$samples)
    } else if (a >= 2) {
      fused_samples <- lapply(1:length(samples_per_core), function(i) fused[[i]]$samples)
      hier_samples[[k]] <- lapply(1:n_nodes, function(i) unlist(fused_samples[which(linker==i)]))
    }

    if (study) {
      # calculating the acceptance rates for all nodes in the current level
      # summing all the iterations for each core
      rho_iterations <- sum(sapply(1:length(fused), function(i) fused[[i]]$iters_rho))
      Q_iterations <- sum(sapply(1:length(fused), function(i) fused[[i]]$iters_Q))

      rho_acc[k] <- Q_iterations / rho_iterations
      Q_acc[k] <- (N_schedule[k]*n_nodes) / Q_iterations
      rhoQ_acc[k] <- (N_schedule[k]*n_nodes) / rho_iterations
      time_taken_per_level[k] <- final['elapsed']
      input_betas[[k]] <- rep(prod(C_schedule[L:(k+1)])*(start_beta), C_schedule[k])
      output_beta[k] <- prod(C_schedule[L:k])*(start_beta)
      diffusion_times[[k]] <- global_T / rep(sqrt(prod(C_schedule[L:(k+1)])*(start_beta)), C_schedule[k])
    }
  }

  # stopping cluster
  parallel::stopCluster(cl)
  # print completion
  cat('Completed hierarchical fusion\n', file = 'parallel_h_fusion_TA_mixG.txt', append = T)

  hier_samples[[1]] <- unlist(hier_samples[[1]])
  if (study) {
    return(list('samples' = hier_samples, 'time' = time_taken_per_level,
                'rho_acc' = rho_acc, 'Q_acc' = Q_acc, 'rhoQ_acc' = rhoQ_acc,
                'input_betas' = input_betas, 'output_beta' = output_beta, 'diffusion_times' = diffusion_times))
  } else {
    return(hier_samples)
  }
}
rchan26/mixGaussTempering documentation built on June 14, 2019, 3:26 p.m.