# Apply Operations using Clusters

### Description

These functions provide several ways to parallelize computations using a cluster.

### Usage

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | ```
clusterCall(cl = NULL, fun, ...)
clusterApply(cl = NULL, x, fun, ...)
clusterApplyLB(cl = NULL, x, fun, ...)
clusterEvalQ(cl = NULL, expr)
clusterExport(cl = NULL, varlist, envir = .GlobalEnv)
clusterMap(cl = NULL, fun, ..., MoreArgs = NULL, RECYCLE = TRUE,
SIMPLIFY = FALSE, USE.NAMES = TRUE,
.scheduling = c("static", "dynamic"))
clusterSplit(cl = NULL, seq)
parLapply(cl = NULL, X, fun, ...)
parSapply(cl = NULL, X, FUN, ..., simplify = TRUE,
USE.NAMES = TRUE)
parApply(cl = NULL, X, MARGIN, FUN, ...)
parRapply(cl = NULL, x, FUN, ...)
parCapply(cl = NULL, x, FUN, ...)
parLapplyLB(cl = NULL, X, fun, ...)
parSapplyLB(cl = NULL, X, FUN, ..., simplify = TRUE,
USE.NAMES = TRUE)
``` |

### Arguments

`cl` |
a cluster object, created by this package or by package
snow. If |

`fun, FUN` |
function or character string naming a function. |

`expr` |
expression to evaluate. |

`seq` |
vector to split. |

`varlist` |
character vector of names of objects to export. |

`envir` |
environment from which t export variables |

`x` |
a vector for |

`...` |
additional arguments to pass to |

`MoreArgs` |
additional arguments for |

`RECYCLE` |
logical; if true shorter arguments are recycled. |

`X` |
A vector (atomic or list) for |

`MARGIN` |
vector specifying the dimensions to use. |

`simplify, USE.NAMES` |
logical; see |

`SIMPLIFY` |
logical; see |

`.scheduling` |
should tasks be statically allocated to nodes or dynamic load-balancing used? |

### Details

`clusterCall`

calls a function `fun`

with identical
arguments `...`

on each node.

`clusterEvalQ`

evaluates a literal expression on each cluster
node. It is a parallel version of `evalq`

, and is a
convenience function invoking `clusterCall`

.

`clusterApply`

calls `fun`

on the first node with
arguments `seq[[1]]`

and `...`

, on the second node with
`seq[[2]]`

and `...`

, and so on, recycling nodes as needed.

`clusterApplyLB`

is a load balancing version of
`clusterApply`

. If the length `p`

of `seq`

is not
greater than the number of nodes `n`

, then a job is sent to
`p`

nodes. Otherwise the first `n`

jobs are placed in order
on the `n`

nodes. When the first job completes, the next job is
placed on the node that has become free; this continues until all jobs
are complete. Using `clusterApplyLB`

can result in better
cluster utilization than using `clusterApply`

, but increased
communication can reduce performance. Furthermore, the node that
executes a particular job is non-deterministic.

`clusterMap`

is a multi-argument version of `clusterApply`

,
analogous to `mapply`

and `Map`

. If
`RECYCLE`

is true shorter arguments are recycled (and either none
or all must be of length zero); otherwise, the result length is the
length of the shortest argument. Nodes are recycled if the length of
the result is greater than the number of nodes. (`mapply`

always
uses `RECYCLE = TRUE`

, and has argument `SIMPLIFY = TRUE`

.
`Map`

always uses `RECYCLE = TRUE`

.)

`clusterExport`

assigns the values on the master **R** process of
the variables named in `varlist`

to variables of the same names
in the global environment (aka ‘workspace’) of each node. The
environment on the master from which variables are exported defaults
to the global environment.

`clusterSplit`

splits `seq`

into a consecutive piece for
each cluster and returns the result as a list with length equal to the
number of nodes. Currently the pieces are chosen to be close
to equal in length: the computation is done on the master.

`parLapply`

, `parSapply`

, and `parApply`

are parallel
versions of `lapply`

, `sapply`

and `apply`

.
`parLapplyLB`

, `parSapplyLB`

are load-balancing versions,
intended for use when applying `FUN`

to different elements of
`X`

takes quite variable amounts of time, and either the function
is deterministic or reproducible results are not required.

`parRapply`

and `parCapply`

are parallel row and column
`apply`

functions for a matrix `x`

; they may be slightly
more efficient than `parApply`

but do less post-processing of the
result.

### Value

For `clusterCall`

, `clusterEvalQ`

and `clusterSplit`

, a
list with one element per node.

For `clusterApply`

and `clusterApplyLB`

, a list the same
length as `seq`

.

`clusterMap`

follows `mapply`

.

`clusterExport`

returns nothing.

`parLapply`

returns a list the length of `X`

.

`parSapply`

and `parApply`

follow `sapply`

and
`apply`

respectively.

`parRapply`

and `parCapply`

always return a vector. If
`FUN`

always returns a scalar result this will be of length the
number of rows or columns: otherwise it will be the concatenation of
the returned values.

An error is signalled on the master if any of the workers produces an error.

### Note

These functions are almost identical to those in package snow.

Two exceptions: `parLapply`

has argument `X`

not `x`

for consistency with `lapply`

, and
`parSapply`

has been updated to match `sapply`

.

### Author(s)

Luke Tierney and R Core.

Derived from the snow package.

### Examples

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | ```
## Use option cl.cores to choose an appropriate cluster size.
cl <- makeCluster(getOption("cl.cores", 2))
clusterApply(cl, 1:2, get("+"), 3)
xx <- 1
clusterExport(cl, "xx")
clusterCall(cl, function(y) xx + y, 2)
## Use clusterMap like an mapply example
clusterMap(cl, function(x, y) seq_len(x) + y,
c(a = 1, b = 2, c = 3), c(A = 10, B = 0, C = -10))
parSapply(cl, 1:20, get("+"), 3)
## A bootstrapping example, which can be done in many ways:
clusterEvalQ(cl, {
## set up each worker. Could also use clusterExport()
library(boot)
cd4.rg <- function(data, mle) MASS::mvrnorm(nrow(data), mle$m, mle$v)
cd4.mle <- list(m = colMeans(cd4), v = var(cd4))
NULL
})
res <- clusterEvalQ(cl, boot(cd4, corr, R = 100,
sim = "parametric", ran.gen = cd4.rg, mle = cd4.mle))
library(boot)
cd4.boot <- do.call(c, res)
boot.ci(cd4.boot, type = c("norm", "basic", "perc"),
conf = 0.9, h = atanh, hinv = tanh)
stopCluster(cl)
## or
library(boot)
run1 <- function(...) {
library(boot)
cd4.rg <- function(data, mle) MASS::mvrnorm(nrow(data), mle$m, mle$v)
cd4.mle <- list(m = colMeans(cd4), v = var(cd4))
boot(cd4, corr, R = 500, sim = "parametric",
ran.gen = cd4.rg, mle = cd4.mle)
}
cl <- makeCluster(mc <- getOption("cl.cores", 2))
## to make this reproducible
clusterSetRNGStream(cl, 123)
cd4.boot <- do.call(c, parLapply(cl, seq_len(mc), run1))
boot.ci(cd4.boot, type = c("norm", "basic", "perc"),
conf = 0.9, h = atanh, hinv = tanh)
stopCluster(cl)
``` |