##########################
# these following functions allow user to perform hierarchical fusion for target exp(-x^4/2)
# hierarchical fusion means that we perform fusion in steps to try recover samples for our target
##########################
######################################## h_fusion ########################################
#' Standard Hierarchical Monte Carlo Fusion
#'
#' Hierarchical Monte Carlo Fusion with base level of the form exp(-((x^4)*beta)/2)
#'
#' @param N_schedule vector of length (L-1), where N_schedule[l] is the number of samples per node at level l
#' @param mean mean value
#' @param time_schedule vector of legnth(L-1), where time_schedule[l] is the time chosen for Fusion at level l
#' @param start_beta beta for the base level
#' @param C_schedule vector of length (L-1), where C_schedule[l] is the number of samples to fuse for level l
#' @param L total number of levels in the hierarchy
#' @param base_samples list of length (1/start_beta), where samples_to_fuse[c] containg the samples for the c-th node in the level
#' @param seed seed number - default is NULL, meaning there is no seed
#'
#' @return samples: samples from hierarchical fusion
#' @return time: vector of length (L-1), where time[l] is the run time for level l
#' @return rho_acc: vector of length (L-1), where rho_acc[l] is the acceptance rate for first fusion step for level l
#' @return Q_acc: vector of length (L-1), where Q_acc[l] is the acceptance rate for second fusion step for level l
#' @return input_betas: list of length (L), where input_betas[[l]] is the input betas for level l
#' @return output_beta: vector of length(L-1), where output_beta[l] is the beta for level l
#' @return diffusion_times: vector of length (L-1), where diffusion_times[l] are the times for fusion in level l (= time_schedule)
#'
#' @examples
#' input_samples <- base_rejection_sampler_exp_4(beta = 1/4, nsamples = 100000, proposal_mean = 0, proposal_sd = 1.5, dominating_M = 1.4)
#' test <- parallel_h_fusion_exp_4_rcpp(N_schedule = rep(10000, 2), time_schedule = rep(1, 2),
#' start_beta = 1/4, C_schedule = rep(2, 2), L = 3,
#' base_samples = input_samples)
#'
#' # plot results
#' plot_levels_hier_exp_4(test, from = -3, to = 3, plot_rows = 3, plot_columns = 1)
#'
#' @export
parallel_h_fusion_exp_4_rcpp <- function(N_schedule, mean = 0, time_schedule, start_beta, C_schedule,
L, base_samples, seed = NULL) {
if (length(N_schedule) != (L-1)) stop('parallel_h_fusion_exp_4_rcpp: length of N_schedule must be equal to (L-1)')
if (length(time_schedule) != (L-1)) stop('parallel_h_fusion_exp_4_rcpp: length of time_schedule must be equal to (L-1)')
if (length(C_schedule) == (L-1)) {
# check that at each level, we are fusing a suitable number
for (l in length(C_schedule):1) {
if (((1/start_beta)/prod(C_schedule[(L-1):l]))%%1 != 0) {
stop('parallel_h_fusion_exp_4_rcpp: check that (1/beta)/prod(C_schedule[(L-1):l]) is an integer for l=L-1,...,1')
}
}
} else {
stop('parallel_h_fusion_exp_4_rcpp: C_schedule must be a vector of length (L-1)')
}
# we append 1 to the vector C_schedule to make the indices work later on when we call fusion
# we need this so that we can set the right value for beta when fusing up the levels
C_schedule <- c(C_schedule, 1)
# initialising study results
hier_samples <- list()
hier_samples[[L]] <- base_samples # base level
time_taken_per_level <- rep(0, L-1)
input_betas <- list()
input_betas[[L]] <- NA
output_beta <- c(1:(L-1), start_beta)
overall_rho_iters <- 0
overall_Q_iters <- 0
overall_N <- 0
# make some vectors for acceptance rates of the level
rho_acc <- rep(0, L-1)
Q_acc <- rep(0, L-1)
rhoQ_acc <- rep(0, L-1)
# creating parallel cluster
n_cores <- parallel::detectCores()
cl <- parallel::makeCluster(n_cores, outfile = 'parallel_h_fusion_exp_4.txt')
# creating variable and functions list to pass into cluster using clusterExport
varlist <- list("phi_exp_4", "bound_phi_exp_4", "diffusion_probability_exp_4", "fusion_diff_exp_4_rcpp",
"N_schedule", "mean", "time_schedule", "start_beta", "C_schedule", "L", "base_samples")
parallel::clusterExport(cl, envir = environment(), varlist = varlist)
# exporting functions from layeredBB package to simulate layered Brownian bridges
parallel::clusterExport(cl, varlist = ls("package:layeredBB"))
if (!is.null(seed)) {
# setting seed for the cores in the cluster
parallel::clusterSetRNGStream(cl, iseed = seed)
}
# add to output file that starting hierarchical fusion
cat('Starting hierarchical fusion \n', file = 'parallel_h_fusion_exp_4.txt')
# parallelising tasks for each level going up the hiearchy
for (k in ((L-1):1)) {
n_nodes <- (1/start_beta)/prod(C_schedule[L:k]) # since previous level has (1/beta)/prod(C_schedule[L:(k-1)]) nodes and we fuse C_schedule[k] of these
a <- floor(n_cores / n_nodes) # calculating how much we can split the work along the cores
if (a <= 1) {
##### IF a==0:
# there are more nodes than cores
# we make a cluster of size n_cores and parallelise task by asking each core to obtain samples for each node
# when a node is completed, the core will start on a new node
##### IF a==1:
# there are more cores than nodes, but not enough to split the number of samples needed by each node among the extra cores
###### in either case, we make a cluster of size n_nodes and parallelise task by asking each core to obtain samples for each node
# we cannot split the number of samples to be obtained by each core
# samples_per_core[i]=N_schedule[k] for i=1,...,n_nodes
samples_per_core <- rep(N_schedule[k], n_nodes)
# linking each core to samples to perform fusion on
linker <- 1:n_nodes
} else if (a >= 2) {
###### IF a >=2
# there are more cores than nodes AND there is enough extra cores so that the number of samples by each node can be reduced
# i.e. work can be shared among the extra cores
# we make a cluster of size a*n_nodes and each core obtains N_schedule[k]/a samples
# splitting the number of samples to be obtained by each core
# samples_per_core[i]=N_schedule[k]/a for i=1,...,n_nodes
samples_per_core <- rep(floor(N_schedule[k]/a), a)
# N_schedule[k]/a may not be an integer
# some cores will need to obtain one more sample so the total number of samples obtained is N_schedule[k]
if (sum(samples_per_core) != N_schedule[k]) {
remainder <- N_schedule[k] %% a
samples_per_core[1:remainder] <- samples_per_core[1:remainder] + 1
}
samples_per_core <- rep(samples_per_core, n_nodes)
# linking each core to samples to perform fusion on
linker <- unlist(lapply(1:n_nodes, function(i) rep(i, a)))
}
# performing Fusion for this level
# printing out some stuff to log file to track the progress
cat('########################\n', file = 'parallel_h_fusion_exp_4.txt', append = T)
cat('Starting to fuse', C_schedule[k], 'densities of pi^beta, where beta =', prod(C_schedule[L:(k+1)]), '/',
(1/start_beta), 'for level', k, 'with time', time_schedule[k], ', which is using', n_cores, 'cores\n',
file = 'parallel_h_fusion_exp_4.txt', append = T)
cat('There are', n_nodes, 'nodes at this level each giving', N_schedule[k],
'samples for beta =', prod(C_schedule[L:k]), '/', (1/start_beta),
'\n', file = 'parallel_h_fusion_exp_4.txt', append = T)
cat('########################\n', file = 'parallel_h_fusion_exp_4.txt', append = T)
# starting fusion
pcm <- proc.time()
fused <- parallel::parLapply(cl, X = 1:length(samples_per_core), fun = function(i) {
fusion_diff_exp_4_rcpp(N = samples_per_core[i],
mean = mean,
time = time_schedule[k],
C = C_schedule[k],
samples_to_fuse = hier_samples[[k+1]][((C_schedule[k]*linker[i])-(C_schedule[k]-1)):(C_schedule[k]*linker[i])],
betas = prod(C_schedule[L:(k+1)])*(start_beta))})
final <- proc.time() - pcm
# fused is a list of length a*n_cores containing N_schedule[k]/a samples
# need to combine the correct samples
if (a <= 1) {
hier_samples[[k]] <- lapply(1:n_nodes, function(i) fused[[which(linker==i)]]$samples)
} else if (a >= 2) {
fused_samples <- lapply(1:length(samples_per_core), function(i) fused[[i]]$samples)
hier_samples[[k]] <- lapply(1:n_nodes, function(i) unlist(fused_samples[which(linker==i)]))
}
# calculating the acceptance rates for all nodes in the current level
# summing all the iterations for each core
rho_iterations <- sum(sapply(1:length(fused), function(i) fused[[i]]$iters_rho))
Q_iterations <- sum(sapply(1:length(fused), function(i) fused[[i]]$iters_Q))
rho_acc[k] <- Q_iterations / rho_iterations
Q_acc[k] <- (N_schedule[k]*n_nodes) / Q_iterations
rhoQ_acc[k] <- (N_schedule[k]*n_nodes) / rho_iterations
time_taken_per_level[k] <- final['elapsed']
input_betas[[k]] <- rep(prod(C_schedule[L:(k+1)])*(start_beta), C_schedule[k])
output_beta[k] <- prod(C_schedule[L:k])*(start_beta)
overall_rho_iters <- overall_rho_iters + rho_iterations
overall_Q_iters <- overall_Q_iters + Q_iterations
overall_N <- overall_N + N_schedule[k]*n_nodes
}
# stopping cluster
parallel::stopCluster(cl)
# print completion
cat('Completed hierarchical fusion\n', file = 'parallel_h_fusion_exp_4.txt', append = T)
hier_samples[[1]] <- unlist(hier_samples[[1]])
return(list('samples' = hier_samples, 'time' = time_taken_per_level,
'rho_acc' = rho_acc, 'Q_acc' = Q_acc, 'rhoQ_acc' = rhoQ_acc,
'input_betas' = input_betas, 'output_beta' = output_beta, 'diffusion_times' = time_schedule,
'overall_rho' = overall_Q_iters/overall_rho_iters, 'overall_Q' = overall_N/overall_Q_iters,
'overall_rhoQ' = overall_N/overall_rho_iters))
}
######################################## time_adapting h_fusion ########################################
#' Time-adapting Hierarchical Monte Carlo Fusion
#'
#' Hierarchical Time-adapting Monte Carlo Fusion with base level of the form exp(-((x^4)*beta)/2)
#'
#' @param N_schedule vector of length (L-1), where N_schedule[l] is the number of samples per node at level l
#' @param mean mean value
#' @param global_T time T for time-adapting fusion algorithm
#' @param start_beta beta for the base level
#' @param C_schedule vector of length (L-1), where C_schedule[l] is the number of samples to fuse for level l
#' @param L total number of levels in the hierarchy
#' @param base_samples list of length (1/start_beta), where samples_to_fuse[c] containg the samples for the c-th node in the level
#' @param seed seed number - default is NULL, meaning there is no seed
#'
#' @return samples: samples from hierarchical fusion
#' @return time: vector of length (L-1), where time[l] is the run time for level l
#' @return rho_acc: vector of length (L-1), where rho_acc[l] is the acceptance rate for first fusion step for level l
#' @return Q_acc: vector of length (L-1), where Q_acc[l] is the acceptance rate for second fusion step for level l
#' @return input_betas: list of length (L), where input_betas[[l]] is the input betas for level l
#' @return output_beta: vector of length(L-1), where output_beta[l] is the beta for level l
#' @return diffusion_times: list of length (L-1), where diffusion_times[[l]] are the scaled/adapted times for fusion in level l
#'
#' @examples
#' input_samples <- base_rejection_sampler_exp_4(beta = 1/4, nsamples = 100000, proposal_mean = 0, proposal_sd = 1.5, dominating_M = 1.4)
#' test <- parallel_h_fusion_TA_exp_4_rcpp(N_schedule = rep(10000, 2), global_T = 0.5,
#' start_beta = 1/4, C_schedule = rep(2, 2), L = 3,
#' base_samples = input_samples)
#'
#' # plot results
#' plot_levels_hier_exp_4(test, from = -3, to = 3, plot_rows = 3, plot_columns = 1)
#'
#' @export
parallel_h_fusion_TA_exp_4_rcpp <- function(N_schedule, mean = 0, global_T, start_beta, C_schedule,
L, base_samples, seed = NULL) {
if (length(N_schedule) != (L-1)) stop('parallel_h_fusion_TA_exp_4_rcpp: length of N_schedule must be equal to (L-1)')
if (length(C_schedule) == (L-1)) {
# check that at each level, we are fusing a suitable number
for (l in length(C_schedule):1) {
if (((1/start_beta)/prod(C_schedule[(L-1):l]))%%1 != 0) {
stop('parallel_h_fusion_TA_exp_4_rcpp: check that (1/beta)/prod(C_schedule[(L-1):l]) is an integer for l=L-1,...,1')
}
}
} else {
stop('parallel_h_fusion_TA_exp_4_rcpp: C_schedule must be a vector of length (L-1)')
}
# change global_T by multiplying it by sqrt(start_beta)
global_T <- global_T*sqrt(start_beta)
# we append 1 to the vector C_schedule to make the indices work later on when we call fusion
# we need this so that we can set the right value for beta when fusing up the levels
C_schedule <- c(C_schedule, 1)
# initialising study results
hier_samples <- list()
hier_samples[[L]] <- base_samples # base level
time_taken_per_level <- rep(0, L-1)
input_betas <- list()
input_betas[[L]] <- NA
output_beta <- c(1:(L-1), start_beta)
diffusion_times <- list()
overall_rho_iters <- 0
overall_Q_iters <- 0
overall_N <- 0
# make some vectors for acceptance rates of the level
rho_acc <- rep(0, L-1)
Q_acc <- rep(0, L-1)
rhoQ_acc <- rep(0, L-1)
# creating parallel cluster
n_cores <- parallel::detectCores()
cl <- parallel::makeCluster(n_cores, outfile = 'parallel_h_fusion_TA_exp_4.txt')
# creating variable and functions list to pass into cluster using clusterExport
varlist <- list("phi_exp_4", "bound_phi_exp_4", "diffusion_probability_exp_4", "fusion_TA_exp_4_rcpp",
"N_schedule", "mean", "global_T", "start_beta", "C_schedule", "L", "base_samples")
parallel::clusterExport(cl, envir = environment(), varlist = varlist)
# exporting functions from layeredBB package to simulate layered Brownian bridges
parallel::clusterExport(cl, varlist = ls("package:layeredBB"))
if (!is.null(seed)) {
# setting seed for the cores in the cluster
parallel::clusterSetRNGStream(cl, iseed = seed)
}
# parallelising tasks for each level going up the hiearchy
for (k in ((L-1):1)) {
n_nodes <- (1/start_beta)/prod(C_schedule[L:k]) # since previous level has (1/beta)/prod(C_schedule[L:(k-1)]) nodes and we fuse C_schedule[k] of these
a <- floor(n_cores / n_nodes) # calculating how much we can split the work along the cores
if (a <= 1) {
##### IF a==0:
# there are more nodes than cores
# we make a cluster of size n_cores and parallelise task by asking each core to obtain samples for each node
# when a node is completed, the core will start on a new node
##### IF a==1:
# there are more cores than nodes, but not enough to split the number of samples needed by each node among the extra cores
###### in either case, we make a cluster of size n_nodes and parallelise task by asking each core to obtain samples for each node
# we cannot split the number of samples to be obtained by each core
# samples_per_core[i]=N_schedule[k] for i=1,...,n_nodes
samples_per_core <- rep(N_schedule[k], n_nodes)
# linking each core to samples to perform fusion on
linker <- 1:n_nodes
} else if (a >= 2) {
###### IF a >=2
# there are more cores than nodes AND there is enough extra cores so that the number of samples by each node can be reduced
# i.e. work can be shared among the extra cores
# we make a cluster of size a*n_nodes and each core obtains N_schedule[k]/a samples
# splitting the number of samples to be obtained by each core
# samples_per_core[i]=N_schedule[k]/a for i=1,...,n_nodes
samples_per_core <- rep(floor(N_schedule[k]/a), a)
# N_schedule[k]/a may not be an integer
# some cores will need to obtain one more sample so the total number of samples obtained is N_schedule[k]
if (sum(samples_per_core) != N_schedule[k]) {
remainder <- N_schedule[k] %% a
samples_per_core[1:remainder] <- samples_per_core[1:remainder] + 1
}
samples_per_core <- rep(samples_per_core, n_nodes)
# linking each core to samples to perform fusion on
linker <- unlist(lapply(1:n_nodes, function(i) rep(i, a)))
}
# performing Fusion for this level
# printing out some stuff to log file to track the progress
cat('########################\n', file = 'parallel_h_fusion_TA_exp_4.txt', append = T)
cat('Starting to fuse', C_schedule[k], 'densities of pi^beta, where beta =', prod(C_schedule[L:(k+1)]), '/', (1/start_beta),
'for level', k, 'with time', global_T/sqrt(prod(C_schedule[L:(k+1)])*(start_beta)),
', which is using', min(length(samples_per_core), n_cores), 'cores\n',
file = 'parallel_h_fusion_TA_exp_4.txt', append = T)
cat('There are', n_nodes, 'nodes at this level each giving', N_schedule[k], 'samples for beta =', prod(C_schedule[L:k]), '/', (1/start_beta),
'\n', file = 'parallel_h_fusion_TA_exp_4.txt', append = T)
cat('########################\n', file = 'parallel_h_fusion_TA_exp_4.txt', append = T)
# starting fusion
pcm <- proc.time()
fused <- parallel::parLapply(cl, X = 1:length(samples_per_core), fun = function(i) {
fusion_TA_exp_4_rcpp(N = samples_per_core[i],
mean = mean,
time = global_T,
C = C_schedule[k],
samples_to_fuse = hier_samples[[k+1]][((C_schedule[k]*linker[i])-(C_schedule[k]-1)):(C_schedule[k]*linker[i])],
betas = prod(C_schedule[L:(k+1)])*(start_beta))})
final <- proc.time() - pcm
# fused is a list of length a*n_cores containing N_schedule[k]/a samples
# need to combine the correct samples
if (a <= 1) {
hier_samples[[k]] <- lapply(1:n_nodes, function(i) fused[[which(linker==i)]]$samples)
} else if (a >= 2) {
fused_samples <- lapply(1:length(samples_per_core), function(i) fused[[i]]$samples)
hier_samples[[k]] <- lapply(1:n_nodes, function(i) unlist(fused_samples[which(linker==i)]))
}
# calculating the acceptance rates for all nodes in the current level
# summing all the iterations for each core
rho_iterations <- sum(sapply(1:length(fused), function(i) fused[[i]]$iters_rho))
Q_iterations <- sum(sapply(1:length(fused), function(i) fused[[i]]$iters_Q))
rho_acc[k] <- Q_iterations / rho_iterations
Q_acc[k] <- (N_schedule[k]*n_nodes) / Q_iterations
rhoQ_acc[k] <- (N_schedule[k]*n_nodes) / rho_iterations
time_taken_per_level[k] <- final['elapsed']
input_betas[[k]] <- rep(prod(C_schedule[L:(k+1)])*(start_beta), C_schedule[k])
output_beta[k] <- prod(C_schedule[L:k])*(start_beta)
diffusion_times[[k]] <- global_T / rep(sqrt(prod(C_schedule[L:(k+1)])*(start_beta)), C_schedule[k])
overall_rho_iters <- overall_rho_iters + rho_iterations
overall_Q_iters <- overall_Q_iters + Q_iterations
overall_N <- N_schedule[k]*n_nodes
}
# stopping cluster
parallel::stopCluster(cl)
# print completion
cat('Completed hierarchical fusion\n', file = 'parallel_h_fusion_TA_exp_4.txt', append = T)
hier_samples[[1]] <- unlist(hier_samples[[1]])
return(list('samples' = hier_samples, 'time' = time_taken_per_level,
'rho_acc' = rho_acc, 'Q_acc' = Q_acc, 'rhoQ_acc' = rhoQ_acc,
'input_betas' = input_betas, 'output_beta' = output_beta, 'diffusion_times' = diffusion_times,
'overall_rho' = overall_Q_iters/overall_rho_iters, 'overall_Q' = overall_N/overall_Q_iters,
'overall_rhoQ' = overall_N/overall_rho_iters))
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.