Nothing
#
# Utilities
#
docall <- function(fun, args) {
if ((is.character(fun) && length(fun) == 1) || is.name(fun))
fun <- get(as.character(fun), envir = .GlobalEnv, mode = "function")
do.call("fun", lapply(args, enquote))
}
shQuoteIfNeeded <- function(p) {
if (length(grep("[[:space:]]", p)) == 0)
p
else if (.Platform$OS.type == "windows")
shQuote(p)
else stop("file names with spaces do not work properly on this platform")
}
#
# Checking and subsetting
#
checkCluster <- function(cl) {
if (! inherits(cl, "cluster"))
stop("not a valid cluster");
}
## Now defined in 'parallel'
## "[.cluster" <-function(cl,...) {
## v<-unclass(cl)[...]
## class(v)<-class(cl)
## v
## }
#
# Work Loop Function
#
workLoop <- function(master) {
repeat tryCatch({
msg <- recvData(master)
cat(paste("Type:", msg$type, "\n"))
if (msg$type == "DONE") {
closeNode(master)
break;
}
else if (msg$type == "EXEC") {
success <- TRUE
## This uses the message, rather than the exception since
## the exception class/methods may not be available on the
## master.
handler <- function(e) {
success <<- FALSE
structure(conditionMessage(e),
class=c("snow-try-error","try-error"))
}
t1 <- proc.time()
value <- tryCatch(docall(msg$data$fun, msg$data$args),
error = handler)
t2 <- proc.time()
value <- list(type = "VALUE", value = value, success = success,
time = t2 - t1, tag = msg$data$tag)
msg <- NULL ## release for GC
sendData(master, value)
value <- NULL ## release for GC
}
}, interrupt = function(e) NULL)
}
sinkWorkerOutput <- function(outfile) {
if (outfile != "") {
if (.Platform$OS.type == "windows" && outfile == "/dev/null")
outfile <- "nul:"
outcon <- file(outfile, open = "w")
sink(outcon)
sink(outcon, type = "message")
}
}
#
# Higher-Level Node Functions
#
closeNode <- function(node) UseMethod("closeNode")
closeNode.default <- function(node) {}
sendData <- function(node, data) UseMethod("sendData")
recvData <- function(node) UseMethod("recvData")
postNode <- function(con, type, value = NULL, tag = NULL) {
sendData(con, list(type = type, data = value, tag = tag))
}
stopNode <- function(n) {
postNode(n, "DONE")
closeNode(n)
}
recvOneData <- function(cl) UseMethod("recvOneData")
#
# Cluster Creation and Destruction
#
defaultClusterOptions <- NULL
#**** check valid cluster option
initDefaultClusterOptions <- function(libname) {
rhome <- Sys.getenv("R_HOME")
if (Sys.getenv("R_SNOW_LIB") != "")
homogeneous <- FALSE
else homogeneous <- TRUE
if (.Platform$OS.type == "windows")
rscript <- file.path(R.home("bin"), "Rscript.exe")
else rscript <- file.path(R.home("bin"), "Rscript")
port <- Sys.getenv("R_PARALLEL_PORT")
port <- if (identical(port, "random")) NA else as.integer(port)
if (is.na(port))
port <-
11000 + 1000 * ((stats::runif(1L) + unclass(Sys.time())/300) %% 1)
options <- list(port = as.integer(port),
timeout = 60 * 60 * 24 * 30, # 30 days
master = Sys.info()["nodename"],
homogeneous = homogeneous,
type = NULL,
outfile = "/dev/null",
rhome = rhome,
user = Sys.info()["user"],
rshcmd = "ssh",
rlibs = Sys.getenv("R_LIBS"),
scriptdir = file.path(libname, "snow"),
rprog = file.path(rhome, "bin", "R"),
snowlib = libname,
rscript = rscript,
useRscript = file.exists(rscript),
manual = FALSE)
defaultClusterOptions <<- addClusterOptions(emptyenv(), options)
}
addClusterOptions <- function(options, new) {
if (! is.null(new)) {
options <- new.env(parent = options)
names <- names(new)
for (i in seq(along = new))
assign(names[i], new[[i]], envir = options)
}
options
}
getClusterOption <- function(name, options = defaultClusterOptions)
get(name, envir = options)
setDefaultClusterOptions <- function(...) {
list <- list(...)
names <- names(list)
for (i in seq(along = list))
assign(names[i], list[[i]], envir = defaultClusterOptions)
}
makeCluster <- function(spec, type = getClusterOption("type"), ...) {
if (is.null(type))
stop("need to specify a cluster type")
switch(type,
SOCK = makeSOCKcluster(spec, ...),
# PVM = makePVMcluster(spec, ...),
MPI = makeMPIcluster(spec, ...),
# NWS = makeNWScluster(spec, ...),
stop("unknown cluster type"))
}
stopCluster <- function(cl) UseMethod("stopCluster")
stopCluster.default <- function(cl)
for (n in cl) stopNode(n)
#
# Cluster Functions
#
sendCall <- function (con, fun, args, return = TRUE, tag = NULL) {
#**** mark node as in-call
timing <- .snowTimingData$running()
if (timing)
start <- proc.time()[3]
postNode(con, "EXEC", list(fun = fun, args = args, return = return,
tag = tag))
if (timing)
.snowTimingData$enterSend(con$rank, start, proc.time()[3])
NULL
}
recvResult <- function (con) {
if (.snowTimingData$running()) {
start <- proc.time()[3]
r <- recvData(con)
end <- proc.time()[3]
.snowTimingData$enterRecv(con$rank, start, end, r$time[3])
}
else r <- recvData(con)
r$value
}
checkForRemoteErrors <- function(val) {
count <- 0
firstmsg <- NULL
for (v in val) {
if (inherits(v, "try-error")) {
count <- count + 1
if (count == 1) firstmsg <- v
}
}
if (count == 1)
stop("one node produced an error: ", firstmsg)
else if (count > 1)
stop(count, " nodes produced errors; first error: ", firstmsg)
val
}
clusterCall <- function(cl, fun, ...) {
checkCluster(cl)
for (i in seq(along = cl))
sendCall(cl[[i]], fun, list(...))
checkForRemoteErrors(lapply(cl, recvResult))
}
staticClusterApply <- function(cl, fun, n, argfun) {
checkCluster(cl)
p <- length(cl)
if (n > 0 && p > 0) {
val <- vector("list", n)
start <- 1
while (start <= n) {
end <- min(n, start + p - 1)
jobs <- end - start + 1
for (i in 1:jobs)
sendCall(cl[[i]], fun, argfun(start + i - 1))
val[start:end] <- lapply(cl[1:jobs], recvResult)
start <- start + jobs
}
checkForRemoteErrors(val)
}
}
clusterApply <- function(cl, x, fun, ...) {
argfun <- function(i) c(list(x[[i]]), list(...))
staticClusterApply(cl, fun, length(x), argfun)
}
clusterEvalQ<-function(cl, expr)
clusterCall(cl, eval, substitute(expr), env=.GlobalEnv)
clusterExport <- local({
env <- as.environment(1) ## .GlobalEnv
gets <- function(n, v) { assign(n, v, envir = env); NULL }
function(cl, list, envir = .GlobalEnv) {
## do this with only one clusterCall--loop on workers?
for (name in list) {
clusterCall(cl, gets, name, get(name, envir = envir))
}
}
})
## A variant that does the exports one at at ime--this may be useful
## when large objects are being sent
# clusterExportSerial <- function(cl, list) {
# gets <- function(n, v) { assign(n, v, envir = .GlobalEnv); NULL }
# for (name in list) {
# v <- get(name, envir = .GlobalEnv)
# for (i in seq(along = cl)) {
# sendCall(cl[[i]], gets, list(name, v))
# recvResult(cl[[i]])
# }
# }
# }
recvOneResult <- function (cl) {
if (.snowTimingData$running()) {
start <- proc.time()[3]
v <- recvOneData(cl)
end <- proc.time()[3]
.snowTimingData$enterRecv(v$node, start, end, v$value$time[3])
}
else v <- recvOneData(cl)
list(value = v$value$value, node = v$node, tag = v$value$tag)
}
findRecvOneTag <- function(cl, anytag) {
rtag <- NULL
for (node in cl) {
if (is.null(rtag))
rtag <- node$RECVTAG
else if (rtag != node$RECVTAG) {
rtag <- anytag
break;
}
}
rtag
}
dynamicClusterApply <- function(cl, fun, n, argfun) {
checkCluster(cl)
p <- length(cl)
if (n > 0 && p > 0) {
submit <- function(node, job)
sendCall(cl[[node]], fun, argfun(job), tag = job)
for (i in 1 : min(n, p))
submit(i, i)
val <- vector("list", n)
for (i in 1:n) {
d <- recvOneResult(cl)
j <- i + min(n, p)
if (j <= n)
submit(d$node, j)
val[d$tag] <- list(d$value)
}
checkForRemoteErrors(val)
}
}
clusterApplyLB <- function(cl, x, fun, ...) {
## **** this closure is sending all of x to all nodes
argfun <- function(i) c(list(x[[i]]), list(...))
dynamicClusterApply(cl, fun, length(x), argfun)
}
## **** should this allow load balancing?
## **** disallow recycling if one arg is length zero?
clusterMap <- function (cl, fun, ..., MoreArgs = NULL, RECYCLE = TRUE) {
checkCluster(cl)
args <- list(...)
if (length(args) == 0)
stop("need at least one argument")
n <- sapply(args, length)
if (RECYCLE) {
vlen <- max(n)
if (!all(n == vlen))
for (i in 1:length(args)) args[[i]] <- rep(args[[i]],
length = max(n))
}
else vlen = min(n)
## **** this closure is sending all of ... to all nodes
argfun <- function(i) c(lapply(args, function(x) x[[i]]), MoreArgs)
staticClusterApply(cl, fun, vlen, argfun)
}
#
# Cluster RNG Support
#
clusterSetupRNG <- function (cl, type="RNGstream", ...) {
RNGnames <- c("RNGstream", "SPRNG")
rng <- pmatch (type, RNGnames)
if (is.na(rng))
stop(paste("'", type,
"' is not a valid choice. Choose 'RNGstream' or 'SPRNG'.",
sep = ""))
type <- RNGnames[rng]
if (rng == 1)
clusterSetupRNGstream(cl, ...)
else clusterSetupSPRNG(cl, ...)
type
}
#
# Cluster SPRNG Support
#
# adapted from rpvm (Li & Rossini)
## clusterSetupSPRNG <- function (cl, seed = round(2^32 * runif(1)),
## prngkind = "default", para = 0, ...)
## {
## if (!is.character(prngkind) || length(prngkind) > 1)
## stop("'rngkind' must be a character string of length 1.")
## if (!is.na(pmatch(prngkind, "default")))
## prngkind <- "LFG"
## prngnames <- c("LFG", "LCG", "LCG64", "CMRG", "MLFG", "PMLCG")
## kind <- pmatch(prngkind, prngnames) - 1
## if (is.na(kind))
## stop(paste("'", prngkind, "' is not a valid choice", sep = ""))
## nc <- length(cl)
## invisible(clusterApply(cl, 0:(nc-1), initSprngNode,
## nc, seed, kind, para))
## }
clusterSetupSPRNG <- function (cl, seed = round(2^32 * runif(1)),
prngkind = "default", para = 0, ...)
stop("SPRNG is currently not supported")
## initSprngNode <- function (streamno, nstream, seed, kind, para)
## {
## if (! requireNamespace("rsprng"))
## stop("the `rsprng' package is needed for SPRNG support.")
## .Call("r_init_sprng", as.integer(kind), as.integer(streamno),
## as.integer(nstream), as.integer(seed), as.integer(para),
## PACKAGE = "rsprng")
## RNGkind("user")
## }
#
# rlecuyer support
#
clusterSetupRNGstream <- function (cl, seed=rep(12345,6), ...) {
if (! requireNamespace("rlecuyer"))
stop("the `rlecuyer' package is needed for RNGstream support.")
rlecuyer::.lec.init()
rlecuyer::.lec.SetPackageSeed(seed)
nc <- length(cl)
names <- as.character(1:nc)
rlecuyer::.lec.CreateStream(names)
states <- lapply(names, rlecuyer::.lec.GetStateList)
invisible(clusterApply(cl, states, initRNGstreamNode))
}
initRNGstreamNode <- function (stream) {
if (! requireNamespace("rlecuyer"))
stop("the `rlecuyer' package is needed for RNGstream support.")
.lec.Random.seed.table <- get(".lec.Random.seed.table", .GlobalEnv)
if (length(.lec.Random.seed.table$name) > 0) {
rm(".lec.Random.seed.table", envir=.GlobalEnv)
.lec.Random.seed.table <- list(Cg=matrix(0,nrow=0,ncol=6),
Bg=matrix(0,nrow=0,ncol=6),
Ig=matrix(0,nrow=0,ncol=6),
AIP=matrix(0,nrow=0,ncol=2),
name=c())
}
.lec.Random.seed.table$Cg <- rbind(.lec.Random.seed.table$Cg,
stream$Cg[1:6])
.lec.Random.seed.table$Bg <- rbind(.lec.Random.seed.table$Bg,stream$Bg)
.lec.Random.seed.table$Ig <- rbind(.lec.Random.seed.table$Ig,stream$Ig)
.lec.Random.seed.table$AIP <- rbind(.lec.Random.seed.table$AIP,
c(stream$Anti, stream$IncPrec))
.lec.Random.seed.table$name <- c(.lec.Random.seed.table$name, stream$name)
env <- as.environment(1) ## .GlobalEnv
assign(".lec.Random.seed.table", .lec.Random.seed.table, envir = env)
old.kind<-rlecuyer::.lec.CurrentStream(stream$name)
old.kind
}
#
# Parallel Functions
#
splitIndices <- function(nx, ncl) {
batchsize <- if (nx %% ncl == 0) nx %/% ncl else 1 + nx %/% ncl
batches <- (nx + batchsize - 1) %/% batchsize
split(1:nx, rep(1:batches, each = batchsize)[1:nx])
}
splitIndices <- function(nx, ncl) {
i <- 1:nx;
if (ncl == 1) i
else structure(split(i, cut(i, ncl)), names=NULL)
}
# The fuzz used by cut() is too small when nx and ncl are both large
# and causes some groups to be empty. The definition below avoids that
# while minimizing changes from the results produced by the definition
# above.
splitIndices <- function(nx, ncl) {
i <- seq_len(nx)
if (ncl == 0) list()
else if (ncl == 1 || nx == 1) list(i)
else {
fuzz <- min((nx - 1) / 1000, 0.4 * nx / ncl)
breaks <- seq(1 - fuzz, nx + fuzz, length = ncl + 1)
structure(split(i, cut(i, breaks)), names = NULL)
}
}
clusterSplit <- function(cl, seq)
lapply(splitIndices(length(seq), length(cl)), function(i) seq[i])
splitList <- function(x, ncl)
lapply(splitIndices(length(x), ncl), function(i) x[i])
splitRows <- function(x, ncl)
lapply(splitIndices(nrow(x), ncl), function(i) x[i,, drop=FALSE])
splitCols <- function(x, ncl)
lapply(splitIndices(ncol(x), ncl), function(i) x[,i, drop=FALSE])
parLapply <- function(cl, x, fun, ...)
docall(c, clusterApply(cl, splitList(x, length(cl)), lapply, fun, ...))
parRapply <- function(cl, x, fun, ...)
docall(c, clusterApply(cl, splitRows(x,length(cl)), apply, 1, fun, ...))
parCapply <- function(cl, x, fun, ...)
docall(c, clusterApply(cl, splitCols(x,length(cl)), apply, 2, fun, ...))
parMM <- function(cl, A, B)
docall(rbind,clusterApply(cl, splitRows(A, length(cl)), get("%*%"), B))
# adapted from sapply in the R sources
parSapply <- function (cl, X, FUN, ..., simplify = TRUE, USE.NAMES = TRUE)
{
FUN <- match.fun(FUN) # should this be done on worker?
answer <- parLapply(cl,as.list(X), FUN, ...)
if (USE.NAMES && is.character(X) && is.null(names(answer)))
names(answer) <- X
if (simplify && length(answer) != 0) {
common.len <- unique(unlist(lapply(answer, length)))
if (common.len == 1)
unlist(answer, recursive = FALSE)
else if (common.len > 1)
array(unlist(answer, recursive = FALSE),
dim = c(common.len, length(X)),
dimnames = list(names(answer[[1]]), names(answer)))
else answer
}
else answer
}
# adapted from apply in the R sources
parApply <- function(cl, X, MARGIN, FUN, ...)
{
FUN <- match.fun(FUN) # should this be done on worker?
## Ensure that X is an array object
d <- dim(X)
dl <- length(d)
if(dl == 0)
stop("dim(X) must have a positive length")
ds <- 1:dl
# for compatibility with R versions prior to 1.7.0
if (! exists("oldClass"))
oldClass <- class
if(length(oldClass(X)) > 0)
X <- if(dl == 2) as.matrix(X) else as.array(X)
dn <- dimnames(X)
## Extract the margins and associated dimnames
s.call <- ds[-MARGIN]
s.ans <- ds[MARGIN]
d.call <- d[-MARGIN]
d.ans <- d[MARGIN]
dn.call<- dn[-MARGIN]
dn.ans <- dn[MARGIN]
## dimnames(X) <- NULL
## do the calls
d2 <- prod(d.ans)
if(d2 == 0) {
## arrays with some 0 extents: return ``empty result'' trying
## to use proper mode and dimension:
## The following is still a bit `hackish': use non-empty X
newX <- array(vector(typeof(X), 1), dim = c(prod(d.call), 1))
ans <- FUN(if(length(d.call) < 2) newX[,1] else
array(newX[,1], d.call, dn.call), ...)
return(if(is.null(ans)) ans else if(length(d.call) < 2) ans[1][-1]
else array(ans, d.ans, dn.ans))
}
## else
newX <- aperm(X, c(s.call, s.ans))
dim(newX) <- c(prod(d.call), d2)
if(length(d.call) < 2) {# vector
if (length(dn.call)) dimnames(newX) <- c(dn.call, list(NULL))
arglist <- lapply(1:d2, function(i) newX[,i])
} else
arglist <- lapply(1:d2, function(i) array(newX[,i], d.call, dn.call))
ans <- parLapply(cl, arglist, FUN, ...)
## answer dims and dimnames
ans.list <- is.recursive(ans[[1]])
l.ans <- length(ans[[1]])
ans.names <- names(ans[[1]])
if(!ans.list)
ans.list <- any(unlist(lapply(ans, length)) != l.ans)
if(!ans.list && length(ans.names)) {
all.same <- sapply(ans, function(x) identical(names(x), ans.names))
if (!all(all.same)) ans.names <- NULL
}
len.a <- if(ans.list) d2 else length(ans <- unlist(ans, recursive = FALSE))
if(length(MARGIN) == 1 && len.a == d2) {
names(ans) <- if(length(dn.ans[[1]])) dn.ans[[1]] # else NULL
return(ans)
}
if(len.a == d2)
return(array(ans, d.ans, dn.ans))
if(len.a > 0 && len.a %% d2 == 0)
return(array(ans, c(len.a %/% d2, d.ans),
if(is.null(dn.ans)) {
if(!is.null(ans.names)) list(ans.names,NULL)
} else c(list(ans.names), dn.ans)))
return(ans)
}
#
# Library Initialization
#
.onLoad <- function(libname, pkgname) {
initDefaultClusterOptions(libname)
if (exists("mpi.comm.size"))
type <- "MPI"
## else if (length(find.package("rpvm", quiet = TRUE)) != 0)
## type <- "PVM"
else if (length(find.package("Rmpi", quiet = TRUE)) != 0)
type <- "MPI"
## else if (length(find.package("nws", quiet = TRUE)) != 0)
## type <- "NWS"
else type <- "SOCK"
setDefaultClusterOptions(type = type)
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.