Running on a C4.8xlarge EC2 intance.
library("microbenchmark") library("dplyr") library("rquery") library("WVPlots") library("ggplot2") library("seplyr") conf <- sparklyr::spark_config() conf$spark.yarn.am.cores <- 16 conf$spark.executor.cores <- 16 conf$spark.executor.memory <- "8G" conf$spark.yarn.am.memory <- "8G" conf$`sparklyr.shell.driver-memory` <- "8G" conf$`sparklyr.shell.executor-memory` <- "8G" conf$`spark.yarn.executor.memoryOverhead` <- "8G" my_db <- sparklyr::spark_connect(version='2.2.0', master = "local", config = conf) dbopts <- dbi_connection_preferences(my_db) print(dbopts) options(dbopts) nSubj <- 200000 nIrrelCol <- 1000 dL <- data.frame(subjectID = sort(rep(seq_len(nSubj),2)), surveyCategory = c( 'withdrawal behavior', 'positive re-framing'), stringsAsFactors = FALSE) dL$assessmentTotal <- sample.int(10, nrow(dL), replace = TRUE) for(i in seq_len(nIrrelCol)) { ni <- paste("irrelevantCol", sprintf("%07g", i), sep = "_") dL[[ni]] <- sample(letters, size = nrow(dL), replace = TRUE) } d <- rquery::dbi_copy_to(my_db, 'd', dL, overwrite = TRUE) dL <- NULL # copy to Parquet to simulate large external data source dT <- dplyr::tbl(my_db, d$table_name) sparklyr::spark_write_parquet(dT, "perf_tmp", mode = 'overwrite') dplyr::db_drop_table(my_db, d$table_name) dT <- NULL d <- NULL # build new refs dT <- sparklyr::spark_read_parquet(my_db, 'dparq', "perf_tmp", memory = FALSE) d <- rquery::dbi_table(my_db, 'dparq')
Define and demonstrate pipelines:
DBI::dbGetQuery(my_db, paste("SELECT COUNT(1) FROM", d$table_name)) length(column_names(d)) sparklyr::sdf_nrow(dT) sparklyr::sdf_ncol(dT) scale <- 0.237 tng <- wrapr::mk_tmp_name_source("tmptab") rquery_run <- function(collect) { dq <- d %.>% extend_nse(., probability := exp(assessmentTotal * scale)/ sum(exp(assessmentTotal * scale)), count := count(1), partitionby = 'subjectID') %.>% extend_nse(., rank := rank(), partitionby = 'subjectID', orderby = c('probability', 'surveyCategory')) %.>% rename_columns(., 'diagnosis' := 'surveyCategory') %.>% select_rows_nse(., rank == count) %.>% select_columns(., c('subjectID', 'diagnosis', 'probability')) %.>% orderby(., 'subjectID') sql <- to_sql(dq, my_db) if(collect) { dR <- DBI::dbGetQuery(my_db, sql) } else { # count and throw away the data sql <- paste("SELECT COUNT(1) FROM (", sql, ") ctab") dR <- DBI::dbGetQuery(my_db, sql) } dR } dplyr_run <- function(narrow, collect) { dR <- dT if(narrow) { dR <- dR %>% select(subjectID, surveyCategory, assessmentTotal) } dR <- dR %>% group_by(subjectID) %>% mutate(probability = exp(assessmentTotal * scale)/ sum(exp(assessmentTotal * scale), na.rm = TRUE)) %>% arrange(probability, surveyCategory) %>% filter(row_number() == n()) %>% ungroup() %>% rename(diagnosis = surveyCategory) %>% select(subjectID, diagnosis, probability) %>% arrange(subjectID) if(collect) { dR <- collect(dR) } else { # count and throw away the data (force calculation) dR <- as.data.frame(tally(dR)) } dR } head(rquery_run(collect=TRUE)) rquery_run(collect=FALSE) head(dplyr_run(narrow=FALSE, collect=TRUE)) dplyr_run(narrow=FALSE, collect=FALSE) head(dplyr_run(narrow=TRUE, collect=TRUE)) dplyr_run(narrow=TRUE, collect=FALSE)
Get timings:
timings <- microbenchmark(rquery_run(collect=TRUE), rquery_run(collect=FALSE), dplyr_run(narrow=FALSE, collect=TRUE), dplyr_run(narrow=FALSE, collect=FALSE), dplyr_run(narrow=TRUE, collect=TRUE), dplyr_run(narrow=TRUE, collect=FALSE)) saveRDS(timings, "PerfTest_timings.RDS")
Present results:
print(timings) autoplot(timings) tdf <- as.data.frame(timings) # order the data tdf <- tdf %.>% group_by_se(., "expr") %.>% mutate_se(., qae(mtime := median(time))) %.>% ungroup(.) tdf$expr <- reorder(tdf$expr, tdf$mtime) WVPlots::ScatterBoxPlotH(tdf, "time", "expr", pt_alpha=0.2, title="Execution times in NS")
sparklyr::spark_disconnect(my_db)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.