Each controller object only supports only one type of worker configuration which you set in advance. However, different controllers may have different types of workers, and crew
supports controller groups to coordinate among these different worker types. With third-party launcher plugins from other packages, this mechanism will allow you to e.g. send some tasks to GPU-capable or high-memory workers while other tasks go to low-spec workers.
We demonstrate with a controller of fully persistent workers which always stay running and a controller of semi-persistent workers which terminate after completing four tasks. We create controller objects with names.
library(crew) persistent <- crew_controller_local(name = "persistent") transient <- crew_controller_local(name = "semi-persistent", tasks_max = 4L)
crew
uses a different TCP port for each controller you run, so please do not create hundreds of controllers. Please see the subsection on ports in the README.
We put these controller objects into a new controller group object.
group <- crew_controller_group(persistent, transient)
This controller group has a global connect()
method to initialize both controllers.
group$start()
You can choose which worker pool to receive tasks.
group$push(name = "my task", command = sqrt(4), controller = "semi-persistent")
The controller group also supports global methods for wait()
, pop()
, and terminate()
. These methods operate on all controllers at once by default, but the controllers
argument allows you to select a subset of controllers to act on. Below in pop()
the controller
column of the output indicates which controller ran the task.
group$wait(controllers = "semi-persistent") group$pop() #> # A tibble: 1 × 13 #> name command result status error code trace warnings seconds seed #> <chr> <chr> <list> <chr> <chr> <int> <chr> <chr> <dbl> <int> #> 1 my task sqrt(4) <dbl [1]> succe… NA 0 NA NA 0 NA #> # ℹ 3 more variables: algorithm <chr>, controller <chr>, worker <chr>
The map()
method provides functional programming, and the controller
argument lets you choose the controller to submit the tasks.
group$map( command = a + b + c + d, iterate = list( a = c(1, 3), b = c(2, 4) ), data = list(c = 5), globals = list(d = 6), controller = "persistent" ) #> # A tibble: 2 × 13 #> name command result status error code trace warnings seconds seed #> <chr> <chr> <list> <chr> <chr> <int> <chr> <chr> <dbl> <int> #> 1 unnamed_t… a + b … <dbl> succe… NA 0 NA NA 0 NA #> 2 unnamed_t… a + b … <dbl> succe… NA 0 NA NA 0 NA #> # ℹ 3 more variables: algorithm <chr>, controller <chr>, worker <chr>
The controller group has a summary()
method which aggregates the summaries of one or more controllers.
group$summary() #> # A tibble: 2 × 8 #> controller tasks seconds success error crash cancel warning #> <chr> <int> <dbl> <int> <int> <int> <int> <int> #> 1 persistent 2 0 2 0 0 0 0 #> 2 semi-persistent 1 0 1 0 0 0 0
When you are finished, please call terminate()
with no arguments to terminate all controllers in the controller group.
group$terminate()
As described in the introduction vignette, a worker may crash if it runs out of memory or encounters a system-level issue. The controller lets you retry the task, but pop()
throws an error if the task's worker crashed more than crashes_max
times in a row.
Controller groups and backup controllers let you configure exactly how tasks are retried.
Each controller can designate a backup controller to run tasks that cause too many crashes in the original controller.
With backups, you can connect a whole chain of controllers with increasing resources.
If you put all these controllers in a controller group, then you can use group-level
push()
and pop()
methods without having to know which controller actually ran the task.
Consider the following example. We define three different controllers that use one another as backups. The default
controller uses the medium_memory
controller as a backup, and the medium_memory
controller uses the high_memory
controller as a backup.
library(crew) library(crew.cluster) # https://wlandau.github.io/crew.cluster/index.html high_memory <- crew_controller_slurm( name = "high_memory", options_cluster = crew_options_slurm(memory_gigabytes_required = 64), crashes_max = 2 ) medium_memory <- crew_controller_slurm( name = "medium_memory", options_cluster = crew_options_slurm(memory_gigabytes_required = 64), crashes_max = 3, backup = high_memory ) default <- crew_controller_slurm( name = "default", crashes_max = 4, backup = medium_memory )
We put all three controllers in a controller group.
The default
controller is listed first, so tasks pushed with group$push()
will automatically run in the default
unless special circumstances dictate otherwise.
group <- crew_controller_group(default, medium_memory, high_memory)
Consider a task that sometimes exhausts the available memory of its SLURM worker:
group$push(command = my.package::run_heavy_task(), name = "heavy_task")
If the task exhausts available memory and crashes its worker, then pop()
informs you:
task <- group$pop() task[, c("name", "result", "status", "error", "code", "controller")] #> # A tibble: 1 × 6 #> name result status error code controller #> <chr> <list> <chr> <chr> <int> <chr> #> 1 heavy_task <lgl [1]> crash 19 | Connection reset 19 default
when you resubmit the task with group$push()
, it will run on the default controller again. But if you keep resubmitting it and it crashes 4 times in a row (from crashes_max = 4
) then the next push()
will send it to the backup medium_memory
controller. If the task's worker crashes in the next 3 pushes crash the task in the medium_memory
controller, then subsequent pushes will use the high_memory
controller, which is the backup of medium_memory
. If the high_memory
controller successfully completes the task, the next group$pop()
returns a successful result.
task <- group$pop() task[, c("name", "result", "status", "error", "code", "controller")] # A tibble: 1 × 6 name result status error code controller <chr> <list> <chr> <chr> <int> <chr> 1 heavy_task <lgl [1]> success NA 0 high_memory
group$summary()
counts the number of times each controller attempted the task, with crashes counted as errors.
group$summary() #> # A tibble: 2 × 8 #> controller tasks seconds success error crash cancel warning #> <chr> <int> <dbl> <int> <int> <int> <int> <int> #> 1 default 4 0 0 0 4 0 0 #> 2 medium_memory 3 0 0 0 3 0 0 #> 3 high_memory 1 0 1 0 0 0 0
If you use a controller group like this one in a targets
pipeline (with tar_option_set(controller = "group")
in _targets.R
) then tar_make()
will automatically retry crashed tasks using the configuration in the controller group. This behavior requires targets
version 1.10.0.9002 or later.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.