R/seq_fusion_exp_4.R

Defines functions parallel_sequential_fusion_exp_4_rcpp parallel_sequential_fusion_TA_exp_4_rcpp

Documented in parallel_sequential_fusion_exp_4_rcpp parallel_sequential_fusion_TA_exp_4_rcpp

##########################
# these following functions allow user to perform sequential fusion for target exp(-x^4/2)
# sequential fusion means that we perform fusion in steps to try recover samples for our target
##########################

######################################## sequential_fusion ########################################

#' Standard Sequential Monte Carlo Fusion
#'
#' Sequential Monte Carlo Fusion with base level of the form exp(-((x^4)*beta)/2)
#'
#' @param N_schedule vector of length (L-1), where N_schedule[l] is the number of samples per node at level l
#' @param mean mean value
#' @param time_schedule vector of legnth(L-1), where time_schedule[l] is the time chosen for Fusion at level l
#' @param start_beta beta for the base level
#' @param base_samples list of length (1/start_beta), where samples_to_fuse[c] containg the samples for the c-th node in the level
#' @param seed seed number - default is NULL, meaning there is no seed
#'
#' @return samples: samples from Sequential fusion
#' @return time: vector of length (L-1), where time[l] is the run time for level l
#' @return rho_acc: vector of length (L-1), where rho_acc[l] is the acceptance rate for first fusion step for level l
#' @return Q_acc: vector of length (L-1), where Q_acc[l] is the acceptance rate for second fusion step for level l
#' @return input_betas: list of length (L), where input_betas[[l]] is the input betas for level l
#' @return output_beta: vector of length(L-1), where output_beta[l] is the beta for level l
#' @return diffusion_times: vector of length (L-1), where diffusion_times[l] are the times for fusion in level l (= time_schedule)
#'
#' @examples
#' input_samples <- base_rejection_sampler_exp_4(beta = 1/4, nsamples = 100000,  proposal_mean = 0, proposal_sd = 1.5, dominating_M = 1.4)
#' test <- parallel_sequential_fusion_exp_4_rcpp(N_schedule = rep(10000, 3), time_schedule = rep(1, 3),
#'                                               start_beta = 1/4, base_samples = input_samples)
#'
#' # plot results
#' plot_levels_seq_exp_4(test, from = -3, to = 3, plot_rows = 2, plot_columns = 2)
#'
#' @export
parallel_sequential_fusion_exp_4_rcpp <- function(N_schedule, mean = 0, time_schedule, start_beta, base_samples, seed = NULL) {
  # base samples is a list of length 1/beta containing samples for each node
  if (is.list(base_samples) && length(base_samples)!=(1/start_beta)) {
    stop('parallel_sequential_fusion_exp_4_rcpp: length of base_samples must be equal to 1/start_beta')
  }
  # N_schedule is how many samples you want in each node along the seq-tree
  if (is.vector(N_schedule) && length(N_schedule)!=(1/start_beta)-1) {
    stop('parallel_sequential_fusion_exp_4_rcpp: length of N_schedule must be equal to 1/start_beta - 1')
  }
  # time_schedule is the designated value for T for each node along the seq-tree
  if (is.vector(time_schedule) && length(time_schedule)!=(1/start_beta)-1) {
    stop('parallel_sequential_fusion_exp_4_rcpp: length of time_schedule must be equal to 1/start_beta - 1')
  }

  # initialising study results
  seq_samples <- list()
  seq_samples[[(1/start_beta)]] <- base_samples # base level
  time_taken_per_level <- rep(0, (1/start_beta)-1)
  input_betas <- list()
  input_betas[[(1/start_beta)]] <- NA
  output_beta <- c(1:((1/start_beta)-1), start_beta)
  overall_rho_iters <- 0
  overall_Q_iters <- 0
  overall_N <- 0

  # make some vectors for acceptance rates of the level
  rho_acc <- rep(0, (1/start_beta)-1)
  Q_acc <- rep(0, (1/start_beta)-1)
  rhoQ_acc <- rep(0, (1/start_beta)-1)

  # creating parallel cluster
  n_cores <- parallel::detectCores()
  cl <- parallel::makeCluster(n_cores, outfile = 'parallel_seq_fusion_exp_4.txt')

  # creating variable and functions list to pass into cluster using clusterExport
  varlist <- list("phi_exp_4", "bound_phi_exp_4", "diffusion_probability_exp_4", "fusion_diff_exp_4_rcpp",
                  "N_schedule", "mean", "time_schedule", "start_beta", "base_samples")
  parallel::clusterExport(cl, envir = environment(), varlist = varlist)

  # exporting functions from layeredBB package to simulate layered Brownian bridges
  parallel::clusterExport(cl, varlist = ls("package:layeredBB"))

  if (!is.null(seed)) {
    # setting seed for the cores in the cluster
    parallel::clusterSetRNGStream(cl, iseed = seed)
  }

  # add to output file that starting sequential fusion
  cat('Starting sequential fusion \n', file = 'parallel_seq_fusion_exp_4.txt')

  # setting index for which base_sample to fuse the current node with
  # starting with 3 since that's the first one we fuse with the second level
  # the first level is manually indexed as 1 and 2
  index <- 3

  for (k in ((1/start_beta)-1):1) {
    # splitting the number of samples to be obtained by each core
    # samples_per_core[i]=N_schedule[k]/n_cores for i=1,...,n_cores
    samples_per_core <- rep(floor(N_schedule[k]/n_cores), n_cores)

    # N_schedule[k]/n_cores may not be an integer
    # some cores will need to obtain one more sample so the total number of samples obtained is N_schedule[k]
    if (sum(samples_per_core) != N_schedule[k]) {
      remainder <- N_schedule[k] %% n_cores
      samples_per_core[1:remainder] <- samples_per_core[1:remainder] + 1
    }

    # performing Fusion for this level
    if (k == (1/start_beta)-1) {
      # printing out some stuff to log file to track the progress
      cat('########################\n', file = 'parallel_seq_fusion_exp_4.txt', append = T)
      cat('Starting to fuse', 2, 'densities for level', k, 'which is using', n_cores, 'cores\n',
          file = 'parallel_seq_fusion_exp_4.txt', append = T)
      cat('Fusing samples for beta =', 1, '/', (1/start_beta), 'and beta =', 1, '/', (1/start_beta), 'to get', N_schedule[k],
          'samples for beta =', 2, '/', (1/start_beta), '\n',
          file = 'parallel_seq_fusion_exp_4.txt', append = T)
      cat('########################\n', file = 'parallel_seq_fusion_exp_4.txt', append = T)

      # starting fusion
      pcm <- proc.time()
      fused <- parallel::parLapply(cl, X = 1:n_cores, fun = function(i) {
        fusion_diff_exp_4_rcpp(N = samples_per_core[i],
                               mean = mean,
                               time = time_schedule[k],
                               C = 2,
                               samples_to_fuse = list(base_samples[[1]], base_samples[[2]]),
                               betas = c(start_beta, start_beta))})
      final <- proc.time() - pcm

      # providing information of what was the input samples and what was the output
      input_betas[[k]] <- c(start_beta, start_beta)
      output_beta[k] <- 2*start_beta
    } else {
      # printing out some stuff to log file to track the progress
      cat('########################\n', file = 'parallel_seq_fusion_exp_4.txt', append = T)
      cat('Starting to fuse', 2, 'densities for level', k, 'which is using', n_cores, 'cores\n',
          file = 'parallel_seq_fusion_exp_4.txt', append = T)
      cat('Fusing samples for beta =', (index-1), '/', (1/start_beta), 'and beta =', 1, '/', (1/start_beta),
          'with time', time_schedule[k], 'to get', N_schedule[k], 'samples for beta =', index, '/', (1/start_beta),
          '\n', file = 'parallel_seq_fusion_exp_4.txt', append = T)
      cat('########################\n', file = 'parallel_seq_fusion_exp_4.txt', append = T)

      # starting fusion
      pcm <- proc.time()
      fused <- parallel::parLapply(cl, X = 1:n_cores, fun = function(i) {
        fusion_diff_exp_4_rcpp(N = samples_per_core[i],
                               mean = mean,
                               time = time_schedule[k],
                               C = 2,
                               samples_to_fuse = list(seq_samples[[k+1]], base_samples[[index]]),
                               betas = c((index-1)*start_beta, start_beta))})
      final <- proc.time() - pcm

      # providing information of what was the input samples and what was the output
      input_betas[[k]] <- c((index-1)*start_beta, start_beta)
      output_beta[k] <- index*start_beta

      # increasing index for which base_sample to fuse with for next node
      index <- index + 1
    }

    # fused is a list of length n_cores containing N/n_cores samples
    # need to combine the correct samples
    seq_samples[[k]] <- unlist(lapply(1:n_cores, function(i) fused[[i]]$samples))

    # calculating the acceptance rates for all nodes in the current level
    # summing all the iterations for each core
    rho_iterations <- sum(sapply(1:length(fused), function(i) fused[[i]]$iters_rho))
    Q_iterations <- sum(sapply(1:length(fused), function(i) fused[[i]]$iters_Q))

    rho_acc[k] <- Q_iterations / rho_iterations
    Q_acc[k] <- N_schedule[k] / Q_iterations
    rhoQ_acc[k] <- N_schedule[k] / rho_iterations
    time_taken_per_level[k] <- final['elapsed']
    overall_rho_iters <- overall_rho_iters + rho_iterations
    overall_Q_iters <- overall_Q_iters + Q_iterations
    overall_N <- overall_N + N_schedule[k]
  }

  # stopping cluster
  parallel::stopCluster(cl)
  # print completion
  cat('Completed seq fusion\n', file = 'parallel_seq_fusion_exp_4.txt', append = T)

  return(list('samples' = seq_samples, 'time' = time_taken_per_level,
              'rho_acc' = rho_acc, 'Q_acc' = Q_acc, 'rhoQ_acc' = rhoQ_acc,
              'input_betas' = input_betas, 'output_beta' = output_beta, 'diffusion_times' = time_schedule,
              'overall_rho' = overall_Q_iters/overall_rho_iters, 'overall_Q' = overall_N/overall_Q_iters,
              'overall_rhoQ' = overall_N/overall_rho_iters))
}



######################################## time_adapting sequential_fusion ########################################

#' Time-adapting Sequential Monte Carlo Fusion
#'
#' Sequential Time-adapting Monte Carlo Fusion with base level of the form exp(-((x^4)*beta)/2)
#'
#' @param N_schedule vector of length (L-1), where N_schedule[l] is the number of samples per node at level l
#' @param mean mean value
#' @param global_T time T for time-adapting fusion algorithm
#' @param start_beta beta for the base level
#' @param base_samples list of length (1/start_beta), where samples_to_fuse[c] containg the samples for the c-th node in the level
#' @param seed seed number - default is NULL, meaning there is no seed
#'
#' @return samples: samples from Sequential fusion
#' @return time: vector of length (L-1), where time[l] is the run time for level l
#' @return rho_acc: vector of length (L-1), where rho_acc[l] is the acceptance rate for first fusion step for level l
#' @return Q_acc: vector of length (L-1), where Q_acc[l] is the acceptance rate for second fusion step for level l
#' @return input_betas: list of length (L), where input_betas[[l]] is the input betas for level l
#' @return output_beta: vector of length(L-1), where output_beta[l] is the beta for level l
#' @return diffusion_times: list of length (L-1), where diffusion_times[[l]] are the scaled/adapted times for fusion in level l
#'
#' @examples
#' input_samples <- base_rejection_sampler_exp_4(beta = 1/4, nsamples = 100000,  proposal_mean = 0, proposal_sd = 1.5, dominating_M = 1.4)
#' test <- parallel_sequential_fusion_TA_exp_4_rcpp(N_schedule = rep(10000, 3), global_T = 0.5,
#'                                                  start_beta = 1/4, base_samples = input_samples)
#'
#' # plot results
#' plot_levels_seq_exp_4(test, from = -3, to = 3, plot_rows = 2, plot_columns = 2)
#'
#' @export
parallel_sequential_fusion_TA_exp_4_rcpp <- function(N_schedule, mean = 0, global_T, start_beta, base_samples, seed = NULL) {
  # base samples is a list of length 1/beta containing samples for each node
  if (is.list(base_samples) && length(base_samples)!=(1/start_beta)) {
    stop('parallel_sequential_fusion_TA_exp_4_rcpp: length of base_samples must be equal to 1/start_beta')
  }
  # N_schedule is how many samples you want in each node along the seq-tree
  if (is.vector(N_schedule) && length(N_schedule)!=(1/start_beta)-1) {
    stop('parallel_sequential_fusion_TA_exp_4_rcpp: length of N_schedule must be equal to 1/start_beta - 1')
  }

  # change global_T by multiplying it by sqrt(start_beta)
  global_T <- global_T*sqrt(start_beta)

  # initialising study results
  seq_samples <- list()
  seq_samples[[(1/start_beta)]] <- base_samples # base level
  time_taken_per_level <- rep(0, (1/start_beta)-1)
  input_betas <- list()
  input_betas[[(1/start_beta)]] <- NA
  output_beta <- c(1:((1/start_beta)-1), start_beta)
  diffusion_times <- list()
  overall_rho_iters <- 0
  overall_Q_iters <- 0
  overall_N <- 0

  # make some vectors for acceptance rates of the level
  rho_acc <- rep(0, (1/start_beta)-1)
  Q_acc <- rep(0, (1/start_beta)-1)
  rhoQ_acc <- rep(0, (1/start_beta)-1)

  # creating parallel cluster
  n_cores <- parallel::detectCores()
  cl <- parallel::makeCluster(n_cores, outfile = 'parallel_seq_TA_fusion_exp_4.txt')

  # creating variable and functions list to pass into cluster using clusterExport
  varlist <- list("phi_exp_4", "bound_phi_exp_4", "diffusion_probability_exp_4", "fusion_TA_exp_4_rcpp",
                  "N_schedule", "mean", "global_T", "start_beta", "base_samples")
  parallel::clusterExport(cl, envir = environment(), varlist = varlist)

  # exporting functions from layeredBB package to simulate layered Brownian bridges
  parallel::clusterExport(cl, varlist = ls("package:layeredBB"))

  if (!is.null(seed)) {
    # setting seed for the cores in the cluster
    parallel::clusterSetRNGStream(cl, iseed = seed)
  }

  # add to output file that starting sequential fusion
  cat('Starting sequential fusion \n', file = 'parallel_seq_TA_fusion_exp_4.txt')

  # setting index for which base_sample to fuse the current node with
  # starting with 3 since that's the first one we fuse with the second level
  # the first level is manually indexed as 1 and 2
  index <- 3

  for (k in ((1/start_beta)-1):1) {
    # splitting the number of samples to be obtained by each core
    # samples_per_core[i]=N_schedule[k]/n_cores for i=1,...,n_cores
    samples_per_core <- rep(floor(N_schedule[k]/n_cores), n_cores)

    # N_schedule[k]/n_cores may not be an integer
    # some cores will need to obtain one more sample so the total number of samples obtained is N_schedule[k]
    if (sum(samples_per_core) != N_schedule[k]) {
      remainder <- N_schedule[k] %% n_cores
      samples_per_core[1:remainder] <- samples_per_core[1:remainder] + 1
    }

    # performing Fusion for this level
    if (k == (1/start_beta)-1) {
      # printing out some stuff to log file to track the progress
      cat('########################\n', file = 'parallel_seq_TA_fusion_exp_4.txt', append = T)
      cat('Starting to fuse', 2, 'densities for level', k, 'which is using', n_cores, 'cores\n',
          file = 'parallel_seq_TA_fusion_exp_4.txt', append = T)
      cat('Fusing samples for beta =', 1, '/', (1/start_beta), 'with time', global_T/sqrt(start_beta),
          ', and beta =', 1, '/', (1/start_beta), 'with time', global_T/sqrt(start_beta), 'to get', N_schedule[k],
          'samples for beta =', 2, '/', (1/start_beta), '\n',
          file = 'parallel_seq_TA_fusion_exp_4.txt', append = T)
      cat('########################\n', file = 'parallel_seq_TA_fusion_exp_4.txt', append = T)

      # starting fusion
      pcm <- proc.time()
      fused <- parallel::parLapply(cl, X = 1:n_cores, fun = function(i) {
        fusion_TA_exp_4_rcpp(N = samples_per_core[i],
                             mean = mean,
                             time = global_T,
                             C = 2,
                             samples_to_fuse = list(base_samples[[1]], base_samples[[2]]),
                             betas = c(start_beta, start_beta))})
      final <- proc.time() - pcm

      # providing information of what was the input samples and what was the output
      input_betas[[k]] <- c(start_beta, start_beta)
      output_beta[k] <- 2*start_beta
      diffusion_times[[k]] <- global_T / sqrt(c(start_beta, start_beta))
    } else {
      # printing out some stuff to log file to track the progress
      cat('########################\n', file = 'parallel_seq_TA_fusion_exp_4.txt', append = T)
      cat('Starting to fuse', 2, 'densities for level', k, 'which is using', n_cores, 'cores\n',
          file = 'parallel_seq_TA_fusion_exp_4.txt', append = T)
      cat('Fusing samples for beta =', (index-1), '/', (1/start_beta), 'with time', global_T/sqrt((index-1)*start_beta),
          ', and beta =', 1, '/', (1/start_beta), 'with time', global_T/sqrt(start_beta), 'to get', N_schedule[k],
          'samples for beta =', index, '/', (1/start_beta), '\n',
          file = 'parallel_seq_TA_fusion_exp_4.txt', append = T)
      cat('########################\n', file = 'parallel_seq_TA_fusion_exp_4.txt', append = T)

      # starting fusion
      pcm <- proc.time()
      fused <- parallel::parLapply(cl, X = 1:n_cores, fun = function(i) {
        fusion_TA_exp_4_rcpp(N = samples_per_core[i],
                             mean = mean,
                             time = global_T,
                             C = 2,
                             samples_to_fuse = list(seq_samples[[k+1]], base_samples[[index]]),
                             betas = c((index-1)*start_beta, start_beta))})
      final <- proc.time() - pcm

      # providing information of what was the input samples and what was the output
      input_betas[[k]] <- c((index-1)*start_beta, start_beta)
      output_beta[k] <- index*start_beta
      diffusion_times[[k]] <- global_T / sqrt(c((index-1)*start_beta, start_beta))

      # increasing index for which base_sample to fuse with for next node
      index <- index + 1
    }

    # fused is a list of length n_cores containing N/n_cores samples
    # need to combine the correct samples
    seq_samples[[k]] <- unlist(lapply(1:n_cores, function(i) fused[[i]]$samples))

    # calculating the acceptance rates for all nodes in the current level
    # summing all the iterations for each core
    rho_iterations <- sum(sapply(1:length(fused), function(i) fused[[i]]$iters_rho))
    Q_iterations <- sum(sapply(1:length(fused), function(i) fused[[i]]$iters_Q))

    rho_acc[k] <- Q_iterations / rho_iterations
    Q_acc[k] <- N_schedule[k] / Q_iterations
    rhoQ_acc[k] <- N_schedule[k] / rho_iterations
    time_taken_per_level[k] <- final['elapsed']
    overall_rho_iters <- overall_rho_iters + rho_iterations
    overall_Q_iters <- overall_Q_iters + Q_iterations
    overall_N <- overall_N + N_schedule[k]
  }

  # stopping cluster
  parallel::stopCluster(cl)
  # print completion
  cat('Completed seq fusion\n', file = 'parallel_seq_TA_fusion_exp_4.txt', append = T)

  return(list('samples' = seq_samples, 'time' = time_taken_per_level,
              'rho_acc' = rho_acc, 'Q_acc' = Q_acc, 'rhoQ_acc' = rhoQ_acc,
              'input_betas' = input_betas, 'output_beta' = output_beta, 'diffusion_times' = diffusion_times,
              'overall_rho' = overall_Q_iters/overall_rho_iters, 'overall_Q' = overall_N/overall_Q_iters,
              'overall_rhoQ' = overall_N/overall_rho_iters))
}
rchan26/exp4FusionRCPP documentation built on Nov. 6, 2019, 7:01 p.m.