For some time we have been teaching R
users "when working with wide tables on Spark or on databases: narrow to the columns you really want to work with early in your analysis."
This issue arises because wide tables (200 to 1000 columns) are quite common in big-data analytics projects. Often these are "denormalized marts" that are used to drive many different projects. For any one project only a small subset of the columns may be relevant in a calculation.
The idea behind the advice is: working with fewer columns makes for quicker queries.
library("microbenchmark") library("ggplot2") library("dplyr") library("rquery") use_spark <- FALSE
if(use_spark) { conf <- sparklyr::spark_config() conf$spark.yarn.am.cores <- 4 conf$spark.executor.cores <- 4 conf$spark.executor.memory <- "2G" conf$spark.yarn.am.memory <- "2G" conf$`sparklyr.shell.driver-memory` <- "2G" conf$`sparklyr.shell.executor-memory` <- "2G" conf$`spark.yarn.executor.memoryOverhead` <- "2G" my_db <- sparklyr::spark_connect(version='2.2.0', master = "local", config = conf) } else { my_db <- DBI::dbConnect(RPostgreSQL::PostgreSQL(), host = 'localhost', port = 5432, user = 'johnmount', password = '') } nSubj <- 50000 nIrrelCol <- 500 dL <- data.frame(subjectID = sort(rep(seq_len(nSubj),2)), surveyCategory = c( 'withdrawal behavior', 'positive re-framing'), stringsAsFactors = FALSE) dL$assessmentTotal <- sample.int(10, nrow(dL), replace = TRUE) for(i in seq_len(nIrrelCol)) { ni <- paste("irrelevantCol", sprintf("%07g", i), sep = "_") dL[[ni]] <- sample(letters, size = nrow(dL), replace = TRUE) } dR <- rquery::dbi_copy_to(my_db, 'd', dL, temporary = TRUE, overwrite = TRUE) dT <- dplyr::tbl(my_db, "d") dL <- NULL
Let's set up our experiment. The data is a larger version of the problem from "Let’s Have Some Sympathy For The Part-time R User". We have expanded the number of subjects to r sprintf("%i",nSubj)
and added r sprintf("%i",nIrrelCol)
irrelevant columns to the example. We define a new function that uses dplyr
and Sparklyr
to compute the diagnoses. We vary if the table is first limited to columns of interest and if the results are brought back to R
.
print(my_db) scale <- 0.237
dT %>% select(subjectID, surveyCategory, assessmentTotal) %>% group_by(subjectID) %>% mutate(probability = exp(assessmentTotal * scale)/ sum(exp(assessmentTotal * scale), na.rm = TRUE)) %>% arrange(probability, surveyCategory) %>% filter(row_number() == n()) %>% ungroup() %>% rename(diagnosis = surveyCategory) %>% select(subjectID, diagnosis, probability) %>% arrange(subjectID) %>% dbplyr::remote_query(.) %>% cat dT %>% group_by(subjectID) %>% mutate(probability = exp(assessmentTotal * scale)/ sum(exp(assessmentTotal * scale), na.rm = TRUE)) %>% arrange(probability, surveyCategory) %>% filter(row_number() == n()) %>% ungroup() %>% rename(diagnosis = surveyCategory) %>% select(subjectID, diagnosis, probability) %>% arrange(subjectID) %>% dbplyr::remote_query(.) %>% cat dplyr_run <- function(narrow) { dR <- dT if(narrow) { dR <- dR %>% select(subjectID, surveyCategory, assessmentTotal) } dR %>% group_by(subjectID) %>% mutate(probability = exp(assessmentTotal * scale)/ sum(exp(assessmentTotal * scale), na.rm = TRUE)) %>% arrange(probability, surveyCategory) %>% filter(row_number() == n()) %>% ungroup() %>% rename(diagnosis = surveyCategory) %>% select(subjectID, diagnosis, probability) %>% arrange(subjectID) %>% head(n=1) %>% collect() %>% as.data.frame() } dplyr_run(narrow=FALSE) dplyr_run(narrow=TRUE)
optree <- dR %.>% extend_nse(., probability %:=% exp(assessmentTotal * scale)/ sum(exp(assessmentTotal * scale)), count %:=% count(1), partitionby = 'subjectID') %.>% extend_nse(., rank %:=% row_number(), partitionby = 'subjectID', orderby = c('probability', 'surveyCategory')) %.>% rename_columns(., 'diagnosis' %:=% 'surveyCategory') %.>% select_rows_nse(., rank == count) %.>% select_columns(., c('subjectID', 'diagnosis', 'probability')) %.>% orderby(., 'subjectID') cat(to_sql(optree, my_db)) rquery_run <- function() { optree <- dR %.>% extend_nse(., probability %:=% exp(assessmentTotal * scale)/ sum(exp(assessmentTotal * scale)), count %:=% count(1), partitionby = 'subjectID') %.>% extend_nse(., rank %:=% row_number(), partitionby = 'subjectID', orderby = c('probability', 'surveyCategory')) %.>% rename_columns(., 'diagnosis' %:=% 'surveyCategory') %.>% select_rows_nse(., rank == count) %.>% select_columns(., c('subjectID', 'diagnosis', 'probability')) %.>% orderby(., 'subjectID', limit = 1) execute(my_db, optree) } rquery_run()
We can get timings for variations of the function:
library("microbenchmark") timings <- microbenchmark(dplyr_run(narrow=FALSE), dplyr_run(narrow=TRUE), rquery_run()) dT <- NULL dR <- NULL
if(use_spark) { sparklyr::spark_disconnect(my_db) } else { DBI::dbDisconnect(my_db) } my_db <- NULL
And then present the results:
print(timings) autoplot(timings) tdf <- as.data.frame(timings) # order the data tdf <- tdf %>% group_by(., expr) %>% mutate(., mtime = median(time)) %>% ungroup(.) tdf$expr <- reorder(tdf$expr, tdf$mtime) WVPlots::ScatterBoxPlotH(tdf, "time", "expr", pt_alpha=0.2, title="Execution times in NS")
The necsissity of the narrowing effect seems to be not present on Spark when we look only at the final result. Though the effect would certainly be there if one inspected an intermediate table.
Of course, narrowing to the exact columns used can be difficult: it can involve inspecting an
arbitrarily long pipeline for column uses. That is part of why we are developing
a new R
query generator that automates that procedure: rquery
.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.