LauraeEapply: Laurae's Parallel Environment Apply with Load Balancing

Description Usage Arguments Details Value Examples

Description

This function performs parallel::parLapply using an environment as the input list, with proper load balancing (OpenMP-like pragma guided/dynamic instead of pragma static in parallel::parLapply).

Usage

1
LauraeEapply(cl, x, fun, ...)

Arguments

cl

Type: cluster. It must be created by snow.

x

Type: environment.

fun

Type: function to parallelize.

...

More parameters to pass to the function.

Details

In contrast to base:eapply, the executed function remains in .GlobalEnv (or the environment in which it is executed). Actually, you could just do LauraeParallel::LauraeLapply(cl = cl, x = eapply(x, function(x) {x}), fun = fun, ...). Nothing new...

Pragma guided/dynamic works very well for long tasks which have dynamic computation time (such as machine learning tasks on large data). However, it becomes very poor when data is computed very quickly, as the overhead increases dramatically. With the default parallel::parLapply, data is chunked for each worker first, then submitted (static pragma), which makes it significantly faster on (not too) smaller data.

Please check the example to understand the difference between guided/dynamic pragma and static pragma scheduling for parallelization.

Value

A list of elements.

Examples

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
library(parallel)
cl <- makeCluster(2)

# Set this to 1 for more realistic testing...
airman_speedtest <- 0.1

my_env <- new.env()
for (i in 1:6) {
  my_env[[as.character(i)]] <- i * airman_speedtest
}

# Guided/Dynamic scheduling: 12 Seconds
# Preparation: none
# 00s => start 1, start 2
# 01s => end 1, start 3
# 02s => end 2, start 4
# 04s => end 3, start 5
# 06s => end 4, start 6
# 08s => end 5
# 12s => end 6
system.time({LauraeEapply(cl, my_env, function(x) {
  Sys.sleep(x)
  return(x)
})})

# Static scheduling: 15 Seconds
# Preparation: chunk in two:
# -- Worker 1: 1, 2, 3
# -- Worker 2: 4, 5, 6
# 00s => start 1, start 4
# 01s => end 1, start 2
# 03s => end 2, start 3
# 04s => end 4, start 5
# 06s => end 3
# 09s => end 5, start 6
# 15s => end 6
system.time({parLapply(cl, eapply(my_env, function(x) {x}), function(x) {
  Sys.sleep(x)
  return(x)
})})

stopCluster(cl)
closeAllConnections()

# More comprehensive example with timeout, requires package R.utils

library(LauraeParallel)
library(parallel)
suppressPackageStartupMessages(library(R.utils))

cl <- makeCluster(2)

my_fun <- function(x) {
  Sys.sleep(x)
  return(x)
}
invisible(clusterEvalQ(cl = cl, expr = {
  suppressPackageStartupMessages(library(R.utils))
}))
clusterExport(cl = cl, "my_fun")

my_env_fast <- new.env()
for (i in 1:6) {
  my_env_fast[[as.character(i)]] <- i * 0.1
}

system.time({data <- LauraeEapply(cl, my_env_fast, function(x) {my_fun(x)})})
data

# Anything after 0.3 sec of run time or 100 sec of CPU time gets trashed
system.time({data <- LauraeEapply(cl, my_env_fast, function(x) {
  err <- try(R.utils::withTimeout(my_fun(x), timeout = 100, elapsed = 0.3, onTimeout = "error"))
  if (class(err) == "try-error") {
    return("ERROR")
  } else {
    return(err)
  }
})})
data

# Anything after 0.3 sec of run time or 0.3 sec of CPU time or 100 sec of run time gets trashed
system.time({data <- LauraeEapply(cl, my_env_fast, function(x) {
  err <- try(R.utils::withTimeout(my_fun(x), timeout = 100, cpu = 0.3, onTimeout = "error"))
  if (class(err) == "try-error") {
    return("ERROR")
  } else {
    return(err)
  }
})})
data

stopCluster(cl)
closeAllConnections()

Laurae2/LauraeParallel documentation built on May 28, 2019, 1:31 p.m.