base::date() library("dplyr") library("rquery") library("wrapr") conf <- sparklyr::spark_config() conf$spark.yarn.am.cores <- 2 conf$spark.executor.cores <- 2 conf$spark.executor.memory <- "4G" conf$spark.yarn.am.memory <- "4G" conf$`sparklyr.shell.driver-memory` <- "4G" conf$`sparklyr.shell.executor-memory` <- "4G" conf$`spark.yarn.executor.memoryOverhead` <- "4G" # conf$spark.yarn.am.cores <- 16 # conf$spark.executor.cores <- 16 # conf$spark.executor.memory <- "8G" # conf$spark.yarn.am.memory <- "8G" # conf$`sparklyr.shell.driver-memory` <- "8G" # conf$`sparklyr.shell.executor-memory` <- "8G" # conf$`spark.yarn.executor.memoryOverhead` <- "8G" my_db <- sparklyr::spark_connect(version='2.2.0', master = "local", config = conf) # configure rquery options dbopts <- rq_connection_tests(my_db) print(dbopts) options(dbopts) base::date()
base::date() # build up medium sized example data nSubj <- 100000 nIrrelCol <- 500 d_local <- data.frame(subjectID = sort(rep(seq_len(nSubj),2)), surveyCategory = c( 'withdrawal behavior', 'positive re-framing'), stringsAsFactors = FALSE) d_local$assessmentTotal <- sample.int(10, nrow(d_local), replace = TRUE) irrel_col_1 <- paste("irrelevantCol", sprintf("%07g", 1), sep = "_") d_local[[irrel_col_1]] <- runif(nrow(d_local)) d_small <- rquery::rq_copy_to(my_db, 'd_small', d_local, overwrite = TRUE, temporary = TRUE) rm(list = "d_local") # cdata::qlook(my_db, d_small$table_name) base::date()
base::date() # add in irrelevant columns # simulates performing a calculation against a larger data mart assignments <- vapply(2:nIrrelCol, function(i) { paste("irrelevantCol", sprintf("%07g", i), sep = "_") }, character(1)) := vapply(2:nIrrelCol, function(i) { paste(irrel_col_1, "+", i) }, character(1)) d_large <- d_small %.>% extend_se(., assignments) %.>% materialize(my_db, ., overwrite = TRUE, temporary = TRUE) rm(list = "d_small") # cdata::qlook(my_db, d_large$table_name) # build dplyr reference d_large_tbl <- tbl(my_db, d_large$table_name) # rquery view of table rquery::rq_nrow(my_db, d_large$table_name) length(column_names(d_large)) # dplyr/tbl view of table sparklyr::sdf_nrow(d_large_tbl) sparklyr::sdf_ncol(d_large_tbl) base::date()
Define and demonstrate pipelines:
base::date() system.time({ scale <- 0.237 rquery_pipeline <- d_large %.>% extend_nse(., probability := exp(assessmentTotal * scale)) %.>% normalize_cols(., "probability", partitionby = 'subjectID') %.>% pick_top_k(., partitionby = 'subjectID', orderby = c('probability', 'surveyCategory'), reverse = c('probability', 'surveyCategory')) %.>% rename_columns(., 'diagnosis' := 'surveyCategory') %.>% select_columns(., qc(subjectID, diagnosis, probability)) %.>% orderby(., 'subjectID') }) # special debug-mode limits all sources to 1 row. # not correct for windowed calculations or joins- # but lets us at least see something execute quickly. system.time(nrow(as.data.frame(execute(my_db, rquery_pipeline, source_limit = 1L)))) # full run system.time(nrow(as.data.frame(execute(my_db, rquery_pipeline)))) base::date()
base::date() system.time({ scale <- 0.237 dplyr_pipeline <- d_large_tbl %>% group_by(subjectID) %>% mutate(probability = exp(assessmentTotal * scale)/ sum(exp(assessmentTotal * scale), na.rm = TRUE)) %>% arrange(probability, surveyCategory) %>% filter(row_number() == n()) %>% ungroup() %>% rename(diagnosis = surveyCategory) %>% select(subjectID, diagnosis, probability) %>% arrange(subjectID) }) # full run system.time(nrow(as.data.frame(dplyr_pipeline))) base::date()
rquery
materialize_node()
works with rquery
's column narrowing calculations.
base::date() system.time({ scale <- 0.237 rquery_pipeline_cached <- d_large %.>% extend_nse(., probability := exp(assessmentTotal * scale)) %.>% normalize_cols(., "probability", partitionby = 'subjectID') %.>% pick_top_k(., partitionby = 'subjectID', orderby = c('probability', 'surveyCategory'), reverse = c('probability', 'surveyCategory')) %.>% materialize_node(., "tmp_res") %.>% # <- insert caching node into pipeline prior to narrowing rename_columns(., 'diagnosis' := 'surveyCategory') %.>% select_columns(., qc(subjectID, diagnosis, probability)) %.>% orderby(., 'subjectID') sql_list <- to_sql(rquery_pipeline_cached, my_db) }) for(i in seq_len(length(sql_list))) { print(paste("step", i)) cat(format(sql_list[[i]])) cat("\n") } # special debug-mode limits all sources to 1 row. # not correct for windowed calculations or joins- # but lets us at least see something execute quickly. system.time(nrow(as.data.frame(execute(my_db, rquery_pipeline_cached, source_limit = 1L)))) # full run system.time(nrow(as.data.frame(execute(my_db, rquery_pipeline_cached)))) base::date()
And the introduction of a dplyr::compute()
node (with the intent of speeding things up through caching)
can be expensive.
base::date() system.time({ scale <- 0.237 dplyr_pipeline_c <- d_large_tbl %>% group_by(subjectID) %>% mutate(probability = exp(assessmentTotal * scale)/ sum(exp(assessmentTotal * scale), na.rm = TRUE)) %>% arrange(probability, surveyCategory) %>% filter(row_number() == n()) %>% compute() %>% # <- inopportune place to try to cache ungroup() %>% rename(diagnosis = surveyCategory) %>% select(subjectID, diagnosis, probability) %>% arrange(subjectID) %>% as.data.frame() %>% nrow() }) base::date()
For larger examples the above dplyr
pipeline often errors-out at the compute()
step with:
# Logs the following to the console and seems to never come back. # *** caught segfault *** # address 0x7fd368200000, cause 'memory not mapped'
Now, let's show how/where erroneous pipelines are debugged in each system.
In rquery
many user errors are caught during pipeline construction,
independent of database.
base::date() system.time({ scale <- 0.237 # rquery catches the error during pipeline definition, # prior to sending it to the database or Spark data system. rquery_pipeline_late_error <- d_large %.>% extend_nse(., probability := exp(assessmentTotal * scale)) %.>% normalize_cols(., "probability", partitionby = 'subjectID') %.>% pick_top_k(., partitionby = 'subjectID', orderby = c('probability', 'surveyCategory'), reverse = c('probability', 'surveyCategory')) %.>% rename_columns(., 'diagnosis' := 'surveyCategory') %.>% select_columns(., qc(subjectID, diagnosis, probability)) %.>% orderby(., 'ZubjectIDZZZ') # <- error non-existent column }) base::date()
With dplyr
user errors are mostly caught when the command is
analyzed on the remote data system.
base::date() system.time({ scale <- 0.237 # dplyr accepts an incorrect pipeline dplyr_pipeline_late_error <- d_large_tbl %>% group_by(subjectID) %>% mutate(probability = exp(assessmentTotal * scale)/ sum(exp(assessmentTotal * scale), na.rm = TRUE)) %>% arrange(probability, surveyCategory) %>% filter(row_number() == n()) %>% ungroup() %>% rename(diagnosis = surveyCategory) %>% select(subjectID, diagnosis, probability) %>% arrange(ZubjectIDZZZ) # <- error non-existent column }) # dplyr will generate (incorrect) SQL from the incorrect pipeline cat(dbplyr::remote_query(dplyr_pipeline_late_error)) # Fortunately, Spark's query analyzer does catch the error quickly # in this case. system.time(nrow(as.data.frame(dplyr_pipeline_late_error))) base::date()
base::date() system.time({ scale <- 0.237 dplyr_pipeline_c <- d_large_tbl %>% group_by(subjectID) %>% mutate(probability = exp(assessmentTotal * scale)/ sum(exp(assessmentTotal * scale), na.rm = TRUE)) %>% arrange(probability, surveyCategory) %>% filter(row_number() == n()) %>% compute() %>% # <- inopportune place to try to cache ungroup() %>% rename(diagnosis = surveyCategory) %>% select(subjectID, diagnosis, probability) %>% arrange(ZubjectIDZZZ) %>% # <- error non-existent column as.data.frame() %>% nrow() }) base::date()
base::date() sparklyr::spark_disconnect(my_db) base::date()
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.