crew
is a distributed computing framework with a centralized interface and auto-scaling. A crew
controller is an object in R which accepts tasks, returns results, and launches workers. Workers can be local processes, jobs on traditional clusters such as SLURM, or jobs on cloud services such as AWS Batch, depending on the launcher plugin of the controller.
A task is a piece of R code, such as an expression or a function call. A worker is a non-interactive R process that runs one or more tasks. When tasks run on workers, the local R session is free and responsive, and work gets done faster. For example, this vignette shows how crew
and mirai
work together to speed up Shiny apps.
crew
First, create a controller object to manage tasks and workers.
library(crew) controller <- crew_controller_local( name = "example", workers = 2, seconds_idle = 10 )
Next, start the controller to create the mirai
client. Later, when you are done with the controller, call controller$terminate()
to clean up your resources.
controller$start()
Use push()
to submit a new task and pop()
to return a completed task.
controller$push(name = "get pid", command = ps::ps_pid())
As a side effect, methods push()
, pop()
, and scale()
also launch workers to run the tasks. If your controller uses transient workers and has a backlog of tasks, you may need to loop over pop()
or scale()
multiple times to make sure enough workers are always available.
controller$pop() # No workers started yet and the task is not done. #> NULL task <- controller$pop() # Worker started, task complete. task #> # A tibble: 1 × 13 #> name command result status error code trace warnings seconds seed #> <chr> <chr> <list> <chr> <chr> <int> <chr> <chr> <dbl> <int> #> 1 get pid ps::ps_pi… <int> succe… NA 0 NA NA 0 NA #> # ℹ 3 more variables: algorithm <chr>, controller <chr>, worker <chr>
Alternatively, wait()
is a loop that repeatedly checks tasks and launches workers until all tasks complete.
controller$wait(mode = "all")
The return value of the task is in the result
column.
task$result[[1]] # return value of the task #> [1] 69631
Here is the full list of output in the task
object returned by pop()
.
name
: the task name.command
: a character string with the R command.result
: a list containing the return value of the R command. NA
if the task failed.status
: a character string. "success"
if the task succeeded,
"cancel"
if the task was canceled with
the cancel()
controller method, "crash"
if the worker running
the task exited before completing the task, and "error"
for any other
kind of error. (See the "Crashes and retries" section below
for more on crashes.)error
: the first 2048 characters of the error message if
the task status is not "success"
, NA
otherwise.code
: an integer code denoting the specific exit status:
0
for successful tasks, -1
for tasks with an error in the R
command of the task, and another positive integer with an NNG
status code if there is an error at the NNG/nanonext
level.
nanonext::nng_error()
can interpret these codes.trace
: the first 2048 characters of the text of the traceback
if the task threw an error, NA
otherwise.warnings
: the first 2048 characters. of the text of
warning messages that the task may have generated, NA
otherwise.seconds
: number of seconds that the task ran.seed
: the single integer originally supplied to push()
,
NA
otherwise. The pseudo-random number generator state
just prior to the task can be restored using
set.seed(seed = seed, kind = algorithm)
, where seed
and
algorithm
are part of this output.algorithm
: name of the pseudo-random number generator algorithm
originally supplied to push()
,
NA
otherwise. The pseudo-random number generator state
just prior to the task can be restored using
set.seed(seed = seed, kind = algorithm)
, where seed
and
algorithm
are part of this output.controller
: name of the crew
controller where the task ran.worker
: name of the crew
worker that ran the task.If seed
and algorithm
are both non-missing in the output,
then you can recover the pseudo-random number generator state
of the task using set.seed(seed = seed, kind = algorithm)
.
However, it is recommended to supply NULL
to these arguments in push()
,
in which case you will observe NA
in the outputs.
With seed
and algorithm
both NULL
, the
random number generator defaults to the
recommended widely spaced worker-specific L'Ecuyer streams
supported by mirai::nextstream()
.
See vignette("parallel", package = "parallel")
for details.
The map()
method of the controller supports functional programming similar to purrr::map()
and clustermq::Q()
. The arguments of map()
are mostly the same those of push()
, but there is a new iterate
argument to define the inputs of individual tasks. map()
submits a whole collection of tasks, auto-scales the workers, waits for all the tasks to finish, and returns the results in a tibble
.
Below, map()
submits one task to compute 1 + 2 + 5 + 6
and another task to compute 3 + 4 + 5 + 6
. The lists and vectors inside iterate
vary from task to task, while the elements of data
and globals
stay constant across tasks.
results <- controller$map( command = a + b + c + d, iterate = list( a = c(1, 3), b = c(2, 4) ), data = list(c = 5), globals = list(d = 6) ) results #> # A tibble: 2 × 13 #> name command result status error code trace warnings seconds seed #> <chr> <chr> <list> <chr> <chr> <int> <chr> <chr> <dbl> <int> #> 1 unnamed_t… a + b … <dbl> succe… NA 0 NA NA 0 NA #> 2 unnamed_t… a + b … <dbl> succe… NA 0 NA NA 0 NA #> # ℹ 3 more variables: algorithm <chr>, controller <chr>, worker <chr> as.numeric(results$result) #> [1] 14 18
If at least one task in map()
throws an error, the default behavior is to error out in the main session and not return the results, If that happens, the results are available in the controller$error
. To return the results instead of setting controller$error
, regardless of error status, set error = "warn"
or "silent"
in map()
. To conserve memory, consider setting controller$error <- NULL
when you are done troubleshooting.
The walk()
method is just like map()
, but it does not wait for any tasks to complete. Instead, it returns control to the local R session immediately and lets you do other things while the tasks run in the background.
controller$walk( command = a + b + c + d, iterate = list( a = c(1, 3), b = c(2, 4) ), data = list(c = 5), globals = list(d = 6) )
The collect()
pops all completed tasks. Put together, walk()
, wait(mode = "all")
, and collect()
have the same overall effect as map()
.
controller$wait(mode = "all") controller$collect() #> # A tibble: 2 × 13 #> name command result status error code trace warnings seconds seed #> <chr> <chr> <list> <chr> <chr> <int> <chr> <chr> <dbl> <int> #> 1 unnamed_t… a + b … <dbl> succe… NA 0 NA NA 0 NA #> 2 unnamed_t… a + b … <dbl> succe… NA 0 NA NA 0 NA #> # ℹ 3 more variables: algorithm <chr>, controller <chr>, worker <chr>
However, there are subtle differences between the synchronous and asynchronous functional programming methods:
map()
requires an empty controller to start with (no prior tasks). But with walk()
, the controller can have any number of running or unpopped tasks beforehand.wait()
does not show a progress bar because it would be misleading if there are a lot of prior tasks. Because map()
requires the controller to be empty initially (i.e. (1)), it shows a progress bar while correctly representing the amount of work left to do.The controller summary shows how many tasks were completed and popped, how many total seconds the workers spent running tasks, how many tasks threw warnings or errors, etc.
controller$summary() #> # A tibble: 1 × 8 #> controller tasks seconds success error crash cancel warning #> <chr> <int> <dbl> <int> <int> <int> <int> <int> #> 1 example 5 0 5 0 0 0 0
Call terminate()
on the controller after you finish using it. terminate()
tries to close the the mirai
dispatcher and any workers that may still be running. It is important to free up these resources.
controller$terminate()
The mirai
dispatcher process should exit on its own, but if not, you can manually terminate the process with ps::ps_kill(p = controller$client$dispatcher)
or call crew_clean()
to terminate any dispatchers and local workers running on your local machine.
crew_clean() #> nothing to clean up
A crew
controller creates different types of local processes. These include:
mirai
needs this process to orchestrate tasks.crew
launches to run tasks. These may be local processes as in the case of crew_controller_local()
, or they may be processes on different computers if you are using a third-party launcher plugin like crew.cluster
or crew.aws.batch
. launches processes.mirai
outside of crew
to run tasks. Such processes may spawn automatically if you set the processes
argument of e.g. crew.aws.batch::crew_controller_aws_batch()
to a positive integer.Usually these processes terminate themselves when the parent R session exits or the controller terminates, but under rare circumstances they may continue running. The "local monitor" in crew
makes it easy to list and terminate any of these processes which may be running on your local computer. Example:
monitor <- crew_monitor_local() monitor$dispatchers() # List PIDs of all local {mirai} dispatcher processes. #> [1] 31215 monitor$daemons() #> integer(0) monitor$workers() #> [1] 57001 57002 monitor$terminate(pid = c(57001, 57002)) monitor$workers() #> integer(0)
crew_monitor_local()
only manages processes running on your local computer. To manage crew
workers running on different computers, such as SLURM or AWS Batch, please familiarize yourself with the given computing platform, and consider using the monitor objects in the relevant third-party plugin packages such as crew.cluster
or crew.aws.batch
. Example: https://wlandau.github.io/crew.aws.batch/index.html#job-management.
As explained above, push()
, pop()
, and wait()
launch new workers to run tasks. The number of new workers depends on the number of tasks at the time. In addition, workers can shut themselves down as work completes. In other words, crew
automatically raises and lowers the number of workers in response to fluctuations in the task workload.
The most useful arguments for down-scaling, in order of importance, are:
seconds_idle
: shut down a worker if it spends too long waiting for a task.tasks_max
: shut down a worker after it completes a certain number of tasks.seconds_wall
: soft wall time of a worker.Please tune these these arguments to achieve the desired balance for auto-scaling. The two extremes of auto-scaling are clustermq
-like persistent workers and future
-like transient workers, and each is problematic in its own way.
Some controllers support local processes to launch and terminate workers asynchronously. For example, a cloud-based controller may need to make HTTP requests to launch and terminate workers on e.g. AWS Batch, and these time-consuming requests should happen in the background. Controllers that support this will have a processes
argument to specify the number of local R processes to churn through worker launches and terminations. Set processes = NULL
to disable async, which can be helpful for troubleshooting.
In rare cases, a worker may exit unexpectedly before it completes its current task.
This could happen for any number of reasons: for example, the worker could run out of memory, the task could cause a segmentation fault, the AWS Batch spot instance could exit because of a spike in price, etc. To troubleshoot, it is best to consult the worker log files and autometric
resource logs.
If a worker crashes, the task will return a status of "crashed"
in pop()
(and collect()
and map()
).
You can simulate a crash for yourself:
library(crew) controller <- crew_controller_local(name = "my_controller", crashes_max = 5L) controller$start() # Submit a task and wait for it to start. controller$push(command = Sys.sleep(300), name = "my_task") Sys.sleep(2) # Terminate the worker in the middle of the task. controller$launcher$terminate_workers() # Wait for the task to resolve. controller$wait() # Retrieve the result. task <- controller$pop() task[, c("name", "result", "status", "error", "code", "controller")] #> # A tibble: 1 × 6 #> name result status error code controller #> <chr> <list> <chr> <chr> <int> <chr> #> 1 my_task <lgl [1]> crash 19 | Connection reset 19 my_controller
In the event of a crash like this one, you can choose to abandon the workflow and troubleshoot, or you can choose to retry the task on a different (possibly new) worker.
Simply push the task again and use the same task name.^[As of targets
>= 1.10.0.9002, targets
pipelines automatically retry tasks whose workers crash.]
controller$push(command = Sys.sleep(300), name = "my_task")
The controller enforces a maximum number of retries, given by the crashes_max
argument to crew_controller_local()
. If a task's worker crashes more than crashes_max
times in a row in the same controller, then a subsequent pop()
(or collect()
or map()
) throws an informative error:
controller$pop() #> Error: #> ! the crew worker of task 'my_task' crashed 6 consecutive time(s) #> in controller 'my_controller'. #> For details and advice, please see the crashes_max argument of #> crew::crew_controller(), as well as #> https://wlandau.github.io/crew/articles/risks.html#crashes and #> https://wlandau.github.io/crew/articles/logging.html.
With multiple controllers in a controller group, you can configure tasks to run in a different controller if the task crashes in the original controller crashes_max
times. For details, see the controller group vignette.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.