base::date()

library("dplyr")
library("rquery")
library("wrapr")


conf <-  sparklyr::spark_config()
conf$spark.yarn.am.cores <- 2
conf$spark.executor.cores <- 2
conf$spark.executor.memory <- "4G"
conf$spark.yarn.am.memory <- "4G"
conf$`sparklyr.shell.driver-memory` <- "4G"
conf$`sparklyr.shell.executor-memory` <- "4G"
conf$`spark.yarn.executor.memoryOverhead` <- "4G"
# conf$spark.yarn.am.cores <- 16
# conf$spark.executor.cores <- 16
# conf$spark.executor.memory <- "8G"
# conf$spark.yarn.am.memory <- "8G"
# conf$`sparklyr.shell.driver-memory` <- "8G"
# conf$`sparklyr.shell.executor-memory` <- "8G"
# conf$`spark.yarn.executor.memoryOverhead` <- "8G"
my_db <- sparklyr::spark_connect(version='2.2.0', 
                                 master = "local",
                                 config = conf)

# configure rquery options
dbopts <- rq_connection_tests(my_db)
print(dbopts)
options(dbopts)

base::date()
base::date()

# build up medium sized example data
nSubj <- 100000
nIrrelCol <- 500

d_local <- data.frame(subjectID = sort(rep(seq_len(nSubj),2)),
                 surveyCategory = c(
                   'withdrawal behavior',
                   'positive re-framing'),
                 stringsAsFactors = FALSE)
d_local$assessmentTotal <- sample.int(10, nrow(d_local), replace = TRUE)
irrel_col_1 <- paste("irrelevantCol", sprintf("%07g", 1), sep = "_")
d_local[[irrel_col_1]] <- runif(nrow(d_local))
d_small <- rquery::rq_copy_to(my_db, 'd_small',
                 d_local,
                 overwrite = TRUE, 
                 temporary = TRUE)
rm(list = "d_local")
# cdata::qlook(my_db, d_small$table_name)

base::date()
base::date()

# add in irrelevant columns
# simulates performing a calculation against a larger data mart
assignments <- 
  vapply(2:nIrrelCol, 
         function(i) {
           paste("irrelevantCol", sprintf("%07g", i), sep = "_")
         }, character(1)) := 
  vapply(2:nIrrelCol, 
         function(i) {
           paste(irrel_col_1, "+", i)
         }, character(1))
d_large <- d_small %.>%
  extend_se(., assignments) %.>%
  materialize(my_db, ., 
              overwrite = TRUE,
              temporary = TRUE)
rm(list = "d_small")
# cdata::qlook(my_db, d_large$table_name)

# build dplyr reference
d_large_tbl <- tbl(my_db, d_large$table_name)

# rquery view of table
rquery::rq_nrow(my_db, d_large$table_name)
length(column_names(d_large))

# dplyr/tbl view of table
sparklyr::sdf_nrow(d_large_tbl)
sparklyr::sdf_ncol(d_large_tbl)

base::date()

Define and demonstrate pipelines:

base::date()

system.time({
  scale <- 0.237

  rquery_pipeline <- d_large %.>%
    extend_nse(.,
               probability :=
                 exp(assessmentTotal * scale))  %.>% 
    normalize_cols(.,
                   "probability",
                   partitionby = 'subjectID') %.>%
    pick_top_k(.,
               partitionby = 'subjectID',
               orderby = c('probability', 'surveyCategory'),
               reverse = c('probability', 'surveyCategory')) %.>%
    rename_columns(., 'diagnosis' := 'surveyCategory') %.>%
    select_columns(., qc(subjectID, diagnosis, probability)) %.>%
    orderby(., 'subjectID') 
})

# special debug-mode limits all sources to 1 row.
# not correct for windowed calculations or joins- 
# but lets us at least see something execute quickly.
system.time(nrow(as.data.frame(execute(my_db, rquery_pipeline, source_limit = 1L))))

# full run
system.time(nrow(as.data.frame(execute(my_db, rquery_pipeline))))

base::date()
base::date()

system.time({
  scale <- 0.237

  dplyr_pipeline <- d_large_tbl %>%
    group_by(subjectID) %>%
    mutate(probability =
             exp(assessmentTotal * scale)/
             sum(exp(assessmentTotal * scale), na.rm = TRUE)) %>%
    arrange(probability, surveyCategory) %>%
    filter(row_number() == n()) %>%
    ungroup() %>%
    rename(diagnosis = surveyCategory) %>%
    select(subjectID, diagnosis, probability) %>%
    arrange(subjectID)
})

# full run
system.time(nrow(as.data.frame(dplyr_pipeline)))

base::date()

rquery materialize_node() works with rquery's column narrowing calculations.

base::date()

system.time({
  scale <- 0.237

  rquery_pipeline_cached <- d_large %.>%
    extend_nse(.,
               probability :=
                 exp(assessmentTotal * scale))  %.>% 
    normalize_cols(.,
                   "probability",
                   partitionby = 'subjectID') %.>%
    pick_top_k(.,
               partitionby = 'subjectID',
               orderby = c('probability', 'surveyCategory'),
               reverse = c('probability', 'surveyCategory')) %.>%
    materialize_node(., "tmp_res") %.>%  # <- insert caching node into pipeline prior to narrowing
    rename_columns(., 'diagnosis' := 'surveyCategory') %.>%
    select_columns(., qc(subjectID, diagnosis, probability)) %.>%
    orderby(., 'subjectID') 

  sql_list <- to_sql(rquery_pipeline_cached, my_db)
})

for(i in seq_len(length(sql_list))) {
  print(paste("step", i))
  cat(format(sql_list[[i]]))
  cat("\n")
}

# special debug-mode limits all sources to 1 row.
# not correct for windowed calculations or joins- 
# but lets us at least see something execute quickly.
system.time(nrow(as.data.frame(execute(my_db, rquery_pipeline_cached, source_limit = 1L))))

# full run
system.time(nrow(as.data.frame(execute(my_db, rquery_pipeline_cached))))

base::date()

And the introduction of a dplyr::compute() node (with the intent of speeding things up through caching) can be expensive.

base::date()

system.time({
  scale <- 0.237

  dplyr_pipeline_c <- d_large_tbl %>%
    group_by(subjectID) %>%
    mutate(probability =
             exp(assessmentTotal * scale)/
             sum(exp(assessmentTotal * scale), na.rm = TRUE)) %>%
    arrange(probability, surveyCategory) %>%
    filter(row_number() == n()) %>%
    compute() %>%     # <- inopportune place to try to cache
    ungroup() %>%
    rename(diagnosis = surveyCategory) %>%
    select(subjectID, diagnosis, probability) %>%
    arrange(subjectID) %>%
    as.data.frame() %>%
    nrow()
})


base::date()

For larger examples the above dplyr pipeline often errors-out at the compute() step with:

# Logs the following to the console and seems to never come back.
# *** caught segfault ***
# address 0x7fd368200000, cause 'memory not mapped'

Now, let's show how/where erroneous pipelines are debugged in each system.

In rquery many user errors are caught during pipeline construction, independent of database.

base::date()

system.time({
  scale <- 0.237

  # rquery catches the error during pipeline definition,
  # prior to sending it to the database or Spark data system.
  rquery_pipeline_late_error <- d_large %.>%
    extend_nse(.,
               probability :=
                 exp(assessmentTotal * scale))  %.>% 
    normalize_cols(.,
                   "probability",
                   partitionby = 'subjectID') %.>%
    pick_top_k(.,
               partitionby = 'subjectID',
               orderby = c('probability', 'surveyCategory'),
               reverse = c('probability', 'surveyCategory')) %.>%
    rename_columns(., 'diagnosis' := 'surveyCategory') %.>%
    select_columns(., qc(subjectID, diagnosis, probability)) %.>%
    orderby(., 'ZubjectIDZZZ') # <- error non-existent column
})

base::date()

With dplyr user errors are mostly caught when the command is analyzed on the remote data system.

base::date()

system.time({
  scale <- 0.237

  # dplyr accepts an incorrect pipeline
  dplyr_pipeline_late_error <- d_large_tbl %>%
    group_by(subjectID) %>%
    mutate(probability =
             exp(assessmentTotal * scale)/
             sum(exp(assessmentTotal * scale), na.rm = TRUE)) %>%
    arrange(probability, surveyCategory) %>%
    filter(row_number() == n()) %>%
    ungroup() %>%
    rename(diagnosis = surveyCategory) %>%
    select(subjectID, diagnosis, probability) %>%
    arrange(ZubjectIDZZZ)  # <- error non-existent column
})

# dplyr will generate (incorrect) SQL from the incorrect pipeline
cat(dbplyr::remote_query(dplyr_pipeline_late_error))

# Fortunately, Spark's query analyzer does catch the error quickly
# in this case.
system.time(nrow(as.data.frame(dplyr_pipeline_late_error)))

base::date()
base::date()

system.time({
  scale <- 0.237

  dplyr_pipeline_c <- d_large_tbl %>%
    group_by(subjectID) %>%
    mutate(probability =
             exp(assessmentTotal * scale)/
             sum(exp(assessmentTotal * scale), na.rm = TRUE)) %>%
    arrange(probability, surveyCategory) %>%
    filter(row_number() == n()) %>%
    compute() %>%     # <- inopportune place to try to cache
    ungroup() %>%
    rename(diagnosis = surveyCategory) %>%
    select(subjectID, diagnosis, probability) %>%
    arrange(ZubjectIDZZZ) %>% # <- error non-existent column
    as.data.frame() %>%
    nrow()
})

base::date()
base::date()
sparklyr::spark_disconnect(my_db)
base::date()


WinVector/rquery documentation built on Aug. 24, 2023, 11:12 a.m.