Running on a C4.8xlarge EC2 intance.

library("microbenchmark")
library("dplyr")
library("rquery")
library("WVPlots")
library("ggplot2")
library("seplyr")

conf <-  sparklyr::spark_config()
conf$spark.yarn.am.cores <- 16
conf$spark.executor.cores <- 16
conf$spark.executor.memory <- "8G"
conf$spark.yarn.am.memory <- "8G"
conf$`sparklyr.shell.driver-memory` <- "8G"
conf$`sparklyr.shell.executor-memory` <- "8G"
conf$`spark.yarn.executor.memoryOverhead` <- "8G"
my_db <- sparklyr::spark_connect(version='2.2.0', 
                                 master = "local",
                                 config = conf)

dbopts <- dbi_connection_preferences(my_db)
print(dbopts)
options(dbopts)

nSubj <- 200000
nIrrelCol <- 1000

dL <- data.frame(subjectID = sort(rep(seq_len(nSubj),2)),
                 surveyCategory = c(
                   'withdrawal behavior',
                   'positive re-framing'),
                 stringsAsFactors = FALSE)
dL$assessmentTotal <- sample.int(10, nrow(dL), replace = TRUE)
for(i in seq_len(nIrrelCol)) {
  ni <- paste("irrelevantCol", sprintf("%07g", i), sep = "_")
  dL[[ni]] <- sample(letters, size = nrow(dL), replace = TRUE)
}

d <- rquery::dbi_copy_to(my_db, 'd',
                 dL,
                 overwrite = TRUE)
dL <- NULL

# copy to Parquet to simulate large external data source
dT <- dplyr::tbl(my_db, d$table_name)
sparklyr::spark_write_parquet(dT, "perf_tmp", mode = 'overwrite')
dplyr::db_drop_table(my_db, d$table_name)
dT <- NULL
d <- NULL

# build new refs
dT <- sparklyr::spark_read_parquet(my_db, 'dparq', "perf_tmp", memory = FALSE)
d <- rquery::dbi_table(my_db, 'dparq')

Define and demonstrate pipelines:

DBI::dbGetQuery(my_db, paste("SELECT COUNT(1) FROM", d$table_name))
length(column_names(d))

sparklyr::sdf_nrow(dT)
sparklyr::sdf_ncol(dT)

scale <- 0.237
tng <- wrapr::mk_tmp_name_source("tmptab")

rquery_run <- function(collect) {
  dq <- d %.>%
    extend_nse(.,
               probability :=
                 exp(assessmentTotal * scale)/
                 sum(exp(assessmentTotal * scale)),
               count := count(1),
               partitionby = 'subjectID') %.>%
    extend_nse(.,
               rank := rank(),
               partitionby = 'subjectID',
               orderby = c('probability', 'surveyCategory'))  %.>%
    rename_columns(., 'diagnosis' := 'surveyCategory') %.>%
    select_rows_nse(., rank == count) %.>%
    select_columns(., c('subjectID', 
                        'diagnosis', 
                        'probability')) %.>%
    orderby(., 'subjectID')
  sql <- to_sql(dq, my_db)
  if(collect) {
    dR <- DBI::dbGetQuery(my_db, sql)
  } else {
    # count and throw away the data
    sql <- paste("SELECT COUNT(1) FROM (", sql, ") ctab")
    dR <- DBI::dbGetQuery(my_db, sql)
  }
  dR
}


dplyr_run <- function(narrow, collect) {
  dR <- dT
  if(narrow) {
    dR <- dR %>%
      select(subjectID, surveyCategory, assessmentTotal)
  }
  dR <- dR %>%
    group_by(subjectID) %>%
    mutate(probability =
             exp(assessmentTotal * scale)/
             sum(exp(assessmentTotal * scale), na.rm = TRUE)) %>%
    arrange(probability, surveyCategory) %>%
    filter(row_number() == n()) %>%
    ungroup() %>%
    rename(diagnosis = surveyCategory) %>%
    select(subjectID, diagnosis, probability) %>%
    arrange(subjectID)
  if(collect) {
    dR <- collect(dR)
  } else {
    # count and throw away the data (force calculation)
    dR <- as.data.frame(tally(dR))
  }
  dR
}

head(rquery_run(collect=TRUE))

rquery_run(collect=FALSE) 


head(dplyr_run(narrow=FALSE, collect=TRUE))

dplyr_run(narrow=FALSE, collect=FALSE)

head(dplyr_run(narrow=TRUE, collect=TRUE))

dplyr_run(narrow=TRUE, collect=FALSE)

Get timings:

timings <- microbenchmark(rquery_run(collect=TRUE), 
                          rquery_run(collect=FALSE), 
                          dplyr_run(narrow=FALSE, collect=TRUE), 
                          dplyr_run(narrow=FALSE, collect=FALSE), 
                          dplyr_run(narrow=TRUE, collect=TRUE),
                          dplyr_run(narrow=TRUE, collect=FALSE))
saveRDS(timings, "PerfTest_timings.RDS")

Present results:

print(timings)

autoplot(timings)

tdf <- as.data.frame(timings)

# order the data
tdf <- tdf %.>%
  group_by_se(., "expr") %.>%
  mutate_se(., qae(mtime := median(time))) %.>%
  ungroup(.)

tdf$expr <- reorder(tdf$expr, tdf$mtime)
WVPlots::ScatterBoxPlotH(tdf, "time", "expr",  
                         pt_alpha=0.2,
                         title="Execution times in NS")
sparklyr::spark_disconnect(my_db)


WinVector/rquery documentation built on Aug. 24, 2023, 11:12 a.m.