For some time we have been teaching R
users "when working with wide tables on Spark or on databases: narrow to the columns you really want to work with early in your analysis."
This issue arises because wide tables (200 to 1000 columns) are quite common in big-data analytics projects. Often these are "denormalized marts" that are used to drive many different projects. For any one project only a small subset of the columns may be relevant in a calculation.
The idea behind the advice is: working with fewer columns makes for quicker queries.
Some wonder is this really an issue or is it something one can ignore in the hope the downstream query optimizer fixes the problem. In this note we will show the effect is real.
library("microbenchmark") suppressPackageStartupMessages(library("dplyr")) library("rquery") my_db <- sparklyr::spark_connect(version='2.2.0', master = "local") nSubj <- 100000 nIrrelCol <- 1000 dL <- data.frame(subjectID = sort(rep(seq_len(nSubj),2)), surveyCategory = c( 'withdrawal behavior', 'positive re-framing'), stringsAsFactors = FALSE) dL$assessmentTotal <- sample.int(10, nrow(dL), replace = TRUE) for(i in seq_len(nIrrelCol)) { ni <- paste("irrelevantCol", sprintf("%07g", i), sep = "_") dL[[ni]] <- sample(letters, size = nrow(dL), replace = TRUE) } d <- rquery::dbi_copy_to(my_db, 'd', dL, temporary = TRUE, overwrite = FALSE) dL <- NULL # copy to Parquet to simulate large external data source dT <- dplyr::tbl(my_db, d$table_name) sparklyr::spark_write_parquet(dT, "perf_tmp", mode = 'overwrite') dplyr::db_drop_table(my_db, d$table_name) dT <- NULL d <- NULL # build new ref dT <- sparklyr::spark_read_parquet(my_db, 'dparq', "perf_tmp", memory = FALSE)
Let's set up our experiment. The data is a larger version of the problem from "Let’s Have Some Sympathy For The Part-time R User". We have expanded the number of subjects to r sprintf("%i",nSubj)
and added r sprintf("%i",nIrrelCol)
irrelevant columns to the example. We define a new function that uses dplyr
and Sparklyr
to compute the diagnoses. We vary if the table is first limited to columns of interest and if the results are brought back to R
.
scale <- 0.237 dplyr_run <- function(narrow, collect = FALSE) { dR <- dT if(narrow) { dR <- dR %>% select(subjectID, surveyCategory, assessmentTotal) } dR <- dR %>% group_by(subjectID) %>% mutate(probability = exp(assessmentTotal * scale)/ sum(exp(assessmentTotal * scale))) %>% arrange(probability, surveyCategory) %>% filter(row_number() == n()) %>% ungroup() %>% rename(diagnosis = surveyCategory) %>% select(subjectID, diagnosis, probability) %>% arrange(subjectID) if(collect) { dR <- collect(dR) } else { dR <- compute(dR) } dR } head(dplyr_run(narrow=FALSE)) head(dplyr_run(narrow=TRUE))
We can get timings for variations of the function:
library("microbenchmark") timings <- microbenchmark(dplyr_run(narrow=FALSE), dplyr_run(narrow=TRUE), times = 20)
And then present the results:
print(timings) tdf <- as.data.frame(timings) # order the data tdf <- tdf %>% group_by(., expr) %>% mutate(., mtime = median(time)) %>% ungroup(.) tdf$expr <- reorder(tdf$expr, tdf$mtime) WVPlots::ScatterBoxPlotH(tdf, "time", "expr", pt_alpha=0.2, title="Execution times in NS")
Notice the times where we have not per-narrowed the table are indeed much slower.
The advice is confirmed: narrow to the columns of interest early in your analysis.
Of course, narrowing to the exact columns used can be difficult: it can involve inspecting an
arbitrarily long pipeline for column uses. That is part of why we are developing
a new R
query generator that automates that procedure: rquery
.
sparklyr::spark_disconnect(my_db)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.