Description Usage Arguments Details Methods
Create an rrqueue queue. A queue requires a queue name and a set
of packages and sources to load. The sources and packages
together define an "environment"; on the worker these packages
will be loaded and source files will be source-ed.
1 2 3 |
queue_name |
Queue name (scalar character) |
packages |
Optional character vector of packages to load |
sources |
Optional character vector of files to source |
redis_host |
Redis hostname |
redis_port |
Redis port number |
global |
Source files into the global environment? This is a good idea for working with the "user friendly" functions. See issue 2 on github. |
config |
Configuration file of key/value pairs in yaml format. See the package README for an example. If given, additional arguments to this function override values in the file which in turn override defaults of this function. |
The default values for redis_host and redis_port
correspond to Redis' defaults; if your Redis server is configured
differently or available over an internet connection you will need
to adjust these accordingly.
The queue objects can be created and desroyed at will; all
the data is stored on the server. Once a queue is created it can
also be connected to by the observer object for read-only
access.
workers_listGeneralises the workers_list method in observer by adding optional support for listing workers that can work on the queue's environment.
Usage:
workers_list(envir_only = FALSE)
Arguments:
envir_onlyList workers that can carry out tasks in this queue's environment (see Details below for limitations). By default this is FALSE and this method is identical in behaviour to the observer workers_list method.
Details:
Workers that are started after the queue will be listed here immediately as they start; by the time they have started they will report if they can work on this environment.
Workers that are started before the queue will only be listed after they finish working on any current task and have cleared any messages in the message queue. Practically this should be very quick.
workers_list_exitedGeneralises the workers_list_exited method in observer by adding optional support for listing workers that use to work on the queue's environment. See workers_list for further details
Usage:
workers_list_exited(envir_only = FALSE)
Arguments:
envir_onlyList workers that could have carried out tasks in this queue's environment before exiting. By default this is FALSE and this method is identical in behaviour to the observer workers_list_exited method.
enqueueThe main queuing function.
Usage:
enqueue(expr, envir = parent.frame(), key_complete = NULL,
group = NULL)
Arguments:
exprAn unevaluated expression to be evaluated
envirAn environment in which local variables required to compute expr can be found. These will be evaluated and added to the Redis database.
key_completean optional string representing the Redis key to write to when the task is complete. You generally don't need to modify this, but is used in some higher-level functions (such as link{rrqlapply}) to keep track of task completions efficiently.
groupAn optional human-readable "group" to add the task to. There are methods for addressing sets of tasks using this group.
Details:
This method uses non standard evaluation and the enqueue_ form may be prefereable for programming.
Value:
invisibly, a link{task} object, which can be used to monitor the status of the task.
enqueue_The workhorse version of enqueue which uses standard evaluation and is therefore more suitable for programming. All arguments are the same as enqueue_ except for eval.
Usage:
enqueue_(expr, envir = parent.frame(), key_complete = NULL,
group = NULL)
Arguments:
exprEither a language object (quoted expression)
envirEnvironment to find locals (see 'enqueue')
key_completeSee 'enqueue'
groupSee 'enqueue'
requeueRe-queue a task that has been orphaned by worker failure.
Usage:
requeue(task_id)
Arguments:
task_idTask id number
Details:
If a worker fails (either an unhandled exception, R crash, network loss or machine loss) then if the worker was running a heartbeat process the task will eventually be flagged as orphaned. If this happens then the task can be requeued. Functions for fetching and querying tasks take a follow_redirect argument which can be set to TRUE so that this new, requeued, task is found instead of the old task.
Value:
invisibly, a task object.
send_messageSend a message to one or more (or all) workers. Messages can be used to retrieve information from workers, to shut them down and are useful in debugging. See Details for possible messages and their action.
Usage:
send_message(command, args = NULL, worker_ids = NULL)
Arguments:
commandName of the command to run; one of "PING", "ECHO", "EVAL", "STOP", "PAUSE", "RESUME", "INFO", "ENVIR", "PUSH", "PULL", "DIR". See Details.
argsArguments to pass through to commands. Some commands require arguments, others do not. See Details.
worker_idsOptional vector of worker ids to send the message to. If this is omitted (or NULL then try all workers that rrqueue knows about.
Details:
The possible types of message are
PINGsend a "PING" to the worker. It will respond by
replying PONG to its stderr, to its log (see observer for
how to access) and to the response queue. Ignores any argument.
ECHOLike "PING", but the worker responds by echoing the string given. Requires one argument.
PAUSETell the worker to stop polling for new jobs,
but continue polling for new messages. Calling PAUSE
multiple times in a row is not an error and leaves the worker in a
paused state. A paused worker will report "PAUSED" for its status.
RESUMETell the worker to resume polling for new jobs, if paused. All previous environments will be polled.
INFORefresh the worker info (see workers_info in
observer. Worker will print info to stderr, write
it to the appropriate place in the database and return it in the
response queue. Ignores any argument.
DIRTell the worker to return directory contents and md5 hashes of files.
PUSHTell the worker to push files into the database. The arguments should be a vector of filenames to copy. The response queue will contain appropriate data for retrieving the files, bu the interface here will change to make this nice to use.
PULLTells the worker to pull files into its working directory. Can be used to keep the worker in sync.
EVALEvaluate an arbitrary R expression as a string (e.g.,
run_message("EVAL", "sin(1)")). The output is printed to
stdout, the worker log and to the response queue. Requires a
single argument.
# the interface here is likely to change, so I'll withdraw the
# documentation for now:
# ENVIR: Tell the worker to try an load an environment, whose
# id is given as a single argument. Requires a single argument.
STOPTell the worker to stop cleanly. Ignores any argument.
After sending a message, there is no guarantee about how long i will take to process. If the worker is involved in a long-running computation it will be unavailable to process the message. However, it will process the message before running any new task.
The message id is worth saving. It can be passed to the method
get_respones to wait for and retrieve responses from one or
more workers.
Value:
The "message id" which can be used to retrieve messages with has_responses, get_responses and get_response.
has_responsesDetect which workers have responses ready for a given message id.
Usage:
has_responses(message_id, worker_ids = NULL)
Arguments:
message_idid of the message (as returned by send_message
worker_idsOptional vector of worker ids to send the message to. If this is omitted (or NULL then try all workers that rrqueue knows about.
Value:
A named logical vector; names are worker ids, the value is TRUE for each worker for which a response is ready and FALSE for workers where a response is not ready.
get_responsesRetrieve responses to a give message id from one or more workers.
Usage:
get_responses(message_id, worker_ids = NULL, delete = FALSE, wait = 0)
Arguments:
message_idid of the message (as returned by send_message
worker_idsOptional vector of worker ids to send the message to. If this is omitted (or NULL then try all workers that rrqueue knows about.
deletedelete the response after a successful retrieval of all responses?
waitNumber of seconds to wait for a response. We poll the database repeatedly during this interval. If 0, then a response is requested immediately. If no response is recieved from all workers in time, an error is raised.
Value: Always returns a list, even if only one worker id is given.
get_responseAs for get_responses, but only for a single worker id, and returns the value of the response rather than a list.
Usage:
get_response(message_id, worker_id, delete = FALSE, wait = 0)
Arguments:
message_idmessage id
worker_idsingle worker id
deletedelete response after successful retrieval?
waithow long to wait for a message, in seconds
response_idsGet list of message ids that a given worker has responses for.
Usage:
response_ids(worker_id)
Arguments:
worker_idsingle worker id
tasks_dropDrop tasks from the database.
Usage:
tasks_drop(task_ids)
Arguments:
task_idsVector of task ids to drop
files_packPack files into the Redis database
Usage:
files_pack(..., files = c(...))
Arguments:
...filenames
filesa vector of filename, used in place of ...
files_unpackUnpack files from the Redis database onto the filesystem.
Usage:
files_unpack(pack, path = tempfile())
Arguments:
packa files_pack object, created by files_pack or returned as a response to a PUSH response.
pathpath to unpack files. Files will be overwritten without warning, so using tempfile() (the default) guarantees not to overwrite anything. This method returns path invisibly so you can move files around easily afterwards.
tasks_set_groupSet the group name for one or more tasks. The tasks can be pending, running or completed, and the tasks can already have a group ir can be groupless. Once tasks have been grouped they can be easier to work with as a set (see tasks_in_groups and task_bundle_get in observer.
Usage:
tasks_set_group(task_ids, group, exists_action = "stop")
Arguments:
task_idsVector of task ids
groupSingle group name
exists_actionBehaviour when a group name already exists for a given task. Options are "stop" (throw an error, the default), "warn" (warn, but don't rename), "pass" (don't warn, don't rename) and "overwrite" (replace the group name).
stop_workersStop some or all rrqueue workers.
Usage:
stop_workers(worker_ids = NULL, type = "message", interrupt = TRUE,
wait = 0)
Arguments:
worker_idsOptional vector of worker ids to send the message to. If this is omitted (or NULL then try all workers that rrqueue knows about.
typeway to stop workers; options are "message" (the default) or "kill". See Details for more information.
interruptShould busy workers be interrupted after sending a message? See Details.
waitHow long to wait after sending a message for a response to be retrieved. If this is greater than zero, any unresponsive workers will be killed.
Details:
Stopping remote workers is fairly tricky because we can't really talk to them, they might be working on a task, or worse they might be working on a task that does not listen for interrupt (custom C/C++ code is a common culprit here).
The default behaviour of this function is to send a STOP message and then immediately send an interrupt signal to all workers that have status "BUSY". This should work in most cases. Wait a second or two and then check workers_list_exited() to make sure that all workers are listed.
To let workers finish whatever task they are working on, specify interrupt=FALSE. The STOP message will be the next thing the workers process, so they will shut down as soon as they finish the task.
To ensure that workers do stop in some timeframe, specify a time. Passing time=5 will send a STOP signal (and possibly an interrupt) and then poll for responses from all workers for 5 seconds. Any worker that has not completed within this time will then be killed. If all workers respond in time, the function will exit more quickly, so you can use an overestimate.
If you just want to kill the workers outright, use type="kill" which will send a SIGTERM via the database. No other checks are done as the worker will be unceremoniously halted.
If you want to kill a local worker and just want it dead, you can use type="kill_local" which will use tools::pskill to terminate the process. This is really a line of last resort.
refresh_environmentRefresh environment contents and inform workers of the update. If the environment has not changed (i.e., no changes to source files) then nothing happens. All new tasks will be started with the new environment but all old tasks will continue to use the previous environment. If you want old tasks to use the new environment you will need to drop and requeue them (there is no support for this automatically).
Usage:
refresh_environment(global = TRUE)
Arguments:
globallogical, indicating if environment contents should be sourced locally. Ideally, use the same value as you did when creating the original queue object.
Value:
Invisibly returns TRUE if the environment was updated or FALSE if not.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.