R/startParallel.R

Defines functions `startParallel`

`startParallel` <-
function(parallel.config,
	process,
	qr.taus) {

	if (any(toupper(parallel.config[['BACKEND']]) == 'MULTICORE' | toupper(parallel.config[['BACKEND']]) == 'SNOW')) {
		stop(paste('\n\t', parallel.config[['BACKEND']], "no longer supported. Please use the 'PARALLEL' package backend and R > 2.12 for parallel computation.\n"))
	}

	if (toupper(parallel.config[['BACKEND']]) == 'FOREACH' & !is.null(parallel.config[['TYPE']])) {
		if (parallel.config[['TYPE']] != "doParallel")
			stop(paste('\n\t', parallel.config[['TYPE']], "no longer supported. Please use doParallel and R > 2.12 for parallel computation.\n"))
	}

	workers <- NULL; par.type <- 'OTHER'; TAUS.LIST <- NULL

	if (!is.null(parallel.config[['CLUSTER.OBJECT']])) {
		if (!missing(qr.taus)) {
			workers <- length(eval(parse(text=parallel.config[['CLUSTER.OBJECT']])))
			chunk.size <- ceiling(length(qr.taus) / workers)
			TAUS.LIST <- vector("list", workers)
			for (chunk in 0:(workers-1)) {
				lower.index <- chunk*chunk.size+1
				upper.index <- min((chunk+1)*chunk.size, length(qr.taus))
				TAUS.LIST[[chunk+1]] <- qr.taus[lower.index:upper.index]
			}
		}
		clusterEvalQ(eval(parse(text=parallel.config[['CLUSTER.OBJECT']])), library(SGP))
		par.start <- list(internal.cl=eval(parse(text=parallel.config[['CLUSTER.OBJECT']])), par.type='SNOW')
		clusterExport(eval(parse(text=parallel.config[['CLUSTER.OBJECT']])), "par.start", envir=2)
		return(list(internal.cl=eval(parse(text=parallel.config[['CLUSTER.OBJECT']])),
			par.type='SNOW', TAUS.LIST=TAUS.LIST))
	}

	###  Basic checks - default to ANY percentiles or projections WORKERS.

	if (is.numeric(parallel.config[['WORKERS']])) {
		message(paste0("\n\tNOTE: ", process, " workers not specified.  Numeric value from WORKERS (", parallel.config[['WORKERS']], ") will be used for all processes.\n"))
		parallel.config[['WORKERS']][[process]] <- parallel.config[['WORKERS']]
	}
	if (is.null(parallel.config[['WORKERS']][[process]])) {
		if (!is.null(parallel.config[['WORKERS']])) {
			 tmp.indx <- grep(strsplit(process, "_")[[1]][2], names(parallel.config[['WORKERS']]))
			 if (any(!is.na(tmp.indx))) {
				 parallel.config[['WORKERS']][[process]] <- parallel.config[['WORKERS']][[tmp.indx]]
				 message(paste("\n\tNOTE: ", process, "workers not defined specifically.", names(parallel.config[['WORKERS']][tmp.indx]),
				 	"WORKERS will be used  (", parallel.config[['WORKERS']][tmp.indx], "worker processors)."))
			 }
		} # See if still NULL and stop:
		if (is.null(parallel.config[['WORKERS']][[process]])) {
			# stop(paste(process, "workers must be specified."))
			parallel.config[['WORKERS']][[process]] <- 1
			message(paste0("\n\tNOTE: ", process, " workers not specified!  WORKERS will be set to a single (1) process.\n"))
		}
	}

	Lower_Level_Parallel <- NULL
	if (all(c("PERCENTILES", "TAUS") %in% names(parallel.config[['WORKERS']]))) {
		# if (as.numeric(parallel.config[['WORKERS']][['PERCENTILES']])==1) {
		# 	Lower_Level_Parallel <- parallel.config
		# } else stop("Both TAUS and PERCENTILES cannot be executed in Parallel at the same time.")
		if (.Platform$OS.type != "unix" | "SNOW_TEST" %in% names(parallel.config)) stop("Both TAUS and PERCENTILES cannot be executed in Parallel at the same time in Windows OS or using SNOW type backends.")
		Lower_Level_Parallel <- parallel.config
	}
	if (all(c("PERCENTILES", "SIMEX") %in% names(parallel.config[['WORKERS']]))) {
		if (.Platform$OS.type != "unix" | "SNOW_TEST" %in% names(parallel.config)) stop("Both SIMEX and PERCENTILES cannot be executed in Parallel at the same time in Windows OS or using SNOW type backends.")
		Lower_Level_Parallel <- parallel.config
	}

	if (all(c("BASELINE_PERCENTILES", "TAUS") %in% names(parallel.config[['WORKERS']]))) {
		if (.Platform$OS.type != "unix" | "SNOW_TEST" %in% names(parallel.config)) stop("Both TAUS and BASELINE_PERCENTILES cannot be executed in Parallel at the same time in Windows OS or using SNOW type backends.")
		Lower_Level_Parallel <- parallel.config
	}
	if (all(c("BASELINE_PERCENTILES", "SIMEX") %in% names(parallel.config[['WORKERS']]))) {
		if (.Platform$OS.type != "unix" | "SNOW_TEST" %in% names(parallel.config)) stop("Both SIMEX and BASELINE_PERCENTILES cannot be executed in Parallel at the same time in Windows OS or using SNOW type backends.")
		Lower_Level_Parallel <- parallel.config
	}

	###  Basic configuration

	if (toupper(parallel.config[['BACKEND']]) == 'FOREACH') {
		if (!is.na(parallel.config[['TYPE']]) & !identical(parallel.config[['TYPE']], "NA")) {
			eval(parse(text=paste("suppressPackageStartupMessages(require(", parallel.config[['TYPE']], "))")))
		} else parallel.config[['TYPE']] <- "doParallel"

		# if (parallel.config[['TYPE']]=="doMC" & is.null(parallel.config[['OPTIONS']][["preschedule"]])) {
			# if (is.list(parallel.config[['OPTIONS']])) {
				# parallel.config[['OPTIONS']][["preschedule"]]=FALSE
			# }	else parallel.config[['OPTIONS']]=list(preschedule=FALSE)
		# }

		if (parallel.config[['TYPE']]=="doParallel") {
			if (.Platform$OS.type == "unix" & par.type == "OTHER") par.type <- 'MULTICORE'
			if (.Platform$OS.type != "unix" & par.type == "OTHER") par.type <- 'SNOW'
			if (par.type == 'MULTICORE' & is.null(parallel.config[['OPTIONS']][["preschedule"]])) {
				if (is.list(parallel.config[['OPTIONS']])) {
					parallel.config[['OPTIONS']][["preschedule"]]=FALSE
				}	else parallel.config[['OPTIONS']]=list(preschedule=FALSE)
			}
		} # END doParallel

		foreach.options <- parallel.config[['OPTIONS']] # works fine if NULL
	} #  END FOREACH

	# if (toupper(parallel.config[['BACKEND']]) == 'MULTICORE') {
		# par.type <- 'MULTICORE'
	# }

	# if (toupper(parallel.config[['BACKEND']]) == 'SNOW') {
		# par.type <- 'SNOW'
	# }

	if (toupper(parallel.config[['BACKEND']]) == 'PARALLEL') {
		if (is.null(parallel.config[['TYPE']]) & !is.null(parallel.config[['SNOW_TEST']])) parallel.config[['TYPE']] <- 'PSOCK'
		if (!is.null(parallel.config[['TYPE']])) {
			if (!parallel.config[['TYPE']] %in% c('SOCK', 'PSOCK', 'MPI')) {
				stop("The 'snow' package will be used when 'parallel.config$TYPE' is specified and BACKEND=='PARALLEL'.  List element must be 'SOCK' ('PSOCK') or 'MPI'.")
			}
			par.type <- 'SNOW'
		} else {
			if (.Platform$OS.type == "unix") par.type <- 'MULTICORE'
			if (.Platform$OS.type != "unix") par.type <- 'SNOW'; parallel.config[['TYPE']] <- 'PSOCK'
		}
	}

	if (par.type == 'SNOW') {
		if (is.null(parallel.config[['TYPE']])) stop("The 'parallel.config$TYPE' must be specified ('PSOCK' or 'MPI')")
		if (!parallel.config[['TYPE']] %in% c('PSOCK','MPI', 'doParallel')) stop("The 'parallel.config$TYPE' must be 'PSOCK', 'MPI' or 'doParallel'.")
	}


	###  Set up workers and spin up clusters / register workers

	if (!is.null(parallel.config[['WORKERS']][[process]])) {
		workers <- parallel.config[['WORKERS']][[process]]
	} else workers <- parallel.config[['WORKERS']]
	if (is.null(workers)) workers <- getOption("cores")
	if (is.null(workers)) stop("parallel.config$WORKERS must, at a minimum, contain the number of parallel workers for all processes,
		or getOption('cores') must be specified to use MULTICORE parallel processing.")

	###
	###  Need this for all flavors - move to startParallel
	###

	if (process=='TAUS') {
		if (workers > 3) {
			if (workers %in% 4:10) {
				tmp.sml <- ceiling((length(qr.taus) / workers)*0.75)
				tmp.lrg <- ceiling((length(qr.taus)-(2*tmp.sml))/(workers-2))
				chunk.size <- c(tmp.sml, rep(tmp.lrg, (workers-2)), tmp.sml)
				if (sum(chunk.size) > length(qr.taus)) {
					over <- (sum(chunk.size) - length(qr.taus)); index <- 0
					while(over != 0) {
						if (over %% 2 == 0) {
							index <- index + 1
							chunk.size[(length(chunk.size)-(index))] <- chunk.size[(length(chunk.size)-(index))]-1
						} else chunk.size[(index + 1)] <- chunk.size[(index + 1)]-1
						over <- over - 1
					}
				}
			}
			if (workers > 10) {
				tmp.sml.a <- ceiling((length(qr.taus) / workers)*0.334)
				tmp.sml.b <- ceiling((length(qr.taus) / workers)*0.666)
				tmp.lrg <- ceiling((length(qr.taus)-(2*sum(tmp.sml.a, tmp.sml.b)))/(workers-4))
				chunk.size <- c(tmp.sml.a, tmp.sml.b, rep(tmp.lrg, (workers-4)), tmp.sml.b, tmp.sml.a)
				if (sum(chunk.size) > length(qr.taus)) {
					over <- (sum(chunk.size) - length(qr.taus)); index <- 0
					while(over != 0) {
						if (over %% 2 != 0) {
							index <- index + 1
							chunk.size[(length(chunk.size)-(index + 1))] <- chunk.size[(length(chunk.size)-(index + 1))]-1
						} else chunk.size[(index + 2)] <- chunk.size[(index + 2)]-1
						over <- over -1
					}
				}
			}
			if (workers > length(qr.taus)) chunk.size <- rep(1, length(qr.taus))
		}	else chunk.size <- rep(ceiling(length(qr.taus) / workers), workers)

		TAUS.LIST <- vector("list", workers)
		count <- index <- 1
		for (ch in chunk.size) {
			TAUS.LIST[[index]] <- qr.taus[count:(count+ch-1)]
			count <- (count+ch); index <- index + 1
		}
		if (sum(chunk.size) > length(qr.taus)) for(l in 1:length(TAUS.LIST))  TAUS.LIST[[l]] <- TAUS.LIST[[l]][!is.na(TAUS.LIST[[l]])]
	}

	###
	### END to startParallel
	###

	if (toupper(parallel.config[['BACKEND']]) == 'FOREACH') {
		if (parallel.config[['TYPE']]=="NA") {
			registerDoSEQ() # prevents warning message
			return(list(foreach.options=foreach.options, par.type=par.type, TAUS.LIST=TAUS.LIST))
		}
		# if (parallel.config[['TYPE']]=="doMC") {
			# registerDoMC(workers)
			# return(list(foreach.options=foreach.options, par.type=par.type, TAUS.LIST=TAUS.LIST))
		# }
		# if (parallel.config[['TYPE']]=='doMPI') {
			# doPar.cl <- startMPIcluster(count=workers)
			# registerDoMPI(doPar.cl)
			# return(list(doPar.cl=doPar.cl, foreach.options=foreach.options, par.type=par.type))
		# }
		# if (parallel.config[['TYPE']]=='doRedis') {
			# redisWorker('jobs', port=10187) #  Doesn't seem to work.  Maybe get rid of this option/flavor?
			# registerDoRedis('jobs')
			# startLocalWorkers(n=workers, queue='jobs')
			# return(list(jobs='jobs', foreach.options=foreach.options, par.type=par.type))
		# }
		# if (parallel.config[['TYPE']]=='doSNOW') {
			# doPar.cl=makeCluster(workers, type='SOCK')
			# registerDoSNOW(doPar.cl)
			# return(list(doPar.cl=doPar.cl, foreach.options=foreach.options, par.type=par.type))
		# }
		if (!is.null(parallel.config[['SNOW_TEST']])) par.type <- 'SNOW' # To test SNOW on Linux
		if (parallel.config[['TYPE']]=="doParallel") {
			if (par.type == 'SNOW') {
				doPar.cl <- makePSOCKcluster(workers)
				registerDoParallel(doPar.cl)
				clusterEvalQ(doPar.cl, library(SGP))
				# foreach.options <- list(attachExportEnv=TRUE)
				return(list(doPar.cl=doPar.cl, foreach.options=foreach.options, par.type=par.type, TAUS.LIST=TAUS.LIST))
			} else {
				registerDoParallel(workers)
				eval(parse(text=paste0("later:", "::ensureInitialized()")))
				return(list(foreach.options=foreach.options, par.type=par.type, TAUS.LIST=TAUS.LIST))
			}
		}
	} # END if (FOREACH)

	if (par.type=='SNOW') {
		# if (parallel.config[['TYPE']]=='MPI') {
			# if (exists('par.start')) return() #don't try to restart a new config
		# }
		internal.cl <- makeCluster(eval(parse(text=workers)), type=parallel.config[['TYPE']]) # eval workers in case 'names' used
		clusterEvalQ(internal.cl, library(SGP))
		return(list(internal.cl=internal.cl, par.type=par.type, TAUS.LIST=TAUS.LIST)) #  workers=workers,
	}

	if (par.type=='MULTICORE') {
		eval(parse(text=paste0("later:", "::ensureInitialized()")))
		return(list(workers=workers, par.type=par.type, TAUS.LIST=TAUS.LIST, Lower_Level_Parallel=Lower_Level_Parallel))
	}
} ### END startParallel Funtion
CenterForAssessment/SGP documentation built on April 22, 2024, 3:16 p.m.