Overview

rrqueue is a distributed task queue for R, implemented on top of Redis. At the cost of a little more work it allows for more flexible parallelisation than afforded by mclapply. The main goal is to support non-map style operations: submit some tasks, collect the completed results, queue more even while some tasks are still running.

Other features include:

The basic workflow is:

  1. Create a queue
  2. Submit tasks to the queue
  3. Start workers
  4. Collect results

The workers can be started at any time between 1-3, though they do need to be started before results can be collected.

Documenting things that work asynchronously is difficult. This document gives a tutorial-style overview of working with rrqueue.

Getting started

The queue and workers can be started in any order, but it's easiest to explain starting the queue first.

Suppose we have some simulation code; it needs to be in a file that the queue can see. For now, I'll use the file myfuns.R which is the test code. It has a function in it called slowdouble that takes a number, sleeps for that many seconds, and then returns twice the number. It's useful for testing.

{r echo=FALSE, results="hide"} rrqueue:::queue_clean(redux::hiredis(), "myqueue", purge=TRUE, stop_workers="kill") lang_output <- function(x, lang) { cat(c(sprintf("%s", lang), x, "```"), sep="\n") } cpp_output <- function(x) lang_output(x, "c++") r_output <- function(x) lang_output(x, "r") yaml_output <- function(x) lang_output(x, "yaml") plain_output <- function(x) lang_output(x, "plain")

You'll also need a running Redis server.  I have one operating with
the default parameters, so this works:
``` {r }
redux::hiredis()$PING()

Create queue called "myqueue", tell it to load the source file "myfuns.R". If it was to load packages, then passing packages=c("package1", "package2") would indicate that workers would need to load those packages, too.

obj <- rrqueue::queue("myqueue", sources="myfuns.R")

The message "creating new queue" here indicates that rrqueue did not find any previous queues in place. Queues are designed to be re-attachable so we can immediately just do that:

obj <- rrqueue::queue("myqueue", sources="myfuns.R")

The message also notes that we have no workers available, so no work is going to get done. But we can still queue some tasks.

Queuing tasks

The simplest sort of task queuing is to pass an expression into enqueue:

t <- obj$enqueue(1 + 1)

The expression is not evaluated but stored and will be evaluated on the worker. Saving the result of this gives a task object which can be inspected.

t

The expression stored in the task:

t$expr()

The status of the task:

t$status()

The result of the task, which will throw an error if we try to call it: ``` {r error=TRUE} t$result()

And how long the task has been waiting:
``` {r }
t$times()

Tasks can use local variables, too:

x <- 10
t2 <- obj$enqueue(x * 2)
t2$expr()

And because using unevaluated expressions can be problematic, rrqueue has a standard-evaluation version (enqueue_) which takes either strings representing expressions or quoted expressions:

obj$enqueue_(quote(x / 2))

Now we have three tasks:

obj$tasks_list()

All the tasks are waiting to be run:

obj$tasks_status()

We can get an overview of the tasks:

obj$tasks_overview()

Starting workers

rrqueue includes a script rrqueue_worker for starting workers from the command line (install with rrqueue::install_scripts(). Workers can also be started from within R using the worker_spawn function:

logfile <- tempfile()
wid <- rrqueue::worker_spawn("myqueue", logfile)

``` {r echo=FALSE} Sys.sleep(.5)

This function returns the *worker identifier*, which is also
printed to the screen.

It's probably informative at this point to read the logfile of the
worker to see what it did on startup:

``` {r results="asis", echo=TRUE}
plain_output(readLines(logfile))

The worker first prints a lot of diagnostic information to the screen (or log file) indicating the name of the worker, the version of rrqueue, machine information, and special keys in the database where important information is stored.

Then after broadcasting that it is awake (ALIVE) it detected that there was a controller on the queue and it attempts to construct the environment that the controller wants r paste("ENVIR", obj$envir_id).

After that, there are a series of TASK_START, EXPR, and TASK_COMPLETE lines as each of the three tasks is processed.

obj$tasks_status()

The times here give an indication of the rrqueue overhead; the running time of these simple expressions should be close to zero.

obj$tasks_times()

The task handle created before can now give a result:

t$result()

Similarly, results can be retrieved from the queue directly:

obj$task_result(1)
obj$task_result(2)
obj$task_result(3)

The worker that we created can be seen here:

obj$workers_list()

Queue a slower task; this time the slowdouble function. This will take 1s: ``` {r echo=TRUE} t <- obj$enqueue(slowdouble(1)) t$status() Sys.sleep(.3) t$status() Sys.sleep(1) t$status() t$result()

Again, times are available:
``` {r }
t$times()

Finishing up

obj$stop_workers()

``` {r echo=FALSE} Sys.sleep(.5)

``` {r results="asis", echo=FALSE}
plain_output(readLines(logfile))

worker is now in the exited list

obj$workers_list_exited()

The full log from our worker (dropping the first column which is the worker id and takes up valuable space here):

obj$workers_log_tail(wid, Inf)[-1]

{r echo=FALSE, results="hide"} rrqueue:::queue_clean(obj$con, obj$queue_name, purge=TRUE)



traitecoevo/rrqueue documentation built on May 31, 2019, 7:44 p.m.