extras/PerfTest.md

PerfTest

2017-12-16

Define and demonstrate pipelines:

DBI::dbGetQuery(my_db, paste("SELECT COUNT(1) FROM", d$table_name))
##   count(1)
## 1    2e+05
length(column_names(d))
## [1] 1003
sparklyr::sdf_nrow(dT)
## [1] 2e+05
sparklyr::sdf_ncol(dT)
## [1] 1003
scale <- 0.237
tng <- cdata::makeTempNameGenerator("tmptab")

rquery_run <- function(collect) {
  dq <- d %.>%
    extend_nse(.,
               probability :=
                 exp(assessmentTotal * scale)/
                 sum(exp(assessmentTotal * scale)),
               count := count(1),
               partitionby = 'subjectID') %.>%
    extend_nse(.,
               rank := rank(),
               partitionby = 'subjectID',
               orderby = c('probability', 'surveyCategory'))  %.>%
    rename_columns(., 'diagnosis' := 'surveyCategory') %.>%
    select_rows_nse(., rank == count) %.>%
    select_columns(., c('subjectID', 
                        'diagnosis', 
                        'probability')) %.>%
    order_by(., 'subjectID')
  if(collect) {
    dR <- DBI::dbGetQuery(my_db, to_sql(dq))
  } else {
    tnam <- tng()
    sql <- paste("CREATE TABLE", tnam, "USING PARQUET OPTIONS ('compression'='snappy') AS", to_sql(dq))
    DBI::dbGetQuery(my_db, sql)
    dR <- rquery::dbi_table(my_db, tnam)
  }
  dR
}


dplyr_run <- function(narrow, collect) {
  dR <- dT
  if(narrow) {
    dR <- dR %>%
      select(subjectID, surveyCategory, assessmentTotal)
  }
  dR <- dR %>%
    group_by(subjectID) %>%
    mutate(probability =
             exp(assessmentTotal * scale)/
             sum(exp(assessmentTotal * scale))) %>%
    arrange(probability, surveyCategory) %>%
    filter(row_number() == n()) %>%
    ungroup() %>%
    rename(diagnosis = surveyCategory) %>%
    select(subjectID, diagnosis, probability) %>%
    arrange(subjectID)
  if(collect) {
    dR <- collect(dR)
  } else {
    dR <- compute(dR)
  }
  dR
}

head(rquery_run(collect=TRUE))
##   subjectID           diagnosis probability
## 1         1 positive re-framing   0.7207128
## 2         2 withdrawal behavior   0.7207128
## 3         3 withdrawal behavior   0.8694381
## 4         4 withdrawal behavior   0.6163301
## 5         5 withdrawal behavior   0.5589742
## 6         6 withdrawal behavior   0.5000000
dR <- rquery_run(collect=FALSE) 
DBI::dbGetQuery(my_db, 
                paste("SELECT * FROM", dR$table_name, "LIMIT 6"))
##   subjectID           diagnosis probability
## 1         1 positive re-framing   0.7207128
## 2         2 withdrawal behavior   0.7207128
## 3         3 withdrawal behavior   0.8694381
## 4         4 withdrawal behavior   0.6163301
## 5         5 withdrawal behavior   0.5589742
## 6         6 withdrawal behavior   0.5000000
head(dplyr_run(narrow=FALSE, collect=TRUE))
## # A tibble: 6 x 3
##   subjectID diagnosis           probability
##       <int> <chr>                     <dbl>
## 1         1 positive re-framing       0.721
## 2         2 withdrawal behavior       0.721
## 3         3 withdrawal behavior       0.869
## 4         4 withdrawal behavior       0.616
## 5         5 withdrawal behavior       0.559
## 6         6 withdrawal behavior       0.500
head(dplyr_run(narrow=FALSE, collect=FALSE))
## # Source: lazy query [?? x 3]
## # Database: spark_connection
## # Ordered by: probability, surveyCategory, subjectID
##   subjectID diagnosis           probability
##       <int> <chr>                     <dbl>
## 1         1 positive re-framing       0.721
## 2         2 withdrawal behavior       0.721
## 3         3 withdrawal behavior       0.869
## 4         4 withdrawal behavior       0.616
## 5         5 withdrawal behavior       0.559
## 6         6 withdrawal behavior       0.500
head(dplyr_run(narrow=TRUE, collect=TRUE))
## # A tibble: 6 x 3
##   subjectID diagnosis           probability
##       <int> <chr>                     <dbl>
## 1         1 positive re-framing       0.721
## 2         2 withdrawal behavior       0.721
## 3         3 withdrawal behavior       0.869
## 4         4 withdrawal behavior       0.616
## 5         5 withdrawal behavior       0.559
## 6         6 withdrawal behavior       0.500
head(dplyr_run(narrow=TRUE, collect=FALSE))
## # Source: lazy query [?? x 3]
## # Database: spark_connection
## # Ordered by: probability, surveyCategory, subjectID
##   subjectID diagnosis           probability
##       <int> <chr>                     <dbl>
## 1         1 positive re-framing       0.721
## 2         2 withdrawal behavior       0.721
## 3         3 withdrawal behavior       0.869
## 4         4 withdrawal behavior       0.616
## 5         5 withdrawal behavior       0.559
## 6         6 withdrawal behavior       0.500

Get timings:

timings <- microbenchmark(rquery_run(collect=TRUE), 
                          dplyr_run(narrow=FALSE, collect=TRUE), 
                          dplyr_run(narrow=TRUE, collect=TRUE),
                          rquery_run(collect=FALSE), 
                          dplyr_run(narrow=FALSE, collect=FALSE), 
                          dplyr_run(narrow=TRUE, collect=FALSE),
                          times = 10)

Present results:

library("seplyr")
print(timings)
## Unit: milliseconds
##                                        expr       min        lq     mean
##                  rquery_run(collect = TRUE)  890.9246  941.8151 1161.304
##   dplyr_run(narrow = FALSE, collect = TRUE) 2904.4212 3032.7618 3649.638
##    dplyr_run(narrow = TRUE, collect = TRUE) 1756.4779 1818.9597 2248.982
##                 rquery_run(collect = FALSE) 1129.4122 1154.4881 1495.734
##  dplyr_run(narrow = FALSE, collect = FALSE) 2502.9198 2518.5096 3061.362
##   dplyr_run(narrow = TRUE, collect = FALSE)  941.0073 1014.8044 1335.589
##    median       uq      max neval
##  1087.370 1246.242 1660.316    10
##  3690.537 3876.728 4846.509    10
##  2189.509 2621.193 3058.776    10
##  1566.103 1731.228 1934.375    10
##  2639.477 3272.162 5392.853    10
##  1247.096 1379.141 2756.205    10
tdf <- as.data.frame(timings)

# order the data
tdf <- tdf %.>%
  group_by_se(., "expr") %.>%
  mutate_se(., qae(mtime := median(time))) %.>%
  ungroup(.)

tdf$expr <- reorder(tdf$expr, tdf$mtime)
WVPlots::ScatterBoxPlotH(tdf, "time", "expr",  
                         pt_alpha=0.2,
                         title="Execution times in NS")

My guess is the timings are seeing some trends and two irregularities:

sparklyr::spark_disconnect(my_db)


YTLogos/rquery documentation built on May 19, 2019, 1:46 a.m.