knitr::opts_chunk$set( collapse = TRUE, error = FALSE, comment = "#>" ) r_output <- function(x) { cat(c("```r", x, "```"), sep = "\n") }
If you have thousands and thousands of jobs to submit at once you may not want to flood the cluster with them all at once. Each job submission is relatively slow (the HPC tools that the web interface has to use are relatively slow). The actual queue that the cluster uses doesn't seem to like processing tens of thousands of job, and can slow down. And if you take up the whole cluster someone may come and knock on your office and complain at you. At the same time, batching your jobs up into little bits and manually sending them off is a pain and work better done by a computer.
An alternative is to submit a set of "workers" to the cluster, and
then submit jobs to them. This is done with the
rrq
package, along with a
redis
server running on the cluster.
To get started you will need to install the rrq
package locally.
drat::add("mrc-ide") install.packages("rrq")
Then construct the context as before
root <- "contexts" ctx <- context::context_save(root, sources = "mysources.R")
There are two ways we can proceed from here; the first - "workers" - is very similar to the non-worker workflow and is described first. The second - "rrq" - is a bit more involved and is described second.
Then configure and create the queue; the use_workers
argument is important here as it:
rrq
package is available on the cluster, where your workers will run$enqueue
method so that jobs are not sent to the HPC scheduler but to the rrq
scheduler$submit_workers
method which you will use to create workers on the clusterHowever, everything else will appear the same.
config <- didehpc::didehpc_config(use_workers = TRUE) obj <- didehpc::queue_didehpc(ctx, config = config)
You can now submit
t <- obj$enqueue(random_walk(0, 10))
This job will stay pending forever as the HPC scheduler will never run it
t$status() t$times()
You must submit actual workers in order to actually run things. This could have been done before submitting the tasks, though workers will time out after 10 minutes of inactivity, if you have very many jobs to save your workers might exit before the work starts!
The argument is the number of workers to submit. Each worker is equivalent to a job that your configuration would otherwise create (in terms of cores selected).
workers <- obj$submit_workers(2) workers
All workers get names in the form <adjective>_<animal>_<integer>
so that you can remember which workers you set off. They will turn
off after 10 minutes of inactivity by default (you can tweak this
with the worker_timeout
argument to didehpc_config
or by
sending a TIMEOUT_SET
message).
One advantage over the usual queuing approach here is that you will not wait for anyone else's jobs to complete once you have reserved your workers.
t$wait(10)
We're going to interact with the rrq object a bit
rrq <- obj$rrq_controller() rrq
This is another R6 object, though this one at least has decent documentation - see the rrq::rrq_controller
for details of each method
You can see what your workers have been up to with the
workers_log_tail
command:
rrq$worker_log_tail(n = Inf)
The time
column represents seconds - relative seconds should still be useful here.
As before, logging works on a per-task basis:
t$log()
Find out how long your workers will persist for:
rrq$message_send_and_wait("TIMEOUT_GET", worker_ids = workers)
Other than that, hopefully everything else continues as normal. We
can submit a bunch of jobs and run them using $lapply
:
sizes <- 3:8 grp <- obj$lapply(sizes, random_walk, x = 0)
Task status:
grp$status()
Collect the results:
res <- grp$wait(5) res
While workers will turn off automatically, it's polite to turn them
off as soon as you're done using obj$stop_workers()
Alternatively, after submitting a bunch of jobs you can run
rrq$message_send("TIMEOUT_SET", 0)
which will mean that the workers will stop immediately after not receiving a task (so after they finish processing all your jobs they'll stop one by one). Practically this still takes one minute because that's the polling timeout time (I may be able to improve this later).
obj$stop_workers() Sys.sleep(1) rrq$worker_log_tail(workers, n = Inf) rrq$destroy()
In this model, we create a very lightweight queue which in turn creates very lightweight tasks. This avoids even more overhead than the approach above, though it can be more difficult to debug because less information is saved. Rather than round-tripping data through the disk, everything goes via the redis server.
The first part here looks very similar, except that we use use_rrq = TRUE
rather than use_workers
config <- didehpc::didehpc_config(use_rrq = TRUE) obj <- didehpc::queue_didehpc(ctx, config = config)
We still submit workers
workers <- obj$submit_workers(10) workers
To send tasks to these workers we directly use the rrq_controller
object - we'll not use the queue_didehpc
object from this point.
rrq <- obj$rrq_controller()
This will look and act a lot like the main didehpc queue controller, but with a few differences. Tasks will come back as plain strings rather than user-friendly objects and lapply
and enqueue_bulk
are now blocking operations by default. Most tasks will clean up after they delete rather than leaving a persistent record on disk. The payback for this is potentially very fast task turnarounds and better behaviour with the disk under heavy load.
t <- rrq$enqueue(sin(1)) rrq$task_wait(t, 10)
For example; submitting 50 trivial tasks to our pool of workers and retrieving the results:
system.time(res <- rrq$lapply(1:50, sin))
or 500 tasks:
system.time(res <- rrq$lapply(1:500, sin, progress = FALSE))
Across the network, the latency here is ~1/600 s per task. On fi--didemrchnb it will hopefully be a bit faster because of the infiniband network.
rrq$worker_stop() rrq$destroy()
It is theoretically possible to submit a cluster job that creates an rrq_controller
and controls the second queue. To do that you need to write function like:
get_rrq_controller <- function(x, ...) { queue_id <- Sys.getenv("CONTEXT_ID", "") stopifnot(queue_id != "") rrq::rrq_controller$new((queue_id) }
within your sources, then you can use it in place of running (say) a lapply()
call in your code. This approach allows a relatively simple form of inter-process communication. Talk to Rich if this is something you might have a use for, if you have simulation needs that are larger than a single node.
Suppose that we want to submit a job where we have a single process which orchestrates a group of workers each of which takes up an entire node. We used this pattern in the covid response where we wanted to run different MCMC chains on different nodes, each using 32 cores, but we also needed a single "controlling" process to organise collecting results from these nodes.
What we want to do do is specify a different set of resources to be used by the workers than by tasks submitted by $enqueue()
. Note that this only makes sense when using use_rrq = TRUE
. For example, to submit a controlling process that uses the GeneralNodes
template and one core (the default) but worker processes that can use 8 cores, we might write
config <- didehpc::didehpc_config( use_rrq = TRUE, worker_resource = didehpc::worker_resource(cores = 8)) config
See the worker_resource
section here indicates different resources to the resource
section.
Now, when submitting workers with obj$submit_workers
each worker will be able to use 8 cores, but a task submitted by $enqueue()
will only be able to use one. You might then submit a task that uses the get_rrq_controller
trick above as part of your single core job, which can then farm out work to your workers using the rrq
queue object.
Above we use rrq$destroy()
to clean up every trace of the queue. Periodically we may flush the entire Redis database, or set all keys to expire after a day or so. Do not leave any important information in here please.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.