mpifarm: Farm out jobs across a cluster using MPI

Description Usage Arguments Details Value Author(s) See Also Examples

Description

Farm out a procedure (an arbitrary block of R-code) to a cluster of slaves, with different values of the variables.

Usage

1
2
3
4
5
6
mpi.farm(proc, joblist, common = list(), status = NULL, chunk = 1,
         stop.condition = TRUE, info = TRUE,
         checkpoint = 0, checkpoint.file = NULL,
         max.backup = 20, sleep = 0.01, blocking = FALSE,
         verbose = getOption("verbose"))
mpi.farm.slave(fn, common=list(), verbose = getOption("verbose"))

Arguments

proc

An arbitrary block of R code that will be executed for each element in joblist. At each execution, this code will be evaluated in an environment consisting of the corresponding element of joblist plus the common list. That is, when the k-th job is executed, the variables referred to in proc will be sought for in joblist[[k]] and, if they are not found there, in common.

joblist

A list of the jobs that will be farmed out. Each element of joblist should be a named list, defining some or all of the variables referred to by the code in proc.

common

An optional list of variables common to all the jobs.

status

An optional integer vector giving the status of the individual jobs in joblist. This will typically only arise when restarting from a checkpoint file.

chunk

optional integer; the chunk size to be used. By default, chunk=1 and each job in joblist is sent individually to a slave and returned when finished. If chunk>1, chunks of this size are sent and processed sequentially. In other words, chunk adjusts the granularity of the parallel computation.

stop.condition

After each execution, this expression will be evaluated in the context of the return-value of proc. For this option to have an effect, proc must return a named list and stop.condition must evaluate to TRUE or FALSE in the context of that list. If it evaluates to FALSE, the job is not finished; the return-value of proc will be queued for another round of processing. If it evaluates to TRUE, the job is finished. This allows one to complete the calcuation in several steps, which in turn allows for more effective load-balancing. If the return-value of proc is not a list, this option has no effect.

info

If info=TRUE, information on the progress of each slave will be printed.

checkpoint

optional integer specifying the granularity of checkpointing. That is, the checkpoint file will be saved once every checkpoint jobs from joblist are completed. If checkpoint<1, no checkpointing will be performed.

checkpoint.file

optional filename. If checkpoint.file is specified, then once every checkpoint jobs are finished, checkpoint information will be saved to a binary-format file with this name. This file will contain the joblist and common lists (the former having been updated) and an integer vector status, which indicates the status of each element in joblist. mpi.farm does not itself ever load the checkpoint file.

max.backup

positive integer; maximum number of backup checkpoint files that will be created.

sleep

amount of time (in seconds) that the master will sleep between consecutive polls of the slave pool.

blocking

If TRUE, blocking MPI calls will be used. If FALSE non-blocking MPI calls are used for greater evenness and efficiency in the scheduling.

verbose

logical; if TRUE, information will be printed both by the master and the slaves.

fn

A arbitrary block of R code, deparsed to a string.

Details

A higher-level code is mpi.farmer, which calls mpi.farm and handles checkpointing. However, it may sometimes happen that a user wants to call mpi.farm directly. mpi.farm.slave does the work on the slaves and should never need to be called by the user.

mpi.farm will execute the code in proc repeatedly in environments defined by the entries of joblist and the common environment common. If Rmpi slaves have been spawned, the jobs will be farmed out to them according to a load-balancing algorithm; if no slaves are running, or Rmpi has not been loaded, the jobs will be executed serially.

For many of the applications envisioned, the jobs the slaves are assigned involve stochastic simulations. Because of the way that R initializes its pseudorandom number generators (RNGs), it is easy to make the mistake of failing to initialize the RNGs on different slaves to different states. If one fails to do this (and doesn't use a sophisticated parallel RNG like SPRNG) then it is possible that the random numbers generated on different slaves will be correlated or even identical. For this reason, it is a good idea to set the seed of the RNG as part of the block of code proc. Storing the state of the RNG before doing so is often desirable, but this can be frustrating if the RNG has not been initialized. mpi.farm checks to see if .Random.seed exists and, if it does not, initializes the RNG with a call to runif. Thus, the user is guaranteed that the RNG has been initialized on each slave.

A user interrupt to mpi.farm results in an attempt to terminate the slaves cleanly. This may take some time, since each slave has to finish the job it is currently working on before it becomes receptive to messages from the master. A user interrupt issued during the abort process will leave some finished jobs in the MPI queue and therefore compromise the integrity of future parallel computations. For this reason, when it is necessary to abort mpi.farm and not possible to allow it to terminate cleanly, it is recommended that the slaves be closed (via mpi.close.Rslaves) and restarted before further parallel computations are attempted.

Value

The value returned by mpi.farm and farm is a list with one entry for each of the elements in joblist. The elements in the result correspond to the elements in the joblist argument.

Author(s)

Aaron A. King

See Also

mpi.farmer

Examples

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
  ## Not run: 
    mpi.spawn.Rslaves(needlog=F)

    x <- lapply(1:100,function(k)list(a=k,b=rnorm(1)))
    y <- mpi.farm(a+b,x)
    print(unlist(y))

    x <- lapply(1:100,function(k)list(a=k,b=0,done=0))
    y <- mpi.farm({
                   b <- b+rnorm(1)
                   list(a=a,b=b,done=done+1)
                   },x,stop.condition=((abs(b)>2)|(done>10)))

    mpi.close.Rslaves()

  
## End(Not run)
## run some jobs in serial mode
  x <- lapply(1:100,function(k)list(a=k,b=0,done=0))
  y <- mpi.farm({
                 b <- b+rnorm(1)
                 list(a=a,b=b,done=done+1)
                 },x,
                 stop.condition=((abs(b)>2)|(done>10)))

kingaa/mpifarm documentation built on May 20, 2019, 10:01 a.m.