queue: Create an rrqueue queue

Description Usage Arguments Details Methods

Description

Create an rrqueue queue. A queue requires a queue name and a set of packages and sources to load. The sources and packages together define an "environment"; on the worker these packages will be loaded and source files will be source-ed.

Usage

1
2
3
queue(queue_name, packages = NULL, sources = NULL,
  redis_host = "127.0.0.1", redis_port = 6379, global = TRUE,
  config = NULL)

Arguments

queue_name

Queue name (scalar character)

packages

Optional character vector of packages to load

sources

Optional character vector of files to source

redis_host

Redis hostname

redis_port

Redis port number

global

Source files into the global environment? This is a good idea for working with the "user friendly" functions. See issue 2 on github.

config

Configuration file of key/value pairs in yaml format. See the package README for an example. If given, additional arguments to this function override values in the file which in turn override defaults of this function.

Details

The default values for redis_host and redis_port correspond to Redis' defaults; if your Redis server is configured differently or available over an internet connection you will need to adjust these accordingly.

The queue objects can be created and desroyed at will; all the data is stored on the server. Once a queue is created it can also be connected to by the observer object for read-only access.

Methods

workers_list

Generalises the workers_list method in observer by adding optional support for listing workers that can work on the queue's environment.

Usage: workers_list(envir_only = FALSE)

Arguments:

envir_only

List workers that can carry out tasks in this queue's environment (see Details below for limitations). By default this is FALSE and this method is identical in behaviour to the observer workers_list method.

Details:

Workers that are started after the queue will be listed here immediately as they start; by the time they have started they will report if they can work on this environment.

Workers that are started before the queue will only be listed after they finish working on any current task and have cleared any messages in the message queue. Practically this should be very quick.

workers_list_exited

Generalises the workers_list_exited method in observer by adding optional support for listing workers that use to work on the queue's environment. See workers_list for further details

Usage: workers_list_exited(envir_only = FALSE)

Arguments:

envir_only

List workers that could have carried out tasks in this queue's environment before exiting. By default this is FALSE and this method is identical in behaviour to the observer workers_list_exited method.

enqueue

The main queuing function.

Usage: enqueue(expr, envir = parent.frame(), key_complete = NULL, group = NULL)

Arguments:

expr

An unevaluated expression to be evaluated

envir

An environment in which local variables required to compute expr can be found. These will be evaluated and added to the Redis database.

key_complete

an optional string representing the Redis key to write to when the task is complete. You generally don't need to modify this, but is used in some higher-level functions (such as link{rrqlapply}) to keep track of task completions efficiently.

group

An optional human-readable "group" to add the task to. There are methods for addressing sets of tasks using this group.

Details:

This method uses non standard evaluation and the enqueue_ form may be prefereable for programming.

Value:

invisibly, a link{task} object, which can be used to monitor the status of the task.

enqueue_

The workhorse version of enqueue which uses standard evaluation and is therefore more suitable for programming. All arguments are the same as enqueue_ except for eval.

Usage: enqueue_(expr, envir = parent.frame(), key_complete = NULL, group = NULL)

Arguments:

expr

Either a language object (quoted expression)

envir

Environment to find locals (see 'enqueue')

key_complete

See 'enqueue'

group

See 'enqueue'

requeue

Re-queue a task that has been orphaned by worker failure.

Usage: requeue(task_id)

Arguments:

task_id

Task id number

Details:

If a worker fails (either an unhandled exception, R crash, network loss or machine loss) then if the worker was running a heartbeat process the task will eventually be flagged as orphaned. If this happens then the task can be requeued. Functions for fetching and querying tasks take a follow_redirect argument which can be set to TRUE so that this new, requeued, task is found instead of the old task.

Value:

invisibly, a task object.

send_message

Send a message to one or more (or all) workers. Messages can be used to retrieve information from workers, to shut them down and are useful in debugging. See Details for possible messages and their action.

Usage: send_message(command, args = NULL, worker_ids = NULL)

Arguments:

command

Name of the command to run; one of "PING", "ECHO", "EVAL", "STOP", "PAUSE", "RESUME", "INFO", "ENVIR", "PUSH", "PULL", "DIR". See Details.

args

Arguments to pass through to commands. Some commands require arguments, others do not. See Details.

worker_ids

Optional vector of worker ids to send the message to. If this is omitted (or NULL then try all workers that rrqueue knows about.

Details:

The possible types of message are

PING

send a "PING" to the worker. It will respond by replying PONG to its stderr, to its log (see observer for how to access) and to the response queue. Ignores any argument.

ECHO

Like "PING", but the worker responds by echoing the string given. Requires one argument.

PAUSE

Tell the worker to stop polling for new jobs, but continue polling for new messages. Calling PAUSE multiple times in a row is not an error and leaves the worker in a paused state. A paused worker will report "PAUSED" for its status.

RESUME

Tell the worker to resume polling for new jobs, if paused. All previous environments will be polled.

INFO

Refresh the worker info (see workers_info in observer. Worker will print info to stderr, write it to the appropriate place in the database and return it in the response queue. Ignores any argument.

DIR

Tell the worker to return directory contents and md5 hashes of files.

PUSH

Tell the worker to push files into the database. The arguments should be a vector of filenames to copy. The response queue will contain appropriate data for retrieving the files, bu the interface here will change to make this nice to use.

PULL

Tells the worker to pull files into its working directory. Can be used to keep the worker in sync.

EVAL

Evaluate an arbitrary R expression as a string (e.g., run_message("EVAL", "sin(1)")). The output is printed to stdout, the worker log and to the response queue. Requires a single argument.

# the interface here is likely to change, so I'll withdraw the # documentation for now: # ENVIR: Tell the worker to try an load an environment, whose # id is given as a single argument. Requires a single argument.

STOP

Tell the worker to stop cleanly. Ignores any argument.

After sending a message, there is no guarantee about how long i will take to process. If the worker is involved in a long-running computation it will be unavailable to process the message. However, it will process the message before running any new task.

The message id is worth saving. It can be passed to the method get_respones to wait for and retrieve responses from one or more workers.

Value:

The "message id" which can be used to retrieve messages with has_responses, get_responses and get_response.

has_responses

Detect which workers have responses ready for a given message id.

Usage: has_responses(message_id, worker_ids = NULL)

Arguments:

message_id

id of the message (as returned by send_message

worker_ids

Optional vector of worker ids to send the message to. If this is omitted (or NULL then try all workers that rrqueue knows about.

Value:

A named logical vector; names are worker ids, the value is TRUE for each worker for which a response is ready and FALSE for workers where a response is not ready.

get_responses

Retrieve responses to a give message id from one or more workers.

Usage: get_responses(message_id, worker_ids = NULL, delete = FALSE, wait = 0)

Arguments:

message_id

id of the message (as returned by send_message

worker_ids

Optional vector of worker ids to send the message to. If this is omitted (or NULL then try all workers that rrqueue knows about.

delete

delete the response after a successful retrieval of all responses?

wait

Number of seconds to wait for a response. We poll the database repeatedly during this interval. If 0, then a response is requested immediately. If no response is recieved from all workers in time, an error is raised.

Value: Always returns a list, even if only one worker id is given.

get_response

As for get_responses, but only for a single worker id, and returns the value of the response rather than a list.

Usage: get_response(message_id, worker_id, delete = FALSE, wait = 0)

Arguments:

message_id

message id

worker_id

single worker id

delete

delete response after successful retrieval?

wait

how long to wait for a message, in seconds

response_ids

Get list of message ids that a given worker has responses for.

Usage: response_ids(worker_id)

Arguments:

worker_id

single worker id

tasks_drop

Drop tasks from the database.

Usage: tasks_drop(task_ids)

Arguments:

task_ids

Vector of task ids to drop

files_pack

Pack files into the Redis database

Usage: files_pack(..., files = c(...))

Arguments:

...

filenames

files

a vector of filename, used in place of ...

files_unpack

Unpack files from the Redis database onto the filesystem.

Usage: files_unpack(pack, path = tempfile())

Arguments:

pack

a files_pack object, created by files_pack or returned as a response to a PUSH response.

path

path to unpack files. Files will be overwritten without warning, so using tempfile() (the default) guarantees not to overwrite anything. This method returns path invisibly so you can move files around easily afterwards.

tasks_set_group

Set the group name for one or more tasks. The tasks can be pending, running or completed, and the tasks can already have a group ir can be groupless. Once tasks have been grouped they can be easier to work with as a set (see tasks_in_groups and task_bundle_get in observer.

Usage: tasks_set_group(task_ids, group, exists_action = "stop")

Arguments:

task_ids

Vector of task ids

group

Single group name

exists_action

Behaviour when a group name already exists for a given task. Options are "stop" (throw an error, the default), "warn" (warn, but don't rename), "pass" (don't warn, don't rename) and "overwrite" (replace the group name).

stop_workers

Stop some or all rrqueue workers.

Usage: stop_workers(worker_ids = NULL, type = "message", interrupt = TRUE, wait = 0)

Arguments:

worker_ids

Optional vector of worker ids to send the message to. If this is omitted (or NULL then try all workers that rrqueue knows about.

type

way to stop workers; options are "message" (the default) or "kill". See Details for more information.

interrupt

Should busy workers be interrupted after sending a message? See Details.

wait

How long to wait after sending a message for a response to be retrieved. If this is greater than zero, any unresponsive workers will be killed.

Details:

Stopping remote workers is fairly tricky because we can't really talk to them, they might be working on a task, or worse they might be working on a task that does not listen for interrupt (custom C/C++ code is a common culprit here).

The default behaviour of this function is to send a STOP message and then immediately send an interrupt signal to all workers that have status "BUSY". This should work in most cases. Wait a second or two and then check workers_list_exited() to make sure that all workers are listed.

To let workers finish whatever task they are working on, specify interrupt=FALSE. The STOP message will be the next thing the workers process, so they will shut down as soon as they finish the task.

To ensure that workers do stop in some timeframe, specify a time. Passing time=5 will send a STOP signal (and possibly an interrupt) and then poll for responses from all workers for 5 seconds. Any worker that has not completed within this time will then be killed. If all workers respond in time, the function will exit more quickly, so you can use an overestimate.

If you just want to kill the workers outright, use type="kill" which will send a SIGTERM via the database. No other checks are done as the worker will be unceremoniously halted.

If you want to kill a local worker and just want it dead, you can use type="kill_local" which will use tools::pskill to terminate the process. This is really a line of last resort.

refresh_environment

Refresh environment contents and inform workers of the update. If the environment has not changed (i.e., no changes to source files) then nothing happens. All new tasks will be started with the new environment but all old tasks will continue to use the previous environment. If you want old tasks to use the new environment you will need to drop and requeue them (there is no support for this automatically).

Usage: refresh_environment(global = TRUE)

Arguments:

global

logical, indicating if environment contents should be sourced locally. Ideally, use the same value as you did when creating the original queue object.

Value: Invisibly returns TRUE if the environment was updated or FALSE if not.


traitecoevo/rrqueue documentation built on May 31, 2019, 7:44 p.m.