run_engine: # Execute bag of tasks parallely, on as many cores as the...

Description Usage Arguments Author(s) Examples

Description

This bag of tasks engine forks processes on as many cores as the current computing node owns. Each sub-process takes a task randomly in the list of tasks. For each task, it starts by taking a lock on this task (creating a file named out_filename.lock). Next, it executes the task_processor (a function) using the corresponding set of parameters (task). When this execution is completed, it dumps task_processor results into a results file (named out_filename.RData).

Usage

1
2
3
run_engine(tasks, task_processor, debug = FALSE, starter_name = "~/.start_best_effort_jobs", 
    rm_starter = TRUE, log_dir = "log", bot_cache_dir = "cache", 
    nb_proc = NULL, ...)

Arguments

tasks

A list of tasks, each task is a list of key values that will be passed as arguments to the task_processor. Note that task$out_filename is a mandatory parameter.

task_processor

A function that will be called for each task in the task list tasks.

debug

If TRUE no process will be forked, the list of tasks will be executed in the current process.

starter_name

Path to file that will be deleted after the execution of all tasks if rm_starter is set to TRUE.

rm_starter

If TRUE the file starter_name will be deleted after the execution of all tasks.

log_dir

Path to the log directory.

bot_cache_dir

the directory where task results are cached

nb_proc

If not NULL fix the number of core on which tasks must be computed.

...

Other arguments that will be passed to task_processor.

Author(s)

Florent Chuffart

Examples

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# We define a basic task_processor
sum_a_b = function(task) {
  return(task$a + task$b)
}

# We define 9 tasks
tasks = list()
for (a in 1:3) {
  for (b in 4:6) {
    tasks[[length(tasks) + 1]] = list(a=a, b=b, out_filename=paste("sum_a_b", a, b, sep="_")) 
  }
}

# We execute the 3 tasks
run_engine(tasks, sum_a_b)    

# We collect 9 task results
for (a in 1:3) {
  for (b in 4:6) {
    out_filename = paste("sum_a_b", a, b, sep="_")
    out_filename = paste("cache/", out_filename, ".RData", sep="")
    load(out_filename)
    print(task_result) 
  }
}

# Better way to do that
apply(t(tasks), 2, function(task) {
  out_filename = task[[1]]$out_filename
  out_filename = paste("cache/", out_filename, ".RData", sep="")
  load(out_filename)
  print(task_result) 
})

# Viewing statistics about the campain.
bot_stats()

fchuffar/bot documentation built on May 16, 2019, 12:06 p.m.