## ---
## title: "rrqueue messages"
## author: "Rich FitzJohn"
## date: "`r Sys.Date()`"
## output: rmarkdown::html_vignette
## vignette: >
## %\VignetteIndexEntry{rrqueue messages}
## %\VignetteEngine{knitr::rmarkdown}
## %\VignetteEncoding{UTF-8}
## ---
##+ echo=FALSE, results="hide"
rrqueue:::queue_clean(redux::hiredis(), "myqueue",
purge=TRUE, stop_workers="kill")
lang_output <- function(x, lang) {
cat(c(sprintf("```%s", lang), x, "```"), sep="\n")
}
make_reader <- function(filename, lang="plain") {
n <- 0
force(filename)
force(lang)
function() {
txt <- readLines(logfile)
if (n > 0) {
txt <- txt[-seq_len(n)]
}
n <<- n + length(txt)
lang_output(txt, lang)
}
}
## In addition to passing tasks (and results) between a controller and
## workers, the controller can also send "messages" to workers. This
## vignette shows what the possible messages do.
## In order to do this, we're going to need a queue and a worker:
obj <- rrqueue::queue("myqueue", sources="myfuns.R")
logfile <- tempfile()
worker_id <- rrqueue::worker_spawn("myqueue", logfile)
## On startup the worker log contains:
##+ results="asis", echo=FALSE
reader <- make_reader(logfile)
reader()
## Because one of the main effects of messages is to print to the
## worker logfile, we'll print this fairly often.
## ## Messages and responses
## 1. The queue sends a message for one or more workers to process.
## The message has an *identifier* that is derived from the current
## time. Messages are written to a first-in-first-out queue, *per
## worker*, and are processed independently by workers who do not
## look to see if other workers have messages or are processing
## them.
##
## 2. As soon as a worker has finished processing any current job it
## will process the message (it must wait to finish a current job
## but will not start any further jobs).
##
## 3. Once the message has been processed (see below) a response will
## be written to a response list with the same identifier as the
## message.
## ## `PING`
## The `PING` message simply asks the worker to return `PONG`. It's
## useful for diagnosing communication issues because it does so
## little
message_id <- obj$send_message("PING")
## The message id is going to be useful for getting responses:
message_id
## (this is derived from the current time, according to Redis which is
## the central reference point of time for the whole system).
## wait a little while:
Sys.sleep(.5)
##+ results="asis", echo=TRUE
reader()
## The logfile prints:
## 1. the request for the `PING` (`MESSAGE PING`)
## 2. the value `PONG` to the R message stream
## 3. logging a response (`RESPONSE PONG`), which means that something is written to the response stream.
## We can access the same bits of information in the worker log:
obj$workers_log_tail(n=Inf)[-1]
## This includes the `ALIVE` and `ENVIR` bits as the worker comes up.
## Inspecting the logs is fine for interactive use, but it's going to
## be more useful often to poll for a response.
## We already know that our worker has a response, but we can ask anyway:
obj$has_responses(message_id)
## Or inversely we can as what messages a given worker has responses for:
obj$response_ids(worker_id)
## To fetch the responses from all workers it was sent to (always
## returning a named list):
obj$get_responses(message_id)
## or to fetch the response from a given worker:
obj$get_response(message_id, worker_id)
## The response can be deleted by passing `delete=TRUE` to this method:
obj$get_response(message_id, worker_id, delete=TRUE)
## after which recalling the message will throw an error:
##+ error=TRUE
obj$get_response(message_id, worker_id, delete=TRUE)
## There is also a `wait` argument that lets you wait until a response
## is ready. The `slowdouble` command will take a few seconds, so to
## demonstrate:
obj$enqueue(slowdouble(2))
message_id <- obj$send_message("PING")
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
## Looking at the log will show what went on here:
obj$workers_log_tail(n=4)[-1]
## 1. A task is recieved
## 2. 2s later the task is completed
## 3. Then the message is recieved
## 4. Then, basically instantaneously, the message is responded to
## However, because the message is only processed after the task is
## completed, the response takes a while to come back. Equivalently,
## from the worker log:
##+ results="asis", echo=FALSE
reader()
## ## `ECHO`
## This is basically like `PING` and not very interesting; it prints
## an arbitrary string to the log. It always returns `"OK"` as a
## response.
message_id <- obj$send_message("ECHO", "hello world!")
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
##+ results="asis", echo=FALSE
reader()
## ## `INFO`
## The `INFO` command refreshes and returns the worker information.
## We already have a copy of the worker info; it was created when the
## worker started up:
obj$workers_info()[[worker_id]]
## Note that the `envir` field is currently empty (`{}`) because when
## the worker started it did not know about any environments.
message_id <- obj$send_message("INFO")
## Here's the new worker information, complete with an updated `envir`
## field:
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
## This has been updated on the database copy too:
obj$workers_info()[[worker_id]]$envir
## and the same information is printed to the worker log:
##+ results="asis", echo=FALSE
reader()
## ## `DIR`
## This is useful for listing directory contents, similar to the `dir`
## function in R. However, because file *contents* are usually more
## interesting (e.g., working out why something is not running on the
## remote machine), this is basically the result of passing the
## results of `dir` to `tools::md5sum` in order to get the md5sum of
## the file.
message_id <- obj$send_message("DIR")
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
## Additional arguments to `dir` can be passed through:
message_id <- obj$send_message("DIR", list(pattern="\\.R$"))
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
## If you pass in invalid arguments to `dir`, then a reasonably
## helpful message should be generated:
message_id <- obj$send_message("DIR", list(foo="bar"))
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
## (note that this does not generate an error locally, but you can
## test to see if it did throw an error by checking the class of the
## returned value).
## and the same information is printed to the worker log:
##+ results="asis", echo=FALSE
reader()
## ## `PUSH`
## The commands `PUSH` and `PULL` move files from and to the worker.
## The command is interpreted as an instruction to the worker so
## `PUSH` pushes files from the worker into the database while `PULL`
## pulls files from the database into the worker. There are (will be)
## prefereable higher-level ways of dealing with this.
## Things to be aware of here: Redis is an in memory store and rrqueue
## is not at all agressive about deleting objects. If you push a 1GB
## file into Redis things *will* go badly. There are no checks for
## this at present!
## `PUSH` takes a vector of filename as an argument. The response is
## not the file itself (how could it do that?) but instead the *hash*
## of that file. By the time the response is recieved the file
## contents are stored in the database and can be returned.
message_id <- obj$send_message("PUSH", "myfuns.R")
res <- obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
res
## We can save the file onto a temporary directory in the filesystem
## using the \code{files_unpack} method of \code{queue}:
path <- obj$files_unpack(res)
dir(path)
## And the files have the expected hash:
tools::md5sum(file.path(path, names(res)))
##+ results="asis", echo=FALSE
reader()
## ## `PULL`
## This is the inverse of `PUSH` and takes files from the machine the
## queue is running on and copies them into the worker (from the view
## of the worker, the files in question are already in the database
## and it will "pull" them down locally.
## First, we need to save files into the database. Let's rename the
## temporary file above and save that:
file.rename(file.path(path, "myfuns.R"),
"brandnewcode.R")
res <- obj$files_pack("brandnewcode.R")
res
## Note that the hash here is the same as above: `rrqueue` can tell
## this is the same file even though it has the same filename. Note
## also that filenames will be interepted relative to the working
## directory, because the directory layout on the worker outside of
## this point could be arbitrarily different.
## Now the the files have been packed, we can run the PULL command:
## (note that the `PULL` command *always* unpacks files into the
## workers working directory).
message_id <- obj$send_message("PULL")
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
## And the new file will be present in the directory:
message_id <- obj$send_message("DIR", list(pattern="\\.R$"))
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
##+ results="asis", echo=FALSE
reader()
## ## `EVAL`
## Evaluate an arbitrary R expression, passed as a string (*not* as
## any sort of unevaluated or quoted expression). This expression is
## evaluated in the global environment, which is *not* the environment
## in which queued code is evaluated in.
message_id <- obj$send_message("EVAL", "1 + 1")
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
## We can delete the file created above:
message_id <- obj$send_message("EVAL", "file.remove('brandnewcode.R')")
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
##+ results="asis", echo=FALSE
reader()
## This could be used to evaluate code that has side effects, such as
## installing packages. However, due to limitations with how R loads
## packages the only way to update and reload a package is going to be
## to restart the worker.
## ## `STOP`
## Stop sends a shutdown message to the worker. Generally you should
## prefer the `stop_workers` method, which uses `STOP` behind the
## scenes.
message_id <- obj$send_message("STOP")
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
##+ results="asis", echo=FALSE
reader()
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.