| crew_class_controller_group | R Documentation | 
R6 class for controller groups.
See crew_controller_group().
controllersList of R6 controller objects.
relayRelay object for event-driven programming on a downstream condition variable.
throttlecrew_throttle() object to orchestrate exponential
backoff in the relay and auto-scaling.
new()Multi-controller constructor.
crew_class_controller_group$new( controllers = NULL, relay = NULL, throttle = NULL )
controllersList of R6 controller objects.
relayRelay object for event-driven programming on a downstream condition variable.
throttlecrew_throttle() object to orchestrate exponential
backoff in the relay and auto-scaling.
An R6 object with the controller group object.
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
persistent <- crew_controller_local(name = "persistent")
transient <- crew_controller_local(
  name = "transient",
  tasks_max = 1L
)
group <- crew_controller_group(persistent, transient)
group$start()
group$push(name = "task", command = sqrt(4), controller = "transient")
group$wait()
group$pop()
group$terminate()
}
validate()Validate the client.
crew_class_controller_group$validate()
NULL (invisibly).
empty()See if the controllers are empty.
crew_class_controller_group$empty(controllers = NULL)
controllersCharacter vector of controller names.
Set to NULL to select all controllers.
A controller is empty if it has no running tasks
or completed tasks waiting to be retrieved with push().
TRUE if all the selected controllers are empty,
FALSE otherwise.
nonempty()Check if the controller group is nonempty.
crew_class_controller_group$nonempty(controllers = NULL)
controllersCharacter vector of controller names.
Set to NULL to select all controllers.
A controller is empty if it has no running tasks
or completed tasks waiting to be retrieved with push().
TRUE if the controller is empty, FALSE otherwise.
resolved()Number of resolved mirai() tasks.
crew_class_controller_group$resolved(controllers = NULL)
controllersCharacter vector of controller names.
Set to NULL to select all controllers.
resolved() is cumulative: it counts all the resolved
tasks over the entire lifetime of the controller session.
Non-negative integer of length 1,
number of resolved mirai() tasks.
The return value is 0 if the condition variable does not exist
(i.e. if the client is not running).
unresolved()Number of unresolved mirai() tasks.
crew_class_controller_group$unresolved(controllers = NULL)
controllersCharacter vector of controller names.
Set to NULL to select all controllers.
Non-negative integer of length 1,
number of unresolved mirai() tasks.
unpopped()Number of resolved mirai() tasks available via pop().
crew_class_controller_group$unpopped(controllers = NULL)
controllersCharacter vector of controller names.
Set to NULL to select all controllers.
Non-negative integer of length 1,
number of resolved mirai() tasks available via pop().
saturated()Check if a controller is saturated.
crew_class_controller_group$saturated( collect = NULL, throttle = NULL, controller = NULL )
collectDeprecated in version 0.5.0.9003 (2023-10-02). Not used.
throttleDeprecated in version 0.5.0.9003 (2023-10-02). Not used.
controllerCharacter vector of length 1 with the controller name.
Set to NULL to select the default controller that push()
would choose.
A controller is saturated if the number of unresolved tasks
is greater than or equal to the maximum number of workers.
In other words, in a saturated controller, every available worker
has a task.
You can still push tasks to a saturated controller, but
tools that use crew such as targets may choose not to.
TRUE if all the selected controllers are saturated,
FALSE otherwise.
start()Start one or more controllers.
crew_class_controller_group$start(controllers = NULL)
controllersCharacter vector of controller names.
Set to NULL to select all controllers.
NULL (invisibly).
started()Check whether all the given controllers are started.
crew_class_controller_group$started(controllers = NULL)
controllersCharacter vector of controller names.
Set to NULL to select all controllers.
Actually checks whether all the given clients are started.
TRUE if the controllers are started, FALSE if any are not.
launch()Launch one or more workers on one or more controllers.
crew_class_controller_group$launch(n = 1L, controllers = NULL)
nNumber of workers to launch in each controller selected.
controllersCharacter vector of controller names.
Set to NULL to select all controllers.
NULL (invisibly).
scale()Automatically scale up the number of workers if needed in one or more controller objects.
crew_class_controller_group$scale(throttle = TRUE, controllers = NULL)
throttleTRUE to skip auto-scaling if it already happened
within the last seconds_interval seconds. FALSE to auto-scale
every time scale() is called. Throttling avoids
overburdening the mirai dispatcher and other resources.
controllersCharacter vector of controller names.
Set to NULL to select all controllers.
See the scale() method in individual controller classes.
Invisibly returns TRUE if there was any relevant
auto-scaling activity (new worker launches or worker
connection/disconnection events) (FALSE otherwise).
autoscale()Run worker auto-scaling in a private later loop
every controller$client$seconds_interval seconds.
crew_class_controller_group$autoscale(controllers = NULL)
controllersCharacter vector of controller names.
Set to NULL to select all controllers.
NULL (invisibly).
descale()Terminate the auto-scaling loop started by
controller$autoscale().
crew_class_controller_group$descale(controllers = NULL)
controllersCharacter vector of controller names.
Set to NULL to select all controllers.
NULL (invisibly).
crashes()Report the number of consecutive crashes of a task, summed over all selected controllers in the group.
crew_class_controller_group$crashes(name, controllers = NULL)
nameCharacter string, name of the task to check.
controllersNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
See the crashes_max argument of crew_controller().
Number of consecutive crashes of the named task, summed over all the controllers in the group.
push()Push a task to the head of the task list.
crew_class_controller_group$push( command, data = list(), globals = list(), substitute = TRUE, seed = NULL, algorithm = NULL, packages = character(0), library = NULL, seconds_timeout = NULL, scale = TRUE, throttle = TRUE, name = NULL, save_command = NULL, controller = NULL )
commandLanguage object with R code to run.
dataNamed list of local data objects in the evaluation environment.
globalsNamed list of objects to temporarily assign to the
global environment for the task. See the reset_globals
argument of crew_controller_local().
substituteLogical of length 1, whether to call
base::substitute() on the supplied value of the
command argument. If TRUE (default) then command is quoted
literally as you write it, e.g.
push(command = your_function_call()). If FALSE, then crew
assumes command is a language object and you are passing its
value, e.g. push(command = quote(your_function_call())).
substitute = TRUE is appropriate for interactive use,
whereas substitute = FALSE is meant for automated R programs
that invoke crew controllers.
seedInteger of length 1 with the pseudo-random number generator
seed to set for the evaluation of the task. Passed to the
seed argument of set.seed() if not NULL.
If algorithm and seed are both NULL,
then the random number generator defaults to the
widely spaced worker-specific
L'Ecuyer streams as supported by mirai::nextstream().
See vignette("parallel", package = "parallel") for details.
algorithmInteger of length 1 with the pseudo-random number
generator algorithm to set for the evaluation of the task.
Passed to the kind argument of RNGkind() if not NULL.
If algorithm and seed are both NULL,
then the random number generator defaults to the
recommended widely spaced worker-specific
L'Ecuyer streams as supported by mirai::nextstream().
See vignette("parallel", package = "parallel") for details.
packagesCharacter vector of packages to load for the task.
libraryLibrary path to load the packages. See the lib.loc
argument of require().
seconds_timeoutOptional task timeout passed to the .timeout
argument of mirai::mirai() (after converting to milliseconds).
scaleLogical, whether to automatically scale workers to meet
demand. See the scale argument of the push() method of
ordinary single controllers.
throttleTRUE to skip auto-scaling if it already happened
within the last seconds_interval seconds. FALSE to auto-scale
every time scale() is called. Throttling avoids
overburdening the mirai dispatcher and other resources.
nameCharacter string, name of the task. If NULL,
a random name is automatically generated.
The task name must not conflict with an existing task
in the controller where it is submitted.
To reuse the name, wait for the existing task
to finish, then either pop() or collect() it
to remove it from its controller.
save_commandDeprecated on 2025-01-22
(crew version 0.10.2.9004).
controllerCharacter of length 1,
name of the controller to submit the task.
If NULL, the controller defaults to the
first controller in the list.
Invisibly return the mirai object of the pushed task.
This allows you to interact with the task directly, e.g.
to create a promise object with promises::as.promise().
walk()Apply a single command to multiple inputs, and return control to the user without waiting for any task to complete.
crew_class_controller_group$walk( command, iterate, data = list(), globals = list(), substitute = TRUE, seed = NULL, algorithm = NULL, packages = character(0), library = NULL, seconds_timeout = NULL, names = NULL, save_command = NULL, verbose = interactive(), scale = TRUE, throttle = TRUE, controller = NULL )
commandLanguage object with R code to run.
iterateNamed list of vectors or lists to iterate over.
For example, to run function calls
f(x = 1, y = "a") and f(x = 2, y = "b"),
set command to f(x, y), and set iterate to
list(x = c(1, 2), y = c("a", "b")). The individual
function calls are evaluated as
f(x = iterate$x[[1]], y = iterate$y[[1]]) and
f(x = iterate$x[[2]], y = iterate$y[[2]]).
All the elements of iterate must have the same length.
If there are any name conflicts between iterate and data,
iterate takes precedence.
dataNamed list of constant local data objects in the evaluation environment. Objects in this list are treated as single values and are held constant for each iteration of the map.
globalsNamed list of constant objects to temporarily
assign to the global environment for each task. This list should
include any functions you previously defined in the global
environment which are required to run tasks.
See the reset_globals argument of crew_controller_local().
Objects in this list are treated as single
values and are held constant for each iteration of the map.
substituteLogical of length 1, whether to call
base::substitute() on the supplied value of the
command argument. If TRUE (default) then command is quoted
literally as you write it, e.g.
push(command = your_function_call()). If FALSE, then crew
assumes command is a language object and you are passing its
value, e.g. push(command = quote(your_function_call())).
substitute = TRUE is appropriate for interactive use,
whereas substitute = FALSE is meant for automated R programs
that invoke crew controllers.
seedInteger of length 1 with the pseudo-random number generator
seed to set for the evaluation of the task. Passed to the
seed argument of set.seed() if not NULL.
If algorithm and seed are both NULL,
then the random number generator defaults to the
recommended widely spaced worker-specific
L'Ecuyer streams as supported by mirai::nextstream().
See vignette("parallel", package = "parallel") for details.
algorithmInteger of length 1 with the pseudo-random number
generator algorithm to set for the evaluation of the task.
Passed to the kind argument of RNGkind() if not NULL.
If algorithm and seed are both NULL,
then the random number generator defaults to the
recommended widely spaced worker-specific
L'Ecuyer streams as supported by mirai::nextstream().
See vignette("parallel", package = "parallel") for details.
packagesCharacter vector of packages to load for the task.
libraryLibrary path to load the packages. See the lib.loc
argument of require().
seconds_timeoutOptional task timeout passed to the .timeout
argument of mirai::mirai() (after converting to milliseconds).
namesOptional character of length 1, name of the element of
iterate with names for the tasks. If names is supplied,
then iterate[[names]] must be a character vector.
save_commandDeprecated on 2025-01-22
(crew version 0.10.2.9004).
verboseLogical of length 1, whether to print to a progress bar when pushing tasks.
scaleLogical, whether to automatically scale workers to meet
demand. See also the throttle argument.
throttleTRUE to skip auto-scaling if it already happened
within the last seconds_interval seconds. FALSE to auto-scale
every time scale() is called. Throttling avoids
overburdening the mirai dispatcher and other resources.
controllerCharacter of length 1,
name of the controller to submit the tasks.
If NULL, the controller defaults to the
first controller in the list.
In contrast to walk(), map() blocks the local R session
and waits for all tasks to complete.
Invisibly returns a list of mirai task objects for the
newly created tasks. The order of tasks in the list matches the
order of data in the iterate argument.
map()Apply a single command to multiple inputs.
crew_class_controller_group$map( command, iterate, data = list(), globals = list(), substitute = TRUE, seed = NULL, algorithm = NULL, packages = character(0), library = NULL, seconds_interval = NULL, seconds_timeout = NULL, names = NULL, save_command = NULL, error = "stop", warnings = TRUE, verbose = interactive(), scale = TRUE, throttle = TRUE, controller = NULL )
commandLanguage object with R code to run.
iterateNamed list of vectors or lists to iterate over.
For example, to run function calls
f(x = 1, y = "a") and f(x = 2, y = "b"),
set command to f(x, y), and set iterate to
list(x = c(1, 2), y = c("a", "b")). The individual
function calls are evaluated as
f(x = iterate$x[[1]], y = iterate$y[[1]]) and
f(x = iterate$x[[2]], y = iterate$y[[2]]).
All the elements of iterate must have the same length.
If there are any name conflicts between iterate and data,
iterate takes precedence.
dataNamed list of constant local data objects in the evaluation environment. Objects in this list are treated as single values and are held constant for each iteration of the map.
globalsNamed list of constant objects to temporarily
assign to the global environment for each task. This list should
include any functions you previously defined in the global
environment which are required to run tasks.
See the reset_globals argument of crew_controller_local().
Objects in this list are treated as single
values and are held constant for each iteration of the map.
substituteLogical of length 1, whether to call
base::substitute() on the supplied value of the
command argument. If TRUE (default) then command is quoted
literally as you write it, e.g.
push(command = your_function_call()). If FALSE, then crew
assumes command is a language object and you are passing its
value, e.g. push(command = quote(your_function_call())).
substitute = TRUE is appropriate for interactive use,
whereas substitute = FALSE is meant for automated R programs
that invoke crew controllers.
seedInteger of length 1 with the pseudo-random number generator
seed to set for the evaluation of the task. Passed to the
seed argument of set.seed() if not NULL.
If algorithm and seed are both NULL,
then the random number generator defaults to the
recommended widely spaced worker-specific
L'Ecuyer streams as supported by mirai::nextstream().
See vignette("parallel", package = "parallel") for details.
algorithmInteger of length 1 with the pseudo-random number
generator algorithm to set for the evaluation of the task.
Passed to the kind argument of RNGkind() if not NULL.
If algorithm and seed are both NULL,
then the random number generator defaults to the
recommended widely spaced worker-specific
L'Ecuyer streams as supported by mirai::nextstream().
See vignette("parallel", package = "parallel") for details.
packagesCharacter vector of packages to load for the task.
libraryLibrary path to load the packages. See the lib.loc
argument of require().
seconds_intervalDeprecated on 2025-01-17 (crew version
0.10.2.9003). Instead, the seconds_interval argument passed
to crew_controller_group() is used as seconds_max
in a crew_throttle() object which orchestrates exponential
backoff.
seconds_timeoutOptional task timeout passed to the .timeout
argument of mirai::mirai() (after converting to milliseconds).
namesOptional character of length 1, name of the element of
iterate with names for the tasks. If names is supplied,
then iterate[[names]] must be a character vector.
save_commandDeprecated on 2025-01-22
(crew version 0.10.2.9004).
errorCharacter vector of length 1, choice of action if a task has an error. Possible values:
"stop": throw an error in the main R session instead of returning
a value. In case of an error, the results from the last errored
map() are in the error field
of the controller, e.g. controller_object$error. To reduce
memory consumption, set controller_object$error <- NULL after
you are finished troubleshooting.
"warn": throw a warning. This allows the return value with
all the error messages and tracebacks to be generated.
"silent": do nothing special.
warningsLogical of length 1, whether to throw a warning in the interactive session if at least one task encounters an error.
verboseLogical of length 1, whether to print progress messages.
scaleLogical, whether to automatically scale workers to meet
demand. See also the throttle argument.
throttleTRUE to skip auto-scaling if it already happened
within the last seconds_interval seconds. FALSE to auto-scale
every time scale() is called. Throttling avoids
overburdening the mirai dispatcher and other resources.
controllerCharacter of length 1,
name of the controller to submit the tasks.
If NULL, the controller defaults to the
first controller in the list.
The idea comes from functional programming: for example,
the map() function from the purrr package.
A tibble of results and metadata: one row per task and
columns corresponding to the output of pop().
pop()Pop a completed task from the results data frame.
crew_class_controller_group$pop( scale = TRUE, collect = NULL, throttle = TRUE, error = NULL, controllers = NULL )
scaleLogical, whether to automatically scale workers to meet
demand. See the scale argument of the pop() method of
ordinary single controllers.
collectDeprecated in version 0.5.0.9003 (2023-10-02). Not used.
throttleTRUE to skip auto-scaling if it already happened
within the last seconds_interval seconds. FALSE to auto-scale
every time scale() is called. Throttling avoids
overburdening the mirai dispatcher and other resources.
errorNULL or character of length 1, choice of action if
the popped task threw an error. Possible values:
"stop": throw an error in the main R session instead of returning
a value.
"warn": throw a warning.
NULL or "silent": do not react to errors.
controllersCharacter vector of controller names.
Set to NULL to select all controllers.
If there is no task to collect, return NULL. Otherwise,
return a one-row tibble with the same columns as pop()
for ordinary controllers.
collect()Pop all available task results and return them in a tidy
tibble.
crew_class_controller_group$collect( scale = TRUE, throttle = TRUE, error = NULL, controllers = NULL )
scaleLogical of length 1,
whether to automatically call scale()
to auto-scale workers to meet the demand of the task load.
throttleTRUE to skip auto-scaling if it already happened
within the last seconds_interval seconds. FALSE to auto-scale
every time scale() is called. Throttling avoids
overburdening the mirai dispatcher and other resources.
errorNULL or character of length 1, choice of action if
the popped task threw an error. Possible values:
"stop": throw an error in the main R session instead of returning
a value.
"warn": throw a warning.
NULL or "silent": do not react to errors.
controllersCharacter vector of controller names.
Set to NULL to select all controllers.
A tibble of results and metadata of all resolved tasks,
with one row per task. Returns NULL if there are no available
results.
promise()Create a promises::promise() object to asynchronously
pop or collect one or more tasks.
crew_class_controller_group$promise( mode = "one", seconds_interval = 0.1, scale = NULL, throttle = NULL, controllers = NULL )
modeCharacter of length 1, what kind of promise to create.
mode must be "one" or "all". Details:
 If mode is "one", then the promise is fulfilled (or rejected)
when at least one task is resolved and available to pop().
When that happens, pop() runs asynchronously, pops a result off
the task list, and returns a value.
If the task succeeded, then the promise
is fulfilled and its value is the result of pop() (a one-row
tibble with the result and metadata). If the task threw an error,
the error message of the task is forwarded to any error callbacks
registered with the promise.
 If mode is "all", then the promise is fulfilled (or rejected)
when there are no unresolved tasks left in the controller.
(Be careful: this condition is trivially met in the moment
if the controller is empty and you have not submitted any tasks,
so it is best to create this kind of promise only after you
submit tasks.)
When there are no unresolved tasks left,
collect() runs asynchronously, pops all available results
off the task list, and returns a value.
If the task succeeded, then the promise
is fulfilled and its value is the result of collect()
(a tibble with one row per task result). If any of the tasks
threw an error, then the first error message detected is forwarded
to any error callbacks registered with the promise.
seconds_intervalPositive numeric of length 1, delay in the
later::later() polling interval to asynchronously check if
the promise can be resolved.
scaleDeprecated on 2024-04-10 (version 0.9.1.9003)
and no longer used. Now, promise() always turns on auto-scaling
in a private later loop (if not already activated).
throttleDeprecated on 2024-04-10 (version 0.9.1.9003)
and no longer used. Now, promise() always turns on auto-scaling
in a private later loop (if not already activated).
controllersNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
Please be aware that pop() or collect() will happen
asynchronously at a some unpredictable time after the promise object
is created, even if your local R process appears to be doing
something completely different. This behavior is highly desirable
in a Shiny reactive context, but please be careful as it may be
surprising in other situations.
A promises::promise() object whose eventual value will
be a tibble with results from one or more popped tasks.
If mode = "one", only one task is popped and returned (one row).
If mode = "all", then all the tasks are returned in a tibble
with one row per task (or NULL is returned if there are no
tasks to pop).
wait()Wait for tasks.
crew_class_controller_group$wait( mode = "all", seconds_interval = NULL, seconds_timeout = Inf, scale = TRUE, throttle = TRUE, controllers = NULL )
modeCharacter of length 1: "all" to wait for
all tasks in all controllers to complete, "one" to wait for
a single task in a single controller to complete. In this scheme,
the timeout limit is applied to each controller sequentially,
and a timeout is treated the same as a completed controller.
seconds_intervalDeprecated on 2025-01-17 (crew version
0.10.2.9003). Instead, the seconds_interval argument passed
to crew_controller_group() is used as seconds_max
in a crew_throttle() object which orchestrates exponential
backoff.
seconds_timeoutTimeout length in seconds waiting for results to become available.
scaleLogical of length 1, whether to call scale_later()
on each selected controller to schedule auto-scaling.
See the scale argument of the wait() method of
ordinary single controllers.
throttleTRUE to skip auto-scaling if it already happened
within the last seconds_interval seconds. FALSE to auto-scale
every time scale() is called. Throttling avoids
overburdening the mirai dispatcher and other resources.
controllersCharacter vector of controller names.
Set to NULL to select all controllers.
The wait() method blocks the calling R session and
repeatedly auto-scales workers for tasks that need them.
The function runs until it either times out or the condition
in mode is met.
A logical of length 1, invisibly. TRUE if the condition
in mode was met, FALSE otherwise.
push_backlog()Push the name of a task to the backlog.
crew_class_controller_group$push_backlog(name, controller = NULL)
nameCharacter of length 1 with the task name to push to the backlog.
controllerCharacter vector of length 1 with the controller name.
Set to NULL to select the default controller that push_backlog()
would choose.
pop_backlog() pops the tasks that can be pushed
without saturating the controller.
NULL (invisibly).
pop_backlog()Pop the task names from the head of the backlog which can be pushed without saturating the controller.
crew_class_controller_group$pop_backlog(controllers = NULL)
controllersCharacter vector of controller names.
Set to NULL to select all controllers.
Character vector of task names which can be pushed to the
controller without saturating it. If the controller is saturated,
character(0L) is returned.
summary()Summarize the workers of one or more controllers.
crew_class_controller_group$summary(controllers = NULL)
controllersCharacter vector of controller names.
Set to NULL to select all controllers.
A data frame of aggregated worker summary statistics
of all the selected controllers. It has one row per worker,
and the rows are grouped by controller.
See the documentation of the summary() method of the controller
class for specific information about the columns in the output.
pids()Get the process IDs of the local process and the
mirai dispatchers (if started).
crew_class_controller_group$pids(controllers = NULL)
controllersCharacter vector of controller names.
Set to NULL to select all controllers.
An integer vector of process IDs of the local process and the
mirai dispatcher (if started).
terminate()Terminate the workers and disconnect the client for one or more controllers.
crew_class_controller_group$terminate(controllers = NULL)
controllersCharacter vector of controller names.
Set to NULL to select all controllers.
NULL (invisibly).
Other controller_group: 
crew_controller_group()
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
persistent <- crew_controller_local(name = "persistent")
transient <- crew_controller_local(
  name = "transient",
  tasks_max = 1L
)
group <- crew_controller_group(persistent, transient)
group$start()
group$push(name = "task", command = sqrt(4), controller = "transient")
group$wait()
group$pop()
group$terminate()
}
## ------------------------------------------------
## Method `crew_class_controller_group$new`
## ------------------------------------------------
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
persistent <- crew_controller_local(name = "persistent")
transient <- crew_controller_local(
  name = "transient",
  tasks_max = 1L
)
group <- crew_controller_group(persistent, transient)
group$start()
group$push(name = "task", command = sqrt(4), controller = "transient")
group$wait()
group$pop()
group$terminate()
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.