R/Rdsm.R

Defines functions loadex stoprdsm getmatrix getidxs writesync readsync barr makebarr backunlock backlock boostunlock boostlock mgrmakelock rdsmunlock rdsmlock mgrmakevar mgrinit

# author:  N. Matloff

# note:  uses the portion of the "parallel" package derived from the
# "snow" package, to be referred to here as "snow" for brevity; the
# terms "manager" and "worker" will refer to the master and cluster
# nodes, respectively

# builds on R's "parallel" and "bigmemory packages" to form a threads
# type of environment on shared-memory machines (or on clusters, using
# file storage for sharing); each worker runs one thread

# shared variables are set up in physical shared memory (or in file
# storage); due to "bigmemory" restriction, all shared variables are
# matrices; they are accessed via name, no quotes, e.g.

#    mgrmakevar(cls,"w",1000,1000)  # at manager, create variable
#    w[2,5] <- 8  # run at worker, then 8 visible at other threads and mgr.

# initializing Rdsm:

#   create a snow cluster, cls
#   run mgrinit(cls) at the manager to initialize Rdsm

# running an Rdsm app:
#   make shared variables at the manager for the app, using mgrmakevar()
#   possibly export to cluster additional objects
#   run Rdsm app code, using clusterEvalQ() at the manager to launch
#      threads running that code

# CRAN issue; no global variables are formed at the manager, so the code
# is CRAN-compliant, but CRAN check doesn't realize this
#' @importFrom utils globalVariables
if (getRversion() >= "2.15.1") {
    globalVariables(c(
        "myinfo",
        "brlock",
        "barrlock",
        "gbl",
        "realrdsmlock",
        "realrdsmunlock"
    ))
}

# options(bigmemory.typecast.warning=FALSE)

# *************************  INITIALIZATION  ***********************

# mgrinit() starts Rdsm

# arguments:

#    cls:  snow cluster
#    boost:  if TRUE, then use synchronicity locks, else use
#            backing store locks;
#    barrback:  if TRUE, store certain variables involved with the
#               barrier in backing store rather than in memory

mgrinit <- function(cls,
                    boost = FALSE,
                    barrback = FALSE) {
    # set up so that each worker node will have a global variable myinfo
    # that contains the thread ID and number of threads
    setmyinfo <- function(i, n) {
        assign("myinfo", list(id = i, nwrkrs = n), pos = tmpenv)
    }
    ncls <- length(cls)
    parallel::clusterEvalQ(cls, tmpenv <- new.env())
    parallel::clusterApply(cls, seq_len(ncls), setmyinfo, ncls)
    parallel::clusterEvalQ(cls, myinfo <- get("myinfo", tmpenv))
    # we create global variables only at the workers, thus OK for CRAN,
    # but CRAN check complains anyway, so here is a workaround
    parallel::clusterEvalQ(cls, gbl <- globalenv())
    # set up the requested locking type, via synchronicity or in backing store
    if (boost) {
        rdsmlock <- boostlock
        rdsmunlock <- boostunlock
    } else {
        rdsmlock <- backlock
        rdsmunlock <- backunlock
    }
    # set up the workers to use the proper lock type
    parallel::clusterExport(cls, "rdsmlock", envir = environment())
    parallel::clusterExport(cls, "rdsmunlock", envir = environment())
    parallel::clusterEvalQ(cls, realrdsmlock <- rdsmlock)
    parallel::clusterEvalQ(cls, realrdsmunlock <- rdsmunlock)
    # send the threads needed Rdsm functions
    parallel::clusterExport(cls, "barr", envir = loadNamespace("TSEAL"))
    parallel::clusterExport(cls, "getidxs", envir = loadNamespace("TSEAL"))
    parallel::clusterExport(cls, "getmatrix", envir = loadNamespace("TSEAL"))
    parallel::clusterExport(cls, "readsync", envir = loadNamespace("TSEAL"))
    parallel::clusterExport(cls, "writesync", envir = loadNamespace("TSEAL"))
    # make a single barrier, set it up on the worker nodes
    makebarr(cls, boost, barrback)
}

# ******************  CREATING SHARED VARIABLES  *********************

# mgrmakevar() is used to created shared variables; executed at manager

# arguments:

#    cls:  snow cluster
#    varname:  name of variable
#    nr, nc:  number of rows, columns in matrix
#    vartype:  "double", "integer", etc.
#    fs:  if TRUE, shared variable will be stored in the file system,
#         not shared memory
#    mgrcpy:  if TRUE, the shared variable will be visible from the mgr
#    savedesc:  if TRUE, the descriptor of the variable will be save to
#               a file, e.g. "x.desc" if varname is "x"

# the variable is created in the global spaces of the worker nodes; at
# the manager, the variable is created in the environment of the caller,
# or not at all, depending on mgrcpy

mgrmakevar <- function(cls,
                       varname,
                       nr,
                       nc,
                       vartype = "double",
                       fs = FALSE,
                       mgrcpy = TRUE,
                       savedesc = TRUE) {
    if (!fs) {
        tmp <- bigmemory::big.matrix(nrow = nr,
                                     ncol = nc,
                                     type = vartype)
        if (savedesc) {
            dput(bigmemory::describe(tmp),
                 paste(varname, ".desc", sep = ""))
        }
    } else {
        tmp <- bigmemory::filebacked.big.matrix(
            nrow = nr,
            ncol = nc,
            type = vartype,
            backingfile = varname,
            descriptorfile = paste(varname, ".desc", sep = "")
        )
    }
    # make accessible to manager
    if (mgrcpy) {
        assign(varname, tmp, pos = parent.frame())
    }
    # get the descriptor for this big.matrix object, to send to the
    # worker nodes
    parallel::clusterExport(cls, "varname", envir = environment())
    if (!fs) {
        desc <- bigmemory::describe(tmp)
        parallel::clusterExport(cls, "desc", envir = environment())
        parallel::clusterEvalQ(cls,
                               tmp <-
                                   bigmemory::attach.big.matrix(desc))
    } else {
        parallel::clusterEvalQ(cls,
                               tmp <-
                                   bigmemory::attach.big.matrix(paste(varname, ".desc", sep = "")))
    }
    parallel::clusterEvalQ(cls, assign(varname, tmp))
    invisible(0)
}

# **********************  LOCKS, ETC.  ***********************

# LOCKS

# these are just stubs, need for CRAN check to be replaced at runtime
rdsmlock <- function(lck) {
    0
}
rdsmunlock <- function(lck) {
    0
}

#' @importFrom synchronicity boost.mutex describe attach.mutex
#' @noRd
mgrmakelock <- function(cls, lockname, boost = FALSE) {
    if (boost) {
        # require(synchronicity)
        tmp <- synchronicity::boost.mutex()
        desc <- synchronicity::describe(tmp)
        parallel::clusterEvalQ(cls, TRUE)
        parallel::clusterExport(cls, "desc", envir = environment())
        parallel::clusterEvalQ(cls,
                               tmp <-
                                   synchronicity::attach.mutex(desc))
        parallel::clusterExport(cls, "lockname", envir = environment())
        parallel::clusterEvalQ(cls, assign(lockname, tmp))
    } else {
        # make sure lock dir not left over from a previous run
        unlink(lockname, recursive = FALSE)
    }
}

# synchronicity lock/unlock

#' @importFrom synchronicity lock
#' @noRd
boostlock <- function(lck) {
    if (is.character(lck)) {
        lck <- get(lck)
    }
    # require(synchronicity)
    synchronicity::lock(lck)
}

#' @importFrom synchronicity unlock
#' @noRd
boostunlock <- function(lck) {
    if (is.character(lck)) {
        lck <- get(lck)
    }
    # require(synchronicity)
    synchronicity::unlock(lck)
}

# general lock/unlock (writing to a shared filesystem)
# lockname must be quoted
backlock <- function(lockname) {
    repeat {
        if (dir.create(lockname)) {
            return()
        }
    }
}
backunlock <- function(lockname) {
    unlink(lockname, recursive = FALSE)
}

# BARRIER

# sense-reversing barrier implementation; see mgrinit() above regarding
# boost and barrback
makebarr <- function(cls,
                     boost = FALSE,
                     barrback = FALSE) {
    mgrmakevar(cls,
               "barrnumleft",
               1,
               1,
               "integer",
               fs = barrback,
               mgrcpy = FALSE)
    mgrmakevar(cls,
               "barrsense",
               1,
               1,
               "integer",
               fs = barrback,
               mgrcpy = FALSE)
    mgrmakelock(cls, "barrlock", boost)
    if (boost) {
        clusterEvalQ(cls, brlock <- barrlock)
    } else {
        clusterEvalQ(cls, brlock <- "barrlock")
    }
    # first component is the count, second is "parity";
    # barrnumleft is number left before barrier done
    clusterEvalQ(cls, barrnumleft[1] <- myinfo$nwrkrs)
    clusterEvalQ(cls, barrsense[1] <- 0) # sense (0 or 1)
}

barr <- function() {
    realrdsmlock(brlock)
    # rdsmlock(brlock)
    count <- barrnumleft[1]
    sense <- barrsense[1]
    if (count == 1) {
        # all done
        # reset count
        barrnumleft[1] <- myinfo$nwrkrs
        # reverse sense
        barrsense[1] <- 1 - barrsense[1]
        realrdsmunlock(brlock)
        # rdsmunlock(brlock)
        return()
    } else {
        barrnumleft[1] <- barrnumleft[1] - 1
        realrdsmunlock(brlock)
        # rdsmunlock(brlock)
        repeat {
            if (barrsense[1] != sense) {
                break
            }
        }
    }
}

# SYNC ROUTINES

# these are needed only if the shared variables are in backing store

# typical usages:

#    critical section:

#        readers call readsync() upon entry to the
#        section, writer calls writesync() before exit

#    barrier:

#        before barrier, one or more nodes write to a variable,
#        then call writesync()

#        after barrier, one or more nodes call readsync()

#    manager:

#       typically the manager calls readsync() to get final result

# sync so can read the variable after others have written to it;
readsync <- function(varname) {
    rm(list = varname, envir = gbl)
    gc()
    tmp <-
        bigmemory::attach.big.matrix(paste(varname, ".desc", sep = ""))
    assign(varname, tmp, envir = gbl)
}

# sync so write to variable is visible by others;
# callable only at the worker nodest
writesync <- function(varname) {
    backlock("synclock")
    bigmemory::flush(getmatrix(varname))
    # rm(list=varname,envir=.GlobalEnv)
    rm(list = varname, envir = gbl)
    tmp <-
        bigmemory::attach.big.matrix(paste(varname, ".desc", sep = ""))
    # assign(varname,tmp,envir=.GlobalEnv)
    assign(varname, tmp, envir = gbl)
    backunlock("synclock")
}

# *************************  UTILITIES  ***********************

# find indices among 1:m to be handled by the given node
getidxs <- function(m) {
    parallel::splitIndices(m, myinfo$nwrkrs)[[myinfo$id]]
}

# obtain access to the matrix m
# the matrix m can be either any of the following:
#    an ordinary R matrix
#    a bigmemory matrix
#    a bigmemory matrix descriptor
#    a bigmemory matrix name, i.e. a character string
# this function determines which, and returns an R matrix (first case
#    above) or a bigmemory matrix (last 3 cases above)
# in the last case, it is assumed that the descriptor is in the file
#    m.dexc
getmatrix <- function(m) {
    # require(bigmemory)
    cl <- class(m)
    if (cl %in% c("matrix", "big.matrix")) {
        return(m)
    }
    if (cl == "big.matrix.descriptor") {
        return(bigmemory::attach.big.matrix(m))
    }
    if (cl == "character") {
        dfilename <- paste(m, ".desc", sep = "")
        if (!(dfilename %in% list.files())) {
            stop("file ", dfilename, "missing")
        }
        return(bigmemory::attach.big.matrix(dfilename))
    }
}

# to shut down cluster (important!):
#' @importFrom parallel stopCluster
#' @noRd
stoprdsm <- function(cls) {
    stopCluster(cls)
    rm(cls)
    # clean up .desc files
    unlink("*.desc")
}

# loads the examples file exfile, from the subdir directory within the
# tree for the package pkg ; if exfile=NA, the names of the files are
# displayed, without loading
loadex <- function(pkg,
                   exfile = NA,
                   subdir = "examples") {
    sp <- searchpaths()
    idx <- grep(pkg, sp)
    dir <- paste(sp[idx], "/", subdir, sep = "")
    if (is.na(exfile)) {
        return(dir(dir))
    }
    source(paste(dir, "/", exfile, sep = ""))
}

Try the TSEAL package in your browser

Any scripts or data that you put into this service are public.

TSEAL documentation built on July 2, 2024, 5:06 p.m.