base::date()

library("dplyr")
library("rquery")
library("wrapr")

my_db <- DBI::dbConnect(RPostgreSQL::PostgreSQL(),
                        host = 'localhost',
                        port = 5432,
                        user = 'johnmount',
                        password = '')

# configure rquery options
dbopts <- rq_connection_tests(
  my_db,
  # seeing a bug here in RPostgreSQL, possibly on CREATE queries
  overrides = c(use_DBI_dbExistsTable = FALSE, use_DBI_dbRemoveTable = FALSE) 
)
print(dbopts)
options(dbopts)

base::date()
base::date()

# build up medium sized example data
nSubj <- 100000
nIrrelCol <- 500

d_local <- data.frame(subjectID = sort(rep(seq_len(nSubj),2)),
                 surveyCategory = c(
                   'withdrawal behavior',
                   'positive re-framing'),
                 stringsAsFactors = FALSE)
d_local$assessmentTotal <- sample.int(10, nrow(d_local), replace = TRUE)
irrel_col_1 <- paste("irrelevantCol", sprintf("%07g", 1), sep = "_")
d_local[[irrel_col_1]] <- runif(nrow(d_local))
d_small <- rquery::rq_copy_to(my_db, 'd_small',
                 d_local,
                 overwrite = TRUE, 
                 temporary = TRUE)
rm(list = "d_local")
# cdata::qlook(my_db, d_small$table_name)

base::date()
base::date()

# add in irrelevant columns
# simulates performing a calculation against a larger data mart
assignments <- 
  vapply(2:nIrrelCol, 
         function(i) {
           paste("irrelevantCol", sprintf("%07g", i), sep = "_")
         }, character(1)) := 
  vapply(2:nIrrelCol, 
         function(i) {
           paste(irrel_col_1, "+", i)
         }, character(1))
d_large <- d_small %.>%
  extend_se(., assignments) %.>%
  materialize(my_db, ., 
              overwrite = TRUE,
              temporary = TRUE)
rm(list = "d_small")
# cdata::qlook(my_db, d_large$table_name)

# build dplyr reference
d_large_tbl <- tbl(my_db, d_large$table_name)

# rquery view of table
rquery::rq_nrow(my_db, d_large$table_name)
length(column_names(d_large))

base::date()

Define and demonstrate pipelines:

base::date()

system.time({
  scale <- 0.237

  rquery_pipeline <- d_large %.>%
    extend_nse(.,
               probability :=
                 exp(assessmentTotal * scale))  %.>% 
    normalize_cols(.,
                   "probability",
                   partitionby = 'subjectID') %.>%
    pick_top_k(.,
               partitionby = 'subjectID',
               orderby = c('probability', 'surveyCategory'),
               reverse = c('probability', 'surveyCategory')) %.>%
    rename_columns(., 'diagnosis' := 'surveyCategory') %.>%
    select_columns(., qc(subjectID, diagnosis, probability)) %.>%
    orderby(., 'subjectID') 
})

# special debug-mode limits all sources to 1 row.
# not correct for windowed calculations or joins- 
# but lets us at least see something execute quickly.
system.time(nrow(as.data.frame(execute(my_db, rquery_pipeline, 
                                       source_limit = 1L))))

# full run
system.time(nrow(as.data.frame(execute(my_db, rquery_pipeline))))

base::date()
base::date()

system.time({
  scale <- 0.237

  dplyr_pipeline <- d_large_tbl %>%
    group_by(subjectID) %>%
    mutate(probability =
             exp(assessmentTotal * scale)/
             sum(exp(assessmentTotal * scale), na.rm = TRUE)) %>%
    arrange(probability, surveyCategory) %>%
    filter(row_number() == n()) %>%
    ungroup() %>%
    rename(diagnosis = surveyCategory) %>%
    select(subjectID, diagnosis, probability) %>%
    arrange(subjectID)
})

# full run
system.time(nrow(as.data.frame(dplyr_pipeline)))

base::date()

rquery materialize_node() works with rquery's column narrowing calculations.

base::date()

system.time({
  scale <- 0.237

  rquery_pipeline_cached <- d_large %.>%
    extend_nse(.,
               probability :=
                 exp(assessmentTotal * scale))  %.>% 
    normalize_cols(.,
                   "probability",
                   partitionby = 'subjectID') %.>%
    pick_top_k(.,
               partitionby = 'subjectID',
               orderby = c('probability', 'surveyCategory'),
               reverse = c('probability', 'surveyCategory')) %.>%
    materialize_node(., "tmp_res") %.>%  # <- insert caching node into pipeline prior to narrowing
    rename_columns(., 'diagnosis' := 'surveyCategory') %.>%
    select_columns(., qc(subjectID, diagnosis, probability)) %.>%
    orderby(., 'subjectID') 

  sql_list <- to_sql(rquery_pipeline_cached, my_db)
})

for(i in seq_len(length(sql_list))) {
  print(paste("step", i))
  cat(format(sql_list[[i]]))
  cat("\n")
}

# special debug-mode limits all sources to 1 row.
# not correct for windowed calculations or joins- 
# but lets us at least see something execute quickly.
system.time(nrow(as.data.frame(execute(my_db, rquery_pipeline_cached, source_limit = 1L))))

# full run
system.time(nrow(as.data.frame(execute(my_db, rquery_pipeline_cached))))

base::date()

The introduction of a dplyr::compute() node (with the intent of speeding things up through caching) can be expensive.

base::date()

system.time({
  scale <- 0.237

  d_large_tbl %>%
    group_by(subjectID) %>%
    mutate(probability =
             exp(assessmentTotal * scale)/
             sum(exp(assessmentTotal * scale), na.rm = TRUE)) %>%
    arrange(probability, surveyCategory) %>%
    filter(row_number() == n()) %>%
    compute() %>%     # <- inopportune place to try to cache
    ungroup() %>%
    rename(diagnosis = surveyCategory) %>%
    select(subjectID, diagnosis, probability) %>%
    arrange(subjectID) %>%
    as.data.frame() %>% # force calculation, and bring back result
    nrow()
})

base::date()

Now, let's show how/where erroneous pipelines are debugged in each system.

In rquery many user errors are caught during pipeline construction, independent of database.

base::date()

system.time({
  scale <- 0.237

  # rquery catches the error during pipeline definition,
  # prior to sending it to the database or Spark data system.
  rquery_pipeline_late_error <- d_large %.>%
    extend_nse(.,
               probability :=
                 exp(assessmentTotal * scale))  %.>% 
    normalize_cols(.,
                   "probability",
                   partitionby = 'subjectID') %.>%
    pick_top_k(.,
               partitionby = 'subjectID',
               orderby = c('probability', 'surveyCategory'),
               reverse = c('probability', 'surveyCategory')) %.>%
    rename_columns(., 'diagnosis' := 'surveyCategory') %.>%
    select_columns(., qc(subjectID, diagnosis, probability)) %.>%
    orderby(., 'ZubjectIDZZZ') # <- error non-existent column
})

base::date()

With dplyr user errors are mostly caught when the command is analyzed on the remote data system.

base::date()

system.time({
  scale <- 0.237

  # dplyr accepts an incorrect pipeline
  dplyr_pipeline_late_error <- d_large_tbl %>%
    group_by(subjectID) %>%
    mutate(probability =
             exp(assessmentTotal * scale)/
             sum(exp(assessmentTotal * scale), na.rm = TRUE)) %>%
    arrange(probability, surveyCategory) %>%
    filter(row_number() == n()) %>%
    ungroup() %>%
    rename(diagnosis = surveyCategory) %>%
    select(subjectID, diagnosis, probability) %>%
    arrange(ZubjectIDZZZ)  # <- error non-existent column
})

# dplyr will generate (incorrect) SQL from the incorrect pipeline
cat(dbplyr::remote_query(dplyr_pipeline_late_error))

# Fortunately, the database's query analyzer does catch the error quickly
# in this case.
system.time(nrow(as.data.frame(dplyr_pipeline_late_error)))

base::date()

However, the remote command analysis can happen after many expensive steps if there is also a dplyr::compute() node breaking up the steps. This can make debugging arbitrarily slow and expensive.

base::date()

system.time({
  scale <- 0.237

  dplyr_pipeline_c <- d_large_tbl %>%
    group_by(subjectID) %>%
    mutate(probability =
             exp(assessmentTotal * scale)/
             sum(exp(assessmentTotal * scale), na.rm = TRUE)) %>%
    arrange(probability, surveyCategory) %>%
    filter(row_number() == n()) %>%
    compute() %>%     # <- inopportune place to try to cache
    ungroup() %>%
    rename(diagnosis = surveyCategory) %>%
    select(subjectID, diagnosis, probability) %>%
    arrange(ZubjectIDZZZ) %>% # <- error non-existent column
    as.data.frame() %>%  # force calculation, and bring back result
    nrow()
})

base::date()

The above matters for large projects. It is as with all coding problems: it isn't enough to be one who would "never make such a mistake or be so careless", one would have to be part of a team where nobody would ever make such a mistake.

base::date()
DBI::dbDisconnect(my_db)
base::date()


WinVector/rquery documentation built on Aug. 24, 2023, 11:12 a.m.