Nothing
#' Creates cluster for estimation.
#'
#' Splits data, creates cluster and loads different pieces of the database on each worker.
#'
#' Internal use only. Called by \code{apollo_estimate} before estimation. Using multiple cores greatly increases memory consumption.
#'
#' @param apollo_probabilities Function. Returns probabilities of the model to be estimated. Must receive three arguments:
#' \itemize{
#' \item \strong{\code{apollo_beta}}: Named numeric vector. Names and values of model parameters.
#' \item \strong{\code{apollo_inputs}}: List containing options of the model. See \link{apollo_validateInputs}.
#' \item \strong{\code{functionality}}: Character. Can be either \strong{\code{"components"}}, \strong{\code{"conditionals"}}, \strong{\code{"estimate"}} (default), \strong{\code{"gradient"}}, \strong{\code{"output"}}, \strong{\code{"prediction"}}, \strong{\code{"preprocess"}}, \strong{\code{"raw"}}, \strong{\code{"report"}}, \strong{\code{"shares_LL"}}, \strong{\code{"validate"}} or \strong{\code{"zero_LL"}}.
#' }
#' @param apollo_inputs List grouping most common inputs. Created by function \link{apollo_validateInputs}.
#' @param silent Boolean. If TRUE, no messages are printed to the terminal. FALSE by default. It overrides \code{apollo_inputs$silent}.
#' @param cleanMemory Boolean. If TRUE, it saves apollo_inputs to disc, and removes database and draws from
#' the apollo_inputs in .GlobalEnv and the parent environment.
#' @return Cluster (i.e. an object of class cluster from package parallel)
#' @export
apollo_makeCluster <- function(apollo_probabilities, apollo_inputs, silent=FALSE, cleanMemory=FALSE){
#### Split data ####
#apollo_control <- apollo_inputs[["apollo_control"]]
#database <- apollo_inputs[["database"]]
### change 27 July
###silent <- apollo_inputs$silent
if(silent==FALSE) silent <- apollo_inputs$silent
### end change
debug <- apollo_inputs$apollo_control$debug
### Extract useful elements
nObs <- nrow(apollo_inputs$database)
indivID <- apollo_inputs$database[,apollo_inputs$apollo_control$indivID]
namesID <- unique(indivID)
nIndiv <- length(namesID)
#nObsID <- as.vector(table(indivID))
nObsID <- rep(0, nIndiv)
for(n in 1:nIndiv) nObsID[n] <- sum(indivID==namesID[n])
mixing <- apollo_inputs$apollo_control$mixing
nCores <- apollo_inputs$apollo_control$nCores
### Figure out how to split the database per individual
#database <- database[order(indivID),]
#indivID <- database[,apollo_control$indivID] # update order
#namesID <- unique(indivID) # update order
#apollo_inputs$database <- database # update order
#rm(database); gc()
if(debug) apollo_print(paste0('Attempting to split data into ', nCores, ' pieces.'))
obj <- ceiling(nObs/nCores)
counter <- 0
currentCore <- 1
assignedCore <- rep(0, nObs)
assignedCoreIndiv <- rep(0, nIndiv)
i <- 1
for(n in 1:nIndiv){
assignedCore[i:(i+nObsID[n]-1)] <- currentCore
assignedCoreIndiv[n] <- currentCore
i <- i + nObsID[n]
counter <- counter + nObsID[n]
if(counter>=obj & currentCore<nCores){
currentCore <- currentCore + 1
counter <- 0
}
}
nCores <- max(assignedCore)
if(debug){
if(nCores!=apollo_inputs$apollo_control$nCores) apollo_print(paste0(nCores, ' workers (threads) will be used for estimation.'))
coreLoad <- setNames(rep(0, nCores), paste0('worker', 1:nCores))
for(i in 1:nCores) coreLoad[i] <- sum(assignedCore==i)
apollo_print(paste0('Obs. per worker (thread): ',paste0(coreLoad, collapse=", ")))
}
apollo_inputs$apollo_control$nCores <- nCores
rm(obj, counter, currentCore, i, n)
### Create names of temp files
inputPieceFile <- rep("", nCores)
for(i in 1:nCores) inputPieceFile[i] <- tempfile()
### Identify elements to split
# For lists, it looks at its first element
asLst <- c() # one element per individual
byObs <- c() # one row per observation
byInd <- c() # one row per individual
asIs <- c('apollo_scaling') # none of the above
for(e in ls(apollo_inputs)){ # e is name of element, E is the element
if(e %in% asIs) next
E <- apollo_inputs[[e]]
if(is.list(E) && length(E)==nIndiv) asLst <- c(asLst, e) else{
if(is.list(E)) E <- E[[1]]
if(is.array(E)) nRows <- dim(E)[1] else nRows <- length(E)
if(nRows==nObs) byObs <- c(byObs, e)
if(nRows==nIndiv) byInd <- c(byInd, e)
if(nRows!=nObs & nRows!=nIndiv) asIs <- c(asIs, e)
}
}; rm(e, E, nRows)
### Create and write to disk each fragment of apollo_inputs
if(debug) cat("Writing pieces to disk") # do not turn to apollo_print
for(i in 1:nCores){
L <- list()
# Elements to be kept unchanged
L <- apollo_inputs[ asIs ]
# Elements to be split by observations
if(length(byObs)>0) for(e in byObs){
E <- apollo_inputs[[e]]
r <- assignedCore==i
if(is.list(E) & !is.data.frame(E)){
L[[e]] <- lapply(E, apollo_keepRows, r=r)
} else L[[e]] <- apollo_keepRows(E, r)
}
# Elements to be split by individual
if(length(byInd)>0) for(e in byInd){
E <- apollo_inputs[[e]]
r <- assignedCoreIndiv==i
if(is.list(E) & !is.data.frame(E)){
L[[e]] <- lapply(E, apollo_keepRows, r=r)
} else L[[e]] <- apollo_keepRows(E, r)
}
# Elements to be split as list
if(length(asLst)>0) for(e in asLst){
E <- apollo_inputs[[e]]
r <- which(assignedCoreIndiv==i)
L[[e]] <- E[r]
}
# Write to disk
wroteOK <- tryCatch({
saveRDS(L, file=inputPieceFile[i])
TRUE
}, warning=function(w) FALSE, error=function(e) FALSE)
if(!wroteOK) stop("INTERNAL ISSUE - Apollo could not write data pieces to disk. This is necessary for multi-core processing to work.")
if(debug) cat(".")
if(debug && i==nCores) cat("\n")
}
if(debug) apollo_print(paste0("Writing completed. ", sum(gc()[,2]), 'MB of RAM in use.'))
#### Create cluster and load data ####
### Create cluster
if(!silent) apollo_print('Preparing workers for multithreading...')
if(Sys.info()["sysname"] == "Darwin"){
cl <- parallel::makeCluster(nCores, setup_strategy="sequential")
} else cl <- parallel::makeCluster(nCores)
### Delete draws and database from memory in main thread
if(cleanMemory){
# No backup of apollo_inputs is created, it is the caller responsibility to do that (e.g. apollo_estimate)
if(debug) cat('Cleaning memory in main thread...') # do not change to apollo_print
# Go over calling stack and delete apollo_inputs$database and apollo_inputs$draws wherever found
for(e in sys.frames()){
x <- tryCatch(get('apollo_inputs', envir=e, inherits=FALSE), error=function(e) NULL)
if(!is.null(x)){
if(!is.null(x$database)) x$database <- NULL
if(!is.null(x$draws )) x$draws <- NULL
assign('apollo_inputs', x, envir=e)
}
}
tmp <- globalenv()
x <- tryCatch(get('apollo_inputs', envir=tmp, inherits=FALSE), error=function(e) NULL)
if(!is.null(x)){
if(!is.null(x$database)) x$database <- NULL
if(!is.null(x$draws )) x$draws <- NULL
assign('apollo_inputs', x, envir=tmp)
}
gc()
if(debug) cat(' Done. ',sum(gc()[,2]),'MB of RAM in use.\n',sep='') # do not change to apollo_print
}
### Load libraries in the cluster (same as in workspace)
if(debug) cat('Loading libraries and likelihood function...') # do not change to apollo_print
excludePackages<- c('parallel')
loadedPackages <- search()
loadedPackages <- loadedPackages[grepl("^(package:)", loadedPackages)]
loadedPackages <- substr(loadedPackages, start=9, stop=100)
loadedPackages <- loadedPackages[!(loadedPackages %in% excludePackages)]
#if(!("apollo" %in% loadedPackages)) loadedPackages <- c(loadedPackages, "apollo")
if(length(loadedPackages)>0){
parallel::clusterCall(cl=cl, function(lib, path) {
.libPaths(path)
for(i in 1:length(lib)) library(lib[i],character.only=TRUE)
}, lib=loadedPackages, path=.libPaths())
}
### Copy functions in the workspace to the workers
# apollo_probabilities is copied separately to ensure it keeps its name
funcs <- as.vector(utils::lsf.str(envir=.GlobalEnv))
if(length(funcs)>0){ # remove functions created with Rcpp
isRcppF <- function(e){
if(is.function(e)) e <- body(e)
isVal <- is.symbol(e) || is.numeric(e) || is.character(e) || is.logical(e) || is.complex(e)
if(isVal) return(FALSE)
isExp <- is.expression(e) || is.call(e)
isCall <- length(e)>1 && is.symbol(e[[1]]) && as.character(e[[1]])==".Call"
if(isCall) return(TRUE)
if(isExp && !isCall && length(e)>=1){
for(i in 1:length(e)) if(!is.null(e[[i]])) if(isRcppF(e[[i]])) return(TRUE)
}
return(FALSE)
}
for(i in 1:length(funcs)) if(isRcppF(get(funcs[i], envir=.GlobalEnv))) funcs[i] <- ""
funcs <- funcs[funcs!=""]
}
parallel::clusterExport(cl, funcs, envir=.GlobalEnv)
parallel::clusterExport(cl, "apollo_probabilities", envir=environment())
if(debug){
mbRAM <- sum(gc()[,2])
if(nCores>1){
gcClusters <- parallel::clusterCall(cl, gc)
gcClusters <- Reduce('+',lapply(gcClusters, function(x) sum(x[,2])))
mbRAM <- mbRAM + gcClusters
}
cat(' Done. ',mbRAM,'MB of RAM in use.',sep='') # do not change to apollo_print
}
### Load apollo_input pieces in each worker
if(debug) cat("\nLoading data...") # do not change to apollo_print
parallel::parLapply(cl, inputPieceFile, fun=function(fileName){
tmp <- globalenv()
txt <- " This is necessary for multi-core processing to work."
if(!file.exists(fileName)) stop(paste0("INTERNAL ISSUE - A piece of data is missing from disk.",txt))
apollo_inputs_piece <- tryCatch(readRDS(fileName),
warning = function(w) FALSE,
error = function(e) FALSE)
if(is.logical(apollo_inputs_piece) && !apollo_inputs_piece) stop(paste0("INTERNAL ISSUE - A piece of data could not be loaded from disk.",txt))
assign("apollo_inputs", apollo_inputs_piece, envir=tmp)
})
unlink(inputPieceFile) # delete tmp files
### Report memory usage
if(debug){
mbRAM1 <- sum(gc()[,2])
mbRAM2 <- 0
if(nCores>1){
mbRAM2 <- parallel::clusterCall(cl, gc)
mbRAM2 <- Reduce('+',lapply(mbRAM2, function(x) sum(x[,2])))
}
cat(' Done. ', mbRAM1+mbRAM2, 'MB of RAM in use\n', sep='') # do not change to apollo_print
}
# The following two lines enumerate
# elements in the workspace of each worker.
#print(parallel::clusterEvalQ(cl, ls(apollo_inputs$draws)))
#parallel::clusterCall(cl, ls, envir=.GlobalEnv)
return(cl)
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.