SnowParam-class: Enable simple network of workstations (SNOW)-style parallel...

SnowParam-classR Documentation

Enable simple network of workstations (SNOW)-style parallel evaluation

Description

This class is used to parameterize simple network of workstations (SNOW) parallel evaluation on one or several physical computers. snowWorkers() chooses the number of workers.

Usage


## constructor
## ------------------------------------

SnowParam(workers = snowWorkers(type), type=c("SOCK", "MPI", "FORK"),
    tasks = 0L, stop.on.error = TRUE,
    progressbar = FALSE, RNGseed = NULL,
    timeout = WORKER_TIMEOUT, exportglobals = TRUE, exportvariables = TRUE,
    log = FALSE, threshold = "INFO", logdir = NA_character_,
    resultdir = NA_character_, jobname = "BPJOB",
    force.GC = FALSE, fallback = TRUE,
    manager.hostname = NA_character_, manager.port = NA_integer_,
    ...)

## coercion
## ------------------------------------

## as(SOCKcluster, SnowParam)
## as(spawnedMPIcluster,SnowParam)

## detect workers
## ------------------------------------

snowWorkers(type = c("SOCK", "MPI", "FORK"))

Arguments

workers

integer(1) Number of workers. Defaults to the maximum of 1 or the number of cores determined by detectCores minus 2 unless environment variables R_PARALLELLY_AVAILABLECORES_FALLBACK or BIOCPARALLEL_WORKER_NUMBER are set otherwise. For a SOCK cluster, workers can be a character() vector of host names.

type

character(1) Type of cluster to use. Possible values are SOCK (default) and MPI. Instead of type=FORK use MulticoreParam.

tasks

integer(1). The number of tasks per job. value must be a scalar integer >= 0L.

In this documentation a job is defined as a single call to a function, such as bplapply, bpmapply etc. A task is the division of the X argument into chunks. When tasks == 0 (default), X is divided as evenly as possible over the number of workers.

A tasks value of > 0 specifies the exact number of tasks. Values can range from 1 (all of X to a single worker) to the length of X (each element of X to a different worker).

When the length of X is less than the number of workers each element of X is sent to a worker and tasks is ignored.

stop.on.error

logical(1) Enable stop on error.

progressbar

logical(1) Enable progress bar (based on plyr:::progress_text).

RNGseed

integer(1) Seed for random number generation. The seed is used to set a new, independent random number stream for each element of X. The ith element recieves the same stream seed, regardless of use of SerialParam(), SnowParam(), or MulticoreParam(), and regardless of worker or task number. When RNGseed = NULL, a random seed is used.

timeout

numeric(1) Time (in seconds) allowed for worker to complete a task. This value is passed to base::setTimeLimit() as both the cpu and elapsed arguments. If the computation exceeds timeout an error is thrown with message 'reached elapsed time limit'.

exportglobals

logical(1) Export base::options() from manager to workers? Default TRUE.

exportvariables

logical(1) Automatically export the variables which are defined in the global environment and used by the function from manager to workers. Default TRUE.

log

logical(1) Enable logging.

threshold

character(1) Logging threshold as defined in futile.logger.

logdir

character(1) Log files directory. When not provided, log messages are returned to stdout.

resultdir

character(1) Job results directory. When not provided, results are returned as an R object (list) to the workspace.

jobname

character(1) Job name that is prepended to log and result files. Default is "BPJOB".

force.GC

logical(1) Whether to invoke the garbage collector after each call to FUN. The default (FALSE, do not explicitly call the garbage collection) rarely needs to be changed.

fallback

logical(1) When TRUE, fall back to using SerialParam when SnowParam has not been started and the number of worker is no greater than 1.

manager.hostname

character(1) Host name of manager node. See 'Global Options', below.

manager.port

integer(1) Port on manager with which workers communicate. See 'Global Options', below.

...

Additional arguments passed to makeCluster

Details

SnowParam is used for distributed memory computing and supports 2 cluster types: ‘SOCK’ (default) and ‘MPI’. The SnowParam builds on infrastructure in the snow and parallel packages and provides the additional features of error handling, logging and writing out results.

See ?BIOCPARALLEL_WORKER_NUMBER to control the default and maximum number of workers.

error handling:

By default all computations are attempted and partial results are returned with any error messages.

  • stop.on.error A logical. Stops all jobs as soon as one job fails or wait for all jobs to terminate. When FALSE, the return value is a list of successful results along with error messages as 'conditions'.

  • The bpok(x) function returns a logical() vector that is FALSE for any jobs that threw an error. The input x is a list output from a bp*apply function such as bplapply or bpmapply.

logging:

When log = TRUE the futile.logger package is loaded on the workers. All log messages written in the futile.logger format are captured by the logging mechanism and returned real-time (i.e., as each task completes) instead of after all jobs have finished.

Messages sent to stdout and stderr are returned to the workspace by default. When log = TRUE these are diverted to the log output. Those familiar with the outfile argument to makeCluster can think of log = FALSE as equivalent to outfile = NULL; providing a logdir is the same as providing a name for outfile except that BiocParallel writes a log file for each task.

The log output includes additional statistics such as memory use and task runtime. Memory use is computed by calling gc(reset=TRUE) before code evaluation and gc() (no reseet) after. The output of the second gc() call is sent to the log file.

log and result files:

Results and logs can be written to a file instead of returned to the workspace. Writing to files is done from the master as each task completes. Options can be set with the logdir and resultdir fields in the constructor or with the accessors, bplogdir and bpresultdir.

random number generation:

For MulticoreParam, SnowParam, and SerialParam, random number generation is controlled through the RNGseed = argument. BiocParallel uses the L'Ecuyer-CMRG random number generator described in the parallel package to generate independent random number streams. One stream is associated with each element of X, and used to seed the random number stream for the application of FUN() to X[[i]]. Thus setting RNGseed = ensures reproducibility across MulticoreParam(), SnowParam(), and SerialParam(), regardless of worker or task number. The default value RNGseed = NULL means that each evaluation of bplapply proceeds independently.

For details of the L'Ecuyer generator, see ?clusterSetRNGStream.

NOTE: The PSOCK cluster from the parallel package does not support cluster options scriptdir and useRscript. PSOCK is not supported because these options are needed to re-direct to an alternate worker script located in BiocParallel.

Constructor

SnowParam(workers = snowWorkers(), type=c("SOCK", "MPI"), tasks = 0L, stop.on.error = FALSE, progressbar = FALSE, RNGseed = NULL, timeout = Inf, exportglobals = TRUE, exportvariables = TRUE, log = FALSE, threshold = "INFO", logdir = NA_character_, resultdir = NA_character_, jobname = "BPJOB", manager.hostname = NA_character_, manager.port = NA_integer_, ...):

Return an object representing a SNOW cluster. The cluster is not created until bpstart is called. Named arguments in ... are passed to makeCluster.

Accessors: Logging and results

In the following code, x is a SnowParam object.

bpprogressbar(x), bpprogressbar(x) <- value:

Get or set the value to enable text progress bar. value must be a logical(1).

bpjobname(x), bpjobname(x) <- value:

Get or set the job name.

bpRNGseed(x), bpRNGseed(x) <- value:

Get or set the seed for random number generaton. value must be a numeric(1) or NULL.

bplog(x), bplog(x) <- value:

Get or set the value to enable logging. value must be a logical(1).

bpthreshold(x), bpthreshold(x) <- value:

Get or set the logging threshold. value must be a character(1) string of one of the levels defined in the futile.logger package: “TRACE”, “DEBUG”, “INFO”, “WARN”, “ERROR”, or “FATAL”.

bplogdir(x), bplogdir(x) <- value:

Get or set the directory for the log file. value must be a character(1) path, not a file name. The file is written out as BPLOG.out. If no logdir is provided and bplog=TRUE log messages are sent to stdout.

bpresultdir(x), bpresultdir(x) <- value:

Get or set the directory for the result files. value must be a character(1) path, not a file name. Separate files are written for each job with the prefix TASK (e.g., TASK1, TASK2, etc.). When no resultdir is provided the results are returned to the session as list.

Accessors: Back-end control

In the code below x is a SnowParam object. See the ?BiocParallelParam man page for details on these accessors.

  • bpworkers(x), bpworkers(x) <- value, bpnworkers(x)

  • bptasks(x), bptasks(x) <- value

  • bpstart(x)

  • bpstop(x)

  • bpisup(x)

  • bpbackend(x), bpbackend(x) <- value

Accessors: Error Handling

In the code below x is a SnowParam object. See the ?BiocParallelParam man page for details on these accessors.

  • bpstopOnError(x), bpstopOnError(x) <- value

Methods: Evaluation

In the code below BPPARAM is a SnowParam object. Full documentation for these functions are on separate man pages: see ?bpmapply, ?bplapply, ?bpvec, ?bpiterate and ?bpaggregate.

  • bpmapply(FUN, ..., MoreArgs=NULL, SIMPLIFY=TRUE, USE.NAMES=TRUE, BPPARAM=bpparam())

  • bplapply(X, FUN, ..., BPPARAM=bpparam())

  • bpvec(X, FUN, ..., AGGREGATE=c, BPPARAM=bpparam())

  • bpiterate(ITER, FUN, ..., BPPARAM=bpparam())

  • bpaggregate(x, data, FUN, ..., BPPARAM=bpparam())

Methods: Other

In the code below x is a SnowParam object.

show(x):

Displays the SnowParam object.

bpok(x):

Returns a logical() vector: FALSE for any jobs that resulted in an error. x is the result list output by a BiocParallel function such as bplapply or bpmapply.

Coercion

as(from, "SnowParam"):

Creates a SnowParam object from a SOCKcluster or spawnedMPIcluster object. Instances created in this way cannot be started or stopped.

Global Options

The environment variable BIOCPARALLEL_WORKER_NUMBER and the the global option mc.cores influences the number of workers determined by snowWorkers() (described above) or multicoreWorkers() (see multicoreWorkers).

Workers communicate to the master through socket connections. Socket connections require a hostname and port. These are determined by arguments manager.hostname and manager.port; default values are influenced by global options.

The default manager hostname is "localhost" when the number of workers are specified as a numeric(1), and Sys.info()[["nodename"]] otherwise. The hostname can be over-ridden by the envirnoment variable MASTER, or the global option bphost (e.g., options(bphost=Sys.info()[["nodename"]]).

The default port is chosen as a random value between 11000 and 11999. The port may be over-ridden by the environment variable R_PARALLEL_PORT or PORT, and by the option ports, e.g., options(ports=12345L).

Author(s)

Martin Morgan and Valerie Obenchain.

See Also

  • register for registering parameter classes for use in parallel evaluation.

  • MulticoreParam for computing in shared memory

  • DoparParam for computing with foreach

  • SerialParam for non-parallel evaluation

Examples


## -----------------------------------------------------------------------
## Job configuration:
## -----------------------------------------------------------------------

## SnowParam supports distributed memory computing. The object fields
## control the division of tasks, error handling, logging and result
## format.
bpparam <- SnowParam()
bpparam

## Fields are modified with accessors of the same name:
bplog(bpparam) <- TRUE
dir.create(resultdir <- tempfile())
bpresultdir(bpparam) <- resultdir
bpparam

## -----------------------------------------------------------------------
## Logging:
## -----------------------------------------------------------------------

## When 'log == TRUE' the workers use a custom script (in BiocParallel)
## that enables logging and access to other job statistics. Log messages
## are returned as each job completes rather than waiting for all to
## finish.

## In 'fun', a value of 'x = 1' will throw a warning, 'x = 2' is ok
## and 'x = 3' throws an error. Because 'x = 1' sleeps, the warning
## should return after the error.

X <- 1:3
fun <- function(x) {
    if (x == 1) {
        Sys.sleep(2)
        log(-x)                    ## warning
    } else if (x == 2) {
        x                          ## ok
    } else if (x == 3) {
        sqrt("FOO")                ## error
    }
}

## By default logging is off. Turn it on with the bplog()<- setter
## or by specifying 'log = TRUE' in the constructor.
bpparam <- SnowParam(3, log = TRUE, stop.on.error = FALSE)
tryCatch({
    bplapply(X, fun, BPPARAM = bpparam)
}, error=identity)

## When a 'logdir' location is given the messages are redirected to a
## file:
## Not run: 
dir.create(logdir <- tempfile())
bplogdir(bpparam) <- logdir
bplapply(X, fun, BPPARAM = bpparam)
list.files(bplogdir(bpparam))

## End(Not run)

## -----------------------------------------------------------------------
## Managing results:
## -----------------------------------------------------------------------

## By default results are returned as a list. When 'resultdir' is given
## files are saved in the directory specified by job, e.g., 'TASK1.Rda',
## 'TASK2.Rda', etc.
## Not run: 
dir.create(resultdir <- tempfile())
bpparam <- SnowParam(2, resultdir = resultdir)
bplapply(X, fun, BPPARAM = bpparam)
list.files(bpresultdir(bpparam))

## End(Not run)

## -----------------------------------------------------------------------
## Error handling:
## -----------------------------------------------------------------------

## When 'stop.on.error' is TRUE the process returns as soon as an error
## is thrown.

## When 'stop.on.error' is FALSE all computations are attempted. Partial
## results are returned along with errors. Use bptry() to see the
## partial results
bpparam <- SnowParam(2, stop.on.error = FALSE)
res <- bptry(bplapply(list(1, "two", 3, 4), sqrt, BPPARAM = bpparam))
res

## Calling bpok() on the result list returns TRUE for elements with no
## error.
bpok(res)

## -----------------------------------------------------------------------
## Random number generation:
## -----------------------------------------------------------------------

## Random number generation is controlled with the 'RNGseed' field.
## This seed is passed to parallel::clusterSetRNGStream
## which uses the L'Ecuyer-CMRG random number generator and distributes
## streams for each job

bpparam <- SnowParam(3, RNGseed = 7739465)
bplapply(seq_len(bpnworkers(bpparam)), function(i) rnorm(1),
         BPPARAM = bpparam)


Bioconductor/BiocParallel documentation built on Oct. 31, 2024, 6:58 a.m.