Description Usage Arguments Details Methods
Create an rrqueue queue. A queue requires a queue name and a set
of packages and sources to load. The sources and packages
together define an "environment"; on the worker these packages
will be loaded and source files will be source
-ed.
1 2 3 |
queue_name |
Queue name (scalar character) |
packages |
Optional character vector of packages to load |
sources |
Optional character vector of files to source |
redis_host |
Redis hostname |
redis_port |
Redis port number |
global |
Source files into the global environment? This is a good idea for working with the "user friendly" functions. See issue 2 on github. |
config |
Configuration file of key/value pairs in yaml format. See the package README for an example. If given, additional arguments to this function override values in the file which in turn override defaults of this function. |
The default values for redis_host
and redis_port
correspond to Redis' defaults; if your Redis server is configured
differently or available over an internet connection you will need
to adjust these accordingly.
The queue
objects can be created and desroyed at will; all
the data is stored on the server. Once a queue is created it can
also be connected to by the observer
object for read-only
access.
workers_list
Generalises the workers_list
method in observer
by adding optional support for listing workers that can work on the queue's environment.
Usage:
workers_list(envir_only = FALSE)
Arguments:
envir_only
List workers that can carry out tasks in this queue's environment (see Details below for limitations). By default this is FALSE
and this method is identical in behaviour to the observer workers_list
method.
Details:
Workers that are started after the queue will be listed here immediately as they start; by the time they have started they will report if they can work on this environment.
Workers that are started before the queue will only be listed after they finish working on any current task and have cleared any messages in the message queue. Practically this should be very quick.
workers_list_exited
Generalises the workers_list_exited
method in observer
by adding optional support for listing workers that use to work on the queue's environment. See workers_list
for further details
Usage:
workers_list_exited(envir_only = FALSE)
Arguments:
envir_only
List workers that could have carried out tasks in this queue's environment before exiting. By default this is FALSE
and this method is identical in behaviour to the observer workers_list_exited
method.
enqueue
The main queuing function.
Usage:
enqueue(expr, envir = parent.frame(), key_complete = NULL,
group = NULL)
Arguments:
expr
An unevaluated expression to be evaluated
envir
An environment in which local variables required to compute expr
can be found. These will be evaluated and added to the Redis database.
key_complete
an optional string representing the Redis key to write to when the task is complete. You generally don't need to modify this, but is used in some higher-level functions (such as link{rrqlapply}
) to keep track of task completions efficiently.
group
An optional human-readable "group" to add the task to. There are methods for addressing sets of tasks using this group.
Details:
This method uses non standard evaluation and the enqueue_
form may be prefereable for programming.
Value:
invisibly, a link{task}
object, which can be used to monitor the status of the task.
enqueue_
The workhorse version of enqueue
which uses standard evaluation and is therefore more suitable for programming. All arguments are the same as enqueue_
except for eval
.
Usage:
enqueue_(expr, envir = parent.frame(), key_complete = NULL,
group = NULL)
Arguments:
expr
Either a language object (quoted expression)
envir
Environment to find locals (see 'enqueue')
key_complete
See 'enqueue'
group
See 'enqueue'
requeue
Re-queue a task that has been orphaned by worker failure.
Usage:
requeue(task_id)
Arguments:
task_id
Task id number
Details:
If a worker fails (either an unhandled exception, R crash, network loss or machine loss) then if the worker was running a heartbeat process the task will eventually be flagged as orphaned. If this happens then the task can be requeued. Functions for fetching and querying tasks take a follow_redirect
argument which can be set to TRUE
so that this new, requeued, task is found instead of the old task.
Value:
invisibly, a task
object.
send_message
Send a message to one or more (or all) workers. Messages can be used to retrieve information from workers, to shut them down and are useful in debugging. See Details for possible messages and their action.
Usage:
send_message(command, args = NULL, worker_ids = NULL)
Arguments:
command
Name of the command to run; one of "PING", "ECHO", "EVAL", "STOP", "PAUSE", "RESUME", "INFO", "ENVIR", "PUSH", "PULL", "DIR". See Details.
args
Arguments to pass through to commands. Some commands require arguments, others do not. See Details.
worker_ids
Optional vector of worker ids to send the message to. If this is omitted (or NULL
then try all workers that rrqueue knows about.
Details:
The possible types of message are
PING
send a "PING" to the worker. It will respond by
replying PONG to its stderr, to its log (see observer
for
how to access) and to the response queue. Ignores any argument.
ECHO
Like "PING", but the worker responds by echoing the string given. Requires one argument.
PAUSE
Tell the worker to stop polling for new jobs,
but continue polling for new messages. Calling PAUSE
multiple times in a row is not an error and leaves the worker in a
paused state. A paused worker will report "PAUSED" for its status.
RESUME
Tell the worker to resume polling for new jobs, if paused. All previous environments will be polled.
INFO
Refresh the worker info (see workers_info
in
observer
. Worker will print info to stderr, write
it to the appropriate place in the database and return it in the
response queue. Ignores any argument.
DIR
Tell the worker to return directory contents and md5 hashes of files.
PUSH
Tell the worker to push files into the database. The arguments should be a vector of filenames to copy. The response queue will contain appropriate data for retrieving the files, bu the interface here will change to make this nice to use.
PULL
Tells the worker to pull files into its working directory. Can be used to keep the worker in sync.
EVAL
Evaluate an arbitrary R expression as a string (e.g.,
run_message("EVAL", "sin(1)")
). The output is printed to
stdout, the worker log and to the response queue. Requires a
single argument.
# the interface here is likely to change, so I'll withdraw the
# documentation for now:
# ENVIR
: Tell the worker to try an load an environment, whose
# id is given as a single argument. Requires a single argument.
STOP
Tell the worker to stop cleanly. Ignores any argument.
After sending a message, there is no guarantee about how long i will take to process. If the worker is involved in a long-running computation it will be unavailable to process the message. However, it will process the message before running any new task.
The message id is worth saving. It can be passed to the method
get_respones
to wait for and retrieve responses from one or
more workers.
Value:
The "message id" which can be used to retrieve messages with has_responses
, get_responses
and get_response
.
has_responses
Detect which workers have responses ready for a given message id.
Usage:
has_responses(message_id, worker_ids = NULL)
Arguments:
message_id
id of the message (as returned by send_message
worker_ids
Optional vector of worker ids to send the message to. If this is omitted (or NULL
then try all workers that rrqueue knows about.
Value:
A named logical vector; names are worker ids, the value is TRUE
for each worker for which a response is ready and FALSE
for workers where a response is not ready.
get_responses
Retrieve responses to a give message id from one or more workers.
Usage:
get_responses(message_id, worker_ids = NULL, delete = FALSE, wait = 0)
Arguments:
message_id
id of the message (as returned by send_message
worker_ids
Optional vector of worker ids to send the message to. If this is omitted (or NULL
then try all workers that rrqueue knows about.
delete
delete the response after a successful retrieval of all responses?
wait
Number of seconds to wait for a response. We poll the database repeatedly during this interval. If 0, then a response is requested immediately. If no response is recieved from all workers in time, an error is raised.
Value: Always returns a list, even if only one worker id is given.
get_response
As for get_responses
, but only for a single worker id, and returns the value of the response rather than a list.
Usage:
get_response(message_id, worker_id, delete = FALSE, wait = 0)
Arguments:
message_id
message id
worker_id
single worker id
delete
delete response after successful retrieval?
wait
how long to wait for a message, in seconds
response_ids
Get list of message ids that a given worker has responses for.
Usage:
response_ids(worker_id)
Arguments:
worker_id
single worker id
tasks_drop
Drop tasks from the database.
Usage:
tasks_drop(task_ids)
Arguments:
task_ids
Vector of task ids to drop
files_pack
Pack files into the Redis database
Usage:
files_pack(..., files = c(...))
Arguments:
...
filenames
files
a vector of filename, used in place of ...
files_unpack
Unpack files from the Redis database onto the filesystem.
Usage:
files_unpack(pack, path = tempfile())
Arguments:
pack
a files_pack
object, created by files_pack
or returned as a response to a PUSH
response.
path
path to unpack files. Files will be overwritten without warning, so using tempfile()
(the default) guarantees not to overwrite anything. This method returns path
invisibly so you can move files around easily afterwards.
tasks_set_group
Set the group name for one or more tasks. The tasks can be pending, running or completed, and the tasks can already have a group ir can be groupless. Once tasks have been grouped they can be easier to work with as a set (see tasks_in_groups
and task_bundle_get
in observer
.
Usage:
tasks_set_group(task_ids, group, exists_action = "stop")
Arguments:
task_ids
Vector of task ids
group
Single group name
exists_action
Behaviour when a group name already exists for a given task. Options are "stop"
(throw an error, the default), "warn"
(warn, but don't rename), "pass"
(don't warn, don't rename) and "overwrite"
(replace the group name).
stop_workers
Stop some or all rrqueue workers.
Usage:
stop_workers(worker_ids = NULL, type = "message", interrupt = TRUE,
wait = 0)
Arguments:
worker_ids
Optional vector of worker ids to send the message to. If this is omitted (or NULL
then try all workers that rrqueue knows about.
type
way to stop workers; options are "message"
(the default) or "kill"
. See Details for more information.
interrupt
Should busy workers be interrupted after sending a message? See Details.
wait
How long to wait after sending a message for a response to be retrieved. If this is greater than zero, any unresponsive workers will be killed.
Details:
Stopping remote workers is fairly tricky because we can't really talk to them, they might be working on a task, or worse they might be working on a task that does not listen for interrupt (custom C/C++ code is a common culprit here).
The default behaviour of this function is to send a STOP
message and then immediately send an interrupt signal to all workers that have status "BUSY"
. This should work in most cases. Wait a second or two and then check workers_list_exited()
to make sure that all workers are listed.
To let workers finish whatever task they are working on, specify interrupt=FALSE
. The STOP
message will be the next thing the workers process, so they will shut down as soon as they finish the task.
To ensure that workers do stop in some timeframe, specify a time. Passing time=5
will send a STOP
signal (and possibly an interrupt) and then poll for responses from all workers for 5 seconds. Any worker that has not completed within this time will then be killed. If all workers respond in time, the function will exit more quickly, so you can use an overestimate.
If you just want to kill the workers outright, use type="kill"
which will send a SIGTERM
via the database. No other checks are done as the worker will be unceremoniously halted.
If you want to kill a local worker and just want it dead, you can use type="kill_local"
which will use tools::pskill
to terminate the process. This is really a line of last resort.
refresh_environment
Refresh environment contents and inform workers of the update. If the environment has not changed (i.e., no changes to source files) then nothing happens. All new tasks will be started with the new environment but all old
tasks will continue to use the previous environment. If you want old tasks to use the new environment you will need to drop and requeue them (there is no support for this automatically).
Usage:
refresh_environment(global = TRUE)
Arguments:
global
logical, indicating if environment contents should be sourced locally. Ideally, use the same value as you did when creating the original queue object.
Value:
Invisibly returns TRUE
if the environment was updated or FALSE
if not.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.