Nothing
## Copyright (C) 2012-14 Marius Hofert and Martin Maechler
##
## This program is free software; you can redistribute it and/or modify it under
## the terms of the GNU General Public License as published by the Free Software
## Foundation; either version 2 of the License, or (at your option) any later
## version.
##
## This program is distributed in the hope that it will be useful, but WITHOUT
## ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
## FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
## details.
##
## You should have received a copy of the GNU General Public License along with
## this program; if not, see <http://www.gnu.org/licenses/>.
##' @title Function for Iterating Over All Subjobs (Non-Parallel)
##' @param vList list of variable specifications
##' @param seed repFirst: see subjob()
##' @param repFirst see subjob()
##' @param sfile see saveSim()
##' @param check see saveSim()
##' @param doAL see saveSim()
##' @param subjob. function for computing a subjob (one row of the virtual grid);
##' typically subjob()
##' @param doOne user-supplied function for computing one row of the (physical)
##' grid
##' @param ... additional arguments passed to subjob() (typically further
##' passed on to doOne())
##' @return the result of applying subjob() to all subjobs, converted with saveSim()
##' @author Marius Hofert and Martin Maechler
##' @note Works *sequentially*
##' { doLapply }
doLapply <- function(vList, seed="seq", repFirst=TRUE, sfile=NULL,
check=TRUE, doAL=TRUE, subjob.=subjob, monitor=FALSE,
doOne, ...)
{
if(!is.null(r <- maybeRead(sfile))) return(r)
stopifnot(is.function(subjob.), is.function(doOne))
if(!(is.null(seed) || any(is.na(seed)) || is.numeric(seed) ||
(is.list(seed) && all(vapply(seed, is.numeric, NA))) ||
is.character(seed) ))
stop(.invalid.seed.msg)
if(check) doCheck(doOne, vList, nChks=1, verbose=FALSE)
## monitor checks {here, not in subjob()!}
if(!(is.logical(monitor) || is.function(monitor)))
stop(gettextf("'monitor' must be logical or a function like %s",
'printInfo[["default"]]'))
## variables
pGrid <- mkGrid(vList)
ngr <- nrow(pGrid)
ng <- get.nonGrids(vList) # => n.sim >= 1
n.sim <- ng$n.sim # get n.sim
## actual work
res <- lapply(seq_len(ngr * n.sim), subjob.,
pGrid=pGrid, nonGrids = ng$nonGrids, repFirst=repFirst,
n.sim=n.sim, seed=seed, doOne=doOne, monitor=monitor, ...)
## convert result and save
saveSim(res, vList=vList, repFirst=repFirst, sfile=sfile, check=check, doAL=doAL)
}
##' { end } doLapply
##' @title Function for Iterating Over All Subjobs in Parallel Using Foreach
##' @param vList list of variable specifications
##' @param cluster cluster object, typically generated by makeCluster(), or NULL.
##' Only one of cluster or cores can be non-NULL.
##' @param cores number of cores or NULL. Only one of cluster or cores can be non-NULL.
##' @param block.size size of blocks of rows in the virtual grid which are computed
##' simultaneously
##' @param seed see subjob()
##' @param repFirst see subjob()
##' @param sfile see saveSim()
##' @param check see saveSim()
##' @param doAL see saveSim()
##' @param subjob. function for computing a subjob (one row of the virtual grid);
##' typically subjob()
##' @param doOne user-supplied function for computing one row of the (physical)
##' grid
##' @param extraPkgs character vector of packages to be made available on the nodes
##' @param exports character vector of functions to export
##' @param ... additional arguments passed to subjob() (typically further
##' passed on to doOne())
##' @return result of applying subjob() to all subjobs, converted with saveSim()
##' @author Marius Hofert and Martin Maechler
##' @note - works on multiple nodes or cores
##' - makeCluster type "MPI" is passed to snow => superseded according to CRAN
##' although https://mailman.stat.ethz.ch/pipermail/r-sig-hpc/2013-April/001652.html
##' { doForeach }
doForeach <-
function(vList, cluster=makeCluster(detectCores(), type="PSOCK"),
cores=NULL, block.size=1,
seed="seq", repFirst=TRUE,
sfile=NULL, check=TRUE, doAL=TRUE,
subjob.=subjob, monitor=FALSE, doOne,
extraPkgs=character(), exports=character(), ...)
{
## Unfortunately, imports() ends not finding 'iter' from pkg "iterators":
## --> rather strictly require things here:
getPkg <- function(pkg)
if(!require(pkg, character.only=TRUE))
stop(sprintf("You must install the CRAN package '%s' before you can use doForeach()",
pkg), call. = FALSE)
getPkg("foreach")
getPkg("doParallel")
if((is.null(cluster) && is.null(cores)) || (!is.null(cluster) && !is.null(cores)))
stop("Precisely one of 'cluster' or 'cores' has to be not NULL")
if(!is.null(r <- maybeRead(sfile))) return(r)
stopifnot(is.function(subjob.), is.function(doOne))
if(!(is.null(seed) || any(is.na(seed)) || is.numeric(seed) ||
(is.list(seed) && all(vapply(seed, is.numeric, NA))) ||
is.character(seed) ))
stop(.invalid.seed.msg)
if(check) doCheck(doOne, vList, nChks=1, verbose=FALSE)
if(!is.null(cluster)) on.exit(stopCluster(cluster)) # shut down cluster and execution environment
## monitor checks {here, not in subjob()!}
if(!(is.logical(monitor) || is.function(monitor)))
stop(gettextf("'monitor' must be logical or a function like %s",
'printInfo[["default"]]'))
## variables
pGrid <- mkGrid(vList)
ngr <- nrow(pGrid)
ng <- get.nonGrids(vList) # => n.sim >= 1
n.sim <- ng$n.sim
stopifnot(1 <= block.size, block.size <= n.sim)
if(n.sim %% block.size != 0) stop("block.size has to divide n.sim")
## Two main cases for parallel computing
if(!is.null(cores)) { # multiple cores
## ?registerDoParallel -> Details -> Unix + multiple cores => 'fork' is used
stopifnot(is.numeric(cores), length(cores) == 1)
registerDoParallel(cores=cores) # register doParallel to be used with foreach
} else registerDoParallel(cluster) # multiple nodes; register doParallel to be used with foreach
if(check) cat(sprintf("getDoParWorkers(): %d\n", getDoParWorkers()))
## actual work
n.block <- n.sim %/% block.size
i <- NULL ## <- required for R CMD check ...
res <- ul(foreach(i=seq_len(ngr * n.block),
.packages=c("simsalapar", extraPkgs),
.export=c(".Random.seed", "iter", "mkTimer", exports)) %dopar%
{
lapply(seq_len(block.size), function(k)
subjob.((i-1)*block.size+k, pGrid=pGrid,
nonGrids=ng$nonGrids, repFirst=repFirst,
n.sim=n.sim, seed=seed, doOne=doOne,
monitor=monitor, ...))})
## convert result and save
saveSim(res, vList, repFirst=repFirst, sfile=sfile, check=check, doAL=doAL)
}
##' { end } doForeach
##
## Because we must use foreach and doParallel packages via require() instead of importing,
## {trying the requireNamespace() + <pkg>::<obj> fails with Error
## unable to find variable "iter"
## Calls: S.T ... doForeach -> ul -> unlist -> %dopar% -> <Anonymous>
## }
## CRAN checks now (>= 3.2.0) need this -- unfortunately:
utils::globalVariables(c("registerDoParallel", "getDoParWorkers",
"%dopar%", "foreach"))
##' @title Function for Iterating Over All Subjobs in Parallel Using Rmpi
##' @param vList list of variable specifications
##' @param nslaves number of workers (passed to mpi.spawn.Rslaves)
##' @param load.balancing logical indicating whether to use mpi.applyLB() instead of mpi.apply()
##' @param block.size size of blocks of rows in the virtual grid which are computed
##' simultaneously
##' @param seed see subjob()
##' @param repFirst see subjob()
##' @param sfile see saveSim()
##' @param check see saveSim()
##' @param doAL see saveSim()
##' @param subjob. function for computing a subjob (one row of the virtual grid);
##' typically subjob()
##' @param doOne user-supplied function for computing one row of the (physical)
##' grid
##' @param exports vector of objects to export
##' @param ... additional arguments passed to subjob() (typically further
##' passed on to doOne())
##' @return the result of applying subjob() to all subjobs, converted with saveSim()
##' @author Marius Hofert and Martin Maechler
##' @note Works on multiple nodes or cores
##' Email from Rmpi maintainer (hyu@stats.uwo.ca) on 2013-06-10:
##' If you are using OpenMPI, then mpi.universe.size() will always return 1
##' unless R is launched through mpirun.
##' Yes. You can use the option nslaves to launch workers as many as you want.
##' How those worker processes assigned to nodes/cores are controlled by
##' OpenMPI (different MPIs have different ways of assigning worker processes
##' but most recycle available notes/cores). In your cases, you probably
##' choose nslaves=4 so that all cores are running in parallel. However,
##' setting nslaves to be higher than the available notes/codes achieves some
##' kind loading balancing. For example, nslaves = 8 essentially spreads an
##' entire job into 8 small ones instead of 4 small ones. This gives some
##' advantages if one of the original 4 small jobs runs much longer than
##' others.
##' mpi.universe.size() # => 1; the total number of CPUs available in a cluster
##' mpi.spawn.Rslaves() # spawn as many workers as the MPI environment knows (=> 1 manager, 1 worker)
##' mpi.close.Rslaves()
##' mpi.spawn.Rslaves(nslaves=17) # spawn more workers than possible (?) (=> 1
##' manager, 17 workers) => calculations are still
##' only done on the max. available cores; see
##' test script http://collaborate.bu.edu/linga/ParallelMCMC
##' mpi.close.Rslaves()
##' Note: spawning more workers than available may lead to errors (MH)
##' { doRmpi }
doRmpi <- function(vList,
nslaves = if((sz <- Rmpi::mpi.universe.size()) <= 1) detectCores()
else sz,
load.balancing=TRUE, block.size=1, seed="seq", repFirst=TRUE,
sfile=NULL, check=TRUE, doAL=TRUE, subjob.=subjob, monitor=FALSE,
doOne, exports=character(), ...)
{
## see http://cran.r-project.org/doc/manuals/r-devel/R-exts.html#Suggested-packages
if(!requireNamespace("Rmpi", quietly=TRUE))
stop("You must install the CRAN package 'Rmpi' before you can use doRmpi()")
if(!is.null(r <- maybeRead(sfile))) return(r)
stopifnot(is.function(subjob.), is.function(doOne))
if(!(is.null(seed) || any(is.na(seed)) || is.numeric(seed) ||
(is.list(seed) && all(vapply(seed, is.numeric, NA))) ||
is.character(seed) ))
stop(.invalid.seed.msg)
if(check) doCheck(doOne, vList, nChks=1, verbose=FALSE)
## monitor checks {here, not in subjob()!}
if(!(is.logical(monitor) || is.function(monitor)))
stop(gettextf("'monitor' must be logical or a function like %s",
'printInfo[["default"]]'))
## variables
pGrid <- mkGrid(vList)
ngr <- nrow(pGrid)
ng <- get.nonGrids(vList) # => n.sim >= 1
n.sim <- ng$n.sim
stopifnot(1 <= block.size, block.size <= n.sim)
if(n.sim %% block.size != 0) stop("block.size has to divide n.sim")
## use as many workers as available
## Note: mpi.comm.size(comm) returns the total number of members in a comm
comm <- 1 ## communicator number
if (!Rmpi::mpi.comm.size(comm)) ## <==> no workers are running
Rmpi::mpi.spawn.Rslaves(nslaves=nslaves)
## quiet = TRUE would omit successfully spawned workers
on.exit(Rmpi::mpi.close.Rslaves()) # close workers spawned by mpi.spawn.Rslaves()
## pass global required objects to cluster (required by mpi.apply())
Rmpi::mpi.bcast.Robj2slave(.Random.seed)
Rmpi::mpi.bcast.Robj2slave(mkTimer)
for(e in exports) {
ee <- substitute(Rmpi::mpi.bcast.Robj2slave(EXP), list(EXP = as.symbol(e)))
eval(ee)
}
## instead of initExpr, this needs a 'initFunction' + 'initArgs'
## if(!missing(initExpr)) do.call(mpi.bcast.cmd, c(list(initFunction), ...))
## actual work
n.block <- n.sim %/% block.size
res <- ul((if(load.balancing) Rmpi::mpi.applyLB else Rmpi::mpi.apply)(
seq_len(ngr * n.block), function(i)
lapply(seq_len(block.size), function(k)
subjob.((i-1)*block.size+k, pGrid=pGrid,
nonGrids=ng$nonGrids, repFirst=repFirst,
n.sim=n.sim, seed=seed, doOne=doOne, monitor=monitor, ...))))
## convert result and save
saveSim(res, vList, repFirst=repFirst, sfile=sfile, check=check, doAL=doAL)
}
##' { end } doRmpi
##' @title Function for Iterating Over All Subjobs in Parallel Using mclapply()
##' @param vList list of variable specifications
##' @param cores number of cores
##' @param load.balancing logical indicating whether to use mpi.applyLB() instead of mpi.apply()
##' @param block.size size of blocks of rows in the virtual grid which are computed
##' simultaneously
##' @param seed see subjob()
##' @param repFirst see subjob()
##' @param sfile see saveSim()
##' @param check see saveSim()
##' @param doAL see saveSim()
##' @param subjob. function for computing a subjob (one row of the virtual grid);
##' typically subjob()
##' @param doOne user-supplied function for computing one row of the (physical)
##' grid
##' @param ... additional arguments passed to subjob() (typically further
##' passed on to doOne())
##' @return the result of applying subjob() to all subjobs, converted with saveSim()
##' @author Marius Hofert and Martin Maechler
##' @note Works on multiple cores (but runs *sequentially* on Windows)
##' { doMclapply }
doMclapply <-
function(vList,
cores = if(.Platform$OS.type == "windows") 1 else detectCores(),
load.balancing=TRUE, block.size=1, seed="seq", repFirst=TRUE,
sfile=NULL, check=TRUE, doAL=TRUE, subjob.=subjob,
monitor=FALSE, doOne, ...)
{
if(!is.null(r <- maybeRead(sfile))) return(r)
stopifnot(is.function(subjob.), is.function(doOne))
if(!(is.null(seed) || any(is.na(seed)) || is.numeric(seed) ||
(is.list(seed) && all(vapply(seed, is.numeric, NA))) ||
is.character(seed) ))
stop(.invalid.seed.msg)
if(check) doCheck(doOne, vList, nChks=1, verbose=FALSE)
## variables
pGrid <- mkGrid(vList)
ngr <- nrow(pGrid)
ng <- get.nonGrids(vList) # => n.sim >= 1
n.sim <- ng$n.sim
stopifnot(1 <= block.size, block.size <= n.sim)
if(n.sim %% block.size != 0) stop("block.size has to divide n.sim")
## monitor checks
if(!(is.logical(monitor) || is.function(monitor)))
stop(gettextf("'monitor' must be logical or a function like %s",
'printInfo[["default"]]'))
## actual work
n.block <- n.sim %/% block.size
res <- ul(mclapply(seq_len(ngr * n.block), function(i)
lapply(seq_len(block.size), function(k)
subjob.((i-1)*block.size+k, pGrid=pGrid,
nonGrids=ng$nonGrids, repFirst=repFirst,
n.sim=n.sim, seed=seed, doOne=doOne,
monitor=monitor, ...)),
mc.cores = cores,
mc.preschedule = !load.balancing, mc.set.seed=FALSE))
## convert result and save
saveSim(res, vList, repFirst=repFirst, sfile=sfile, check=check, doAL=doAL)
}
##' { end } doMclapply
##' @title Function for Iterating Over All Subjobs in Parallel Using clusterApply()
##' @param vList list of variable specifications
##' @param cluster cluster object, typically generated by makeCluster()
##' @param load.balancing logical indicating whether to use clusterApplyLB()
##' instead of clusterApply()
##' @param block.size size of blocks of rows in the virtual grid which are computed
##' simultaneously
##' @param seed see subjob()
##' @param repFirst see subjob()
##' @param sfile see saveSim()
##' @param check see saveSim()
##' @param doAL see saveSim()
##' @param subjob. function for computing a subjob (one row of the virtual grid);
##' typically subjob()
##' @param doOne user-supplied function for computing one row of the (physical)
##' grid
##' @param initExpr expression initially evaluated on the cluster (can be missing)
##' @param ... additional arguments passed to subjob() (typically further
##' passed on to doOne())
##' @return the result of applying subjob() to all subjobs, converted with saveSim()
##' @author Marius Hofert and Martin Maechler
##' @note - works on multiple nodes or cores
##' - makeCluster type "MPI" is passed to snow => superseded according to CRAN
##' although https://mailman.stat.ethz.ch/pipermail/r-sig-hpc/2013-April/001652.html
##' { doClusterApply }
doClusterApply <-
function(vList, cluster=makeCluster(detectCores(), type="PSOCK"),
load.balancing=TRUE, block.size=1, seed="seq", repFirst=TRUE,
sfile=NULL, check=TRUE, doAL=TRUE, subjob.=subjob, monitor=FALSE,
doOne, initExpr, exports=character(), ...)
{
if(!is.null(r <- maybeRead(sfile))) return(r)
stopifnot(is.function(subjob.), is.function(doOne))
if(!(is.null(seed) || any(is.na(seed)) || is.numeric(seed) ||
(is.list(seed) && all(vapply(seed, is.numeric, NA))) ||
is.character(seed) ))
stop(.invalid.seed.msg)
if(check) doCheck(doOne, vList, nChks=1, verbose=FALSE)
on.exit(stopCluster(cluster)) # shut down cluster and execution environment
## variables
pGrid <- mkGrid(vList)
ngr <- nrow(pGrid)
ng <- get.nonGrids(vList) # => n.sim >= 1
n.sim <- ng$n.sim
stopifnot(1 <= block.size, block.size <= n.sim)
if(n.sim %% block.size != 0) stop("block.size has to divide n.sim")
## monitor checks
if(!(is.logical(monitor) || is.function(monitor)))
stop(gettextf("'monitor' must be logical or a function like %s",
'printInfo[["default"]]'))
clusterExport(cluster, varlist=c(".Random.seed", "mkTimer", exports))
if(!missing(initExpr)) clusterCall(cluster, eval, substitute(initExpr))
## actual work
n.block <- n.sim %/% block.size
res <- ul((if(load.balancing) clusterApplyLB else clusterApply)(
cluster, seq_len(ngr * n.block), function(i)
lapply(seq_len(block.size), function(k)
subjob.((i-1)*block.size+k, pGrid=pGrid,
nonGrids=ng$nonGrids, repFirst=repFirst,
n.sim=n.sim, seed=seed, doOne=doOne, monitor=monitor, ...))))
## convert result and save
saveSim(res, vList, repFirst=repFirst, sfile=sfile, check=check, doAL=doAL)
}
##' { end } doClusterApply
##' Function for comparing do*Apply() results:
##' { doRes.equal }
doRes.equal <- function(x,y, tol=1e-15, ...)
all.equal(lapply(x, `[`, 1:3),
lapply(y, `[`, 1:3), tol=tol, ...)
##' { end }
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.