Interactive

This vignette discusses the interactive use of this package.

Suppose you have some large or unwieldy object in R which can conceptually be split into parts, such as the rows of a data frame or the elements of a list. You are analyzing this object, perhaps writing functions and debugging them as you go. The code takes too long to run. Most of what you're doing could happen in parallel after an appropriate split, but you don't want to worry about all the issues that can arise with parallel code when you're in the middle of your interactive analysis. You want a system to manage all the details of the parallelism for you, so that you can focus on the analysis. The parallel_evaluator (subsequently referred to as the 'evaluator') is a system to manage the details.

Basic Example

We'll start with a basic example.

library(autoparallel)

x = list(1:10, rnorm(10), rep(pi, 10))

do = makeParallel(x)
do

Calling parallelize split x into approximately equal parts so that one can run code in parallel using the resulting evaluator do.

Calling lapply on a list through do produces the same result as the base R case:

lapply(x, head)

do(lapply(x, head))

There's nothing special about the use of lapply() above. We can evaluate arbitrary code.

FEEDBACK: The current implementation for the parallel evaluator only looks for variables in the global environment, which is why I'm using <<- (because of how knitr evaluates). I could write a version that uses dynGet(), but this would be more complicated.

y <<- 20
z <<- 30
do(y + z, verbose = TRUE)

y + z uses the variables y and z. The evaluator detects this and sends them over, saving the user from having to do this manually.

Interactive

The evaluator is designed for interactively building functions and analysis on large data sets, or data sets that take too long to run. The interactive feature is sending variables, including functions, from the manager's global workspace to the parallel workers every time they are used. This allows us to see the results of the improved / debugged versions of the functions as we work on them.

# An analysis function
myfun <<- function(x) x[1:2]

do(lapply(x, myfun))

# Oops I actually need the first 4
myfun <<- function(x) x[1:4]

# Now we see the new results of myfun
do(lapply(x, myfun))

Limitations

autoparallel is not currently designed to work with multiple large objects. Rather, it was designed for a single large object to be distributed to the workers when the evaluator is created. The following code will be slow because it serializes a large (400 MB) object to each of the workers:

# Any large R object
big = 1:1e8

object.size(big)

# BAD IDEA: this sends `big` over every time
do(sum(big + x[[1]][1]))

FEEDBACK: I could check the size of the objects before exporting them, and handle it if they're too large.

Details

Under the hood, the evaluator is a closure with a couple attributes. The most notable attributes are the variable name and the cluster. We can inspect all this by printing the evaluator as a function.

print.function(do)

The default simplifying function is c(). We can also bring back results without simplifying.

do(lapply(x, head), simplify = FALSE)

do sent the code to 2 different R processes for evaluation, so we will always see a list of length 2 before the results are simplified.

Cleaning up

When finished it's a good idea to shut down the cluster. This also happens automatically when the R session is terminated.

stop_cluster(do)

Working with many files

A realistic example is working with many files simultaneously. The US Veterans Administration (VA) Court appeals are one such example. Each file contains the summary of an appeal. One can download a handful from the VA servers as follows:

# Used on my local machine only
datadir = "~/data/vets/appeals_sample"
datadir = "vets_appeals"
dir.create(datadir)

fnames = paste0("1719", 100:266, ".txt")
urls = paste0("https://www.va.gov/vetapp17/files3/", fnames)

Map(download.file, urls, fnames)

The file names themselves are small, so we can cheaply distribute them among the parallel workers.

filenames = list.files(datadir, full.names = TRUE)
length(filenames)

do = makeParallel(filenames)

The following code actually loads the data contained in the files and assigns the result into appeals on the cluster. It's efficient because the reads happen in parallel, rather than creating a bottleneck in the manager process. Furthermore, by having the workers do their own loading we do not have to serialize the data between processes.

do({
    appeals <- lapply(filenames, readLines)
    appeals <- sapply(appeals, paste, collapse = "\n")
    appeals <- enc2utf8(appeals)
    NULL
})

The braces along with the final NULL are necessary to avoid transferring the large data set from the workers back to the manager.

The code above only assigned appeals to the global environment of the workers. It does not exist in the manager process.

"appeals" %in% ls()

However, if we subsequently create a variable called appeals in the manager process then the evaluator will export it to the cluster, overriding the existing one.

ten <<- 10
do(ten + 1, verbose = TRUE)

The evaluator allows us to execute the same code that is used for serial R. Again, we see a vector of length 2 because we're running the code on chunks of the data residing in 2 R processes.

do(length(appeals))
do(class(appeals))

We may want to look more closely at those cases which have been remanded for further evidence. If they're a reasonably small subset we may choose to bring them back into the manager process for further non parallel analysis. This would be useful to see the warnings that may come from our code, for example.

# Check how many we're about to bring back
do(sum(grepl("REMAND", appeals)))

# Bring them back from the workers
remand <- do(appeals[grepl("REMAND", appeals)])

length(remand)

In summary, when working with larger data sets it's efficient to minimize the data movement. We avoided it in this case by only distributing the relatively small vector of file names and having each worker independently load the files that it needed, thus keeping the data in place on that worker.

stop_cluster(do)


clarkfitzg/makeParallel documentation built on Nov. 21, 2020, 2:35 a.m.