{r echo=FALSE, results="hide"} rrqueue:::queue_clean(redux::hiredis(), "myqueue", purge=TRUE, stop_workers="kill") lang_output <- function(x, lang) { cat(c(sprintf("%s", lang), x, "```"), sep="\n") } make_reader <- function(filename, lang="plain") { n <- 0 force(filename) force(lang) function() { txt <- readLines(logfile) if (n > 0) { txt <- txt[-seq_len(n)] } n <<- n + length(txt) lang_output(txt, lang) } }

In addition to passing tasks (and results) between a controller and
workers, the controller can also send "messages" to workers.  This
vignette shows what the possible messages do.

In order to do this, we're going to need a queue and a worker:
``` {r }
obj <- rrqueue::queue("myqueue", sources="myfuns.R")
logfile <- tempfile()
worker_id <- rrqueue::worker_spawn("myqueue", logfile)

On startup the worker log contains: ``` {r results="asis", echo=FALSE} reader <- make_reader(logfile) reader()

Because one of the main effects of messages is to print to the
worker logfile, we'll print this fairly often.

## Messages and responses

1. The queue sends a message for one or more workers to process.
   The message has an *identifier* that is derived from the current
   time.  Messages are written to a first-in-first-out queue, *per
   worker*, and are processed independently by workers who do not
   look to see if other workers have messages or are processing
   them.

2. As soon as a worker has finished processing any current job it
   will process the message (it must wait to finish a current job
   but will not start any further jobs).

3. Once the message has been processed (see below) a response will
   be written to a response list with the same identifier as the
   message.

## `PING`

The `PING` message simply asks the worker to return `PONG`.  It's
useful for diagnosing communication issues because it does so
little
``` {r }
message_id <- obj$send_message("PING")

The message id is going to be useful for getting responses:

message_id

(this is derived from the current time, according to Redis which is the central reference point of time for the whole system).

wait a little while:

Sys.sleep(.5)

``` {r results="asis", echo=TRUE} reader()

The logfile prints:

1. the request for the `PING` (`MESSAGE PING`)
2. the value `PONG` to the R message stream
3. logging a response (`RESPONSE PONG`), which means that something is written to the response stream.

We can access the same bits of information in the worker log:
``` {r }
obj$workers_log_tail(n=Inf)[-1]

This includes the ALIVE and ENVIR bits as the worker comes up.

Inspecting the logs is fine for interactive use, but it's going to be more useful often to poll for a response.

We already know that our worker has a response, but we can ask anyway:

obj$has_responses(message_id)

Or inversely we can as what messages a given worker has responses for:

obj$response_ids(worker_id)

To fetch the responses from all workers it was sent to (always returning a named list):

obj$get_responses(message_id)

or to fetch the response from a given worker:

obj$get_response(message_id, worker_id)

The response can be deleted by passing delete=TRUE to this method:

obj$get_response(message_id, worker_id, delete=TRUE)

after which recalling the message will throw an error: ``` {r error=TRUE} obj$get_response(message_id, worker_id, delete=TRUE)

There is also a `wait` argument that lets you wait until a response
is ready.  The `slowdouble` command will take a few seconds, so to
demonstrate:
``` {r }
obj$enqueue(slowdouble(2))
message_id <- obj$send_message("PING")
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)

Looking at the log will show what went on here:

obj$workers_log_tail(n=4)[-1]
  1. A task is recieved
  2. 2s later the task is completed
  3. Then the message is recieved
  4. Then, basically instantaneously, the message is responded to

However, because the message is only processed after the task is completed, the response takes a while to come back. Equivalently, from the worker log:

``` {r results="asis", echo=FALSE} reader()

## `ECHO`

This is basically like `PING` and not very interesting; it prints
an arbitrary string to the log.  It always returns `"OK"` as a
response.

``` {r }
message_id <- obj$send_message("ECHO", "hello world!")
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)

``` {r results="asis", echo=FALSE} reader()

## `INFO`

The `INFO` command refreshes and returns the worker information.

We already have a copy of the worker info; it was created when the
worker started up:
``` {r }
obj$workers_info()[[worker_id]]

Note that the envir field is currently empty ({}) because when the worker started it did not know about any environments.

message_id <- obj$send_message("INFO")

Here's the new worker information, complete with an updated envir field:

obj$get_response(message_id, worker_id, delete=TRUE, wait=10)

This has been updated on the database copy too:

obj$workers_info()[[worker_id]]$envir

and the same information is printed to the worker log: ``` {r results="asis", echo=FALSE} reader()

## `DIR`

This is useful for listing directory contents, similar to the `dir`
function in R.  However, because file *contents* are usually more
interesting (e.g., working out why something is not running on the
remote machine), this is basically the result of passing the
results of `dir` to `tools::md5sum` in order to get the md5sum of
the file.
``` {r }
message_id <- obj$send_message("DIR")
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)

Additional arguments to dir can be passed through:

message_id <- obj$send_message("DIR", list(pattern="\\.R$"))
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)

If you pass in invalid arguments to dir, then a reasonably helpful message should be generated:

message_id <- obj$send_message("DIR", list(foo="bar"))
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)

(note that this does not generate an error locally, but you can test to see if it did throw an error by checking the class of the returned value).

and the same information is printed to the worker log: ``` {r results="asis", echo=FALSE} reader()

## `PUSH`

The commands `PUSH` and `PULL` move files from and to the worker.
The command is interpreted as an instruction to the worker so
`PUSH` pushes files from the worker into the database while `PULL`
pulls files from the database into the worker.  There are (will be)
prefereable higher-level ways of dealing with this.

Things to be aware of here: Redis is an in memory store and rrqueue
is not at all agressive about deleting objects.  If you push a 1GB
file into Redis things *will* go badly.  There are no checks for
this at present!

`PUSH` takes a vector of filename as an argument.  The response is
not the file itself (how could it do that?) but instead the *hash*
of that file.  By the time the response is recieved the file
contents are stored in the database and can be returned.
``` {r }
message_id <- obj$send_message("PUSH", "myfuns.R")
res <- obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
res

We can save the file onto a temporary directory in the filesystem using the \code{files_unpack} method of \code{queue}:

path <- obj$files_unpack(res)
dir(path)

And the files have the expected hash:

tools::md5sum(file.path(path, names(res)))

``` {r results="asis", echo=FALSE} reader()

## `PULL`

This is the inverse of `PUSH` and takes files from the machine the
queue is running on and copies them into the worker (from the view
of the worker, the files in question are already in the database
and it will "pull" them down locally.

First, we need to save files into the database.  Let's rename the
temporary file above and save that:
``` {r }
file.rename(file.path(path, "myfuns.R"),
            "brandnewcode.R")
res <- obj$files_pack("brandnewcode.R")
res

Note that the hash here is the same as above: rrqueue can tell this is the same file even though it has the same filename. Note also that filenames will be interepted relative to the working directory, because the directory layout on the worker outside of this point could be arbitrarily different.

Now the the files have been packed, we can run the PULL command:

(note that the PULL command always unpacks files into the workers working directory).

message_id <- obj$send_message("PULL")
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)

And the new file will be present in the directory:

message_id <- obj$send_message("DIR", list(pattern="\\.R$"))
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)

``` {r results="asis", echo=FALSE} reader()

## `EVAL`

Evaluate an arbitrary R expression, passed as a string (*not* as
any sort of unevaluated or quoted expression).  This expression is
evaluated in the global environment, which is *not* the environment
in which queued code is evaluated in.

``` {r }
message_id <- obj$send_message("EVAL", "1 + 1")
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)

We can delete the file created above:

message_id <- obj$send_message("EVAL", "file.remove('brandnewcode.R')")
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)

``` {r results="asis", echo=FALSE} reader()

This could be used to evaluate code that has side effects, such as
installing packages.  However, due to limitations with how R loads
packages the only way to update and reload a package is going to be
to restart the worker.

## `STOP`

Stop sends a shutdown message to the worker.  Generally you should
prefer the `stop_workers` method, which uses `STOP` behind the
scenes.

``` {r }
message_id <- obj$send_message("STOP")
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)

{r results="asis", echo=FALSE} reader()



traitecoevo/rrqueue documentation built on May 31, 2019, 7:44 p.m.