futurenow: Create future that can resolve asynchronously in a parallel R...

Description Usage Arguments Details Value An Issue with MulticoreFuture See Also Examples

View source: R/class-futurenow.R

Description

A multisession or multicore future that uses other R process to evaluate, but allows to run code in the master session during the evaluation.

Usage

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
futurenow(
  expr,
  envir = parent.frame(),
  substitute = TRUE,
  listener.delay = 0.1,
  globals = TRUE,
  label = NULL,
  type = c("MultisessionFuture", "MulticoreFuture"),
  workers = availableCores(),
  ...
)

FutureNowFuture(
  expr = NULL,
  envir = parent.frame(),
  type = c("MultisessionFuture", "MulticoreFuture"),
  substitute = TRUE,
  globals = TRUE,
  packages = NULL,
  workers = NULL,
  listener.delay = 0.1,
  ...
)

register_name(name, .env = parent.frame())

run_in_master(
  expr,
  env = parent.frame(),
  substitute = TRUE,
  local_vars = FALSE
)

Arguments

expr

R expression

envir

Environment the future should be evaluated

substitute

Should expr be quoted? Default is TRUE

listener.delay

Time interval in second the master main session should wait to handle connections from the future instances; default is 0.1

globals, label, ...

Passed to future::future()

type

The actual type of future to run, choices are "MultisessionFuture" and "MulticoreFuture"; default is "MultisessionFuture"

workers

Max number of parallel instances to run at the same time

packages

Packages for future instances to load

name

R symbol to register to the future processes

.env

Internally used

env

Where to local_vars to search variables from

local_vars

Local variable names in the future object to send along to the master session for evaluation; default is FALSE, meaning all variables should be in the main session

Details

In this package, master session'' means the R session which schedules future process. This usually happens in the main process where the users interact with. The slave sessions” or the “future sessions” are the R processes that asynchronous codes are evaluated.

One of the issues with asynchronously evaluating R expressions in another process is data transfer. If the data is an external pointer, this procedure is hard unless using forked process. If the data is too large, transferring the large data around is both time consuming (serialization and needs extra time) and memory consuming. Suppose a user wants to run a data pipeline A, B, and C, where only B requires handling the data. One can choose to run A asynchronously, then B in the main session, then C again asynchronously.

Normally future::future() instance only allows instructions from the master process to the slave nodes. The reversed communication is missing or limited. This prevents the above procedure to run within one future session.

Motivated by this objective, futurenow is created. In futurenow, the above procedure is possible with run_in_master and register_name.

During the asynchronous evaluation, the function run_in_master sends the expression inside to the master session to evaluate. Once finished, variables are sent back to the future sessions via register_name. The variables sent back via register_name can then be used in future sessions as-is.

When run_in_master asks the master session to evaluate code, the users can also choose which variables in the future sessions along with the instructions; see examples.

The other parts are exactly the same as other future objects.

Value

The function futurenow and FutureNowFuture return future::future() instances with class 'FutureNowFuture'. Function run_in_master and register_name return nothing.

An Issue with MulticoreFuture

The strategy futurenow has two internal types: MultisessionFuture and MulticoreFuture. MultisessionFuture spawns a future::multisession() process and MulticoreFuture spawns a forked future::multicore() process. While multisession works in any situation, multicore is faster since it uses a "fork" process that has shared memory and does not need serialization. However, there are some limits using MulticoreFuture. For example, it's only supported on 'Mac' and 'Linux' system; if used improperly in 'RStudio', a fork bomb could be malicious to the system (see also https://en.wikipedia.org/wiki/Fork_bomb).

When choosing MultisessionFuture type, a listener will run in the background monitoring requests from the future sessions. However when running with MulticoreFuture, the listener could cause a fork bomb. Therefore, the listener will stop running in the background and some future processes may pause at stage B (see details, the procedure that requires interaction with the main session) if run_in_master is called. In such case, the future instance might never be resolved if the listener is not triggered.

To trigger the listener manually, one only needs to "try to collect" the results, for instance, value(x), resolve(x), or result(x) will trigger the listener and collect the results. resolved(x) will also trigger the listener, but it does not block the main session. When trying to spawn new future instances, the listener will also trigger automatically.

See Also

future::future(), future::plan()

Examples

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
if(interactive()){

  library(future)
  library(futurenow)

  plan(futurenow, workers = 2)

  # ------------------ Basic example ------------------
  plan(futurenow, workers = 2)
  f <- future({

    # Procedure A
    future_pid <- Sys.getpid()

    run_in_master({
      # Procedure B
      master_pid <- Sys.getpid()
      register_name(master_pid)
    })

    # Procedure C
    sprintf("Master process PID is %s, future process PID is %s",
            master_pid, future_pid)
  })

  value(f)

  # ------------------ Choose variables examples ------------------
  plan(futurenow, workers = 2)

  a <- 1
  f <- future({
    a <- 10
    run_in_master({
      b <- a + 1
      register_name(b)
    })
    b
  })
  value(f)   # a + 1 (a in master session)

  f <- future({
    a <- 10
    run_in_master({
      b <- a + 1
      register_name(b)

      # local_vars sends variables along with the instruction
    }, local_vars = 'a')
    b
  })
  value(f)   # a + 1 (a in future session)

  # ------------------ A practical example ------------------
  # Create a "large" dataset that will fail the future
  x <- rnorm(1e6)
  options(future.globals.maxSize = 1024^2)

  plan(futurenow, workers = 2)

  # This will **fail** because x exceed `future.globals.maxSize`
  future({
    pid <- Sys.getpid()
    result <- pid + mean(x)
    result
  })

  # x is never transferred to future sessions
  fnow <- future({
    pid <- Sys.getpid()

    run_in_master({
      # This code will run in master
      mean_x <- mean(x)

      # register results back to future
      register_name(mean_x)
    })

    mean_x + pid

  })
  value(fnow)

  # ------------------ Progressbar in Shiny app ------------------
  library(shiny)
  plan(futurenow, workers = 2)

  ui <- fluidPage(
    actionButton("ok", "Run")
  )

  server <- function(input, output, session) {
    observeEvent(input$ok, {
      p <- Progress$new(session = session, min = 0, max = 10)

      futurenow_lapply(1:10, function(i){
        Sys.sleep(0.3)
        # inc progress bar
        futurenow::run_in_master({
          p$inc(amount = 1, message = 'Running item', detail = i)
        }, local_vars = 'i')
      })

      p$close()
    })
  }

  shinyApp(ui, server)

}

dipterix/futurenow documentation built on Dec. 31, 2020, 11:21 p.m.