Nothing
### These functions are modified from "task_pull.R" originally provided
### at "http://math.acadiau.ca/ACMMaC/Rmpi/examples.html".
###
### The "task_pull.R" is mainly designed to mimic Rmpi's legacy manager/workers
### parallel environment. However, the following functions are aiming
### for SPMD and presuming one of ranks (default 0) has the role of manager.
# e.g. Suppose everyone knows FUN and ..., then the code should like
#
# if(comm.rank() != 0){
# ret <- task.pull.workers(FUN)
# } else{
# ret <- task.pull.manager(jids = 1:100, FUN)
# }
#
# where ret is the results in manager, and NULL in all workers.
task.pull.workers <- function(FUN = function(jid, ...){ return(jid) },
..., rank.manager = .pbd_env$SPMD.CT$rank.root,
comm = .pbd_env$SPMD.CT$comm,
try = .pbd_env$SPMD.TP$try, try.silent = .pbd_env$SPMD.TP$try.silent){
### FUN <- function(jid, ...) is a user defined function.
### Note the use of the tag for sent messages:
### 1 = ready_for_job, 2 = done_job, and 3 = exiting.
### Note the use of the tag for received messages:
### 1 = job, and 2 = done_jobs.
done <- 0L
while(done != 1L){
### Signal being ready to receive a new job.
spmd.send.default(0L, rank.dest = rank.manager, tag = 1L, comm = comm,
check.type = FALSE)
### Receive a job id.
jid <- spmd.recv.default(rank.source = rank.manager,
tag = spmd.anytag(), comm = comm,
check.type = FALSE)
sourcetag <- spmd.get.sourcetag()
tag <- sourcetag[2]
if(tag == 1){
if(try){
tmp <- try(FUN(jid, ...), silent = try.silent)
} else{
tmp <- FUN(jid, ...)
}
ret <- list(jid = jid, ret = tmp)
### Send a results message back to the manager.
spmd.send.default(ret, rank.dest = rank.manager, tag = 2L, comm = comm,
check.type = FALSE)
} else if(tag == 2){
done <- 1L
}
### We'll just ignore any unknown messages.
}
spmd.send.default(0L, rank.dest = rank.manager, tag = 3L, comm = comm,
check.type = FALSE)
invisible()
} # End of task.pull.workers().
task.pull.manager <- function(jids, rank.manager = .pbd_env$SPMD.CT$rank.root,
comm = .pbd_env$SPMD.CT$comm){
### Check.
if(spmd.comm.rank(comm) != rank.manager){
comm.stop("Wrong manager id.")
}
if(spmd.comm.size(comm) == 1){
comm.stop("Comm size >= 2 is required.")
}
### Note the use of the tag for received messages:
### 1 = ready_for_job, 2 = done_job, and 3 = exiting.
### Note the use of the tag for sent messages:
### 1 = job, and 2 = done_jobs.
closed.workers <- 0L
n.workers <- spmd.comm.size(comm) - 1
# ret <- vector("list", n.workers)
# for(i in 1:n.workers){
# ret[[i]] <- list()
# }
if(! all(jids > 0) || ! is.integer(jids)){
comm.stop("jids should be all positive integers.")
}
ret <- vector("list", max(jids))
while(closed.workers < n.workers){
### Receive a message from a worker.
ret.worker <- spmd.recv.default(rank.source = spmd.anysource(),
tag = spmd.anytag(), comm = comm,
check.type = FALSE)
sourcetag <- spmd.get.sourcetag()
worker.id <- sourcetag[1]
tag <- sourcetag[2]
if(tag == 1L){
### The worker is ready for a job.
### Give the next job to the worker if there is any available.
### Or, tell the worker that it's works are all done if there is none.
if(length(jids) > 0){
### Send a job, and then remove it from the job list.
spmd.send.default(jids[1], rank.dest = worker.id, tag = 1L, comm = comm,
check.type = FALSE)
jids <- jids[-1]
} else{
spmd.send.default(0L, rank.dest = worker.id, tag = 2L, comm = comm,
check.type = FALSE)
}
} else if (tag == 2L){
### The message contains results. Do something with the results.
### Store them in the data structure.
# ret[[worker.id]][[length(ret[[worker.id]]) + 1]] <- ret.worker
ret[[ret.worker$jid]] <- ret.worker$ret
} else if(tag == 3L){
### A worker has closed down.
closed.workers <- closed.workers + 1
}
}
ret
} # End of task.pull.manager().
task.pull <- function(jids, FUN, ...,
rank.manager = .pbd_env$SPMD.CT$rank.root,
comm = .pbd_env$SPMD.CT$comm, bcast = .pbd_env$SPMD.TP$bcast,
barrier = .pbd_env$SPMD.TP$barrier,
try = .pbd_env$SPMD.TP$try, try.silent = .pbd_env$SPMD.TP$try.silent){
if(spmd.comm.rank(comm) != rank.manager){
ret <- task.pull.workers(FUN, ..., rank.manager = rank.manager, comm = comm,
try = try, try.silent = try.silent)
} else{
ret <- task.pull.manager(jids, rank.manager = rank.manager, comm = comm)
}
if(bcast){
ret <- spmd.bcast.object(ret, rank.source = rank.manager, comm = comm)
}
if(barrier){
spmd.barrier(comm = comm)
}
ret
} # End of task.pull().
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.