R/mpi.R

Defines functions stopCluster.spawnedMPIcluster stopCluster.MPIcluster runMPIworker makeMPIcluster recvOneData.MPIcluster recvData.MPInode sendData.MPInode makeMPImaster newMPInode

Documented in makeMPIcluster makeMPImaster newMPInode recvData.MPInode recvOneData.MPIcluster runMPIworker sendData.MPInode stopCluster.MPIcluster stopCluster.spawnedMPIcluster

#
# MPI Implementation
#

newMPInode <- function(rank, comm)
    structure(list(rank = rank, RECVTAG = 33, SENDTAG = 22, comm = comm),
              class = "MPInode")

makeMPImaster <- function(comm = 0)
    structure(list(rank = 0, RECVTAG = 22, SENDTAG = 33, comm = comm),
              class = "MPInode")

sendData.MPInode <- function(node, data)
    Rmpi::mpi.send.Robj(data, node$rank, node$SENDTAG, node$comm)

recvData.MPInode <- function(node)
    Rmpi::mpi.recv.Robj(node$rank, node$RECVTAG, node$comm)

recvOneData.MPIcluster <- function(cl) {
    rtag <- findRecvOneTag(cl, Rmpi::mpi.any.tag())
    comm <- cl[[1]]$comm  # should all be the same
    status <- 0
    Rmpi::mpi.probe(Rmpi::mpi.any.source(), rtag, comm, status)
    srctag <- Rmpi::mpi.get.sourcetag(status)
    data <- Rmpi::mpi.recv.Robj(srctag[1], srctag[2], comm)
    list(node = srctag[1], value = data)
}

getMPIcluster <- NULL
setMPIcluster <- NULL
local({
    cl <- NULL
    getMPIcluster <<- function() cl
    setMPIcluster <<- function(new) cl <<- new
})

makeMPIcluster <- function(count, ..., options = defaultClusterOptions) {
    options <- addClusterOptions(options, list(...))
    cl <- getMPIcluster()
    if (! is.null(cl)) {
        if (missing(count) || count == length(cl))
             cl
        else stop(sprintf("MPI cluster of size %d already running",
                          length(cl)))
    }
    else if (missing(count)) {
        # assume something like mpirun -np count+1 has been used to start R
        count <- Rmpi::mpi.comm.size(0) - 1
        if (count <= 0)
            stop("no nodes available.")
        cl <- vector("list",count)
        for (i in seq(along=cl))
            cl[[i]] <- newMPInode(i, 0)
        class(cl) <- c("MPIcluster","cluster")
        setMPIcluster(cl)
        cl
    }
    else {
	# use process spawning to create cluster
        if (! requireNamespace("Rmpi"))
            stop("the `Rmpi' package is needed for MPI clusters.")
        comm <- 1
        intercomm <- 2
        if (Rmpi::mpi.comm.size(comm) > 0)
            stop(paste("a cluster already exists", comm))
        scriptdir <- getClusterOption("scriptdir", options)
        outfile <- getClusterOption("outfile", options)
        homogeneous <- getClusterOption("homogeneous", options)
        if (getClusterOption("useRscript", options)) {
            if (homogeneous) {
                rscript <- shQuoteIfNeeded(getClusterOption("rscript", options))
                snowlib <- getClusterOption("snowlib", options)
                script <- shQuoteIfNeeded(file.path(snowlib, "snow",
                                                    "RMPInode.R"))
                args <- c(script,
                          paste("SNOWLIB=", snowlib, sep=""),
                          paste("OUT=", outfile, sep=""))
                mpitask <- rscript
            }
            else {
                args <- c("RMPInode.R",
                          paste("OUT=", outfile, sep=""))
                mpitask <- "RunSnowWorker"
            }
        }
        else {
            if (homogeneous) {
                script <- shQuoteIfNeeded(file.path(scriptdir, "RMPInode.sh"))
                rlibs <- paste(getClusterOption("rlibs", options),
                               collapse = ":")
                rprog <- shQuoteIfNeeded(getClusterOption("rprog", options))
                args <- c(paste("RPROG=", rprog, sep=""),
                          paste("OUT=", outfile, sep=""),
                          paste("R_LIBS=", rlibs, sep=""),
                          script)
            }
            else {
                args <- c(paste("OUT=", outfile, sep=""),
                          "RunSnowNode", "RMPInode.sh")
            }
            mpitask <- "/usr/bin/env"
        }
        count <- Rmpi::mpi.comm.spawn(slave = mpitask, slavearg = args,
                                      nslaves = count, intercomm = intercomm)
        if (Rmpi::mpi.intercomm.merge(intercomm, 0, comm)) {
            Rmpi::mpi.comm.set.errhandler(comm)
            Rmpi::mpi.comm.disconnect(intercomm)
        }
        else stop("Failed to merge the comm for master and workers.")
        cl <- vector("list",count)
        for (i in seq(along=cl))
            cl[[i]] <- newMPInode(i, comm)
        class(cl) <- c("spawnedMPIcluster",  "MPIcluster", "cluster")
        setMPIcluster(cl)
        cl
    }
}

runMPIworker <- function() {
    comm <- 1
    intercomm <- 2
    Rmpi::mpi.comm.get.parent(intercomm)
    Rmpi::mpi.intercomm.merge(intercomm,1,comm)
    Rmpi::mpi.comm.set.errhandler(comm)
    Rmpi::mpi.comm.disconnect(intercomm)

    workLoop(makeMPImaster(comm))

    Rmpi::mpi.comm.disconnect(comm)
    Rmpi::mpi.quit()
}

stopCluster.MPIcluster <- function(cl) {
    NextMethod()
    setMPIcluster(NULL)
}
    
stopCluster.spawnedMPIcluster <- function(cl) {
    comm <- 1
    NextMethod()
    Rmpi::mpi.comm.disconnect(comm)
}

#**** figure out how to get Rmpi::mpi.quit called (similar issue for pvm?)
#**** fix things so stopCluster works in both versions.
#**** need .Last to make sure cluster is shut down on exit of master
#**** figure out why the workers busy wait under mpirun

Try the snow package in your browser

Any scripts or data that you put into this service are public.

snow documentation built on Oct. 27, 2021, 5:08 p.m.