In this note we will show how to speed up work in R
by partitioning data and process-level parallelization. We will show the technique with three different R
packages: rqdatatable
, data.table
, and dplyr
. The methods shown will also work with base-R
and other packages.
For each of the above packages we speed up work by using wrapr::execute_parallel
which in turn uses wrapr::partition_tables
to partition un-related data.frame
rows and then distributes them to different processors to be executed. rqdatatable::ex_data_table_parallel
conveniently bundles all of these steps together when working with rquery
pipelines.
The partitioning is specified by the user preparing a grouping column that tells the system which sets of rows must be kept together in a correct calculation. We are going to try to demonstrate everything with simple code examples, and minimal discussion.
Keep in mind: unless the pipeline steps have non-trivial cost, the overhead of partitioning and distributing the work may overwhelm any parallel speedup. Also data.table
itself already seems to exploit some thread-level parallelism (notice user time is greater than elapsed time). That being said, in this note we will demonstrate a synthetic example where computation is expensive due to a blow-up in an intermediate join step.
First we set up our execution environment and example (some details: OSX 10.13.4 on a 2.8 GHz Intel Core i5 Mac Mini (Late 2015 model) with 8GB RAM and hybrid disk drive).
knitr::opts_chunk$set(fig.width=12, fig.height=8, fig.retina = 2) library("rqdatatable") library("microbenchmark") library("ggplot2") library("WVPlots") suppressPackageStartupMessages(library("dplyr")) base::date() R.version.string parallel::detectCores() packageVersion("parallel") packageVersion("rqdatatable") packageVersion("rquery") packageVersion("dplyr") ncore <- parallel::detectCores() print(ncore) cl <- parallel::makeCluster(ncore) print(cl) set.seed(2362) mk_example <- function(nkey, nrep, ngroup = 20) { keys <- paste0("key_", seq_len(nkey)) key_group <- sample(as.character(seq_len(ngroup)), length(keys), replace = TRUE) names(key_group) <- keys key_table <- data.frame( key = rep(keys, nrep), stringsAsFactors = FALSE) key_table$data <- runif(nrow(key_table)) instance_table <- data.frame( key = rep(keys, nrep), stringsAsFactors = FALSE) instance_table$id <- seq_len(nrow(instance_table)) instance_table$info <- runif(nrow(instance_table)) # groups should be no finer than keys key_table$key_group <- key_group[key_table$key] instance_table$key_group <- key_group[instance_table$key] list(key_table = key_table, instance_table = instance_table) } dlist <- mk_example(10, 10) data <- dlist$instance_table annotation <- dlist$key_table
rquery
and rqdatatable
can implement a non-trivial calculation as follows.
# possible data lookup: find rows that # have lookup data <= info optree <- local_td(data) %.>% natural_join(., local_td(annotation), jointype = "INNER", by = "key") %.>% select_rows_nse(., data <= info) %.>% pick_top_k(., k = 1, partitionby = "id", orderby = "data", reverse = "data", keep_order_column = FALSE) %.>% orderby(., "id") cat(format(optree)) res1 <- ex_data_table(optree) head(res1) nrow(res1)
Or we could try a theta-join, which reduces production of intermediate rows.
# possible data lookup: find rows that # have lookup data <= info optree_theta <- local_td(data) %.>% theta_join_se(., local_td(annotation), jointype = "INNER", expr = "key == key && info >= data") %.>% select_rows_nse(., data <= info) %.>% pick_top_k(., k = 1, partitionby = "id", orderby = "data", reverse = "data", keep_order_column = FALSE) %.>% orderby(., "id") cat(format(optree_theta)) res_theta <- ex_data_table(optree_theta) head(res_theta) nrow(res_theta)
And we can execute the operations in parallel.
parallel::clusterEvalQ(cl, library("rqdatatable")) res2 <- ex_data_table_parallel(optree, "key_group", cl) head(res2) nrow(res2)
data.table
can implement the same function.
library("data.table") packageVersion("data.table") # revised function from: # http://www.win-vector.com/blog/2018/07/speed-up-your-r-work/#comment-66925 data_table_f <- function(data, annotation) { #setDT(data, key = c("key","info")) #setDT(annotation, key = c("key","data")) data <- data.table::as.data.table(data) annotation <- data.table::as.data.table(annotation) joined2 <- data[annotation, on=.(key, info >= data), .(id, key, info = x.info, key_group.x = x.key_group, data = i.data, key_group.y = i.key_group), allow.cartesian=TRUE, nomatch = 0] setorder(joined2,data) joined2[joined2[,.I[.N], keyby = .(id)]$V1] } resdt <- data_table_f(data, annotation) head(resdt) nrow(resdt)
We can also run data.table
in parallel using wrapr::execute_parallel
.
parallel::clusterEvalQ(cl, library("data.table")) parallel::clusterExport(cl, "data_table_f") dt_f <- function(tables_list) { data <- tables_list$data annotation <- tables_list$annotation data_table_f(data, annotation) } data_table_parallel_f <- function(data, annotation) { respdt <- wrapr::execute_parallel( tables = list(data = data, annotation = annotation), f = dt_f, partition_column = "key_group", cl = cl) %.>% data.table::rbindlist(.) data.table::setorderv(respdt, cols = "id") respdt } respdt <- data_table_parallel_f(data, annotation) head(respdt) nrow(respdt)
dplyr
can also implement the example.
dplyr_pipeline <- function(data, annotation) { res <- data %>% inner_join(annotation, by = "key") %>% filter(data <= info) %>% group_by(id) %>% arrange(-data) %>% mutate(rownum = row_number()) %>% ungroup() %>% filter(rownum == 1) %>% arrange(id) res } resd <- dplyr_pipeline(data, annotation) head(resd) nrow(resd)
And we can use wrapr::execute_parallel
to parallelize the dplyr
solution.
parallel::clusterEvalQ(cl, library("dplyr")) parallel::clusterExport(cl, "dplyr_pipeline") dplyr_f <- function(tables_list) { data <- tables_list$data annotation <- tables_list$annotation dplyr_pipeline(data, annotation) } dplyr_parallel_f <- function(data, annotation) { respdt <- wrapr::execute_parallel( tables = list(data = data, annotation = annotation), f = dplyr_f, partition_column = "key_group", cl = cl) %>% dplyr::bind_rows() %>% arrange(id) } respdplyr <- dplyr_parallel_f(data, annotation) head(respdplyr) nrow(respdplyr)
We can benchmark the various realizations.
dlist <- mk_example(300, 300) data <- dlist$instance_table annotation <- dlist$key_table timings <- microbenchmark( data_table_parallel = nrow(data_table_parallel_f(data, annotation)), data_table = nrow(data_table_f(data, annotation)), rqdatatable_parallel = nrow(ex_data_table_parallel(optree, "key_group", cl)), rqdatatable = nrow(ex_data_table(optree)), rqdatatable_theta_parallel = nrow(ex_data_table_parallel(optree_theta, "key_group", cl)), rqdatatable_theta = nrow(ex_data_table(optree_theta)), dplyr_parallel = nrow(dplyr_parallel_f(data, annotation)), dplyr = nrow(dplyr_pipeline(data, annotation)), times = 10L) saveRDS(timings, "Parallel_rqdatatable_timings.RDS")
print(timings) # autoplot(timings) timings <- as.data.frame(timings) timings$seconds <- timings$time/1e+9 timings$method <- factor(timings$expr, levels = rev(c("dplyr", "dplyr_parallel", "rqdatatable", "rqdatatable_parallel", "rqdatatable_theta", "rqdatatable_theta_parallel", "data_table", "data_table_parallel"))) ScatterBoxPlotH(timings, xvar = "seconds", yvar = "method", title="task duration by method") # timings$is_parallel <- grepl("parallel", timings$expr) # ScatterBoxPlotH(timings, # xvar = "seconds", yvar = "method", # title="task duration by method") + # facet_wrap(~is_parallel, ncol = 1, labeller = "label_both", scales = "free_y")
In these timings data.table
is by far the fastest. Part of it is the faster nature of data.table
, and another contribution is data.table
's non-equi join avoids a lot of expense (which is why theta-style joins are in fact interesting).
A reason dplyr
sees greater speedup relative to its own non-parallel implementation (yet does not beat data.table
) is that data.table
starts already multi-threaded, so data.table
is exploiting some parallelism even before we added the process level parallelism (and hence sees less of a speed up, though it is fastest).
rquery
pipelines exhibit superior performance on big data systems (Spark, PostgreSQL, Amazon Redshift, and hopefully soon Google bigquery), and rqdatatable
supplies a very good in-memory implementation of the rquery
system based on data.table
. rquery
also speeds up solution development by supplying higher order operators and early debugging features.
In this note we have demonstrated simple procedures to reliably parallelize any of rqdatatable
, data.table
, or dplyr
.
Note: we did not include alternatives such as multidplyr
or dtplyr
in the timings, as they did not appear to work on this example.
The original rendering of this article can be found here, source code here, and raw timings here.
multidplyr
does not appear to work on this example,
so we could not include it in the timings.
# devtools::install_github("hadley/multidplyr") library("multidplyr") # https://github.com/hadley/multidplyr packageVersion("multidplyr") multidplyr::set_default_cluster(cl) head(dplyr_pipeline(data, annotation)) # example similar to https://github.com/hadley/multidplyr/blob/master/vignettes/multidplyr.Rmd class(data) datap <- multidplyr::partition(data, key_group) head(datap) class(datap) class(annotation) annotationp <- multidplyr::partition(annotation, key_group) head(annotationp) class(annotationp) dplyr_pipeline(datap, annotationp)
dtplyr
does not appear to work on this example, so we could not include it in the timings.
library("data.table") library("dtplyr") # https://CRAN.R-project.org/package=dtplyr packageVersion("dtplyr") head(dplyr_pipeline(data, annotation)) class(data) datadt <- data.table::as.data.table(data) head(datadt) class(datadt) class(annotation) annotationdt <- data.table::as.data.table(annotation) head(annotationdt) class(annotationdt) dplyr_pipeline(datadt, annotationdt)
parallel::stopCluster(cl) rm(list = "cl")
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.