rrq_controller: rrq queue controller

rrq_controllerR Documentation

rrq queue controller

Description

A queue controller. Use this to interact with a queue/cluster.

Task lifecycle

  • A task is queued with ⁠$enqueue()⁠, at which point it becomes PENDING

  • Once a worker selects the task to run, it becomes RUNNING

  • If the task completes successfully without error it becomes COMPLETE

  • If the task throws an error, it becomes ERROR

  • If the task was cancelled (e.g., via ⁠$task_cancel()⁠) it becomes CANCELLED

  • If the task is killed by an external process, crashes or the worker dies (and is running a heartbeat) then the task becomes DIED.

  • The status of an unknown task is MISSING

  • Tasks in any terminal state (except IMPOSSIBLE) may be retried with task_retry at which point they become MOVED, see vignette("fault-tolerance") for details

Worker lifecycle

  • A worker appears and is IDLE

  • When running a task it is BUSY

  • If it receives a PAUSE message it becomes PAUSED until it receives a RESUME message

  • If it exits cleanly (e.g., via a STOP message or a timeout) it becomes EXITED

  • If it crashes and was running a heartbeat, it becomes LOST

Messages

Most of the time workers process tasks, but you can also send them "messages". Messages take priority over tasks, so if a worker becomes idle (by coming online or by finishing a task) it will consume all available messages before starting on a new task, even if both are available.

Each message has a "command" and may have "arguments" to that command. The supported messages are:

  • PING (no args): "ping" the worker, if alive it will respond with "PONG"

  • ECHO (accepts an argument of a string): Print a string to the terminal and log of the worker. Will respond with OK once the message has been printed.

  • EVAL (accepts a string or a quoted expression): Evaluate an arbitrary R expression on the worker. Responds with the value of this expression.

  • STOP (accepts a string to print as the worker exits, defaults to "BYE"): Tells the worker to stop.

  • INFO (no args): Returns information about the worker (versions of packages, hostname, pid, etc).

  • PAUSE (no args): Tells the worker to stop accepting tasks (until it receives a RESUME message). Messages are processed as normal.

  • RESUME (no args): Tells a paused worker to resume accepting tasks.

  • REFRESH (no args): Tells the worker to rebuild their environment with the create method.

  • TIMEOUT_SET (accepts a number, representing seconds): Updates the worker timeout - the length of time after which it will exit if it has not processed a task.

  • TIMEOUT_GET (no args): Tells the worker to respond with its current timeout.

Bulk interface (lapply)

The bulk interface is a bit more complicated than the basic enqueue interface. In the majority of cases you can ignore the details and use the lapply method in much the same way as you would in normal R. Assuming that obj is your rrq_controller object, you might write:

ans <- obj$lapply(1:10, sqrt)

which will return the same thing as lapply(1:10, sqrt) (provided that you have a Redis server running and workers registered)

There is some sleight of hand here, though as we need to identify that it is the symbol sqrt that matters there corresponding to the builtin sqrt function. You can make this more explicit by passing in the name of the function using ⁠$lapply_()⁠

ans <- obj$lapply(1:10, quote(sqrt))

The same treatment applies to the dots; this is allowed:

b <- 2
ans <- obj$lapply(1:10, log, base = b)

But this will look up the bindings of log and b in the context in which the call is made. This may not always do what is expected, so you can use the names directly:

b <- 2
ans <- obj$lapply_(1:10, quote(log), base = quote(b))

Public fields

con

The redis connection. This is part of the public API and can be used to access the same redis database as the queue.

queue_id

The queue id used on creation. This is read-only after creation.

Methods

Public methods


Method new()

Constructor

Usage
rrq_controller$new(
  queue_id,
  con = redux::hiredis(),
  timeout_task_wait = NULL,
  follow = NULL,
  check_version = TRUE
)
Arguments
queue_id

An identifier for the queue. This will prefix all keys in redis, so a prefix might be useful here depending on your use case (e.g. rrq:<user>:<id>)

con

A redis connection. The default tries to create a redis connection using default ports, or environment variables set as in redux::hiredis()

timeout_task_wait

An optional default timeout to use when waiting for tasks (e.g., with ⁠$task_wait()⁠, ⁠$tasks_wait()⁠, ⁠$lapply()⁠, etc). If not given, then we fall back on the global option rrq.timeout_task_wait, and if that is not set,

follow

An optional default logical to use for tasks that may (or may not) be retried. If not given we fall back on the global option rrq.follow, and if that is not set then TRUE (i.e., we do follow). The value follow = TRUE is potentially slower than follow = FALSE for some operations because we need to dereference every task id. If you never use ⁠$task_retry⁠ then this dereference never has an effect and we can skip it. See vignette("fault-tolerance") for more information.

check_version

Check that the schema version is correct


Method to_v2()

Convert controller to the new-style object. Please don't use this in packages directly

Usage
rrq_controller$to_v2()

Method destroy()

Entirely destroy a queue, by deleting all keys associated with it from the Redis database. This is a very destructive action and cannot be undone.

Usage
rrq_controller$destroy(
  delete = TRUE,
  worker_stop_type = "message",
  timeout_worker_stop = 0
)
Arguments
delete

Either TRUE (the default) indicating that the keys should be immediately deleted. Alternatively, provide an integer value and the keys will instead be marked for future deletion by "expiring" after this many seconds, using Redis' EXPIRE command.

worker_stop_type

Passed to ⁠$worker_stop⁠; Can be one of "message", "kill" or "kill_local". The "kill" method requires that the workers are using a heartbeat, and "kill_local" requires that the workers are on the same machine as the controller. However, these may be faster to stop workers than "message", which will wait until any task is finished.

timeout_worker_stop

A timeout to pass to the worker to respond the request to stop. See worker_stop's timeout argument for details.


Method envir()

Register a function to create an environment when creating a worker. When a worker starts, they will run this function.

Usage
rrq_controller$envir(create, notify = TRUE)
Arguments
create

A function that will create an environment. It will be called with one parameter (an environment), in a fresh R session. The function rrq_envir() can be used to create a suitable function for the most common case (loading packages and sourcing scripts).

notify

Boolean, indicating if we should send a REFRESH message to all workers to update their environment.


Method enqueue()

Queue an expression

Usage
rrq_controller$enqueue(
  expr,
  envir = parent.frame(),
  queue = NULL,
  separate_process = FALSE,
  timeout_task_run = NULL,
  depends_on = NULL,
  export = NULL
)
Arguments
expr

Any R expression, unevaluated

envir

The environment that you would run this expression in locally. This will be used to copy across any dependent variables. For example, if your expression is sum(1 + a), we will also send the value of a to the worker along with the expression.

queue

The queue to add the task to; if not specified the "default" queue (which all workers listen to) will be used. If you have configured workers to listen to more than one queue you can specify that here. Be warned that if you push jobs onto a queue with no worker, it will queue forever.

separate_process

Logical, indicating if the task should be run in a separate process on the worker. If TRUE, then the worker runs the task in a separate process using the callr package. This means that the worker environment is completely clean, subsequent runs are not affected by preceding ones. The downside of this approach is a considerable overhead in starting the external process and transferring data back.

timeout_task_run

Optionally, a maximum allowed running time, in seconds. This parameter only has an effect if separate_process is TRUE. If given, then if the task takes longer than this time it will be stopped and the task status set to TIMEOUT.

depends_on

Vector or list of IDs of tasks which must have completed before this job can be run. Once all dependent tasks have been successfully run, this task will get added to the queue. If the dependent task fails then this task will be removed from the queue.

export

Optionally a list of variables to export for the calculation. If given then no automatic analysis of the expression is done. It should be either a named list (name being the variable name, value being the value) or a character vector of variables that can be found immediately within envir. Use this where you have already done analysis of the expression (e.g., with the future package / globals) or where you want to avoid moving large objects through Redis that will be available on the remote workers due to how you have configured your worker environment.


Method enqueue_()

Queue an expression

Usage
rrq_controller$enqueue_(
  expr,
  envir = parent.frame(),
  queue = NULL,
  separate_process = FALSE,
  timeout_task_run = NULL,
  depends_on = NULL,
  export = NULL
)
Arguments
expr

Any R expression, quoted; use this to use ⁠$enqueue⁠ in a programmatic context where you want to construct expressions directly (e.g., bquote(log(.(x)), list(x = 10))

envir

The environment that you would run this expression in locally. This will be used to copy across any dependent variables. For example, if your expression is sum(1 + a), we will also send the value of a to the worker along with the expression.

queue

The queue to add the task to; if not specified the "default" queue (which all workers listen to) will be used. If you have configured workers to listen to more than one queue you can specify that here. Be warned that if you push jobs onto a queue with no worker, it will queue forever.

separate_process

Logical, indicating if the task should be run in a separate process on the worker (see ⁠$enqueue⁠ for details).

timeout_task_run

Optionally, a maximum allowed running time, in seconds (see ⁠$enqueue⁠ for details).

depends_on

Vector or list of IDs of tasks which must have completed before this job can be run. Once all dependent tasks have been successfully run, this task will get added to the queue. If the dependent task fails then this task will be removed from the queue.

export

Optionally a list of variables to export for the calculation. See ⁠$enqueue⁠ for details.


Method lapply()

Apply a function over a list of data. This is equivalent to using ⁠$enqueue()⁠ over each element in the list.

Usage
rrq_controller$lapply(
  X,
  FUN,
  ...,
  dots = NULL,
  envir = parent.frame(),
  queue = NULL,
  separate_process = FALSE,
  timeout_task_run = NULL,
  depends_on = NULL,
  timeout_task_wait = NULL,
  time_poll = 1,
  progress = NULL,
  delete = FALSE,
  error = FALSE
)
Arguments
X

A list of data to apply our function against

FUN

A function to be applied to each element of X

...

Additional arguments to FUN

dots

As an alternative to ..., you can provide the dots as a list of additional arguments. This may be easier to program against.

envir

The environment to use to try and find the function

queue

The queue to add the tasks to (see ⁠$enqueue⁠ for details).

separate_process

Logical, indicating if the task should be run in a separate process on the worker (see ⁠$enqueue⁠ for details).

timeout_task_run

Optionally, a maximum allowed running time, in seconds (see the timeout argument of ⁠$enqueue⁠ for details).

depends_on

Vector or list of IDs of tasks which must have completed before this job can be run. Once all dependent tasks have been successfully run, this task will get added to the queue. If the dependent task fails then this task will be removed from the queue. Dependencies are applied to all tasks added to the queue.

timeout_task_wait

Optional timeout, in seconds, after which an error will be thrown if all tasks have not completed. If given as 0, then we return a handle that can be used to check for tasks using bulk_wait. If not given, falls back on the controller's timeout_task_wait (see ⁠$new()⁠)

time_poll

Optional time with which to "poll" for completion (default is 1s, see ⁠$task_wait()⁠ for details)

progress

Optional logical indicating if a progress bar should be displayed. If NULL we fall back on the value of the global option rrq.progress, and if that is unset display a progress bar if in an interactive session.

delete

Optional logical, indicating if the tasks should be be immediately deleted after collection, preventing buildup of lots of content in your Redis database.

error

Optional logical, indicating if an error in the task should throw. Like ⁠$task_result()⁠ the default is not to throw, giving you back an rrq_task_error object for each failing task. If error = TRUE we throw on error instead.


Method lapply_()

The "standard evaluation" version of ⁠$lapply()⁠. This differs in how the function is found and how dots are passed. With this version, both are passed by value; this may create more overhead on the redis server as the values of the variables will be copied over rather than using their names if possible.

Usage
rrq_controller$lapply_(
  X,
  FUN,
  ...,
  dots = NULL,
  envir = parent.frame(),
  queue = NULL,
  separate_process = FALSE,
  timeout_task_run = NULL,
  depends_on = NULL,
  timeout_task_wait = NULL,
  time_poll = 1,
  progress = NULL,
  delete = FALSE,
  error = FALSE
)
Arguments
X

A list of data to apply our function against

FUN

A function to be applied to each element of X

...

Additional arguments to FUN

dots

As an alternative to ..., you can provide the dots as a list of additional arguments. This may be easier to program against.

envir

The environment to use to try and find the function

queue

The queue to add the tasks to (see ⁠$enqueue⁠ for details).

separate_process

Logical, indicating if the task should be run in a separate process on the worker (see ⁠$enqueue⁠ for details).

timeout_task_run

Optionally, a maximum allowed running time, in seconds (see the timeout argument of ⁠$enqueue⁠ for details).

depends_on

Vector or list of IDs of tasks which must have completed before this job can be run. Once all dependent tasks have been successfully run, this task will get added to the queue. If the dependent task fails then this task will be removed from the queue. Dependencies are applied to all tasks added to the queue.

timeout_task_wait

Optional timeout, in seconds, after which an error will be thrown if all tasks have not completed. If given as 0, then we return a handle that can be used to check for tasks using bulk_wait. If not given, falls back on the controller's timeout_task_wait (see ⁠$new()⁠)

time_poll

Optional time with which to "poll" for completion (default is 1s, see ⁠$task_wait()⁠ for details)

progress

Optional logical indicating if a progress bar should be displayed. If NULL we fall back on the value of the global option rrq.progress, and if that is unset display a progress bar if in an interactive session.

delete

Optional logical, indicating if the tasks should be be immediately deleted after collection, preventing buildup of lots of content in your Redis database.

error

Optional logical, indicating if an error in the task should throw. Like ⁠$task_result()⁠ the default is not to throw, giving you back an rrq_task_error object for each failing task. If error = TRUE we throw on error instead.


Method enqueue_bulk()

Send a bulk set of tasks to your workers. This function is a bit like a mash-up of Map and do.call, when used with a data.frame argument, which is typically what is provided. Rather than ⁠$lapply()⁠ which applies FUN to each element of X, ⁠enqueue_bulk will apply over each row of ⁠X⁠, spreading the columms out as arguments. If you have a function ⁠f(a, b)⁠and a [data.frame] with columns⁠aandb' this should feel intuitive.

Usage
rrq_controller$enqueue_bulk(
  X,
  FUN,
  ...,
  dots = NULL,
  envir = parent.frame(),
  queue = NULL,
  separate_process = FALSE,
  timeout_task_run = NULL,
  depends_on = NULL,
  timeout_task_wait = NULL,
  time_poll = 1,
  progress = NULL,
  delete = FALSE,
  error = FALSE
)
Arguments
X

Typically a data.frame, which you want to apply FUN over, row-wise. The names of the data.frame must match the arguments of your function.

FUN

A function

...

Additional arguments to add to every call to FUN

dots

As an alternative to ..., you can provide the dots as a list of additional arguments. This may be easier to program against.

envir

The environment to use to try and find the function

queue

The queue to add the tasks to (see ⁠$enqueue⁠ for details).

separate_process

Logical, indicating if the task should be run in a separate process on the worker (see ⁠$enqueue⁠ for details).

timeout_task_run

Optionally, a maximum allowed running time, in seconds (see the timeout argument of ⁠$enqueue⁠ for details).

depends_on

Vector or list of IDs of tasks which must have completed before this job can be run. Once all dependent tasks have been successfully run, this task will get added to the queue. If the dependent task fails then this task will be removed from the queue. Dependencies are applied to all tasks added to the queue.

timeout_task_wait

Optional timeout, in seconds, after which an error will be thrown if all tasks have not completed. If given as 0, then we return a handle that can be used to check for tasks using bulk_wait. If not given, falls back on the controller's timeout_task_wait (see ⁠$new()⁠)

time_poll

Optional time with which to "poll" for completion (default is 1s, see ⁠$task_wait()⁠ for details)

progress

Optional logical indicating if a progress bar should be displayed. If NULL we fall back on the value of the global option rrq.progress, and if that is unset display a progress bar if in an interactive session.

delete

Optional logical, indicating if the tasks should be be immediately deleted after collection, preventing buildup of lots of content in your Redis database.

error

Optional logical, indicating if an error in the task should throw. Like ⁠$task_result()⁠ the default is not to throw, giving you back an rrq_task_error object for each failing task. If error = TRUE we throw on error instead.


Method enqueue_bulk_()

The "standard evaluation" version of ⁠$enqueue_bulk()⁠. This differs in how the function is found and how dots are passed. With this version, both are passed by value; this may create more overhead on the redis server as the values of the variables will be copied over rather than using their names if possible.

Usage
rrq_controller$enqueue_bulk_(
  X,
  FUN,
  ...,
  dots = NULL,
  envir = parent.frame(),
  queue = NULL,
  separate_process = FALSE,
  timeout_task_run = NULL,
  depends_on = NULL,
  timeout_task_wait = NULL,
  time_poll = 1,
  progress = NULL,
  delete = delete,
  error = error
)
Arguments
X

Typically a data.frame, which you want to apply FUN over, row-wise. The names of the data.frame must match the arguments of your function.

FUN

A function

...

Additional arguments to add to every call to FUN

dots

As an alternative to ..., you can provide the dots as a list of additional arguments. This may be easier to program against.

envir

The environment to use to try and find the function

queue

The queue to add the tasks to (see ⁠$enqueue⁠ for details).

separate_process

Logical, indicating if the task should be run in a separate process on the worker (see ⁠$enqueue⁠ for details).

timeout_task_run

Optionally, a maximum allowed running time, in seconds (see the timeout argument of ⁠$enqueue⁠ for details).

depends_on

Vector or list of IDs of tasks which must have completed before this job can be run. Once all dependent tasks have been successfully run, this task will get added to the queue. If the dependent task fails then this task will be removed from the queue. Dependencies are applied to all tasks added to the queue.

timeout_task_wait

Optional timeout, in seconds, after which an error will be thrown if all tasks have not completed. If given as 0, then we return a handle that can be used to check for tasks using bulk_wait. If not given, falls back on the controller's timeout_task_wait (see ⁠$new()⁠)

time_poll

Optional time with which to "poll" for completion (default is 1s, see ⁠$task_wait()⁠ for details)

progress

Optional logical indicating if a progress bar should be displayed. If NULL we fall back on the value of the global option rrq.progress, and if that is unset display a progress bar if in an interactive session.

delete

Optional logical, indicating if the tasks should be be immediately deleted after collection, preventing buildup of lots of content in your Redis database.

error

Optional logical, indicating if an error in the task should throw. Like ⁠$task_result()⁠ the default is not to throw, giving you back an rrq_task_error object for each failing task. If error = TRUE we throw on error instead.


Method bulk_wait()

Wait for a group of tasks

Usage
rrq_controller$bulk_wait(
  x,
  timeout = NULL,
  time_poll = 1,
  progress = NULL,
  delete = FALSE,
  error = FALSE,
  follow = NULL
)
Arguments
x

An object of class rrq_bulk, as created by ⁠$lapply()⁠

timeout

Optional timeout, in seconds, after which an error will be thrown if the task has not completed.

time_poll

Optional time with which to "poll" for completion (default is 1s, see ⁠$task_wait()⁠ for details)

progress

Optional logical indicating if a progress bar should be displayed. If NULL we fall back on the value of the global option rrq.progress, and if that is unset display a progress bar if in an interactive session.

delete

Optional logical, indicating if the tasks should be be immediately deleted after collection, preventing buildup of lots of content in your Redis database.

error

Optional logical, indicating if an error in the task should throw. Like ⁠$task_result()⁠ the default is not to throw, giving you back an rrq_task_error object for each failing task. If error = TRUE we throw on error instead.

follow

Optional logical, indicating if we should follow any redirects set up by doing ⁠$task_retry⁠. If not given, falls back on the value passed into the queue, the global option rrq.follow, and finally TRUE. Set to FALSE if you want to return information about the original task, even if it has been subsequently retried.


Method task_list()

List ids of all tasks known to this rrq controller

Usage
rrq_controller$task_list()

Method task_exists()

Test if task with id task_ids is known to this rrq controller

Usage
rrq_controller$task_exists(task_ids)
Arguments
task_ids

Character vector of task ids to check for existence.


Method task_status()

Return a character vector of task statuses. The name of each element corresponds to a task id, and the value will be one of the possible statuses ("PENDING", "COMPLETE", etc).

Usage
rrq_controller$task_status(task_ids = NULL, follow = NULL)
Arguments
task_ids

Optional character vector of task ids for which you would like statuses. If not given (or NULL) then the status of all task ids known to this rrq controller is returned.

follow

Optional logical, indicating if we should follow any redirects set up by doing ⁠$task_retry⁠. If not given, falls back on the value passed into the queue, the global option rrq.follow, and finally TRUE. Set to FALSE if you want to return information about the original task, even if it has been subsequently retried.


Method task_progress()

Retrieve task progress, if set. This will be NULL if progress has never been registered, otherwise whatever value was set - can be an arbitrary R object.

Usage
rrq_controller$task_progress(task_id)
Arguments
task_id

A single task id for which the progress is wanted.


Method task_overview()

Provide a high level overview of task statuses for a set of task ids, being the count in major categories of PENDING, RUNNING, COMPLETE and ERROR.

Usage
rrq_controller$task_overview(task_ids = NULL)
Arguments
task_ids

Optional character vector of task ids for which you would like the overview. If not given (or NULL) then the status of all task ids known to this rrq controller is used.


Method task_position()

Find the position of one or more tasks in the queue.

Usage
rrq_controller$task_position(
  task_ids,
  missing = 0L,
  queue = NULL,
  follow = NULL
)
Arguments
task_ids

Character vector of tasks to find the position for.

missing

Value to return if the task is not found in the queue. A task will take value missing if it is running, complete, errored, deferred etc and a positive integer if it is in the queue, indicating its position (with 1) being the next task to run.

queue

The name of the queue to query (defaults to the "default" queue).

follow

Optional logical, indicating if we should follow any redirects set up by doing ⁠$task_retry⁠. If not given, falls back on the value passed into the queue, the global option rrq.follow, and finally TRUE. Set to FALSE if you want to return information about the original task, even if it has been subsequently retried.


Method task_preceeding()

List the tasks in front of task_id in the queue. If the task is missing from the queue this will return NULL. If the task is next in the queue this will return an empty character vector.

Usage
rrq_controller$task_preceeding(task_id, queue = NULL, follow = NULL)
Arguments
task_id

Task to find the position for.

queue

The name of the queue to query (defaults to the "default" queue).

follow

Optional logical, indicating if we should follow any redirects set up by doing ⁠$task_retry⁠. If not given, falls back on the value passed into the queue, the global option rrq.follow, and finally TRUE. Set to FALSE if you want to return information about the original task, even if it has been subsequently retried.


Method task_result()

Get the result for a single task (see ⁠$tasks_result⁠ for a method for efficiently getting multiple results at once). Returns the value of running the task if it is complete, and an error otherwise.

Usage
rrq_controller$task_result(task_id, error = FALSE, follow = NULL)
Arguments
task_id

The single id for which the result is wanted.

error

Logical, indicating if we should throw an error if a task was not successful. By default (error = FALSE), in the case of the task result returning an error we return an object of class rrq_task_error, which contains information about the error. Passing error = TRUE simply calls stop() on this error if it is returned.

follow

Optional logical, indicating if we should follow any redirects set up by doing ⁠$task_retry⁠. If not given, falls back on the value passed into the queue, the global option rrq.follow, and finally TRUE. Set to FALSE if you want to return information about the original task, even if it has been subsequently retried.


Method tasks_result()

Get the results of a group of tasks, returning them as a list.

Usage
rrq_controller$tasks_result(task_ids, error = FALSE, follow = NULL)
Arguments
task_ids

A vector of task ids for which the task result is wanted.

error

Logical, indicating if we should throw an error if the task was not successful. See ⁠$task_result()⁠ for details.

follow

Optional logical, indicating if we should follow any redirects set up by doing ⁠$task_retry⁠. If not given, falls back on the value passed into the queue, the global option rrq.follow, and finally TRUE. Set to FALSE if you want to return information about the original task, even if it has been subsequently retried.


Method task_wait()

Poll for a task to complete, returning the result when completed. If the task has already completed this is roughly equivalent to task_result. See ⁠$tasks_wait⁠ for an efficient way of doing this for a group of tasks.

Usage
rrq_controller$task_wait(
  task_id,
  timeout = NULL,
  time_poll = 1,
  progress = NULL,
  error = FALSE,
  follow = NULL
)
Arguments
task_id

The single id that we will wait for

timeout

Optional timeout, in seconds, after which an error will be thrown if the task has not completed. If not given, falls back on the controller's timeout_task_wait (see ⁠$new()⁠)

time_poll

Optional time with which to "poll" for completion. By default this will be 1 second; this is the time that each request for a completed task may block for (however, if the task is finished before this, the actual time waited for will be less). Increasing this will reduce the responsiveness of your R session to interrupting, but will cause slightly less network load. Values less than 1s are not currently supported as this requires a very recent Redis server.

progress

Optional logical indicating if a progress bar should be displayed. If NULL we fall back on the value of the global option rrq.progress, and if that is unset display a progress bar if in an interactive session.

error

Logical, indicating if we should throw an error if the task was not successful. See ⁠$task_result()⁠ for details. Note that an error is always thrown if not all tasks are fetched in time.

follow

Optional logical, indicating if we should follow any redirects set up by doing ⁠$task_retry⁠. If not given, falls back on the value passed into the queue, the global option rrq.follow, and finally TRUE. Set to FALSE if you want to return information about the original task, even if it has been subsequently retried.


Method tasks_wait()

Poll for a group of tasks to complete, returning the result as list when completed. If the tasks have already completed this is roughly equivalent to tasks_result.

Usage
rrq_controller$tasks_wait(
  task_ids,
  timeout = NULL,
  time_poll = 1,
  progress = NULL,
  error = FALSE,
  follow = NULL
)
Arguments
task_ids

A vector of task ids to poll for

timeout

Optional timeout, in seconds, after which an error will be thrown if the task has not completed. If not given, falls back on the controller's timeout_task_wait (see ⁠$new()⁠)

time_poll

Optional time with which to "poll" for completion (default is 1s, see ⁠$task_wait()⁠ for details)

progress

Optional logical indicating if a progress bar should be displayed. If NULL we fall back on the value of the global option rrq.progress, and if that is unset display a progress bar if in an interactive session.

error

Logical, indicating if we should throw an error if the task was not successful. See ⁠$task_result()⁠ for details. Note that an error is always thrown if not all tasks are fetched in time.

follow

Optional logical, indicating if we should follow any redirects set up by doing ⁠$task_retry⁠. If not given, falls back on the value passed into the queue, the global option rrq.follow, and finally TRUE. Set to FALSE if you want to return information about the original task, even if it has been subsequently retried.


Method task_delete()

Delete one or more tasks

Usage
rrq_controller$task_delete(task_ids, check = TRUE)
Arguments
task_ids

Vector of task ids to delete

check

Logical indicating if we should check that the tasks are not running. Deleting running tasks is unlikely to result in desirable behaviour.


Method task_cancel()

Cancel a single task. If the task is PENDING it will be unqueued and the status set to CANCELED. If RUNNING then the task will be stopped if it was set to run in a separate process (i.e., queued with separate_process = TRUE). Dependent tasks will be marked as impossible.

Usage
rrq_controller$task_cancel(task_id, wait = TRUE, timeout_wait = 10)
Arguments
task_id

Id of the task to cancel

wait

Wait for the task to be stopped, if it was running.

timeout_wait

Maximum time, in seconds, to wait for the task to be cancelled by the worker.

Returns

Nothing if successfully cancelled, otherwise throws an error with task_id and status e.g. Task 123 is not running (MISSING)


Method task_data()

Fetch internal data about a task from Redis (expert use only).

Usage
rrq_controller$task_data(task_id)
Arguments
task_id

The id of the task


Method task_info()

Return information about a task. This currently includes information about where a task is (or was) running and information about any retry chain, but will expand in future. The format of the output here is subject to change (and will probably get a nice print method) but the values present in the output will be included in any future update.

Usage
rrq_controller$task_info(task_id)
Arguments
task_id

The id of the task to fetch information about


Method task_times()

Fetch times for tasks at points in their life cycle. For each task returns the time of submission, starting and completion (not necessarily successfully; this includes errors and interruptions). If a task has not reached a point yet (e.g., submitted but not run, or running but not finished) the time will be NA). Times are returned in unix timestamp format in UTC; you can use redux::redis_time_to_r to convert them to a POSIXt object.

Usage
rrq_controller$task_times(task_ids, follow = NULL)
Arguments
task_ids

Task ids to fetch times for.

follow

Optional logical, indicating if we should follow any redirects set up by doing ⁠$task_retry⁠. If not given, falls back on the value passed into the queue, the global option rrq.follow, and finally TRUE. Set to FALSE if you want to return information about the original task, even if it has been subsequently retried.


Method task_retry()

Retry a task (or set of tasks). Typically this is after failure (e.g., ERROR, DIED or similar) but you can retry even successfully completed tasks. Once retried, methods that retrieve information about a task (e.g., ⁠$task_status()⁠, ⁠$task_result()⁠) will behave differently depending on the value of their follow argument. See vignette("fault-tolerance") for more details.

Usage
rrq_controller$task_retry(task_ids)
Arguments
task_ids

Task ids to retry.


Method queue_length()

Returns the number of tasks in the queue (waiting for an available worker).

Usage
rrq_controller$queue_length(queue = NULL)
Arguments
queue

The name of the queue to query (defaults to the "default" queue).


Method queue_list()

Returns the keys in the task queue.

Usage
rrq_controller$queue_list(queue = NULL)
Arguments
queue

The name of the queue to query (defaults to the "default" queue).


Method queue_remove()

Remove task ids from a queue.

Usage
rrq_controller$queue_remove(task_ids, queue = NULL)
Arguments
task_ids

Task ids to remove

queue

The name of the queue to query (defaults to the "default" queue).


Method deferred_list()

Return deferred tasks and what they are waiting on. Note this is in an arbitrary order, tasks will be added to the queue as their dependencies are satisfied.

Usage
rrq_controller$deferred_list()

Method worker_len()

Returns the number of active workers

Usage
rrq_controller$worker_len()

Method worker_list()

Returns the ids of active workers

Usage
rrq_controller$worker_list()

Method worker_list_exited()

Returns the ids of workers known to have exited

Usage
rrq_controller$worker_list_exited()

Method worker_info()

Returns a list of information about active workers (or exited workers if worker_ids includes them).

Usage
rrq_controller$worker_info(worker_ids = NULL)
Arguments
worker_ids

Optional vector of worker ids. If NULL then all active workers are used.


Method worker_status()

Returns a character vector of current worker statuses

Usage
rrq_controller$worker_status(worker_ids = NULL)
Arguments
worker_ids

Optional vector of worker ids. If NULL then all active workers are used.


Method worker_log_tail()

Returns the last (few) elements in the worker log. The log will be returned as a data.frame of entries worker_id (the worker id), child (the process id, an integer, where logs come from a child process from a task queued with separate_process = TRUE), time (the time in Redis when the event happened; see redux::redis_time to convert this to an R time), command (the worker command) and message (the message corresponding to that command).

Usage
rrq_controller$worker_log_tail(worker_ids = NULL, n = 1)
Arguments
worker_ids

Optional vector of worker ids. If NULL then all active workers are used.

n

Number of elements to select, the default being the single last entry. Use Inf or 0 to indicate that you want all log entries


Method worker_delete_exited()

Cleans up workers known to have exited

Usage
rrq_controller$worker_delete_exited(worker_ids = NULL)
Arguments
worker_ids

Optional vector of worker ids. If NULL then rrq looks for exited workers.


Method worker_stop()

Stop workers.

Usage
rrq_controller$worker_stop(
  worker_ids = NULL,
  type = "message",
  timeout = 0,
  time_poll = 0.05,
  progress = NULL
)
Arguments
worker_ids

Optional vector of worker ids. If NULL then all active workers will be stopped.

type

The strategy used to stop the workers. Can be message, kill or kill_local (see details).

timeout

Optional timeout; if greater than zero then we poll for a response from the worker for this many seconds until they acknowledge the message and stop (only has an effect if type is message). If a timeout of greater than zero is given, then for a message-based stop we wait up to this many seconds for the worker to exit. That means that we might wait up to 2 * timeout seconds for this function to return.

time_poll

If type is message and timeout is greater than zero, this is the polling interval used between redis calls. Increasing this reduces network load but decreases the ability to interrupt the process.

progress

Optional logical indicating if a progress bar should be displayed. If NULL we fall back on the value of the global option rrq.progress, and if that is unset display a progress bar if in an interactive session.

Details

The type parameter indicates the strategy used to stop workers, and interacts with other parameters. The strategies used by the different values are:

  • message, in which case a STOP message will be sent to the worker, which they will receive after finishing any currently running task (if RUNNING; IDLE workers will stop immediately).

  • kill, in which case a kill signal will be sent via the heartbeat (if the worker is using one). This will kill the worker even if is currently working on a task, eventually leaving that task with a status of DIED.

  • kill_local, in which case a kill signal is sent using operating system signals, which requires that the worker is on the same machine as the controller.


Method worker_detect_exited()

Detects exited workers through a lapsed heartbeat

Usage
rrq_controller$worker_detect_exited()

Method worker_process_log()

Return the contents of a worker's process log, if it is located on the same physical storage (including network storage) as the controller. This will generally behave for workers started with rrq_worker_spawn but may require significant care otherwise.

Usage
rrq_controller$worker_process_log(worker_id)
Arguments
worker_id

The worker for which the log is required


Method worker_config_save()

Save a worker configuration, which can be used to start workers with a set of options with the cli. These correspond to arguments to rrq_worker.

Usage
rrq_controller$worker_config_save(name, config, overwrite = TRUE)
Arguments
name

Name for this configuration

config

A worker configuration, created by rrq_worker_config()

overwrite

Logical, indicating if an existing configuration with this name should be overwritten if it exists. If FALSE, then the configuration is not updated, even if it differs from the version currently saved.

Returns

Invisibly, a boolean indicating if the configuration was updated.


Method worker_config_list()

Return names of worker configurations saved by ⁠$worker_config_save⁠

Usage
rrq_controller$worker_config_list()

Method worker_config_read()

Return the value of a of worker configuration saved by ⁠$worker_config_save⁠

Usage
rrq_controller$worker_config_read(name)
Arguments
name

Name of the configuration Report on worker "load" (the number of workers being used over time). Reruns an object of class worker_load, for which a mean method exists (this method is a work in progress and the interface may change).


Method worker_load()

Usage
rrq_controller$worker_load(worker_ids = NULL)
Arguments
worker_ids

Optional vector of worker ids. If NULL then all active workers are used.


Method message_send()

Send a message to workers. Sending a message returns a message id, which can be used to poll for a response with the other ⁠message_*⁠ methods.

Usage
rrq_controller$message_send(command, args = NULL, worker_ids = NULL)
Arguments
command

A command, such as PING, PAUSE; see the Messages section of the Details for al messages.

args

Arguments to the command, if supported

worker_ids

Optional vector of worker ids to send the message to. If NULL then the message will be sent to all active workers.


Method message_has_response()

Detect if a response is available for a message

Usage
rrq_controller$message_has_response(
  message_id,
  worker_ids = NULL,
  named = TRUE
)
Arguments
message_id

The message id

worker_ids

Optional vector of worker ids. If NULL then all active workers are used (note that this may differ to the set of workers that the message was sent to!)

named

Logical, indicating if the return vector should be named


Method message_get_response()

Get response to messages, waiting until the message has been responded to.

Usage
rrq_controller$message_get_response(
  message_id,
  worker_ids = NULL,
  named = TRUE,
  delete = FALSE,
  timeout = 0,
  time_poll = 0.05,
  progress = NULL
)
Arguments
message_id

The message id

worker_ids

Optional vector of worker ids. If NULL then all active workers are used (note that this may differ to the set of workers that the message was sent to!)

named

Logical, indicating if the return value should be named by worker id.

delete

Logical, indicating if messages should be deleted after retrieval

timeout

Integer, representing seconds to wait until the response has been received. An error will be thrown if a response has not been received in this time.

time_poll

If timeout is greater than zero, this is the polling interval used between redis calls. Increasing this reduces network load but increases the time that may be waited for.

progress

Optional logical indicating if a progress bar should be displayed. If NULL we fall back on the value of the global option rrq.progress, and if that is unset display a progress bar if in an interactive session.


Method message_response_ids()

Return ids for messages with responses for a particular worker.

Usage
rrq_controller$message_response_ids(worker_id)
Arguments
worker_id

The worker id


Method message_send_and_wait()

Send a message and wait for responses. This is a helper function around message_send and message_get_response.

Usage
rrq_controller$message_send_and_wait(
  command,
  args = NULL,
  worker_ids = NULL,
  named = TRUE,
  delete = TRUE,
  timeout = 600,
  time_poll = 0.05,
  progress = NULL
)
Arguments
command

A command, such as PING, PAUSE; see the Messages section of the Details for al messages.

args

Arguments to the command, if supported

worker_ids

Optional vector of worker ids to send the message to. If NULL then the message will be sent to all active workers.

named

Logical, indicating if the return value should be named by worker id.

delete

Logical, indicating if messages should be deleted after retrieval

timeout

Integer, representing seconds to wait until the response has been received. An error will be thrown if a response has not been received in this time.

time_poll

If timeout is greater than zero, this is the polling interval used between redis calls. Increasing this reduces network load but increases the time that may be waited for.

progress

Optional logical indicating if a progress bar should be displayed. If NULL we fall back on the value of the global option rrq.progress, and if that is unset display a progress bar if in an interactive session.


richfitz/rrq documentation built on March 10, 2024, 12:03 a.m.