John Mount, Win-Vector LLC 06/06/2018
We can work an example similar to the rquery
example using a data.table
back-end (rqdatatable
).
Some details: OSX 10.13.4 on a 2.8 GHz Intel Core i5 Mac Mini (Late 2015 model) with 8GB ram and hybrid disk drive.
library("microbenchmark")
library("dplyr")
##
## Attaching package: 'dplyr'
## The following objects are masked from 'package:stats':
##
## filter, lag
## The following objects are masked from 'package:base':
##
## intersect, setdiff, setequal, union
library("dtplyr")
# https://github.com/WinVector/rqdatatable
library("rqdatatable") # devtools::install.packages("WinVector/rqdatatable")
## Loading required package: rquery
print("R.version.string")
## [1] "R.version.string"
packageVersion("dplyr")
## [1] '0.7.5'
packageVersion("dtplyr")
## [1] '0.0.2'
packageVersion("dbplyr")
## [1] '1.2.1'
packageVersion("data.table")
## [1] '1.11.4'
packageVersion("rquery")
## [1] '0.5.0'
# data example
set.seed(2362)
mk_example <- function(nsubjects, nirrelcols) {
d <- rbind(data.frame(subjectID = seq_len(nsubjects),
surveyCategory = "withdrawal behavior",
stringsAsFactors = FALSE),
data.frame(subjectID = seq_len(nsubjects),
surveyCategory = "positive re-framing",
stringsAsFactors = FALSE))
d <- d[order(d$subjectID, d$surveyCategory), , drop = FALSE]
d$assessmentTotal <- rbinom(nrow(d), 10, 0.3)
for(i in seq_len(nirrelcols)) {
d[[paste0("irrelevantCol_", i)]] <- runif(nrow(d))
}
rownames(d) <- NULL
d
}
dL <- mk_example(2, 0)
scale <- 0.237
# example rquery pipeline
rquery_pipeline <- local_td(dL) %.>%
extend_nse(.,
probability :=
exp(assessmentTotal * scale)) %.>%
normalize_cols(.,
"probability",
partitionby = 'subjectID') %.>%
pick_top_k(.,
k = 1,
partitionby = 'subjectID',
orderby = c('probability', 'surveyCategory'),
reverse = c('probability', 'surveyCategory')) %.>%
rename_columns(., c('diagnosis' = 'surveyCategory')) %.>%
select_columns(., c('subjectID',
'diagnosis',
'probability')) %.>%
orderby(., cols = 'subjectID')
Show expanded form of query tree.
cat(format(rquery_pipeline))
table('dL';
subjectID,
surveyCategory,
assessmentTotal) %.>%
extend(.,
probability := exp(assessmentTotal * scale)) %.>%
extend(.,
probability := probability / sum(probability),
p= subjectID) %.>%
extend(.,
row_rank := rank(),
p= subjectID,
o= "probability" DESC, "surveyCategory" DESC) %.>%
select_rows(.,
row_rank <= 1) %.>%
rename(.,
c('diagnosis' = 'surveyCategory')) %.>%
select_columns(.,
subjectID, diagnosis, probability) %.>%
orderby(., subjectID)
Execute rquery
pipeline using data.table
as the implementation.
ex_data_table(rquery_pipeline) %.>%
knitr::kable(.)
| subjectID| diagnosis | probability| |----------:|:--------------------|------------:| | 1| positive re-framing | 0.6706221| | 2| positive re-framing | 0.5589742|
Execute rquery
pipeline using PostgreSQL
as the implementation.
# configure a database connection
my_db <- DBI::dbConnect(RPostgreSQL::PostgreSQL(),
host = 'localhost',
port = 5432,
user = 'johnmount',
password = '')
dbopts <- rquery::rq_connection_tests(my_db)
options(dbopts)
# build the shared handle
old_o <- options(list("rquery.rquery_db_executor" = list(db = my_db)))
# run the job
execute(dL, rquery_pipeline, allow_executor = FALSE) %.>%
knitr::kable(.)
| subjectID| diagnosis | probability| |----------:|:--------------------|------------:| | 1| positive re-framing | 0.6706221| | 2| positive re-framing | 0.5589742|
dplyr
pipeline.
scale <- 0.237
dplyr_pipeline <- . %>%
select(subjectID, surveyCategory, assessmentTotal) %>% # narrow to columns of interest
rename(diagnosis = surveyCategory) %>%
mutate(probability = exp(assessmentTotal * scale)) %>%
group_by(subjectID) %>%
mutate(probability = probability / sum(probability, na.rm = TRUE)) %>%
arrange(probability, diagnosis) %>%
mutate(isDiagnosis = row_number() == n()) %>% # try to avoid grouped filtering overhead
ungroup() %>%
filter(isDiagnosis) %>%
select(subjectID, diagnosis, probability) %>%
arrange(subjectID)
dL %>%
dplyr_pipeline %>%
knitr::kable()
| subjectID| diagnosis | probability| |----------:|:--------------------|------------:| | 1| positive re-framing | 0.6706221| | 2| positive re-framing | 0.5589742|
Try dtplyr
.
data.table::as.data.table(dL) %>%
dplyr_pipeline
## Error in rank(x, ties.method = "first", na.last = "keep"): argument "x" is missing, with no default
Idiomatic data.table
pipeline.
# improved code from:
# http://www.win-vector.com/blog/2018/01/base-r-can-be-fast/#comment-66746
data.table_function <- function(dL) {
# data.table is paying for this copy in its timings (not quite fair)
# so we will try to minimize it by narrowing columns.
dDT <- data.table::as.data.table(dL[, c("subjectID", "surveyCategory", "assessmentTotal")])
data.table::setnames(dDT, old = "surveyCategory", new = "diagnosis")
dDT[, probability := exp(assessmentTotal * scale)]
dDT[, probability := probability / sum( probability ), subjectID ]
data.table::setorder(dDT, subjectID, probability, diagnosis)
dDT <- dDT[, .SD[.N], subjectID]
dDT[, assessmentTotal := NULL]
data.table::setorder(dDT, subjectID)
}
data.table_function(dL) %.>%
knitr::kable(.)
| subjectID| diagnosis | probability| |----------:|:--------------------|------------:| | 1| positive re-framing | 0.6706221| | 2| positive re-framing | 0.5589742|
stats_aggregate_soln <- function(d) {
d <- d[order(d$subjectID, d$surveyCategory), , drop=FALSE]
# compute un-normalized probability
d$probability <- exp(d$assessmentTotal * scale)
# set up of for selection
dmax <- stats::aggregate(d$probability,
by = list(subjectID = d$subjectID),
FUN = max)
maxv <- dmax$x
names(maxv) <- dmax$subjectID
# set up for normalization
dsum <- stats::aggregate(d$probability,
by = list(subjectID = d$subjectID),
FUN = sum)
sumv <- dsum$x
names(sumv) <- dsum$subjectID
# start selection
d$maxv <- maxv[d$subjectID]
d <- d[d$probability >= d$maxv,
,
drop=FALSE]
# de-dup
d$rownum <- seq_len(nrow(d))
drow <- stats::aggregate(d$rownum,
by = list(subjectID = d$subjectID),
FUN = max)
maxv <- drow$x
names(maxv) <- drow$subjectID
d$rmax <- maxv[d$subjectID]
d <- d[d$rownum >= d$rmax, , drop=FALSE]
# renormalize
d$probability <- d$probability/sumv[d$subjectID]
d <- d[, c("subjectID", "surveyCategory", "probability")]
colnames(d)[[2]] <- "diagnosis"
rownames(d) <- NULL
d
}
Timings on a larger example.
nSubj <- 10000
dL <- mk_example(nSubj, 10)
# and an in-database copy
dR <- rquery::rq_copy_to(my_db, table_name = "dL", dL,
temporary = TRUE, overwrite = TRUE)
dRtbl <- dplyr::tbl(my_db, dR$table_name)
dplyr_round_trip <- function(dL) {
# https://github.com/tidyverse/dplyr/issues/3026#issuecomment-339035129
DBI::dbWriteTable(my_db, "dplyr_tmp",
select(dL, subjectID, surveyCategory, assessmentTotal),
overwrite = TRUE, temporary = TRUE)
as.data.frame(dplyr_pipeline(dplyr::tbl(my_db, "dplyr_tmp")))
}
# show we are working on the new larger data and results agree
dLorig <- dL
ref <- as.data.frame(ex_data_table(rquery_pipeline))
# sensible consequences we can check
assertthat::assert_that(min(ref$probability)>=0.5)
## [1] TRUE
assertthat::assert_that(assertthat::are_equal(nSubj, nrow(ref)))
## [1] TRUE
assertthat::assert_that(assertthat::are_equal(ref$subjectID, seq_len(nSubj)))
## [1] TRUE
assertthat::assert_that(assertthat::are_equal(colnames(ref), c("subjectID", "diagnosis", "probability")))
## [1] TRUE
# from database version
c0 <- as.data.frame(execute(my_db, rquery_pipeline, allow_executor = FALSE))
assertthat::assert_that(assertthat::are_equal(ref, c0))
## [1] TRUE
# database round trip version
c1 <- as.data.frame(execute(dL, rquery_pipeline, allow_executor = FALSE))
assertthat::assert_that(assertthat::are_equal(ref, c1))
## [1] TRUE
c2 <- as.data.frame(dplyr_pipeline(dL))
assertthat::assert_that(assertthat::are_equal(ref, c2))
## [1] TRUE
c2b <- as.data.frame(dplyr_pipeline(dplyr::as.tbl(dL)))
assertthat::assert_that(assertthat::are_equal(ref, c2b))
## [1] TRUE
# from database version
c3 <- as.data.frame(dplyr_pipeline(dRtbl))
assertthat::assert_that(assertthat::are_equal(ref, c3))
## [1] TRUE
# database round trip version
# narrow by hand before copying to give all advantages.
c4 <- dplyr_round_trip(dL)
assertthat::assert_that(assertthat::are_equal(ref, c4))
## [1] TRUE
c5 <- as.data.frame(data.table_function(dL))
assertthat::assert_that(assertthat::are_equal(ref, c5))
## [1] TRUE
c6 <- stats_aggregate_soln(dL)
assertthat::assert_that(assertthat::are_equal(ref, c6))
## [1] TRUE
# confirm no side-effects back to orginal frame
assertthat::assert_that(assertthat::are_equal(dLorig, dL))
## [1] TRUE
rm(list = c("dL", "dLorig", "dR", "dRtbl",
"ref", "c0", "c1", "c2", "c2b",
"c3", "c4", "c5", "c6"))
all_timings <- NULL
sizes <- expand.grid(c(1, 2, 5), 10^(0:6)) %.>%
(.$Var1 * .$Var2) %.>%
sort(.)
for(nSubj in sizes) {
# print("******")
# print(paste("nSubj", nSubj))
dL <- mk_example(nSubj, 10)
# and a tbl version
dLt <- dplyr::as.tbl(dL)
# # and an in-database copy
# dR <- rq_copy_to(my_db, table_name = "dL", dL,
# temporary = TRUE, overwrite = TRUE)
# dRtbl <- dplyr::tbl(my_db, dR$table_name)
timings <- microbenchmark(times = 5L,
rquery_database_round_trip = nrow(execute(dL, rquery_pipeline, allow_executor = FALSE)),
# rquery_database_read = nrow(as.data.frame(execute(my_db, rquery_pipeline, allow_executor = FALSE))),
rquery_data.table = nrow(ex_data_table(rquery_pipeline)),
data.table = nrow(data.table_function(dL)),
dplyr_data_frame = nrow(dplyr_pipeline(dL)),
dplyr_tbl = nrow(dplyr_pipeline(dLt)),
# dplyr_database_read = nrow(as.data.frame(dplyr_pipeline(dRtbl))),
dplyr_database_round_trip = nrow(dplyr_round_trip(dL)),
base_r_stats_aggregate = nrow(stats_aggregate_soln(dL))
)
#print(timings)
timings <- as.data.frame(timings)
timings$nrows <- nrow(dL)
timings$ncols <- ncol(dL)
all_timings <- rbind(all_timings, as.data.frame(timings))
}
saveRDS(all_timings, "all_timings.RDS")
Please see here for presentation and plots.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.