if (getRversion() >= "2.15.1")  utils::globalVariables(c("toexec", "tmpx"))
# general "Snow" (the part of 'parallel' adapted from the old Snow)
# utilities, some used in Snowdoop but generally applicable
# split matrix/data frame into chunks of rows, placing a chunk into each
# of the cluster nodes
#
# arguments:
#
# cls: a 'parallel' cluster
# m: a matrix, data frame or data.table
# mchunkname: name to be given to the chunks of m at the cluster nodes
# scramble: if TRUE, randomly assign rows of the data frame to the chunks; 
#           otherwsie, the first rows go to the first chunk, the next set
#           of rows go to the second chunk, and so on
#
# places the chunk named mchunkname in the global space of the worker
formrowchunks <- function(cls,m,mchunkname,scramble=FALSE) {
   nr <- nrow(m)
   idxs <- if (!scramble) 1:nr else sample(1:nr,nr,replace=FALSE)
   rcs <- clusterSplit(cls,idxs)
   rowchunks <- lapply(rcs, function(rc) m[rc,])
   # need to write chunk i to global at worker node i
   # need very convoluted workaround to avoid CRAN check's aversion
   # to globals, even though these globals are only at the worker
   # nodes; Uwe Ligges of CRAN advised workaround based on
   # globalVariables(), but this didn't work here; anyway, Uwe was big
   # on having CRAN automated, i.e. no special human intervention, so he
   # wants a workaround, and here it is; it involves a "surreptitious"
   # assign 
   invisible(
      clusterApply(
         cls,rowchunks,  
         function(onechunk) {
            cmd <- 
               paste('assign("',mchunkname,'",onechunk,envir = .GlobalEnv)',
                  sep='')
            eval(parse(text = cmd))
         }
      )
   )
}
# extracts rows (rc=1) or columns (rc=2) of a matrix, producing a list
matrixtolist <- function(rc,m) {
   if (rc == 1) {
      Map(function(rownum) m[rownum,],1:nrow(m))
   } else Map(function(colnum) m[,colnum],1:ncol(m))
}
# "add" 2 lists, applying the operation 'add' to elements in common,
# copying non-null others
addlists <- function(lst1,lst2,add) {
   lst <- list()
   for (nm in union(names(lst1),names(lst2))) {
      if (is.null(lst1[[nm]])) lst[[nm]] <- lst2[[nm]] else
      if (is.null(lst2[[nm]])) lst[[nm]] <- lst1[[nm]] else
      lst[[nm]] <- add(lst1[[nm]],lst2[[nm]])
   }
   lst
}
# give each node in the cluster cls an ID number myid, global to that
# node; does the same for nclus, the number of nodes in the cluster
setclsinfo <- function(cls) {
   clusterEvalQ(cls,partoolsenv <- new.env())
   clusterApply(cls,1:length(cls),function(i) tmpvar <<- i)
   clusterEvalQ(cls,partoolsenv$myid <- tmpvar)
   tmpvar <- length(cls)
   clusterExport(cls,"tmpvar",envir=environment())
   clusterEvalQ(cls,partoolsenv$ncls <- tmpvar)
   exportlibpaths(cls)
   clusterEvalQ(cls,library(partools))
   # doclscmd(cls,'library(partools)')
}
# returns a pointer to partoolsenv
getpte <- function() {
   # get("partoolsenv",envir=.GlobalEnv)
   get("partoolsenv",envir=.GlobalEnv)
}
# set the R library paths at the workers to that of the calling node
exportlibpaths <- function(cls) {
   lp <- .libPaths()
   clusterCall(cls,function(p) .libPaths(p),lp)
}
# split a vector/matrix/data frame into approximately equal-sized
# chunks across a cluster 
#
# arguments:
#
# cls: a 'parallel' cluster
# dfname: quoted name of matrix/data frame to be split
# scramble: if TRUE, randomly assign rows of the data frame to the chunks; 
#           otherwsie, the first rows go to the first chunk, the next set
#           of rows go to the second chunk, and so on
#
# each remote chunk, a data frame, will have the same name as the
# full object
distribsplit <- function(cls,dfname,scramble=FALSE) {
   dfr <- get(dfname,envir=sys.parent())
   if(!is.data.table(dfr)) dfr <- as.data.frame(dfr)
   formrowchunks(cls,dfr,dfname,scramble)
   for (j in 1:ncol(dfr)) {
      mdj <- mode(dfr[[j]])
      if (mdj == 'character') 
         warning('character column converted to factor') else
      if (mdj == 'factor') {
         usubj <- dfname
         ipstrcat(usubj,'[,')
         ipstrcat(usubj,as.character(j))
         ipstrcat(usubj,']')
         remotecmd <- usubj
         ipstrcat(remotecmd,' <- ')
         ipstrcat(remotecmd,'as.factor(')
         ipstrcat(remotecmd,usubj)
         ipstrcat(remotecmd,')')
         clusterEvalQ(cls,docmd(remotecmd))
      }
   }
}
# collects a distributed matrix/data frame specified by dfname at
# manager (i.e. caller), again with the name dfname, in global space of
# the latter
distribcat <- function(cls,dfname) {
   toexec <- paste("clusterEvalQ(cls,",dfname,")")
   tmp <- eval(parse(text=toexec))
   Reduce(rbind,tmp)
}
# distributed version of aggregate()
# arguments:
#    cls: cluster
#    ynames: names of variables on which FUN is to be applied
#    xnames: names of variables that define the grouping
#    dataname: quoted name of the data frame or data.table
#    FUN: quoted name(s) of aggregation function to be used in 
#         aggregation within cluster nodes; for a data frame,
#         this function has a single argument; in the case of 
#         a data.table, this will be a vector of length the 
#         same as the length of ynames (or recycling will be used)
#    FUN1: quoted name of the aggregation function to be used in 
#          aggregation between cluster nodes
# e.g. for the call at the nodes
#
#    aggregate(cbind(mpg,disp,hp) ~ cyl+gear,data=mtcars,FUN=max)
#
# we would call
#
#    distribagg(cls,c("mpg","disp","hp"),c("cyl","gear"),mtcars,max)
# return value: aggregate()-style data frame, with column of cell counts
# appended at the end
# currently cannot have FUNdim > 1 for data.tables;  duplicate ynames;
#
# distribagg(cls,c('mpg','mpg','disp','disp','hp','hp'),
#             c('cyl','gear'),'mtc1',
#             FUN=c('sum','length'))
# doesn't matter here 
distribagg <- function(cls,ynames,xnames,dataname,FUN,FUNdim=1,FUN1=FUN) {
   ncellvars <- length(xnames) # number of cell-specification variables
   nagg <- length(ynames) # number of variables to tabulate
   isdt <- distribisdt(cls,dataname)
   if (isdt) {
      if (length(FUN) != length(ynames))
         stop('lengths of FUN and ynames must be the same for data.tables')
      if (FUNdim > 1) stop('FUNdim must be 1 for data.tables')
      # if (length(FUN) < nagg) FUN <- rep_len(FUN,nagg*length(FUN))
      remotecmd <- paste(dataname,'[,.(',sep='')
      for (i in 1:nagg) {
         ipstrcat(remotecmd,FUN[i],'(',ynames[i],')')
         if (i == nagg) ipstrcat(remotecmd,')')
         ipstrcat(remotecmd,',') 
      }
      xns <- xnames
      quotemark <- '"'
      for (i in 1:length(xns)) {
         xns[i] <- paste(quotemark,xns[i],quotemark,sep='')
      }
      ipstrcat(remotecmd,'by=c(',xns,')]',innersep=',')
   } else {
        if (length(FUN) > 1) 
           stop('for data.frames,FUN must be a single string')
        # set up aggregate() command to be run on the cluster nodes
        ypart <- paste("cbind(",paste(ynames,collapse=","),")",sep="")
        xpart <- paste(xnames,collapse="+")
        # the formula
        frmla <- paste(ypart,"~",paste(xnames,collapse="+"))
        remotecmd <-
           paste("aggregate(",frmla,",data=",dataname,",",FUN,")",sep="")
   }
   clusterExport(cls,"remotecmd",envir=environment())
   # run the command, and combine the returned data frames into one big
   # data frame
   aggs <- clusterEvalQ(cls,docmd(remotecmd))
   agg <- Reduce(rbind,aggs)
   # typically a given cell will found at more than one cluster node;
   # they must be combined, using FUN1
   FUN1 <- get(FUN1)
   # if FUN returns a vector rather than a scalar, the "columns" of agg
   # associated with ynames will be matrices; need to expand so that
   # have real columns
   if (FUNdim > 1) {
      # note: names will be destroyed
      tmp  <- agg[,1:ncellvars,drop=FALSE]
      for (i in 1:(ncol(agg)-ncellvars))
         tmp <- cbind(tmp,agg[,ncellvars+i])
      agg <- tmp
      names(agg)[-(1:ncellvars)] <- rep(ynames,each=FUNdim)
   }
   if (isdt) {
      agg <- as.data.frame(agg)
      names(agg)[-(1:ncellvars)] <- ynames
   }
   aggregate(x=agg[,-(1:ncellvars)],by=agg[,1:ncellvars,drop=FALSE],FUN1)
}
# get the indicated cell counts, cells defined according to the
# variables in xnames 
distribcounts <- function(cls,xnames,dataname) {
   isdt <- distribisdt(cls,dataname)
   res <- 
      if (isdt) distribagg(cls,'.N',xnames,dataname,"sum",FUN1="sum") else
      distribagg(cls,xnames[1],xnames,dataname,"length",FUN1="sum")
   names(res)[length(xnames)+1] <- 'count'
   res
}
# TODO: Clark - Write this function
#distribunique <- function(cls,ynames,dataname) {
#}
# determine whether the distributed object dataname is a data.table
distribisdt <- function(cls,dataname) {
   rcmd <- paste('is.data.table(',dataname,')',sep='')
   clusterExport(cls,"rcmd",envir=environment())
   clusterEvalQ(cls,docmd(rcmd))[[1]]
}
sumlength <- function(a) c(sum(a),length(a))
# get the indicated cell means of the variables in ynames,
# cells defined according to the variables in xnames; if saveni is TRUE,
# then add one column 'yni', giving the number of Y values in this cell
distribmeans <- function(cls,ynames,xnames,dataname,saveni=FALSE) {
   clusterExport(cls,'sumlength',envir=environment())
   isdt <- distribisdt(cls,dataname)
   if (!isdt) {
      da <- distribagg(cls,ynames,xnames,dataname,
         FUN='sumlength',FUNdim=2,FUN1='sum')
   } else {
      da <- distribagg(cls,rep(ynames,each=2),
         xnames,dataname,
         FUN=rep(c('sum','length'),length((ynames))),FUN1='sum')
   }
   nx <- length(xnames)
   tmp <- da[,1:nx]
   day <- da[,-(1:nx)]  # Y values in da
   ny <- length(ynames)
   for (i in 1:ny) {
      tmp <- cbind(tmp,day[,2*i-1] / day[,2*i])
   }
   tmp <- as.data.frame(tmp)
   if (saveni) {
      tmp$yni <- day[,2]
      names(tmp) <- c(xnames,ynames,'yni')
   }
   names(tmp)[-(1:nx)] <- ynames
   tmp
}
# currently not in service; xtabs() call VERY slow
# distribtable <- function(cls,xnames,dataname) {
#    tmp <- distribagg(cls,xnames[1],xnames,dataname,"length","sum")
#    names(tmp)[ncol(tmp)] <- "counts"
#    xtabs(counts ~ .,data=tmp)
# }
# find the smallest value in the indicated distributed vector; return
# value is a pair (node number, row number)
dwhich.min <- function(cls,vecname) {
   cmd <- paste('which.min(',vecname,')',sep='')
   mininfo <- function(x) {i <- which.min(x); c(i,x[i])}
   cmd <- paste('mininfo(',vecname,')',sep='')
   clusterExport(cls,c('mininfo','cmd'),envir=environment())
   rslt <- clusterEvalQ(cls,docmd(cmd))
   tmp <- unlist(Reduce(rbind,rslt))
   nodenum <- which.min(tmp[,2])
   rownum <- tmp[nodenum,1]
   c(nodenum,rownum)
}
# find the largest value in the indicated distributed vector; return
# value is a pair (node number, row number)
dwhich.max <- function(cls,vecname) {
   cmd <- paste('which.max(',vecname,')',sep='')
   maxinfo <- function(x) {i <- which.max(x); c(i,x[i])}
   cmd <- paste('maxinfo(',vecname,')',sep='')
   clusterExport(cls,c('maxinfo','cmd'),envir=environment())
   rslt <- clusterEvalQ(cls,docmd(cmd))
   tmp <- unlist(Reduce(rbind,rslt))
   nodenum <- which.max(tmp[,2])
   rownum <- tmp[nodenum,1]
   c(nodenum,rownum)
}
# find the k largest or smallest values in the specified distributed
# vector; UNDER CONSTRUCTION
# dwhich.extremek <- function(cls,vecname,k,largest=TRUE) {
#    extremek <- function(vecname,k,largest) {
#       tmpvec <- get('vecname')
#       tmporder <- order(tmpvec,!largest)[1:k]
#       cbind(order(tmporder,tmpvec[tmporder][
#    }
#    cmd <- paste('extremek(',vecname,',',k,'.',largest,')')
#    clusterExport(cls,'cmd),envir=environment())
#    rslt <- clusterEvalQ(cls,docmd(cmd))
#    tmp <- unlist(Reduce(rbind,rslt))
# 
# }
distribrange <- function(cls,vec,na.rm=FALSE) {
   narm <- if(na.rm) 'TRUE' else 'FALSE'  
   narm <- paste("na.rm=",narm)
   tmp <- paste("range(", vec, ",",narm,")",collapse = "")
   rangeout <- clusterCall(cls,docmd,tmp)
   vecmin <- min(geteltis(rangeout,1))
   vecmax <- max(geteltis(rangeout,2))
   c(vecmin,vecmax)
}
# execute the command cmd at each cluster node, typically select(), then
# collect using rbind() at the caller
distribgetrows <- function(cls,cmd,who=NULL) {
   # clusterExport(cls,'cmd',envir=environment())
   # res <- clusterEvalQ(cls,docmd(cmd))
   res <- doclscmd(cls,cmd,who)
   tmp <- Reduce(rbind,res,init = NULL)
   notallna <- function(row) any(!is.na(row))
   tmp[apply(tmp,1,notallna),]
}
######################### misc. utilities ########################
# execute the contents of a quoted command; main use is with
# clusterEvalQ()
docmd <- function(toexec) eval(parse(text=toexec))
# execute the contents of a quoted command 'toexec' at the cluster
# nodes; optionally, set 'who' to have the command done only at the
# nodes in 'who'
doclscmd <- function(cls,toexec,who=NULL)
{  dotoexec <- function() {
      if (!is.null(who) && !(myid %in% who)) 
         toexec <- 'NULL'
      docmd(toexec)
   }
   clusterEvalQ(cls,myid <- getpte()$myid)
   clusterExport(cls, c("dotoexec","toexec","who"), 
      envir = environment())
   clusterEvalQ(cls, dotoexec())
}
# extract element i of each element in the R list 'lst', which is a list
# of vectors
geteltis <- function(lst,i) {
   get1elti <- function(lstmember) lstmember[i]
   sapply(lst,get1elti)
}
# in-place string concatenation; appends the strings in ... to str1,
# assigning back to str1 (at least in name, if not memory location), in
# the spirit of string.append() in Python; here DOTS is one of more
# expressions separated by commas, with each expression being either a
# string or a vector of strings; within a vector, innersep is used as a
# separator, while between vectors it is outersep
# generaed from gtools code:
# ipstrcat <- defmacro(str1,DOTS,outersep='',innersep='',expr = (
#       for (str in list(...)) {
#          lstr <- length(str)
#          tmp <- str[1]
#          if (lstr > 1) 
#             for (i in 2:lstr) 
#                tmp <- paste(tmp,str[i],sep=innersep)
#          str1 <- paste(str1,tmp,sep=outersep)
#       }
#    )
# )
ipstrcat <- function (str1 = stop("str1 not supplied"), ..., outersep = "", 
    innersep = "") 
{
    tmp <- substitute((for (str in list(...)) {
        lstr <- length(str)
        tmp <- str[1]
        if (lstr > 1) for (i in 2:lstr) tmp <- paste(tmp, str[i], 
            sep = innersep)
        str1 <- paste(str1, tmp, sep = outersep)
    }))
    eval(tmp, parent.frame())
}
# try to load the package specified in pkgname
tryloadpkg <- function(pkgname) {
   cmd <- paste('require(',pkgname,')',sep='')
   docmd(cmd)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.