{r echo=FALSE, results="hide"}
rrqueue:::queue_clean(redux::hiredis(), "myqueue",
purge=TRUE, stop_workers="kill")
lang_output <- function(x, lang) {
cat(c(sprintf("
%s", lang), x, "```"), sep="\n")
}
make_reader <- function(filename, lang="plain") {
n <- 0
force(filename)
force(lang)
function() {
txt <- readLines(logfile)
if (n > 0) {
txt <- txt[-seq_len(n)]
}
n <<- n + length(txt)
lang_output(txt, lang)
}
}
In addition to passing tasks (and results) between a controller and workers, the controller can also send "messages" to workers. This vignette shows what the possible messages do. In order to do this, we're going to need a queue and a worker: ``` {r } obj <- rrqueue::queue("myqueue", sources="myfuns.R") logfile <- tempfile() worker_id <- rrqueue::worker_spawn("myqueue", logfile)
On startup the worker log contains: ``` {r results="asis", echo=FALSE} reader <- make_reader(logfile) reader()
Because one of the main effects of messages is to print to the worker logfile, we'll print this fairly often. ## Messages and responses 1. The queue sends a message for one or more workers to process. The message has an *identifier* that is derived from the current time. Messages are written to a first-in-first-out queue, *per worker*, and are processed independently by workers who do not look to see if other workers have messages or are processing them. 2. As soon as a worker has finished processing any current job it will process the message (it must wait to finish a current job but will not start any further jobs). 3. Once the message has been processed (see below) a response will be written to a response list with the same identifier as the message. ## `PING` The `PING` message simply asks the worker to return `PONG`. It's useful for diagnosing communication issues because it does so little ``` {r } message_id <- obj$send_message("PING")
The message id is going to be useful for getting responses:
message_id
(this is derived from the current time, according to Redis which is the central reference point of time for the whole system).
wait a little while:
Sys.sleep(.5)
``` {r results="asis", echo=TRUE} reader()
The logfile prints: 1. the request for the `PING` (`MESSAGE PING`) 2. the value `PONG` to the R message stream 3. logging a response (`RESPONSE PONG`), which means that something is written to the response stream. We can access the same bits of information in the worker log: ``` {r } obj$workers_log_tail(n=Inf)[-1]
This includes the ALIVE
and ENVIR
bits as the worker comes up.
Inspecting the logs is fine for interactive use, but it's going to be more useful often to poll for a response.
We already know that our worker has a response, but we can ask anyway:
obj$has_responses(message_id)
Or inversely we can as what messages a given worker has responses for:
obj$response_ids(worker_id)
To fetch the responses from all workers it was sent to (always returning a named list):
obj$get_responses(message_id)
or to fetch the response from a given worker:
obj$get_response(message_id, worker_id)
The response can be deleted by passing delete=TRUE
to this method:
obj$get_response(message_id, worker_id, delete=TRUE)
after which recalling the message will throw an error: ``` {r error=TRUE} obj$get_response(message_id, worker_id, delete=TRUE)
There is also a `wait` argument that lets you wait until a response is ready. The `slowdouble` command will take a few seconds, so to demonstrate: ``` {r } obj$enqueue(slowdouble(2)) message_id <- obj$send_message("PING") obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
Looking at the log will show what went on here:
obj$workers_log_tail(n=4)[-1]
However, because the message is only processed after the task is completed, the response takes a while to come back. Equivalently, from the worker log:
``` {r results="asis", echo=FALSE} reader()
## `ECHO` This is basically like `PING` and not very interesting; it prints an arbitrary string to the log. It always returns `"OK"` as a response. ``` {r } message_id <- obj$send_message("ECHO", "hello world!") obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
``` {r results="asis", echo=FALSE} reader()
## `INFO` The `INFO` command refreshes and returns the worker information. We already have a copy of the worker info; it was created when the worker started up: ``` {r } obj$workers_info()[[worker_id]]
Note that the envir
field is currently empty ({}
) because when
the worker started it did not know about any environments.
message_id <- obj$send_message("INFO")
Here's the new worker information, complete with an updated envir
field:
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
This has been updated on the database copy too:
obj$workers_info()[[worker_id]]$envir
and the same information is printed to the worker log: ``` {r results="asis", echo=FALSE} reader()
## `DIR` This is useful for listing directory contents, similar to the `dir` function in R. However, because file *contents* are usually more interesting (e.g., working out why something is not running on the remote machine), this is basically the result of passing the results of `dir` to `tools::md5sum` in order to get the md5sum of the file. ``` {r } message_id <- obj$send_message("DIR") obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
Additional arguments to dir
can be passed through:
message_id <- obj$send_message("DIR", list(pattern="\\.R$")) obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
If you pass in invalid arguments to dir
, then a reasonably
helpful message should be generated:
message_id <- obj$send_message("DIR", list(foo="bar")) obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
(note that this does not generate an error locally, but you can test to see if it did throw an error by checking the class of the returned value).
and the same information is printed to the worker log: ``` {r results="asis", echo=FALSE} reader()
## `PUSH` The commands `PUSH` and `PULL` move files from and to the worker. The command is interpreted as an instruction to the worker so `PUSH` pushes files from the worker into the database while `PULL` pulls files from the database into the worker. There are (will be) prefereable higher-level ways of dealing with this. Things to be aware of here: Redis is an in memory store and rrqueue is not at all agressive about deleting objects. If you push a 1GB file into Redis things *will* go badly. There are no checks for this at present! `PUSH` takes a vector of filename as an argument. The response is not the file itself (how could it do that?) but instead the *hash* of that file. By the time the response is recieved the file contents are stored in the database and can be returned. ``` {r } message_id <- obj$send_message("PUSH", "myfuns.R") res <- obj$get_response(message_id, worker_id, delete=TRUE, wait=10) res
We can save the file onto a temporary directory in the filesystem using the \code{files_unpack} method of \code{queue}:
path <- obj$files_unpack(res) dir(path)
And the files have the expected hash:
tools::md5sum(file.path(path, names(res)))
``` {r results="asis", echo=FALSE} reader()
## `PULL` This is the inverse of `PUSH` and takes files from the machine the queue is running on and copies them into the worker (from the view of the worker, the files in question are already in the database and it will "pull" them down locally. First, we need to save files into the database. Let's rename the temporary file above and save that: ``` {r } file.rename(file.path(path, "myfuns.R"), "brandnewcode.R") res <- obj$files_pack("brandnewcode.R") res
Note that the hash here is the same as above: rrqueue
can tell
this is the same file even though it has the same filename. Note
also that filenames will be interepted relative to the working
directory, because the directory layout on the worker outside of
this point could be arbitrarily different.
Now the the files have been packed, we can run the PULL command:
(note that the PULL
command always unpacks files into the
workers working directory).
message_id <- obj$send_message("PULL") obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
And the new file will be present in the directory:
message_id <- obj$send_message("DIR", list(pattern="\\.R$")) obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
``` {r results="asis", echo=FALSE} reader()
## `EVAL` Evaluate an arbitrary R expression, passed as a string (*not* as any sort of unevaluated or quoted expression). This expression is evaluated in the global environment, which is *not* the environment in which queued code is evaluated in. ``` {r } message_id <- obj$send_message("EVAL", "1 + 1") obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
We can delete the file created above:
message_id <- obj$send_message("EVAL", "file.remove('brandnewcode.R')") obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
``` {r results="asis", echo=FALSE} reader()
This could be used to evaluate code that has side effects, such as installing packages. However, due to limitations with how R loads packages the only way to update and reload a package is going to be to restart the worker. ## `STOP` Stop sends a shutdown message to the worker. Generally you should prefer the `stop_workers` method, which uses `STOP` behind the scenes. ``` {r } message_id <- obj$send_message("STOP") obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
{r results="asis", echo=FALSE}
reader()
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.