knitr::opts_chunk$set(collapse = TRUE, comment = "#>")
set.seed(1014)

multidplyr is a backend for dplyr that spreads work across multiple processes. Like all dplyr backends, it allows you to use the dplyr verbs that you're already familiar with, but alters the underlying computational model to transparently support multi-process parallelism.

This vignette will show you the basics of multidplyr using the nycflights13 dataset.

library(multidplyr)
library(dplyr, warn.conflicts = FALSE)
library(nycflights13)

Creating a cluster

To start using multidplyr you must create a cluster. Here I used two cores because it's the maximum permitted by CRAN, but I suggest that you use more. For best performance, I recommend using 1 or 2 fewer than the total number of cores on your computer, which you can detect with parallel::detectCores() (leaving at least 1 core free means that you should still be able to use your computer for other tasks while your computation is running).

cluster <- new_cluster(2)
cluster

(In the examples, you'll also see the use of default_cluster(); this is designed specifically for the constraints of R CMD check, so I don't recommend using it in your own code.)

A cluster consists of multiple R processes created by callr. When multiple processes are running at the same time, your operating system will take care of spreading the work across multiple cores.

Add data

There are two ways to get data to the workers in cluster:

partition()

partition() is useful if you have a single in-memory data frame. For example, take nycflights13::flights. This dataset contains information for about ~300,000 flights departing New York City in 2013. We group it by destination, then partition it:

flights1 <- flights %>% group_by(dest) %>% partition(cluster)
flights1

partition() splits flights1 into roughly equal subsets on each worker, ensuring that all rows in a group are transfered to the same worker. The result is a party_df, or partitioned data frame.

Direct loading

partition() is simple to call, but it's relatively expensive because it copies a lot of data between processes. An alternative strategy is for each worker to load the data (from files) it needs directly.

To show how that might work, I'll first split flights up by month and save as csv files:

path <- tempfile()
dir.create(path)

flights %>% 
  group_by(month) %>% 
  group_walk(~ vroom::vroom_write(.x, sprintf("%s/month-%02i.csv", path, .y$month)))

Now we find all the files in the directory, and divide them up so that each worker gets (approximately) the same number of pieces:

files <- dir(path, full.names = TRUE)
cluster_assign_partition(cluster, files = files)

Then we read in the files on each worker and use party_df() to create a partitioned dataframe:

cluster_send(cluster, flights2 <- vroom::vroom(files))

flights2 <- party_df(cluster, "flights2")
flights2

dplyr verbs

Once you have a partitioned data frame, you can operate on it with the usual dplyr verbs. To bring the data back to the interactive process, use collect():

flights1 %>% 
  summarise(dep_delay = mean(dep_delay, na.rm = TRUE)) %>% 
  collect()

For this size of data and a simple transformation, using a local cluster actually makes performance much worse!

by_dest <- flights %>% group_by(dest)

# Local computation
system.time(by_dest %>% summarise(mean(dep_delay, na.rm = TRUE)))

# Remote: partitioning
system.time(flights2 <- flights %>% partition(cluster))
# Remote: computation
system.time(flights3 <- flights2 %>% summarise(mean(dep_delay, na.rm = TRUE)))
# Remote: retrieve results
system.time(flights3 %>% collect())

That's because of the overhead associated with sending the data to each worker and retrieving the results at the end. For basic dplyr verbs, multidplyr is unlikely to give you significant speed ups unless you have 10s or 100s of millions of data points (and in that scenario you should first try dtplyr, which uses data.table).

multipldyr might help, however, if you're doing more complex things. Let's see how that plays out when fitting a moderately complex model. We'll start by selecting a subset of flights that have at least 50 occurrences, and we'll compute the day of the year from the date:

daily_flights <- flights %>%
  count(dest) %>%
  filter(n >= 365)

common_dest <- flights %>% 
  semi_join(daily_flights, by = "dest") %>% 
  mutate(yday = lubridate::yday(ISOdate(year, month, day))) %>% 
  group_by(dest)

nrow(common_dest)

That leaves us with ~332,000 observations. Let's partition this smaller dataset:

by_dest <- common_dest %>% partition(cluster)
by_dest

Let's fit a smoothed generalised additive model to each destination, estimating how delays vary over the course of the year and within a day. Note that we need to use cluster_library() to load the mgcv package on every node. That takes around 3s:

cluster_library(cluster, "mgcv")
system.time({
  models <- by_dest %>% 
    do(mod = gam(dep_delay ~ s(yday) + s(dep_time), data = .))
})

Compared with around 5s doing it locally:

system.time({
  models <- common_dest %>% 
    group_by(dest) %>% 
    do(mod = gam(dep_delay ~ s(yday) + s(dep_time), data = .))
})

The cost of transmitting messages to the nodes is roughly constant, so the longer the task you're parallelising, the closer you'll get to a linear speed up.



hadley/multidplyr documentation built on Nov. 4, 2023, 3:21 a.m.