base::date() library("dplyr") library("rquery") library("wrapr") my_db <- DBI::dbConnect(RPostgreSQL::PostgreSQL(), host = 'localhost', port = 5432, user = 'johnmount', password = '') # configure rquery options dbopts <- rq_connection_tests( my_db, # seeing a bug here in RPostgreSQL, possibly on CREATE queries overrides = c(use_DBI_dbExistsTable = FALSE, use_DBI_dbRemoveTable = FALSE) ) print(dbopts) options(dbopts) base::date()
base::date() # build up medium sized example data nSubj <- 100000 nIrrelCol <- 500 d_local <- data.frame(subjectID = sort(rep(seq_len(nSubj),2)), surveyCategory = c( 'withdrawal behavior', 'positive re-framing'), stringsAsFactors = FALSE) d_local$assessmentTotal <- sample.int(10, nrow(d_local), replace = TRUE) irrel_col_1 <- paste("irrelevantCol", sprintf("%07g", 1), sep = "_") d_local[[irrel_col_1]] <- runif(nrow(d_local)) d_small <- rquery::rq_copy_to(my_db, 'd_small', d_local, overwrite = TRUE, temporary = TRUE) rm(list = "d_local") # cdata::qlook(my_db, d_small$table_name) base::date()
base::date() # add in irrelevant columns # simulates performing a calculation against a larger data mart assignments <- vapply(2:nIrrelCol, function(i) { paste("irrelevantCol", sprintf("%07g", i), sep = "_") }, character(1)) := vapply(2:nIrrelCol, function(i) { paste(irrel_col_1, "+", i) }, character(1)) d_large <- d_small %.>% extend_se(., assignments) %.>% materialize(my_db, ., overwrite = TRUE, temporary = TRUE) rm(list = "d_small") # cdata::qlook(my_db, d_large$table_name) # build dplyr reference d_large_tbl <- tbl(my_db, d_large$table_name) # rquery view of table rquery::rq_nrow(my_db, d_large$table_name) length(column_names(d_large)) base::date()
Define and demonstrate pipelines:
base::date() system.time({ scale <- 0.237 rquery_pipeline <- d_large %.>% extend_nse(., probability := exp(assessmentTotal * scale)) %.>% normalize_cols(., "probability", partitionby = 'subjectID') %.>% pick_top_k(., partitionby = 'subjectID', orderby = c('probability', 'surveyCategory'), reverse = c('probability', 'surveyCategory')) %.>% rename_columns(., 'diagnosis' := 'surveyCategory') %.>% select_columns(., qc(subjectID, diagnosis, probability)) %.>% orderby(., 'subjectID') }) # special debug-mode limits all sources to 1 row. # not correct for windowed calculations or joins- # but lets us at least see something execute quickly. system.time(nrow(as.data.frame(execute(my_db, rquery_pipeline, source_limit = 1L)))) # full run system.time(nrow(as.data.frame(execute(my_db, rquery_pipeline)))) base::date()
base::date() system.time({ scale <- 0.237 dplyr_pipeline <- d_large_tbl %>% group_by(subjectID) %>% mutate(probability = exp(assessmentTotal * scale)/ sum(exp(assessmentTotal * scale), na.rm = TRUE)) %>% arrange(probability, surveyCategory) %>% filter(row_number() == n()) %>% ungroup() %>% rename(diagnosis = surveyCategory) %>% select(subjectID, diagnosis, probability) %>% arrange(subjectID) }) # full run system.time(nrow(as.data.frame(dplyr_pipeline))) base::date()
rquery
materialize_node()
works with rquery
's column narrowing calculations.
base::date() system.time({ scale <- 0.237 rquery_pipeline_cached <- d_large %.>% extend_nse(., probability := exp(assessmentTotal * scale)) %.>% normalize_cols(., "probability", partitionby = 'subjectID') %.>% pick_top_k(., partitionby = 'subjectID', orderby = c('probability', 'surveyCategory'), reverse = c('probability', 'surveyCategory')) %.>% materialize_node(., "tmp_res") %.>% # <- insert caching node into pipeline prior to narrowing rename_columns(., 'diagnosis' := 'surveyCategory') %.>% select_columns(., qc(subjectID, diagnosis, probability)) %.>% orderby(., 'subjectID') sql_list <- to_sql(rquery_pipeline_cached, my_db) }) for(i in seq_len(length(sql_list))) { print(paste("step", i)) cat(format(sql_list[[i]])) cat("\n") } # special debug-mode limits all sources to 1 row. # not correct for windowed calculations or joins- # but lets us at least see something execute quickly. system.time(nrow(as.data.frame(execute(my_db, rquery_pipeline_cached, source_limit = 1L)))) # full run system.time(nrow(as.data.frame(execute(my_db, rquery_pipeline_cached)))) base::date()
The introduction of a dplyr::compute()
node (with the intent of speeding things up through caching)
can be expensive.
base::date() system.time({ scale <- 0.237 d_large_tbl %>% group_by(subjectID) %>% mutate(probability = exp(assessmentTotal * scale)/ sum(exp(assessmentTotal * scale), na.rm = TRUE)) %>% arrange(probability, surveyCategory) %>% filter(row_number() == n()) %>% compute() %>% # <- inopportune place to try to cache ungroup() %>% rename(diagnosis = surveyCategory) %>% select(subjectID, diagnosis, probability) %>% arrange(subjectID) %>% as.data.frame() %>% # force calculation, and bring back result nrow() }) base::date()
Now, let's show how/where erroneous pipelines are debugged in each system.
In rquery
many user errors are caught during pipeline construction,
independent of database.
base::date() system.time({ scale <- 0.237 # rquery catches the error during pipeline definition, # prior to sending it to the database or Spark data system. rquery_pipeline_late_error <- d_large %.>% extend_nse(., probability := exp(assessmentTotal * scale)) %.>% normalize_cols(., "probability", partitionby = 'subjectID') %.>% pick_top_k(., partitionby = 'subjectID', orderby = c('probability', 'surveyCategory'), reverse = c('probability', 'surveyCategory')) %.>% rename_columns(., 'diagnosis' := 'surveyCategory') %.>% select_columns(., qc(subjectID, diagnosis, probability)) %.>% orderby(., 'ZubjectIDZZZ') # <- error non-existent column }) base::date()
With dplyr
user errors are mostly caught when the command is
analyzed on the remote data system.
base::date() system.time({ scale <- 0.237 # dplyr accepts an incorrect pipeline dplyr_pipeline_late_error <- d_large_tbl %>% group_by(subjectID) %>% mutate(probability = exp(assessmentTotal * scale)/ sum(exp(assessmentTotal * scale), na.rm = TRUE)) %>% arrange(probability, surveyCategory) %>% filter(row_number() == n()) %>% ungroup() %>% rename(diagnosis = surveyCategory) %>% select(subjectID, diagnosis, probability) %>% arrange(ZubjectIDZZZ) # <- error non-existent column }) # dplyr will generate (incorrect) SQL from the incorrect pipeline cat(dbplyr::remote_query(dplyr_pipeline_late_error)) # Fortunately, the database's query analyzer does catch the error quickly # in this case. system.time(nrow(as.data.frame(dplyr_pipeline_late_error))) base::date()
However, the remote command analysis can happen after many
expensive steps if there is also a dplyr::compute()
node
breaking up the steps. This can make debugging arbitrarily
slow and expensive.
base::date() system.time({ scale <- 0.237 dplyr_pipeline_c <- d_large_tbl %>% group_by(subjectID) %>% mutate(probability = exp(assessmentTotal * scale)/ sum(exp(assessmentTotal * scale), na.rm = TRUE)) %>% arrange(probability, surveyCategory) %>% filter(row_number() == n()) %>% compute() %>% # <- inopportune place to try to cache ungroup() %>% rename(diagnosis = surveyCategory) %>% select(subjectID, diagnosis, probability) %>% arrange(ZubjectIDZZZ) %>% # <- error non-existent column as.data.frame() %>% # force calculation, and bring back result nrow() }) base::date()
The above matters for large projects. It is as with all coding problems: it isn't enough to be one who would "never make such a mistake or be so careless", one would have to be part of a team where nobody would ever make such a mistake.
base::date() DBI::dbDisconnect(my_db) base::date()
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.