`startParallel` <-
function(parallel.config,
process,
qr.taus) {
if (any(toupper(parallel.config[['BACKEND']]) == 'MULTICORE' | toupper(parallel.config[['BACKEND']]) == 'SNOW')) {
stop(paste('\n\t', parallel.config[['BACKEND']], "no longer supported. Please use the 'PARALLEL' package backend and R > 2.12 for parallel computation.\n"))
}
if (toupper(parallel.config[['BACKEND']]) == 'FOREACH' & !is.null(parallel.config[['TYPE']])) {
if (parallel.config[['TYPE']] != "doParallel")
stop(paste('\n\t', parallel.config[['TYPE']], "no longer supported. Please use doParallel and R > 2.12 for parallel computation.\n"))
}
workers <- NULL; par.type <- 'OTHER'; TAUS.LIST <- NULL
if (!is.null(parallel.config[['CLUSTER.OBJECT']])) {
if (!missing(qr.taus)) {
workers <- length(eval(parse(text=parallel.config[['CLUSTER.OBJECT']])))
chunk.size <- ceiling(length(qr.taus) / workers)
TAUS.LIST <- vector("list", workers)
for (chunk in 0:(workers-1)) {
lower.index <- chunk*chunk.size+1
upper.index <- min((chunk+1)*chunk.size, length(qr.taus))
TAUS.LIST[[chunk+1]] <- qr.taus[lower.index:upper.index]
}
}
clusterEvalQ(eval(parse(text=parallel.config[['CLUSTER.OBJECT']])), library(SGP))
par.start <- list(internal.cl=eval(parse(text=parallel.config[['CLUSTER.OBJECT']])), par.type='SNOW')
clusterExport(eval(parse(text=parallel.config[['CLUSTER.OBJECT']])), "par.start", envir=2)
return(list(internal.cl=eval(parse(text=parallel.config[['CLUSTER.OBJECT']])),
par.type='SNOW', TAUS.LIST=TAUS.LIST))
}
### Basic checks - default to ANY percentiles or projections WORKERS.
if (is.numeric(parallel.config[['WORKERS']])) {
message(paste0("\n\tNOTE: ", process, " workers not specified. Numeric value from WORKERS (", parallel.config[['WORKERS']], ") will be used for all processes.\n"))
parallel.config[['WORKERS']][[process]] <- parallel.config[['WORKERS']]
}
if (is.null(parallel.config[['WORKERS']][[process]])) {
if (!is.null(parallel.config[['WORKERS']])) {
tmp.indx <- grep(strsplit(process, "_")[[1]][2], names(parallel.config[['WORKERS']]))
if (any(!is.na(tmp.indx))) {
parallel.config[['WORKERS']][[process]] <- parallel.config[['WORKERS']][[tmp.indx]]
message(paste("\n\tNOTE: ", process, "workers not defined specifically.", names(parallel.config[['WORKERS']][tmp.indx]),
"WORKERS will be used (", parallel.config[['WORKERS']][tmp.indx], "worker processors)."))
}
} # See if still NULL and stop:
if (is.null(parallel.config[['WORKERS']][[process]])) {
# stop(paste(process, "workers must be specified."))
parallel.config[['WORKERS']][[process]] <- 1
message(paste0("\n\tNOTE: ", process, " workers not specified! WORKERS will be set to a single (1) process.\n"))
}
}
Lower_Level_Parallel <- NULL
if (all(c("PERCENTILES", "TAUS") %in% names(parallel.config[['WORKERS']]))) {
# if (as.numeric(parallel.config[['WORKERS']][['PERCENTILES']])==1) {
# Lower_Level_Parallel <- parallel.config
# } else stop("Both TAUS and PERCENTILES cannot be executed in Parallel at the same time.")
if (.Platform$OS.type != "unix" | "SNOW_TEST" %in% names(parallel.config)) stop("Both TAUS and PERCENTILES cannot be executed in Parallel at the same time in Windows OS or using SNOW type backends.")
Lower_Level_Parallel <- parallel.config
}
if (all(c("PERCENTILES", "SIMEX") %in% names(parallel.config[['WORKERS']]))) {
if (.Platform$OS.type != "unix" | "SNOW_TEST" %in% names(parallel.config)) stop("Both SIMEX and PERCENTILES cannot be executed in Parallel at the same time in Windows OS or using SNOW type backends.")
Lower_Level_Parallel <- parallel.config
}
if (all(c("BASELINE_PERCENTILES", "TAUS") %in% names(parallel.config[['WORKERS']]))) {
if (.Platform$OS.type != "unix" | "SNOW_TEST" %in% names(parallel.config)) stop("Both TAUS and BASELINE_PERCENTILES cannot be executed in Parallel at the same time in Windows OS or using SNOW type backends.")
Lower_Level_Parallel <- parallel.config
}
if (all(c("BASELINE_PERCENTILES", "SIMEX") %in% names(parallel.config[['WORKERS']]))) {
if (.Platform$OS.type != "unix" | "SNOW_TEST" %in% names(parallel.config)) stop("Both SIMEX and BASELINE_PERCENTILES cannot be executed in Parallel at the same time in Windows OS or using SNOW type backends.")
Lower_Level_Parallel <- parallel.config
}
### Basic configuration
if (toupper(parallel.config[['BACKEND']]) == 'FOREACH') {
if (!is.na(parallel.config[['TYPE']]) & !identical(parallel.config[['TYPE']], "NA")) {
eval(parse(text=paste("suppressPackageStartupMessages(require(", parallel.config[['TYPE']], "))")))
} else parallel.config[['TYPE']] <- "doParallel"
# if (parallel.config[['TYPE']]=="doMC" & is.null(parallel.config[['OPTIONS']][["preschedule"]])) {
# if (is.list(parallel.config[['OPTIONS']])) {
# parallel.config[['OPTIONS']][["preschedule"]]=FALSE
# } else parallel.config[['OPTIONS']]=list(preschedule=FALSE)
# }
if (parallel.config[['TYPE']]=="doParallel") {
if (.Platform$OS.type == "unix" & par.type == "OTHER") par.type <- 'MULTICORE'
if (.Platform$OS.type != "unix" & par.type == "OTHER") par.type <- 'SNOW'
if (par.type == 'MULTICORE' & is.null(parallel.config[['OPTIONS']][["preschedule"]])) {
if (is.list(parallel.config[['OPTIONS']])) {
parallel.config[['OPTIONS']][["preschedule"]]=FALSE
} else parallel.config[['OPTIONS']]=list(preschedule=FALSE)
}
} # END doParallel
foreach.options <- parallel.config[['OPTIONS']] # works fine if NULL
} # END FOREACH
# if (toupper(parallel.config[['BACKEND']]) == 'MULTICORE') {
# par.type <- 'MULTICORE'
# }
# if (toupper(parallel.config[['BACKEND']]) == 'SNOW') {
# par.type <- 'SNOW'
# }
if (toupper(parallel.config[['BACKEND']]) == 'PARALLEL') {
if (is.null(parallel.config[['TYPE']]) & !is.null(parallel.config[['SNOW_TEST']])) parallel.config[['TYPE']] <- 'PSOCK'
if (!is.null(parallel.config[['TYPE']])) {
if (!parallel.config[['TYPE']] %in% c('SOCK', 'PSOCK', 'MPI')) {
stop("The 'snow' package will be used when 'parallel.config$TYPE' is specified and BACKEND=='PARALLEL'. List element must be 'SOCK' ('PSOCK') or 'MPI'.")
}
par.type <- 'SNOW'
} else {
if (.Platform$OS.type == "unix") par.type <- 'MULTICORE'
if (.Platform$OS.type != "unix") par.type <- 'SNOW'; parallel.config[['TYPE']] <- 'PSOCK'
}
}
if (par.type == 'SNOW') {
if (is.null(parallel.config[['TYPE']])) stop("The 'parallel.config$TYPE' must be specified ('PSOCK' or 'MPI')")
if (!parallel.config[['TYPE']] %in% c('PSOCK','MPI', 'doParallel')) stop("The 'parallel.config$TYPE' must be 'PSOCK', 'MPI' or 'doParallel'.")
}
### Set up workers and spin up clusters / register workers
if (!is.null(parallel.config[['WORKERS']][[process]])) {
workers <- parallel.config[['WORKERS']][[process]]
} else workers <- parallel.config[['WORKERS']]
if (is.null(workers)) workers <- getOption("cores")
if (is.null(workers)) stop("parallel.config$WORKERS must, at a minimum, contain the number of parallel workers for all processes,
or getOption('cores') must be specified to use MULTICORE parallel processing.")
###
### Need this for all flavors - move to startParallel
###
if (process=='TAUS') {
if (workers > 3) {
if (workers %in% 4:10) {
tmp.sml <- ceiling((length(qr.taus) / workers)*0.75)
tmp.lrg <- ceiling((length(qr.taus)-(2*tmp.sml))/(workers-2))
chunk.size <- c(tmp.sml, rep(tmp.lrg, (workers-2)), tmp.sml)
if (sum(chunk.size) > length(qr.taus)) {
over <- (sum(chunk.size) - length(qr.taus)); index <- 0
while(over != 0) {
if (over %% 2 == 0) {
index <- index + 1
chunk.size[(length(chunk.size)-(index))] <- chunk.size[(length(chunk.size)-(index))]-1
} else chunk.size[(index + 1)] <- chunk.size[(index + 1)]-1
over <- over - 1
}
}
}
if (workers > 10) {
tmp.sml.a <- ceiling((length(qr.taus) / workers)*0.334)
tmp.sml.b <- ceiling((length(qr.taus) / workers)*0.666)
tmp.lrg <- ceiling((length(qr.taus)-(2*sum(tmp.sml.a, tmp.sml.b)))/(workers-4))
chunk.size <- c(tmp.sml.a, tmp.sml.b, rep(tmp.lrg, (workers-4)), tmp.sml.b, tmp.sml.a)
if (sum(chunk.size) > length(qr.taus)) {
over <- (sum(chunk.size) - length(qr.taus)); index <- 0
while(over != 0) {
if (over %% 2 != 0) {
index <- index + 1
chunk.size[(length(chunk.size)-(index + 1))] <- chunk.size[(length(chunk.size)-(index + 1))]-1
} else chunk.size[(index + 2)] <- chunk.size[(index + 2)]-1
over <- over -1
}
}
}
if (workers > length(qr.taus)) chunk.size <- rep(1, length(qr.taus))
} else chunk.size <- rep(ceiling(length(qr.taus) / workers), workers)
TAUS.LIST <- vector("list", workers)
count <- index <- 1
for (ch in chunk.size) {
TAUS.LIST[[index]] <- qr.taus[count:(count+ch-1)]
count <- (count+ch); index <- index + 1
}
if (sum(chunk.size) > length(qr.taus)) for(l in 1:length(TAUS.LIST)) TAUS.LIST[[l]] <- TAUS.LIST[[l]][!is.na(TAUS.LIST[[l]])]
}
###
### END to startParallel
###
if (toupper(parallel.config[['BACKEND']]) == 'FOREACH') {
if (parallel.config[['TYPE']]=="NA") {
registerDoSEQ() # prevents warning message
return(list(foreach.options=foreach.options, par.type=par.type, TAUS.LIST=TAUS.LIST))
}
# if (parallel.config[['TYPE']]=="doMC") {
# registerDoMC(workers)
# return(list(foreach.options=foreach.options, par.type=par.type, TAUS.LIST=TAUS.LIST))
# }
# if (parallel.config[['TYPE']]=='doMPI') {
# doPar.cl <- startMPIcluster(count=workers)
# registerDoMPI(doPar.cl)
# return(list(doPar.cl=doPar.cl, foreach.options=foreach.options, par.type=par.type))
# }
# if (parallel.config[['TYPE']]=='doRedis') {
# redisWorker('jobs', port=10187) # Doesn't seem to work. Maybe get rid of this option/flavor?
# registerDoRedis('jobs')
# startLocalWorkers(n=workers, queue='jobs')
# return(list(jobs='jobs', foreach.options=foreach.options, par.type=par.type))
# }
# if (parallel.config[['TYPE']]=='doSNOW') {
# doPar.cl=makeCluster(workers, type='SOCK')
# registerDoSNOW(doPar.cl)
# return(list(doPar.cl=doPar.cl, foreach.options=foreach.options, par.type=par.type))
# }
if (!is.null(parallel.config[['SNOW_TEST']])) par.type <- 'SNOW' # To test SNOW on Linux
if (parallel.config[['TYPE']]=="doParallel") {
if (par.type == 'SNOW') {
doPar.cl <- makePSOCKcluster(workers)
registerDoParallel(doPar.cl)
clusterEvalQ(doPar.cl, library(SGP))
# foreach.options <- list(attachExportEnv=TRUE)
return(list(doPar.cl=doPar.cl, foreach.options=foreach.options, par.type=par.type, TAUS.LIST=TAUS.LIST))
} else {
registerDoParallel(workers)
eval(parse(text=paste0("later:", "::ensureInitialized()")))
return(list(foreach.options=foreach.options, par.type=par.type, TAUS.LIST=TAUS.LIST))
}
}
} # END if (FOREACH)
if (par.type=='SNOW') {
# if (parallel.config[['TYPE']]=='MPI') {
# if (exists('par.start')) return() #don't try to restart a new config
# }
internal.cl <- makeCluster(eval(parse(text=workers)), type=parallel.config[['TYPE']]) # eval workers in case 'names' used
clusterEvalQ(internal.cl, library(SGP))
return(list(internal.cl=internal.cl, par.type=par.type, TAUS.LIST=TAUS.LIST)) # workers=workers,
}
if (par.type=='MULTICORE') {
eval(parse(text=paste0("later:", "::ensureInitialized()")))
return(list(workers=workers, par.type=par.type, TAUS.LIST=TAUS.LIST, Lower_Level_Parallel=Lower_Level_Parallel))
}
} ### END startParallel Funtion
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.