R/doApply.R

Defines functions doRes.equal doClusterApply doMclapply doRmpi doForeach doLapply

Documented in doClusterApply doForeach doLapply doMclapply doRes.equal doRmpi

## Copyright (C) 2012-14 Marius Hofert and Martin Maechler
##
## This program is free software; you can redistribute it and/or modify it under
## the terms of the GNU General Public License as published by the Free Software
## Foundation; either version 2 of the License, or (at your option) any later
## version.
##
## This program is distributed in the hope that it will be useful, but WITHOUT
## ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
## FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
## details.
##
## You should have received a copy of the GNU General Public License along with
## this program; if not, see <http://www.gnu.org/licenses/>.

##' @title Function for Iterating Over All Subjobs (Non-Parallel)
##' @param vList list of variable specifications
##' @param seed repFirst: see subjob()
##' @param repFirst see subjob()
##' @param sfile see saveSim()
##' @param check see saveSim()
##' @param doAL see saveSim()
##' @param subjob. function for computing a subjob (one row of the virtual grid);
##'        typically subjob()
##' @param doOne user-supplied function for computing one row of the (physical)
##'        grid
##' @param ... additional arguments passed to subjob() (typically further
##'        passed on to doOne())
##' @return the result of applying subjob() to all subjobs, converted with saveSim()
##' @author Marius Hofert and Martin Maechler
##' @note Works *sequentially*
##' { doLapply }
doLapply <- function(vList, seed="seq", repFirst=TRUE, sfile=NULL,
                     check=TRUE, doAL=TRUE, subjob.=subjob, monitor=FALSE,
                     doOne, ...)
{
    if(!is.null(r <- maybeRead(sfile))) return(r)
    stopifnot(is.function(subjob.), is.function(doOne))
    if(!(is.null(seed) || any(is.na(seed)) || is.numeric(seed) ||
         (is.list(seed) && all(vapply(seed, is.numeric, NA))) ||
         is.character(seed) ))
        stop(.invalid.seed.msg)
    if(check) doCheck(doOne, vList, nChks=1, verbose=FALSE)

    ## monitor checks {here, not in subjob()!}
    if(!(is.logical(monitor) || is.function(monitor)))
        stop(gettextf("'monitor' must be logical or a function like %s",
                      'printInfo[["default"]]'))

    ## variables
    pGrid <- mkGrid(vList)
    ngr <- nrow(pGrid)
    ng <- get.nonGrids(vList) # => n.sim >= 1
    n.sim <- ng$n.sim # get n.sim

    ## actual work
    res <- lapply(seq_len(ngr * n.sim), subjob.,
                  pGrid=pGrid, nonGrids = ng$nonGrids, repFirst=repFirst,
                  n.sim=n.sim, seed=seed, doOne=doOne, monitor=monitor, ...)

    ## convert result and save
    saveSim(res, vList=vList, repFirst=repFirst, sfile=sfile, check=check, doAL=doAL)
}
##' { end } doLapply

##' @title Function for Iterating Over All Subjobs in Parallel Using Foreach
##' @param vList list of variable specifications
##' @param cluster cluster object, typically generated by makeCluster(), or NULL.
##'        Only one of cluster or cores can be non-NULL.
##' @param cores number of cores or NULL. Only one of cluster or cores can be non-NULL.
##' @param block.size size of blocks of rows in the virtual grid which are computed
##'        simultaneously
##' @param seed see subjob()
##' @param repFirst see subjob()
##' @param sfile see saveSim()
##' @param check see saveSim()
##' @param doAL see saveSim()
##' @param subjob. function for computing a subjob (one row of the virtual grid);
##'        typically subjob()
##' @param doOne user-supplied function for computing one row of the (physical)
##'        grid
##' @param extraPkgs character vector of packages to be made available on the nodes
##' @param exports character vector of functions to export
##' @param ... additional arguments passed to subjob() (typically further
##'        passed on to doOne())
##' @return result of applying subjob() to all subjobs, converted with saveSim()
##' @author Marius Hofert and Martin Maechler
##' @note - works on multiple nodes or cores
##'       - makeCluster type "MPI" is passed to snow => superseded according to CRAN
##'         although https://mailman.stat.ethz.ch/pipermail/r-sig-hpc/2013-April/001652.html
##' { doForeach }
doForeach <-
    function(vList, cluster=makeCluster(detectCores(), type="PSOCK"),
             cores=NULL, block.size=1,
             seed="seq", repFirst=TRUE,
             sfile=NULL, check=TRUE, doAL=TRUE,
             subjob.=subjob, monitor=FALSE, doOne,
             extraPkgs=character(), exports=character(), ...)
{
    ## Unfortunately, imports() ends not finding 'iter' from pkg "iterators":
    ## --> rather strictly require things here:
    getPkg <- function(pkg)
	if(!require(pkg, character.only=TRUE))
	    stop(sprintf("You must install the CRAN package '%s' before you can use doForeach()",
			 pkg), call. = FALSE)
    getPkg("foreach")
    getPkg("doParallel")

    if((is.null(cluster) && is.null(cores)) || (!is.null(cluster) && !is.null(cores)))
        stop("Precisely one of 'cluster' or 'cores' has to be not NULL")
    if(!is.null(r <- maybeRead(sfile))) return(r)
    stopifnot(is.function(subjob.), is.function(doOne))
    if(!(is.null(seed) || any(is.na(seed)) || is.numeric(seed) ||
         (is.list(seed) && all(vapply(seed, is.numeric, NA))) ||
         is.character(seed) ))
        stop(.invalid.seed.msg)
    if(check) doCheck(doOne, vList, nChks=1, verbose=FALSE)
    if(!is.null(cluster)) on.exit(stopCluster(cluster)) # shut down cluster and execution environment

    ## monitor checks {here, not in subjob()!}
    if(!(is.logical(monitor) || is.function(monitor)))
        stop(gettextf("'monitor' must be logical or a function like %s",
                      'printInfo[["default"]]'))

    ## variables
    pGrid <- mkGrid(vList)
    ngr <- nrow(pGrid)
    ng <- get.nonGrids(vList) # => n.sim >= 1
    n.sim <- ng$n.sim
    stopifnot(1 <= block.size, block.size <= n.sim)
    if(n.sim %% block.size != 0) stop("block.size has to divide n.sim")

    ## Two main cases for parallel computing
    if(!is.null(cores)) { # multiple cores
        ## ?registerDoParallel -> Details -> Unix + multiple cores => 'fork' is used
        stopifnot(is.numeric(cores), length(cores) == 1)
        registerDoParallel(cores=cores) # register doParallel to be used with foreach
    } else registerDoParallel(cluster) # multiple nodes; register doParallel to be used with foreach
    if(check) cat(sprintf("getDoParWorkers(): %d\n", getDoParWorkers()))

    ## actual work
    n.block <- n.sim %/% block.size
    i <- NULL ## <- required for R CMD check ...
    res <- ul(foreach(i=seq_len(ngr * n.block),
                      .packages=c("simsalapar", extraPkgs),
                      .export=c(".Random.seed", "iter", "mkTimer", exports)) %dopar%
          {
              lapply(seq_len(block.size), function(k)
                     subjob.((i-1)*block.size+k, pGrid=pGrid,
                             nonGrids=ng$nonGrids, repFirst=repFirst,
                             n.sim=n.sim, seed=seed, doOne=doOne,
                             monitor=monitor, ...))})
    ## convert result and save
    saveSim(res, vList, repFirst=repFirst, sfile=sfile, check=check, doAL=doAL)
}
##' { end } doForeach
##
## Because we must use foreach and doParallel packages via require() instead of importing,
## {trying the requireNamespace() + <pkg>::<obj> fails with Error
##   unable to find variable "iter"
##   Calls: S.T ... doForeach -> ul -> unlist -> %dopar% -> <Anonymous>
## }
## CRAN checks now (>= 3.2.0) need this -- unfortunately:
utils::globalVariables(c("registerDoParallel", "getDoParWorkers",
                         "%dopar%", "foreach"))



##' @title Function for Iterating Over All Subjobs in Parallel Using Rmpi
##' @param vList list of variable specifications
##' @param nslaves number of workers (passed to mpi.spawn.Rslaves)
##' @param load.balancing logical indicating whether to use mpi.applyLB() instead of mpi.apply()
##' @param block.size size of blocks of rows in the virtual grid which are computed
##'        simultaneously
##' @param seed see subjob()
##' @param repFirst see subjob()
##' @param sfile see saveSim()
##' @param check see saveSim()
##' @param doAL see saveSim()
##' @param subjob. function for computing a subjob (one row of the virtual grid);
##'        typically subjob()
##' @param doOne user-supplied function for computing one row of the (physical)
##'        grid
##' @param exports vector of objects to export
##' @param ... additional arguments passed to subjob() (typically further
##'        passed on to doOne())
##' @return the result of applying subjob() to all subjobs, converted with saveSim()
##' @author Marius Hofert and Martin Maechler
##' @note Works on multiple nodes or cores
##' Email from Rmpi maintainer (hyu@stats.uwo.ca) on 2013-06-10:
##' If you are using OpenMPI, then mpi.universe.size() will always return 1
##' unless R is launched through mpirun.
##' Yes. You can use the option nslaves to launch workers as many as you want.
##' How those worker processes assigned to nodes/cores are controlled by
##' OpenMPI (different MPIs have different ways of assigning worker processes
##' but most recycle available notes/cores). In your cases, you probably
##' choose nslaves=4 so that all cores are running in parallel. However,
##' setting nslaves to be higher than the available notes/codes achieves some
##' kind loading balancing. For example, nslaves = 8 essentially spreads an
##' entire job into 8 small ones instead of 4 small ones. This gives some
##' advantages if one of the original 4 small jobs runs much longer than
##' others.
##' mpi.universe.size() # => 1; the total number of CPUs available in a cluster
##' mpi.spawn.Rslaves() # spawn as many workers as the MPI environment knows (=> 1 manager, 1 worker)
##' mpi.close.Rslaves()
##' mpi.spawn.Rslaves(nslaves=17) # spawn more workers than possible (?) (=> 1
##'                                 manager, 17 workers) => calculations are still
##'                                 only done on the max. available cores; see
##'                                 test script http://collaborate.bu.edu/linga/ParallelMCMC
##' mpi.close.Rslaves()
##' Note: spawning more workers than available may lead to errors (MH)
##' { doRmpi }
doRmpi <- function(vList,
                   nslaves = if((sz <- Rmpi::mpi.universe.size()) <= 1) detectCores()
                             else sz,
                   load.balancing=TRUE, block.size=1, seed="seq", repFirst=TRUE,
                   sfile=NULL, check=TRUE, doAL=TRUE, subjob.=subjob, monitor=FALSE,
                   doOne, exports=character(), ...)
{
    ## see http://cran.r-project.org/doc/manuals/r-devel/R-exts.html#Suggested-packages
    if(!requireNamespace("Rmpi", quietly=TRUE))
        stop("You must install the CRAN package 'Rmpi' before you can use doRmpi()")

    if(!is.null(r <- maybeRead(sfile))) return(r)
    stopifnot(is.function(subjob.), is.function(doOne))
    if(!(is.null(seed) || any(is.na(seed)) || is.numeric(seed) ||
         (is.list(seed) && all(vapply(seed, is.numeric, NA))) ||
         is.character(seed) ))
        stop(.invalid.seed.msg)
    if(check) doCheck(doOne, vList, nChks=1, verbose=FALSE)

    ## monitor checks {here, not in subjob()!}
    if(!(is.logical(monitor) || is.function(monitor)))
        stop(gettextf("'monitor' must be logical or a function like %s",
                      'printInfo[["default"]]'))

    ## variables
    pGrid <- mkGrid(vList)
    ngr <- nrow(pGrid)
    ng <- get.nonGrids(vList) # => n.sim >= 1
    n.sim <- ng$n.sim
    stopifnot(1 <= block.size, block.size <= n.sim)
    if(n.sim %% block.size != 0) stop("block.size has to divide n.sim")

    ## use as many workers as available
    ## Note: mpi.comm.size(comm) returns the total number of members in a comm
    comm <- 1  ## communicator number
    if (!Rmpi::mpi.comm.size(comm)) ## <==> no workers are running
        Rmpi::mpi.spawn.Rslaves(nslaves=nslaves)
    ## quiet = TRUE would omit successfully spawned workers
    on.exit(Rmpi::mpi.close.Rslaves()) # close workers spawned by mpi.spawn.Rslaves()
    ## pass global required objects to cluster (required by mpi.apply())
    Rmpi::mpi.bcast.Robj2slave(.Random.seed)
    Rmpi::mpi.bcast.Robj2slave(mkTimer)
    for(e in exports) {
        ee <- substitute(Rmpi::mpi.bcast.Robj2slave(EXP), list(EXP = as.symbol(e)))
        eval(ee)
    }

    ## instead of initExpr, this needs a 'initFunction' + 'initArgs'
    ## if(!missing(initExpr)) do.call(mpi.bcast.cmd, c(list(initFunction), ...))

    ## actual work
    n.block <- n.sim %/% block.size
    res <- ul((if(load.balancing) Rmpi::mpi.applyLB else Rmpi::mpi.apply)(
        seq_len(ngr * n.block), function(i)
        lapply(seq_len(block.size), function(k)
            subjob.((i-1)*block.size+k, pGrid=pGrid,
                    nonGrids=ng$nonGrids, repFirst=repFirst,
                    n.sim=n.sim, seed=seed, doOne=doOne, monitor=monitor, ...))))

    ## convert result and save
    saveSim(res, vList, repFirst=repFirst, sfile=sfile, check=check, doAL=doAL)
}
##' { end } doRmpi


##' @title Function for Iterating Over All Subjobs in Parallel Using mclapply()
##' @param vList list of variable specifications
##' @param cores number of cores
##' @param load.balancing logical indicating whether to use mpi.applyLB() instead of mpi.apply()
##' @param block.size size of blocks of rows in the virtual grid which are computed
##'        simultaneously
##' @param seed see subjob()
##' @param repFirst see subjob()
##' @param sfile see saveSim()
##' @param check see saveSim()
##' @param doAL see saveSim()
##' @param subjob. function for computing a subjob (one row of the virtual grid);
##'        typically subjob()
##' @param doOne user-supplied function for computing one row of the (physical)
##'        grid
##' @param ... additional arguments passed to subjob() (typically further
##'        passed on to doOne())
##' @return the result of applying subjob() to all subjobs, converted with saveSim()
##' @author Marius Hofert and Martin Maechler
##' @note Works on multiple cores (but runs *sequentially* on Windows)
##' { doMclapply }
doMclapply <-
    function(vList,
             cores = if(.Platform$OS.type == "windows") 1 else detectCores(),
             load.balancing=TRUE, block.size=1, seed="seq", repFirst=TRUE,
             sfile=NULL, check=TRUE, doAL=TRUE, subjob.=subjob,
             monitor=FALSE, doOne, ...)
{
    if(!is.null(r <- maybeRead(sfile))) return(r)
    stopifnot(is.function(subjob.), is.function(doOne))
    if(!(is.null(seed) || any(is.na(seed)) || is.numeric(seed) ||
         (is.list(seed) && all(vapply(seed, is.numeric, NA))) ||
         is.character(seed) ))
        stop(.invalid.seed.msg)
    if(check) doCheck(doOne, vList, nChks=1, verbose=FALSE)

    ## variables
    pGrid <- mkGrid(vList)
    ngr <- nrow(pGrid)
    ng <- get.nonGrids(vList) # => n.sim >= 1
    n.sim <- ng$n.sim
    stopifnot(1 <= block.size, block.size <= n.sim)
    if(n.sim %% block.size != 0) stop("block.size has to divide n.sim")

    ## monitor checks
    if(!(is.logical(monitor) || is.function(monitor)))
        stop(gettextf("'monitor' must be logical or a function like %s",
                      'printInfo[["default"]]'))

    ## actual work
    n.block <- n.sim %/% block.size
    res <- ul(mclapply(seq_len(ngr * n.block), function(i)
                       lapply(seq_len(block.size), function(k)
                              subjob.((i-1)*block.size+k, pGrid=pGrid,
                                      nonGrids=ng$nonGrids, repFirst=repFirst,
                                      n.sim=n.sim, seed=seed, doOne=doOne,
                                      monitor=monitor, ...)),
                       mc.cores = cores,
                       mc.preschedule = !load.balancing, mc.set.seed=FALSE))

    ## convert result and save
    saveSim(res, vList, repFirst=repFirst, sfile=sfile, check=check, doAL=doAL)
}
##' { end } doMclapply

##' @title Function for Iterating Over All Subjobs in Parallel Using clusterApply()
##' @param vList list of variable specifications
##' @param cluster cluster object, typically generated by makeCluster()
##' @param load.balancing logical indicating whether to use clusterApplyLB()
##'        instead of clusterApply()
##' @param block.size size of blocks of rows in the virtual grid which are computed
##'        simultaneously
##' @param seed see subjob()
##' @param repFirst see subjob()
##' @param sfile see saveSim()
##' @param check see saveSim()
##' @param doAL see saveSim()
##' @param subjob. function for computing a subjob (one row of the virtual grid);
##'        typically subjob()
##' @param doOne user-supplied function for computing one row of the (physical)
##'        grid
##' @param initExpr expression initially evaluated on the cluster (can be missing)
##' @param ... additional arguments passed to subjob() (typically further
##'        passed on to doOne())
##' @return the result of applying subjob() to all subjobs, converted with saveSim()
##' @author Marius Hofert and Martin Maechler
##' @note - works on multiple nodes or cores
##'       - makeCluster type "MPI" is passed to snow => superseded according to CRAN
##'         although https://mailman.stat.ethz.ch/pipermail/r-sig-hpc/2013-April/001652.html
##' { doClusterApply }
doClusterApply <-
    function(vList, cluster=makeCluster(detectCores(), type="PSOCK"),
             load.balancing=TRUE, block.size=1, seed="seq", repFirst=TRUE,
             sfile=NULL, check=TRUE, doAL=TRUE, subjob.=subjob, monitor=FALSE,
             doOne, initExpr, exports=character(), ...)
{
    if(!is.null(r <- maybeRead(sfile))) return(r)
    stopifnot(is.function(subjob.), is.function(doOne))
    if(!(is.null(seed) || any(is.na(seed)) || is.numeric(seed) ||
         (is.list(seed) && all(vapply(seed, is.numeric, NA))) ||
         is.character(seed) ))
        stop(.invalid.seed.msg)
    if(check) doCheck(doOne, vList, nChks=1, verbose=FALSE)
    on.exit(stopCluster(cluster)) # shut down cluster and execution environment

    ## variables
    pGrid <- mkGrid(vList)
    ngr <- nrow(pGrid)
    ng <- get.nonGrids(vList) # => n.sim >= 1
    n.sim <- ng$n.sim
    stopifnot(1 <= block.size, block.size <= n.sim)
    if(n.sim %% block.size != 0) stop("block.size has to divide n.sim")

    ## monitor checks
    if(!(is.logical(monitor) || is.function(monitor)))
        stop(gettextf("'monitor' must be logical or a function like %s",
                      'printInfo[["default"]]'))

    clusterExport(cluster, varlist=c(".Random.seed", "mkTimer", exports))
    if(!missing(initExpr)) clusterCall(cluster, eval, substitute(initExpr))

    ## actual work
    n.block <- n.sim %/% block.size
    res <- ul((if(load.balancing) clusterApplyLB else clusterApply)(
        cluster, seq_len(ngr * n.block), function(i)
        lapply(seq_len(block.size), function(k)
               subjob.((i-1)*block.size+k, pGrid=pGrid,
                       nonGrids=ng$nonGrids, repFirst=repFirst,
                       n.sim=n.sim, seed=seed, doOne=doOne, monitor=monitor, ...))))

    ## convert result and save
    saveSim(res, vList, repFirst=repFirst, sfile=sfile, check=check, doAL=doAL)
}
##' { end } doClusterApply

##' Function for comparing do*Apply() results:
##' { doRes.equal }
doRes.equal <- function(x,y, tol=1e-15, ...)
    all.equal(lapply(x, `[`, 1:3),
              lapply(y, `[`, 1:3), tol=tol, ...)
##' { end }

Try the simsalapar package in your browser

Any scripts or data that you put into this service are public.

simsalapar documentation built on April 27, 2023, 9:05 a.m.