For some time we have been teaching R users "when working with wide tables on Spark or on databases: narrow to the columns you really want to work with early in your analysis."

This issue arises because wide tables (200 to 1000 columns) are quite common in big-data analytics projects. Often these are "denormalized marts" that are used to drive many different projects. For any one project only a small subset of the columns may be relevant in a calculation.

The idea behind the advice is: working with fewer columns makes for quicker queries.

library("microbenchmark")
library("ggplot2")
library("dplyr")
library("rquery")

use_spark <- TRUE
if(use_spark) {
  conf <-  sparklyr::spark_config()
  conf$spark.yarn.am.cores <- 4
  conf$spark.executor.cores <- 4
  conf$spark.executor.memory <- "2G"
  conf$spark.yarn.am.memory <- "2G" 
  conf$`sparklyr.shell.driver-memory` <- "2G"
  conf$`sparklyr.shell.executor-memory` <- "2G"
  conf$`spark.yarn.executor.memoryOverhead` <- "2G"
  my_db <- sparklyr::spark_connect(version='2.2.0', 
                                   master = "local",
                                   config = conf)
} else {
  my_db <- DBI::dbConnect(RPostgreSQL::PostgreSQL(),
                          host = 'localhost',
                          port = 5432,
                          user = 'johnmount',
                          password = '')
}

nSubj <- 50000
nIrrelCol <- 500
dL <- data.frame(subjectID = sort(rep(seq_len(nSubj),2)),
                 surveyCategory = c(
                   'withdrawal behavior',
                   'positive re-framing'),
                 stringsAsFactors = FALSE)
dL$assessmentTotal <- sample.int(10, nrow(dL), replace = TRUE)
for(i in seq_len(nIrrelCol)) {
  ni <- paste("irrelevantCol", sprintf("%07g", i), sep = "_")
  dL[[ni]] <- sample(letters, size = nrow(dL), replace = TRUE)
}

dR <- rquery::dbi_copy_to(my_db, 'd',
                 dL,
                 temporary = TRUE, 
                 overwrite = TRUE)
dT <- dplyr::tbl(my_db, "d")

dL <- NULL

Let's set up our experiment. The data is a larger version of the problem from "Let’s Have Some Sympathy For The Part-time R User". We have expanded the number of subjects to r sprintf("%i",nSubj) and added r sprintf("%i",nIrrelCol) irrelevant columns to the example. We define a new function that uses dplyr and Sparklyr to compute the diagnoses. We vary if the table is first limited to columns of interest and if the results are brought back to R.

scale <- 0.237
dT %>%
  select(subjectID, surveyCategory, assessmentTotal) %>%
  group_by(subjectID) %>%
  mutate(probability =
           exp(assessmentTotal * scale)/
           sum(exp(assessmentTotal * scale), na.rm = TRUE)) %>%
  arrange(probability, surveyCategory) %>%
  filter(row_number() == n()) %>%
  ungroup() %>%
  rename(diagnosis = surveyCategory) %>%
  select(subjectID, diagnosis, probability) %>%
  arrange(subjectID) %>%
  dbplyr::remote_query(.) %>%
  cat

dT %>%
  group_by(subjectID) %>%
  mutate(probability =
           exp(assessmentTotal * scale)/
           sum(exp(assessmentTotal * scale), na.rm = TRUE)) %>%
  arrange(probability, surveyCategory) %>%
  filter(row_number() == n()) %>%
  ungroup() %>%
  rename(diagnosis = surveyCategory) %>%
  select(subjectID, diagnosis, probability) %>%
  arrange(subjectID) %>%
  dbplyr::remote_query(.) %>%
  cat

dplyr_run <- function(narrow, compute) {
  dR <- dT
  if(narrow) {
    dR <- dR %>%
      select(subjectID, surveyCategory, assessmentTotal)
  }
  dR <- dR %>%
    group_by(subjectID) %>%
    mutate(probability =
             exp(assessmentTotal * scale)/
             sum(exp(assessmentTotal * scale), na.rm = TRUE)) %>%
    arrange(probability, surveyCategory) %>%
    filter(row_number() == n()) %>%
    ungroup() 
  if(compute) {
    dR <- compute(dR)
  }
  dR %>%
    rename(diagnosis = surveyCategory) %>%
    select(subjectID, diagnosis, probability) %>%
    arrange(subjectID) %>% 
    head(n=1) %>%
    collect() %>%
    as.data.frame()
}


dplyr_run(narrow=FALSE, compute=FALSE)

dplyr_run(narrow=TRUE, compute=FALSE)
optree <- dR %.>%
  extend_nse(.,
             probability %:=%
               exp(assessmentTotal * scale)/
               sum(exp(assessmentTotal * scale)),
             count %:=% count(1),
             partitionby = 'subjectID') %.>%
  extend_nse(.,
             rank %:=% row_number(),
             partitionby = 'subjectID',
             orderby = c('probability', 'surveyCategory'))  %.>%
  rename_columns(., 'diagnosis' %:=% 'surveyCategory') %.>%
  select_rows_nse(., rank == count) %.>%
  select_columns(., c('subjectID', 
                      'diagnosis', 
                      'probability')) %.>%
  orderby(., 'subjectID')

cat(to_sql(optree, my_db))


rquery_run <- function() {
  optree <- dR %.>%
    extend_nse(.,
               probability %:=%
                 exp(assessmentTotal * scale)/
                 sum(exp(assessmentTotal * scale)),
               count %:=% count(1),
               partitionby = 'subjectID') %.>%
    extend_nse(.,
               rank %:=% row_number(),
               partitionby = 'subjectID',
               orderby = c('probability', 'surveyCategory'))  %.>%
    rename_columns(., 'diagnosis' %:=% 'surveyCategory') %.>%
    select_rows_nse(., rank == count) %.>%
    select_columns(., c('subjectID', 
                        'diagnosis', 
                        'probability')) %.>%
    orderby(., 'subjectID', limit = 1)
  execute(my_db, optree)
}

rquery_run()

We can get timings for variations of the function:

library("microbenchmark")

timings <- microbenchmark(dplyr_run(narrow=FALSE, compute=FALSE), 
                          dplyr_run(narrow=TRUE, compute=FALSE),
                          dplyr_run(narrow=FALSE, compute=TRUE), 
                          dplyr_run(narrow=TRUE, compute=TRUE),
                          rquery_run())

dT <- NULL
dR <- NULL
if(use_spark) {
  sparklyr::spark_disconnect(my_db)
} else {
  DBI::dbDisconnect(my_db)
}
my_db <- NULL

And then present the results:

print(timings)

autoplot(timings)

tdf <- as.data.frame(timings)

# order the data
tdf <- tdf %>%
  group_by(., expr) %>%
  mutate(., mtime = median(time)) %>%
  ungroup(.)

tdf$expr <- reorder(tdf$expr, tdf$mtime)
WVPlots::ScatterBoxPlotH(tdf, "time", "expr",  
                         pt_alpha=0.2,
                         title="Execution times in NS")

The necsissity of the narrowing effect seems to be not present on Spark when we look only at the final result. Though the effect would certainly be there if one inspected an intermediate table.

Of course, narrowing to the exact columns used can be difficult: it can involve inspecting an arbitrarily long pipeline for column uses. That is part of why we are developing a new R query generator that automates that procedure: rquery.



WinVector/rquery documentation built on Aug. 24, 2023, 11:12 a.m.