## ---
## title: "Introduction to rrqueue"
## author: "Rich FitzJohn"
## date: "`r Sys.Date()`"
## output: rmarkdown::html_vignette
## vignette: >
## %\VignetteIndexEntry{Introduction to rrqueue}
## %\VignetteEngine{knitr::rmarkdown}
## %\VignetteEncoding{UTF-8}
## ---
## # Overview
## `rrqueue` is a *distributed task queue* for R, implemented on top of [Redis](http://redis.io).
## At the cost of a little more work it allows for more flexible parallelisation than afforded by `mclapply`.
## The main goal is to support non-map style operations: submit some tasks, collect the completed results,
## queue more even while some tasks are still running.
## Other features include:
## * Low-level task submission / retrieval has a simple API so that asynchronous task queues can be created.
## * Objects representing tasks, workers, queues, etc can be queried.
## * While blocking `mclapply`-like functions are available, the package is designed to be non-blocking so that intermediate results can be used.
## * Automatic fingerprinting of environments so that code run on a remote machine will correspond to the code found locally.
## * Works well connecting to a Redis database running on the cloud (e.g., on an AWS machine over an ssh tunnel).
## * Local workers can be added to a remote pool, so long as everything can talk to the same Redis server.
## * The worker pool can be scaled at any time (up or down).
## * Basic fault tolerance, supporting re-queuing tasks lost on crashed workers.
## The basic workflow is:
## 1. Create a queue
## 2. Submit tasks to the queue
## 3. Start workers
## 4. Collect results
## The workers can be started at any time between 1-3, though they do need to be started before results can be collected.
## Documenting things that work asynchronously is difficult. This
## document gives a tutorial-style overview of working with rrqueue.
## # Getting started
## The queue and workers can be started in any order, but it's easiest
## to explain starting the queue first.
## Suppose we have some simulation code; it needs to be in a file that
## the queue can see. For now, I'll use the file `myfuns.R` which is
## the test code. It has a function in it called `slowdouble` that
## takes a number, sleeps for that many seconds, and then returns
## twice the number. It's useful for testing.
##+ echo=FALSE, results="hide"
rrqueue:::queue_clean(redux::hiredis(), "myqueue",
purge=TRUE, stop_workers="kill")
lang_output <- function(x, lang) {
cat(c(sprintf("```%s", lang), x, "```"), sep="\n")
}
cpp_output <- function(x) lang_output(x, "c++")
r_output <- function(x) lang_output(x, "r")
yaml_output <- function(x) lang_output(x, "yaml")
plain_output <- function(x) lang_output(x, "plain")
## You'll also need a running Redis server. I have one operating with
## the default parameters, so this works:
redux::hiredis()$PING()
## Create queue called "myqueue", tell it to load the source file
## "myfuns.R". If it was to load packages, then passing
## `packages=c("package1", "package2")` would indicate that workers
## would need to load those packages, too.
obj <- rrqueue::queue("myqueue", sources="myfuns.R")
## The message "creating new queue" here indicates that `rrqueue` did
## not find any previous queues in place. Queues are designed to be
## re-attachable so we can immediately just do that:
obj <- rrqueue::queue("myqueue", sources="myfuns.R")
## The message also notes that we have no workers available, so no
## work is going to get done. But we can still queue some tasks.
## # Queuing tasks
## The simplest sort of task queuing is to pass an expression into enqueue:
t <- obj$enqueue(1 + 1)
## The expression is not evaluated but stored and will be evaluated on
## the worker. Saving the result of this gives a `task` object which
## can be inspected.
t
## The expression stored in the task:
t$expr()
## The status of the task:
t$status()
## The result of the task, which will throw an error if we try to call it:
##+ error=TRUE
t$result()
## And how long the task has been waiting:
t$times()
## Tasks can use local variables, too:
x <- 10
t2 <- obj$enqueue(x * 2)
t2$expr()
## And because using unevaluated expressions can be problematic,
## `rrqueue` has a standard-evaluation version (`enqueue_`) which takes
## either strings representing expressions or quoted expressions:
### obj$enqueue_("x / 2")
obj$enqueue_(quote(x / 2))
## Now we have three tasks:
obj$tasks_list()
## All the tasks are waiting to be run:
obj$tasks_status()
## We can get an overview of the tasks:
obj$tasks_overview()
## # Starting workers
## `rrqueue` includes a script `rrqueue_worker` for starting workers
## from the command line (install with `rrqueue::install_scripts()`.
## Workers can also be started from within R using the `worker_spawn`
## function:
logfile <- tempfile()
wid <- rrqueue::worker_spawn("myqueue", logfile)
##+ echo=FALSE
Sys.sleep(.5)
## This function returns the *worker identifier*, which is also
## printed to the screen.
## It's probably informative at this point to read the logfile of the
## worker to see what it did on startup:
### See the RcppR6 tutorial for how to do this nicely.
##+ results="asis", echo=TRUE
plain_output(readLines(logfile))
## The worker first prints a lot of diagnostic information to the
## screen (or log file) indicating the name of the worker, the version
## of rrqueue, machine information, and special keys in the database
## where important information is stored.
## Then after broadcasting that it is awake (`ALIVE`) it detected that
## there was a controller on the queue and it attempts to construct
## the environment that the controller wants `r paste("ENVIR", obj$envir_id)`.
## After that, there are a series of `TASK_START`, `EXPR`, and
## `TASK_COMPLETE` lines as each of the three tasks is processed.
obj$tasks_status()
## The times here give an indication of the rrqueue overhead; the
## running time of these simple expressions should be close to zero.
obj$tasks_times()
## The task handle created before can now give a result:
t$result()
## Similarly, results can be retrieved from the queue directly:
obj$task_result(1)
obj$task_result(2)
obj$task_result(3)
## The worker that we created can be seen here:
obj$workers_list()
## Queue a slower task; this time the `slowdouble` function. This
## will take 1s:
##+ echo=TRUE
t <- obj$enqueue(slowdouble(1))
t$status()
Sys.sleep(.3)
t$status()
Sys.sleep(1)
t$status()
t$result()
## Again, times are available:
t$times()
## # Finishing up
obj$stop_workers()
##+ echo=FALSE
Sys.sleep(.5)
##+ results="asis", echo=FALSE
plain_output(readLines(logfile))
## worker is now in the exited list
obj$workers_list_exited()
## The full log from our worker (dropping the first column which is
## the worker id and takes up valuable space here):
obj$workers_log_tail(wid, Inf)[-1]
### This is unfortunate, but I really need things cleaned up nicely so
### that I can get tests to pass in R CMD check. Will try to expose
### this function soon.
##+ echo=FALSE, results="hide"
rrqueue:::queue_clean(obj$con, obj$queue_name, purge=TRUE)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.