require(plyr)
require(MSBVAR)
# TODO
# write documentation for this!
# setting up the config
# should be given
# list of model parameters including,
# * priority,
# * moment a priori connection
# * transform (optional)
# * bounds (optional)
# list of computational parameters
# * tolerance
# * iteration limit
#' create the configuration for the MCMC
#' chain.
#' @export
mopt_config <- function(p) {
stopifnot(is.list(p))
cf = list()
cf$mode = 'mpi'
cf$iter = 10
cf$i = 0
cf$use_last_run = TRUE
cf$file_chain = 'evaluations.dat'
cf$file_lastparam = 'param_submit.dat'
cf$file_errorparam= 'param_error.dat'
cf$save_error = FALSE
cf$wd = '~/git/bankruptcy/R'
cf$source_on_nodes = 'example-bgp-mpi-slaves.r'
cf$logdir = file.path(cf$wd,'sims/sim2/workers') # full path to log directory for workers
cf$debug = FALSE # switch TRUE if want to save parameter to disk before each evaluation
cf$params_to_sample = c() #This is a vector of names: c('delta', 'b')
cf$objfunc = MOPT_OBJ_FUNC
cf$modelinfo = c()
cf$run = 0
cf$shock_var = 0.1
cf$moments_to_use = c()
cf$moments.data = c()
cf$moments.sd = c()
cf$np_shock = 1
cf$n_untempered = 5
cf$save_freq = 25
cf$initial_value = p
cf$params_all = names(p) #c('sep','c','b','s0','s1','firmMass','beta','delta','sigma','f_rho','f_mx','f_my','f_a')
cf$N = 3 # default in case of serial
param.descript = data.frame()
for (n in names(p)) {
param.descript = rbind(param.descript,data.frame(param=n, lb=NA , ub=NA))
}
rownames(param.descript) <- names(p)
cf$pdesc = param.descript
class(cf) <- 'mopt_config'
return(cf)
}
#' defining sampling support for parameter
#' @export
samplep <- function(pp,lb,ub) {
res = list()
class(res) <- 'mopt_smaplep'
res$pp = pp
res$lb = lb
res$ub = ub
return(res)
}
#' add data moments to the configuration
#' @export
datamoments <- function(names,values,sds) {
res = list()
rr = data.frame(moment=names,value = values, sd = sds)
class(res) <- 'mopt_dmoms'
res$dd = rr
return(res)
}
#' @export
"+.mopt_config" <- function(cf,argb) {
if (class(argb)=='mopt_smaplep') {
cf$pdesc[argb$pp,'lb'] = argb$lb
cf$pdesc[argb$pp,'ub'] = argb$ub
cf$pdesc$param = paste(cf$pdesc$param)
}
if (class(argb)=='mopt_dmoms') {
cf$data.moments = argb$dd
}
return(cf)
}
# mopt_obj_wrapper <- function(p,objfunc=NA) {
# m = try( {
#
# get result
# r = objfunc(p)
#
# if (!is.list(r)) {
# r = list(value=r)
# }
#
# check that there is a status
# and that values is not NA
# if (!('status' %in% names(r))) {
# r$status=1
# }
#
# if (is.nan(r$value) | is.na(r$value)) {
# r$status=-1
# stop("objective function produced NA")
# }
# return(r)
#
# },silent=TRUE)
# return(m)
# returns NA if there is any sort of crash
# later it would be good to get the error message
# }
#' wraper for the objective function. Made public for multicore
#' @export
mopt_obj_wrapper <- function(p,objfunc=NA,errfile='param_error.dat') {
m = tryCatch( {
# get result
r = objfunc(p)
if (!is.list(r)) {
r = list(value=r)
}
# check that there is a status
# and that values is not NA
if (!('status' %in% names(r))) {
r$status=1
}
if (is.nan(r$value) | is.na(r$value)) {
r$status=-1
}
r
},error = function(e) {
list(status=-1,error=e$message)
} )
# if status is <0 we store the parameters in a file
if (m$status<0) {
try({
#save(p,file=paste('per.',format(Sys.time(), "%m.%d.%y-%Hh%S"), '-',sample.int(1000,1) , '.dat',sep=''))
per <- NULL
if ( file.exists(errfile)) try(load(errfile));
if (is.null(per)) per <- data.frame();
per <- rbind(per, data.frame(p))
save(per, file=errfile)
})
}
return(m)
# returns NA if there is any sort of crash
# later it would be good to get the error message
}
mopt_obj_wrapper_custom <- function(p,objfunc=NA) {
m = tryCatch( {
# get result
r = objfunc(p)
if (!is.list(r)) {
stop('MOPT_OBJ_FUNC must return a list')
}
if (!('output' %in% names(r))){
stop('MOPT_OBJ_FUNC must return a list with an element named: output')
}
r
},error = function(e) {
list(status=-1,error=e$message)
} )
return(m)
}
#' prepares mopt to run with either MPI or openMP or just serial
#' @export
prepare.mopt_config <- function(cf) {
if (cf$mode=='mpi') {
cat('[mode=mpi] USING MPI !!!!! \n')
#using RMPI instead of SNOW which is not working with Econ HPC
#save .Rprofile into your working directory first
#it will initialize RMPI
cf$N = mpi.universe.size()-1
## creating the cluster
## size of the cluster is determined by MPIRUN, i.e. in the SGE submit script. not here.
#require(snow)
#cl <- makeCluster(type='MPI',spec=cf$N)
#cf$cl <- cl # add cluster to the config as well
## worker roll call
#cf$num.worker <- length(clusterEvalQ(cl,Sys.info()))
#dir.create(cf$logdir,showWarnings=FALSE)
#cat("Master: I've got",cf$num.worker,"workers\n")
#cat("Master: doing rollcall on cluster now ( ", file.path(cf$logdir,"rollcall.txt") ," )\n")
#cat("Here is the boss talking. Worker roll call on",date(),"\n",file=file.path(cf$logdir,"rollcall.txt"),append=FALSE)
## setting up the slaves
#cat("Master: setting directory and sourcing on slaves ( ",cf$wd, cf$source_on_nodes ," )\n")
#eval(parse(text = paste("clusterEvalQ(cl,setwd('",cf$wd,"'))",sep='',collapse='')))
#eval(parse(text = paste("clusterEvalQ(cl,source('",cf$source_on_nodes,"'))",sep='',collapse='')))
#clusterCall(cl, rollcall, cf$logdir)
## adding the normal lapply
#cf$mylapply = function(a,b) { return(parLapply(cl,a,b))}
## adding the load balanced lapply
#cf$mylbapply = function(a,b) { return(clusterApplyLB(cl,a,b))}
#cf$N = length(cl)
#} else if (cf$mode=='mpiLB') {
# cat('[mode=mpiLB] USING LOAD BALANCED MPI !!!!! \n')
#
# # creating the cluster
# # size of the cluster is determined by MPIRUN, i.e. in the SGE submit script. not here.
# require(snow)
# cl <- makeCluster(type='MPI',spec=cf$N)
# cf$cl <- cl # add cluster to the config as well
#
# # worker roll call
# cf$num.worker <- length(clusterEvalQ(cl,Sys.info()))
# dir.create(cf$logdir,showWarnings=FALSE)
# cat("Master: I've got",cf$num.worker,"workers\n")
# cat("Master: doing rollcall on cluster now\n")
# cat("Here is the boss talking. Worker roll call on",date(),"\n",file=file.path(cf$logdir,"rollcall.txt"),append=FALSE)
#
# # setting up the slaves
# eval(parse(text = paste("clusterEvalQ(cl,setwd('",cf$wd,"'))",sep='',collapse='')))
# eval(parse(text = paste("clusterEvalQ(cl,source('",cf$source_on_nodes,"'))",sep='',collapse='')))
# clusterCall(cl, rollcall, cf$logdir)
#
# # adding the normal lapply
# cf$mylapply = function(a,b) { return(parLapply(cl,a,b))}
#
# # adding the load balanced lapply
# cf$mylbapply = function(a,b) { return(clusterApplyLB(cl,a,b))}
#
# if (cf$N <= length(cl)){
# warning('benefits of Load Balancing on cluster only materialize\nif you set number of chains N > number of nodes')
# }
} else if (cf$mode=='mpi2') {
cat('[mode=mpi2] we use Rmpi, and we startup the cluster ourselves\n')
library(Rmpi)
library(snow)
np <- mpi.universe.size() - 1
cluster <- makeMPIcluster(np)
cf$cluster=cluster
cf$N= np
} else if (cf$mode=='mpi3') {
cat('[mode=mpi3] we use Rmpi, and we set worker number N in advance\n')
library(Rmpi)
library(snow)
#cluster <- makeMPIcluster(cf$N)
#cf$cluster=cluster
cf$cluster <- makeCluster(type='MPI',spec=cf$N)
num.worker <- length(clusterEvalQ(cf$cluster,Sys.info()))
cat("num workers:",num.worker,'\n')
} else if (cf$mode=='multicore2') {
cat('[mode=multicore2] we use makeCluster to allocate threads once and for all\n')
cf$cluster = makeForkCluster(cf$N)
} else if (cf$mode=='multicore') {
cat('[mode=mulicore] threads will be re-allocated at each call \n')
require(parallel)
cf$N= detectCores()-1
#if(Sys.info()[['sysname']]=='Windows') {
# cl <- makeCluster(spec=pmin(detectCores(),cf$N),type='MPI')
#
# # worker roll call
# dir.create(file.path(cf$wd,"workers"),showWarnings=FALSE)
# cat("Master: I've got",num.worker,"workers\n")
# cat("Master: doing rollcall on cluster now\n")
# cat("Here is the boss talking. Worker roll call on",date(),"\n",file=file.path(cf$wd,"workers","rollcall.txt"),append=FALSE)
# # setting up the slaves
# eval(parse(text = paste("clusterEvalQ(cl,setwd('",cf$wd,"'))",sep='',collapse='')))
# eval(parse(text = paste("clusterEvalQ(cl,source('",cf$source_on_nodes,"'))",sep='',collapse='')))
# clusterCall(cl, rollcall, file.path(cf$wd,"workers"))
#
#
# # adding the normal lapply
# cf$mylapply = function(a,b) { return(parLapply(cl,a,b))}
#
# # adding the load balanced lapply
# cf$mylbapply = function(a,b) { return(clusterApplyLB(cl,a,b))}
#
# cf$N = length(cl)
#} else {
# cf$mylapply = mclapply;
# cf$mylbapply = mclapply;
# cf$N= pmin(detectCores(),cf$N)
#}
} else if (cf$mode == 'serial') {
cat('[mode=serial] NOT USING MPI !!!!! \n')
} else {
#error(cat('your selected mode: ',cf$mode,' does not exist. please choose from mpi,mpiLB,multicore and serial'))
error(cat('your selected mode: ',cf$mode,' does not exist. please choose from mpi,multicore and serial'))
}
return(cf)
}
#' cluster rollcall
#'
#' cluster reporting function. makes every worker state their name and
#' writes into a log file
#' @export
#' @param dir path to the log directory
rollcall <- function(dir){
my.name <- Sys.info()["nodename"]
cat("I am a worker (pid=", Sys.getpid() ,"), my name is ",my.name,"and I'm ready to go.\n",file=file.path(dir,"rollcall.txt"),append=TRUE)
}
#' this is the main function, it will run the
#' optimizer in parallel if called with MPI=TRUE
#' it will search for the minimum
#' @export
runMOpt <- function(cf,autoload=TRUE) {
# reading configuration
# =====================
pdesc = cf$pdesc
p = cf$initial_value
priv = list()
# setting up the cluster if MPI
# =============================
last_time = as.numeric(proc.time()[3])
# saving all evaluations with param values
# start from best last value
if (file.exists(cf$file_chain) & autoload ) {
cat("loading automatically chain and config file from disk (because autoload=TRUE)")
load(cf$file_chain)
cf$run = cf$run +1
} else {
param_data = data.frame()
# what is the meaning of these paramters?
# I find theta, breaks, nu, and kk are used algo.wl.r, acc is used to update sample var.
cf$theta = seq(1,l=50)
cf$breaks = seq(-2,0,l=50)
cf$acc = 0.5
cf$nu = seq(0,l=50)
cf$kk = 2
cf$i = 1
}
cat('Number of Chains: ',cf$N,'\n')
# we want to keep a matrix for the chain
# it should be a data.frame in long format
# with a chain and iteration indicator
# each row will also have the all parameters and the
# objective value
# we are going to carry 2 arrays
# -- param_data: will store all evaluations
# -- chain_data: will store the chain results
# ======================== MCMC loop =======================
# get initial candidates
cat('Computing intial candidates\n')
ps = computeInitialCandidates(cf$N,cf)
param_data = rbind(param_data, evaluateParameters(ps,cf))
cat('Done with intial candidates\n')
cat('Starting main MCMC loop\n')
for (i in cf$i:cf$iter) {
cf$i=i
# step 1, evaluate candidates
# --------------------------------------------------------
eval_start = as.numeric(proc.time()[3])
rd = evaluateParameters(ps,cf)
eval_time = as.numeric(proc.time()[3]) - eval_start
# SAVING VALUES
if ( (i %% cf$save_freq)==1 & i>10 ) {
cat(sprintf("%d evals, saving to %s \n",i,cf$file_chain))
save(cf,priv,param_data,file=cf$file_chain)
}
# step 2, updating chain and computing guesses
# ----------------------------------------------------------------
algo_start = as.numeric(proc.time()[3])
rr = cf$algo(rd,param_data, 0, cf, cf$pdesc, priv)
algo_time = as.numeric(proc.time()[3]) - algo_start
priv = rr$priv
ps = rr$ps
# append the accepted/rejected draws
param_data = rbind(param_data,rr$evals)
run_time = as.numeric(proc.time()[3]) - last_time
# small reporting
# ----------------
cat(sprintf('[%d/%d][total: %f][last: %f ( e:%4.2f + a:%4.2f )][m: %f] best value %f \n',
i , cf$iter ,
as.numeric(proc.time()[3]),
run_time,
eval_time/run_time,
algo_time/run_time,
sum(gc()[,"(Mb)"]),
min(param_data$value,na.rm=TRUE)))
last_time = as.numeric(proc.time()[3])
for (pp in paste('p',cf$params_to_sample,sep='.')) {
cat(' range for ',pp,' ',range(param_data[,pp]),'\n')
}
}
#saving the data set
save(cf,priv,param_data,file=cf$file_chain)
# stopping the cluster using R snow command
if (cf$mode=='mpi2') {
stopCluster(cf$cluster)
mpi.exit()
}
return(param_data)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.