suppressPackageStartupMessages(library(dwtools))
# presentation pkgs
library(knitr)
library(microbenchmark)
opts_chunk$set(cache=FALSE,collapse=TRUE, comment="#>", dev="svg", error=FALSE)
Sys.setenv(TZ="UTC")
options(scipen=100)
global_start <- proc.time()[[3]]

Benchmark meta information

Scope of the benchmark

Benchmark scenarios

SQL names used:

Tools tested

Index scenarios

Notes

There will be no cold/hot runs, all goes in a single session, but input dataset is re-populated each time to have a fresh dataset for each test.
Some of the combinations will not be available, e.g. sqlite does not support partition by, sqlite and postgres does not support pivot and unpivot, base R and dplyr does not support indices, dplyr does not support cross join. Some others will use a workarounds because of open issues in some packages, each case is commented.
Any database interface will include penalty of transferring data to/from R. Currently defined SQLite and postgres uses native db drivers via DBI so the data transferring penalty is minimized as much as possible. No tuning was made on database side (e.g. autovacuum). In case of comparing DBI to ODBC it would be better to benchmark off-R in databases as ODBC may be a bottleneck.
Only time will be measured, not the memory.

Reproducibility notes

Benchmark can be easily reproduced by running Rmd file used in the post generation.
User should update connection details to postgres db (host, port, user, pass) or exclude postgres from benchmark by commenting the "psql" line below in Benchmark configuration.
In case of benchmarking big data it might be desired to setup environment on amazon ec2 (up to 244GB RAM), see references at the end of document for amazon EC2 reproduction script on clean Ubuntu AMI.

Contribution

Feel free to PR improvements to the benchmark.
Document will be updated by merging PR.
Additionally if you believe there is a common task related to ETL which was not covered here feel free to PR.
PR must be runable from within R + packages, any external dependency tool must be easily optional as already included postgres.

Benchmark configuration

Currently supported benchmark volumes are from 1e5 to 1e8, by 0.5e.

N = 1e6
benchmark_eval = TRUE # useful while dev, to eval only single chunk

Database setup

Setup databases to test (comment "psql" line to exclude postgres benchmark):

# benchmark dbs
db_test = c("sqlite","sqlite_memory")
# optional dbs
#db_test <- c(db_test,"psql")

Packages used

Loading packages:

pkgs <- c("dplyr","tidyr","data.table","RSQLite","dwtools","optiRum")
if(("psql" %in% db_test) && !("RPostgreSQL" %in% pkgs)) pkgs <- c(pkgs,"RPostgreSQL")
suppressPackageStartupMessages(sapply(pkgs, require, character.only=TRUE))
pkgsVersion(pkgs, c(ver = .libPaths()[1]))
# sqlite
if(file.exists("sqlite.db")) file.remove("sqlite.db")
sqlite = list(drvName="SQLite",dbname="sqlite.db"); sqlite$conn = dbConnect(SQLite(), dbname=sqlite$dbname)
sqlite_memory = list(drvName="SQLite",dbname=":memory:"); sqlite_memory$conn = dbConnect(SQLite(), dbname=sqlite_memory$dbname)
options("dwtools.db.conns"=list(sqlite=sqlite,sqlite_memory=sqlite_memory))
if("psql" %in% db_test){
  psql <- list(drvName="PostgreSQL", host="localhost", port="5432", dbname="dwtools", user="dwtools")
  psql$conn <- dbConnect(PostgreSQL(), host=psql$host, port=psql$port, dbname=psql$dbname, user=psql$user, password="dwtools_pass")
  add.db.conns(psql=psql)
  all_tables <- c("db")
  # dynamic DROP TABLE
  db(paste0("SELECT table_schema, table_name FROM information_schema.tables WHERE table_schema = 'dwtools' AND table_name IN ('",paste(all_tables,collapse="','"),"')"),"psql")[
    ][,if(.N > 0) list(sql = paste0("DROP TABLE ",table_schema,".",table_name)) else data.table()
      ][,if(.N > 0) list(db(sql,"psql"))]
  invisible()
}
options("dwtools.timing.conn.name"=NULL) # always log to memory, can be change to desired db
options("dwtools.timing"=FALSE) # all timings will be prepared ad-hoc, no auto-timing
options("dwtools.timing.nano"=TRUE)
options("dwtools.session"=as.integer(Sys.time())) # update session id for each run of Rmd
options("datatable.timing"=FALSE) # in case of using jangorecki/data.table@timing
options("datatable.auto.index"=FALSE)
options("datatable.showProgress"=FALSE)

Preview data

X = dw.populate(N, S=1, scenario="star", setkey=FALSE)
X = X[names(X) %in% c("SALES","PRODUCT")]
kable(head(X$SALES))

Benchmark

showtiming <- function(last=1L,trunc_expr=FALSE){
  timelog <- get.timing(trunc_expr,last=last)
  timelog[order(elapsed), list(expr,elapsed), by=setnames(rbindlist(lapply(strsplit(tag, ";", fixed="TRUE"),as.list)),c("test","scenario","indexing","environment","tool","fun"))][,fun:=NULL]
}
print_expr <- function(x){
  copy(x)[order(indexing,scenario),indx:=seq_len(.N), list(environment,tool)][indx==1L][ # filter only to first test in each tool
    ][,cat(tool,": \x60",expr,"\x60<br>",sep=""),keyby=list(environment,tool)]
}

# filter sample - valid for dw.populate(S=1) and N: 1e3, and from 1e5 to 1e8, by 0.5
whereList <- function(N){
  W <- list()
  if(N==1e3){
    W[["1"]][["cust_code"]] <- c("id048","id004","id088")
    W[["1"]][["prod_code"]] <- c(4L,4L,2L)
    W[["1"]][["geog_code"]] <- c("RI","NV","MN")
    W[["2"]][["cust_code"]] <- c("id079","id044","id044")
    W[["2"]][["geog_code"]] <- c("NJ","HI","NH")
    W[["2"]][["curr_code"]] <- c("JPY","IRR","KRW")
    W[["3"]][["prod_code"]] <- c(5L,5L,2L)
    W[["3"]][["geog_code"]] <- c("NV","NE","SC")
  }
  else if(N==1e5){
    W[["1"]][["cust_code"]] <- c("id028","id015","id081")
    W[["1"]][["prod_code"]] <- c(922L,232L,550L)
    W[["1"]][["geog_code"]] <- c("IN","NM","DE")
    W[["2"]][["cust_code"]] <- c("id012","id087","id006")
    W[["2"]][["geog_code"]] <- c("OH","NH","UT")
    W[["2"]][["curr_code"]] <- c("HRK","LTL","NOK")
    W[["3"]][["prod_code"]] <- c(140L,852L,276L)
    W[["3"]][["geog_code"]] <- c("VA","CO","MN")
  }
  else if(N==1e6){
    W[["1"]][["cust_code"]] <- c("id088","id012","id073") # 1st filter
    W[["1"]][["prod_code"]] <- c(1402L,2707L,4461L)
    W[["1"]][["geog_code"]] <- c("RI","CO","NC")
    W[["2"]][["cust_code"]] <- c("id015","id008","id082") # 2nd filter
    W[["2"]][["geog_code"]] <- c("TX","SD","MS")
    W[["2"]][["curr_code"]] <- c("GEL","SEK","XPM")
    W[["3"]][["prod_code"]] <- c(5728L,2090L,3591L) # 3rd filter
    W[["3"]][["geog_code"]] <- c("IA","AK","MS")
  }
  else if(N==1e7){
    W[["1"]][["cust_code"]] <- c("id088","id025","id072") # 1st filter
    W[["1"]][["prod_code"]] <- c(75932L,80108L,42139L)
    W[["1"]][["geog_code"]] <- c("TX","WY","AK")
    W[["2"]][["cust_code"]] <- c("id030","id017","id001") # 2nd filter
    W[["2"]][["geog_code"]] <- c("WI","AZ","AR")
    W[["2"]][["curr_code"]] <- c("CNY","HUF","KZT")
    W[["3"]][["prod_code"]] <- c(27641L,22277L,66907L) # 3rd filter
    W[["3"]][["geog_code"]] <- c("IN","VA","CT")
  }
  else if(N==1e8){
    W[["1"]][["cust_code"]] <- c("id067","id062","id038") # 1st filter
    W[["1"]][["prod_code"]] <- c(431352L,930119L,493908L)
    W[["1"]][["geog_code"]] <- c("VT","WV","MO")
    W[["2"]][["cust_code"]] <- c("id007","id022","id004") # 2nd filter
    W[["2"]][["geog_code"]] <- c("SC","AK","ME")
    W[["2"]][["curr_code"]] <- c("UAH","CAD","IRR")
    W[["3"]][["prod_code"]] <- c(532993L,951482L,347098L) # 3rd filter
    W[["3"]][["geog_code"]] <- c("KY","IL","OK")
  }
  else if(N==5e5){
  W[["1"]][["cust_code"]] <- c("id086","id007","id068")
  W[["1"]][["prod_code"]] <- c(1298L,3676L,3647L)
  W[["1"]][["geog_code"]] <- c("ND","MI","NJ")
  W[["2"]][["cust_code"]] <- c("id043","id096","id075")
  W[["2"]][["geog_code"]] <- c("HI","KY","AZ")
  W[["2"]][["curr_code"]] <- c("CHF","ARS","PPC")
  W[["3"]][["prod_code"]] <- c(2848L,4743L,2543L)
  W[["3"]][["geog_code"]] <- c("TX","LA","MN")
}
else if(N==5e6){
  W[["1"]][["cust_code"]] <- c("id073","id058","id095")
  W[["1"]][["prod_code"]] <- c(26088L,14847L,40725L)
  W[["1"]][["geog_code"]] <- c("MS","NV","WY")
  W[["2"]][["cust_code"]] <- c("id046","id090","id062")
  W[["2"]][["geog_code"]] <- c("MI","NJ","IL")
  W[["2"]][["curr_code"]] <- c("NZD","MNT","XDG")
  W[["3"]][["prod_code"]] <- c(30974L,6470L,4264L)
  W[["3"]][["geog_code"]] <- c("WI","AZ","MA")
}
else if(N==5e7){
  W[["1"]][["cust_code"]] <- c("id010","id084","id081")
  W[["1"]][["prod_code"]] <- c(260063L,114119L,399766L)
  W[["1"]][["geog_code"]] <- c("ID","NE","CT")
  W[["2"]][["cust_code"]] <- c("id010","id053","id016")
  W[["2"]][["geog_code"]] <- c("MT","MN","WY")
  W[["2"]][["curr_code"]] <- c("UAH","GEL","MYR")
  W[["3"]][["prod_code"]] <- c(174978L,4455L,113433L)
  W[["3"]][["geog_code"]] <- c("NH","ME","CT")
}
  else {
    ## below queries can be helpful to find bigger samples for filtering
    # DT[,.N,by=c("cust_code","prod_code","geog_code")][order(-N)][1:3][,N:=NULL][,lapply(.SD,paste,collapse='","')]
    # DT[,.N,by=c("cust_code","geog_code","curr_code")][order(-N)][1:3][,N:=NULL][,lapply(.SD,paste,collapse='","')]
    # DT[,.N,by=c("prod_code","geog_code")][order(-N)][1:3][,N:=NULL][,lapply(.SD,paste,collapse='","')]
    stop("filtering samples were prepared for N in 1e3, 1e5:1e8 by 0.5. You need to extend the section *where* to for valid filtering samples for the data.")
  }
  W
}
Idx = list(c("cust_code", "prod_code", "geog_code"), c("cust_code", 
"geog_code", "curr_code"), c("prod_code", "geog_code"))
# Each of below tests can be re-run separately
purge.timing()

Expressions below each timing table are limited to one expression per tool. For detailed expressions for each tool, scenario and indexing see the vignettes Rmd script.

write data

DT = copy(X$SALES)

if(file.exists("benchmark.csv")) invisible(file.remove("benchmark.csv"))
r0 = timing(write.table(DT,"benchmark.csv",sep=",",row.names=FALSE,quote=FALSE), N, paste("write","csv",NA_character_,"R","base","write.table",sep=";"))

if(file.exists("benchmark.rds")) invisible(file.remove("benchmark.rds"))
r0 = timing(saveRDS(DT,"benchmark.rds"), N, paste("write","rds",NA_character_,"R","base","saveRDS",sep=";"))

try(db("DROP TABLE db", db_test),silent = TRUE)
r4 = lapply(db_test, function(db_t){
  r = eval(bquote(timing(db(DT,"db",.(db_t)),N,paste("write","db",NA_character_,"DB",db_t,"db",sep=";"))))
})

last = 2+length(db_test)
showtiming(last=last)[,kable(.SD),.SDcols=-c("expr")]
invisible(showtiming(last=last)[,print_expr(.SD)])

read data

DT = copy(X$SALES)

if(file.exists("benchmark.csv")) invisible(file.remove("benchmark.csv"))
write.table(DT,"benchmark.csv",sep=",",row.names=FALSE,quote=FALSE)
r1 = timing(fread("benchmark.csv")[,time_code:=as.Date(time_code)],
            N, paste("read","csv",NA_character_,"R","data.table","fread",sep=";"))

if(file.exists("benchmark.csv")) invisible(file.remove("benchmark.csv"))
write.table(DT,"benchmark.csv",sep=",",row.names=FALSE,quote=FALSE)
r2 = timing(read.table("benchmark.csv",header=TRUE,sep=",",quote="",stringsAsFactors=FALSE,comment.char="",nrows=N,colClasses=unname(sapply(DT,class))),
            N, paste("read","csv",NA_character_,"R","base","read.table",sep=";"))
stopifnot(data.equal.data.table(r1,setDT(r2))); rm(r2); invisible(file.remove("benchmark.csv"))

if(file.exists("benchmark.rds")) invisible(file.remove("benchmark.rds"))
saveRDS(DT,"benchmark.rds")
r3 = timing(readRDS("benchmark.rds"),
            N, paste("read","rds",NA_character_,"R","base","readRDS",sep=";"))
stopifnot(data.equal.data.table(r1,r3)); rm(r3); invisible(file.remove("benchmark.rds"))

try(db("DROP TABLE db", db_test),silent = TRUE)
db(DT,"db",db_test)
r4 = lapply(db_test, function(db_t){ # timing of 4 sqls at once per each db_test
  r = eval(bquote(timing(db("db",.(db_t)),N,paste("read","db",NA_character_,"DB",db_t,"db",sep=";"))))
  stopifnot(data.equal.data.table(r1,r))
})

rm(r1, r4)

last = 3+length(db_test)
showtiming(last=last)[,kable(.SD),.SDcols=-c("expr")]
invisible(showtiming(last=last)[,print_expr(.SD)])

create index

The following indices will be tested:

invisible(lapply(Idx, print))

Some tests will be done on other indices but only those for which above indices would be useless.
data.table solution use only single key set during the tests, this has to reflect support for concurency access to same data.table.

DT = copy(X$SALES)
r = timing(idxv(DT, Idx[1L]),N,paste("create_index","single_index",NA_character_,"R","data.table-dwtools","idxv",sep=";"))
DT = copy(X$SALES)
r = timing(setkeyv(DT, Idx[[1L]]),N,paste("create_index","single_index",NA_character_,"R","data.table","setkeyv",sep=";"))
DT = copy(X$SALES)
try(db("DROP TABLE db", db_test),silent = TRUE)
db(DT,"db",db_test)
create_index_sql <- sapply(Idx[1L], function(x) paste0("CREATE INDEX db_idx_",paste(x,collapse="_")," ON db (",paste(x,collapse=","),")"))
r = lapply(db_test, function(db_t){ # timing of 4 sqls at once per each db_test
  eval(bquote(timing(db(create_index_sql,.(db_t)),N,paste("create_index","single_index",NA_character_,"DB",db_t,"db",sep=";"))))
})
idx1_n <- 2+length(db_test)

DT = copy(X$SALES)
r = timing(idxv(DT, Idx),N,paste("create_index","multiple_indices",NA_character_,"R","data.table-dwtools","idxv",sep=";"))
DT = copy(X$SALES)
try(db("DROP TABLE db", db_test),silent = TRUE)
db(DT,"db",db_test)
create_index_sql <- sapply(Idx, function(x) paste0("CREATE INDEX db_idx_",paste(x,collapse="_")," ON db (",paste(x,collapse=","),")"))
r = lapply(db_test, function(db_t){ # timing of 4 sqls at once per each db_test
  eval(bquote(timing(db(create_index_sql,.(db_t)),N,paste("create_index","multiple_indices",NA_character_,"DB",db_t,"db",sep=";"))))
})
idxN_n <- 1+length(db_test)

last = idx1_n + idxN_n
showtiming(last=last)[,kable(.SD),.SDcols=-c("expr")]
invisible(showtiming(last=last)[,print_expr(.SD)])

select where

W <- whereList(N=N)

# NON INDEX

DF = as.data.frame(copy(X$SALES))
r1 = eval(bquote(timing(DF[DF$cust_code%in%.(W[["1"]][["cust_code"]]) & DF$prod_code%in%.(W[["1"]][["prod_code"]]) & DF$geog_code%in%.(W[["1"]][["geog_code"]]),],
                        N, paste("where",paste(names(W[["1"]]),collapse=","),"no_index","R","base","[.data.frame",sep=";"))))
r2 = eval(bquote(timing(DF[DF$cust_code%in%.(W[["2"]][["cust_code"]]) & DF$geog_code%in%.(W[["2"]][["geog_code"]]) & DF$curr_code%in%.(W[["2"]][["curr_code"]]),],
                        N, paste("where",paste(names(W[["2"]]),collapse=","),"no_index","R","base","[.data.frame",sep=";"))))
r3 = eval(bquote(timing(DF[DF$prod_code%in%.(W[["3"]][["prod_code"]]) & DF$geog_code%in%.(W[["3"]][["geog_code"]]),],
                        N, paste("where",paste(names(W[["3"]]),collapse=","),"no_index","R","base","[.data.frame",sep=";"))))
stopifnot(nrow(r1)>0L,nrow(r2)>0L,nrow(r3)>0L) # filters validation

DF = as.data.frame(copy(X$SALES))
r0 = eval(bquote(timing(filter(DF, cust_code%in%.(W[["1"]][["cust_code"]]) & prod_code%in%.(W[["1"]][["prod_code"]]) & geog_code%in%.(W[["1"]][["geog_code"]])),
                        N, paste("where",paste(names(W[["1"]]),collapse=","),"no_index","R","dplyr","filter",sep=";"))))
stopifnot(data.equal.data.table(setDT(r1),setDT(r0)))
r0 = eval(bquote(timing(filter(DF, cust_code%in%.(W[["2"]][["cust_code"]]) & geog_code%in%.(W[["2"]][["geog_code"]]) & curr_code%in%.(W[["2"]][["curr_code"]])),
                        N, paste("where",paste(names(W[["2"]]),collapse=","),"no_index","R","dplyr","filter",sep=";"))))
stopifnot(data.equal.data.table(setDT(r2),setDT(r0)))
r0 = eval(bquote(timing(filter(DF, prod_code%in%.(W[["3"]][["prod_code"]]) & geog_code%in%.(W[["3"]][["geog_code"]])),
                        N, paste("where",paste(names(W[["3"]]),collapse=","),"no_index","R","dplyr","filter",sep=";"))))
stopifnot(data.equal.data.table(setDT(r3),setDT(r0)))

DT = copy(X$SALES)
r0 = eval(bquote(timing(DT[cust_code%in%.(W[["1"]][["cust_code"]]) & prod_code%in%.(W[["1"]][["prod_code"]]) & geog_code%in%.(W[["1"]][["geog_code"]])],
                        N, paste("where",paste(names(W[["1"]]),collapse=","),"no_index","R","data.table","[.data.table",sep=";"))))
stopifnot(data.equal.data.table(r1,r0))
r0 = eval(bquote(timing(DT[cust_code%in%.(W[["2"]][["cust_code"]]) & geog_code%in%.(W[["2"]][["geog_code"]]) & curr_code%in%.(W[["2"]][["curr_code"]])],
                        N, paste("where",paste(names(W[["2"]]),collapse=","),"no_index","R","data.table","[.data.table",sep=";"))))
stopifnot(data.equal.data.table(r2,r0))
r0 = eval(bquote(timing(DT[prod_code%in%.(W[["3"]][["prod_code"]]) & geog_code%in%.(W[["3"]][["geog_code"]])],
                        N, paste("where",paste(names(W[["3"]]),collapse=","),"no_index","R","data.table","[.data.table",sep=";"))))
stopifnot(data.equal.data.table(r3,r0))

DT = copy(X$SALES)
try(db("DROP TABLE db", db_test),silent = TRUE)
db(DT,"db",db_test)
select_sql <- c(
  paste0("SELECT * FROM db WHERE cust_code IN ('",paste(W[[1]][["cust_code"]],collapse="','"),"') AND prod_code IN (",paste(W[[1]][["prod_code"]],collapse=","),") AND geog_code IN ('",paste(W[[1]][["geog_code"]],collapse="','"),"')"),
  paste0("SELECT * FROM db WHERE cust_code IN ('",paste(W[[2]][["cust_code"]],collapse="','"),"') AND geog_code IN ('",paste(W[[2]][["geog_code"]],collapse="','"),"') AND curr_code IN ('",paste(W[[2]][["curr_code"]],collapse="','"),"')"),
  paste0("SELECT * FROM db WHERE prod_code IN (",paste(W[[3]][["prod_code"]],collapse=","),") AND geog_code IN ('",paste(W[[3]][["geog_code"]],collapse="','"),"')")
)
r = lapply(db_test, function(db_t){ # timing of each sql per each db_test
  p = lapply(1:length(select_sql), function(i){
    r0 = eval(bquote(timing(db(.(select_sql[i]),.(db_t)),N,paste("where",paste(names(W[[as.character(i)]]),collapse=","),"no_index","DB",db_t,"db",sep=";"))))
    stopifnot(data.equal.data.table({if(i==1L) r1 else if(i==2L) r2 else if(i==3L) r3 else stop("three filters defined")},r0))
    invisible()
  })
})
non_idx_n <- 3 * 3 + length(db_test)*length(select_sql)

# INDEX

DT = copy(X$SALES)
setkeyv(DT, Idx[[1]])
r0 = eval(bquote(timing(DT[CJ(.(W[["1"]][["cust_code"]]),.(W[["1"]][["prod_code"]]),.(W[["1"]][["geog_code"]])),nomatch=0L],
                        N, paste("where",paste(names(W[["1"]]),collapse=","),"index","R","data.table","[.data.table",sep=";"))))
stopifnot(data.equal.data.table(r1,r0))

DT = copy(X$SALES)
IDX = idxv(DT, Idx)
r0 = eval(bquote(timing(DT[CJI(IDX,.(W[["1"]][["cust_code"]]),.(W[["1"]][["prod_code"]]),.(W[["1"]][["geog_code"]]))],
                        N, paste("where",paste(names(W[["1"]]),collapse=","),"index","R","data.table-dwtools","[.data.table",sep=";"))))
stopifnot(data.equal.data.table(r1,r0))
r0 = eval(bquote(timing(DT[CJI(IDX,.(W[["2"]][["cust_code"]]),TRUE,.(W[["2"]][["geog_code"]]),TRUE,.(W[["2"]][["curr_code"]]))],
                        N, paste("where",paste(names(W[["2"]]),collapse=","),"index","R","data.table-dwtools","[.data.table",sep=";"))))
stopifnot(data.equal.data.table(r2,r0))
r0 = eval(bquote(timing(DT[CJI(IDX,TRUE,.(W[["3"]][["prod_code"]]),.(W[["3"]][["geog_code"]]))],
                        N, paste("where",paste(names(W[["3"]]),collapse=","),"index","R","data.table-dwtools","[.data.table",sep=";"))))
stopifnot(data.equal.data.table(r3,r0))

rm(IDX,r0)

#DT = copy(X$SALES)
#try(db("DROP TABLE db", db_test),silent = TRUE)
#db(DT,"db",db_test)
create_index_sql <- sapply(Idx, function(x) paste0("CREATE INDEX db_idx_",paste(x,collapse="_")," ON db (",paste(x,collapse=","),")"))
r = lapply(db_test, function(db_t){
  db(create_index_sql,db_t)
})

select_sql <- c(
  paste0("SELECT * FROM db WHERE cust_code IN ('",paste(W[[1]][["cust_code"]],collapse="','"),"') AND prod_code IN (",paste(W[[1]][["prod_code"]],collapse=","),") AND geog_code IN ('",paste(W[[1]][["geog_code"]],collapse="','"),"')"),
  paste0("SELECT * FROM db WHERE cust_code IN ('",paste(W[[2]][["cust_code"]],collapse="','"),"') AND geog_code IN ('",paste(W[[2]][["geog_code"]],collapse="','"),"') AND curr_code IN ('",paste(W[[2]][["curr_code"]],collapse="','"),"')"),
  paste0("SELECT * FROM db WHERE prod_code IN (",paste(W[[3]][["prod_code"]],collapse=","),") AND geog_code IN ('",paste(W[[3]][["geog_code"]],collapse="','"),"')")
)
r = lapply(db_test, function(db_t){ # timing of each sql per each db_test
  p = lapply(1:length(select_sql), function(i){
    r0 = eval(bquote(timing(db(.(select_sql[i]),.(db_t)),N,paste("where",paste(names(W[[as.character(i)]]),collapse=","),"index","DB",db_t,"db",sep=";"))))
    stopifnot(data.equal.data.table({if(i==1L) r1 else if(i==2L) r2 else if(i==3L) r3 else stop("three filters defined")},r0))
    invisible()
  })
})
idx_n <- 1 + 3 + length(db_test)*length(select_sql)

rm(r1,r2,r3,r)

last = non_idx_n+idx_n
showtiming(last=last)[,kable(.SD),.SDcols=-c("expr")]
invisible(showtiming(last=last)[,print_expr(.SD)])

group by

W <- whereList(N=N)

# NON INDEX

DT = copy(X$SALES)
r1 = timing(DT[,lapply(.SD,sum), by=c("cust_code","prod_code","geog_code"), .SDcols=c("amount","value")],
            N, paste("group",paste(names(W[["1"]]),collapse=","),"no_index","R","data.table","[.data.table",sep=";"))
r2 = timing(DT[,lapply(.SD,sum), by=c("cust_code","geog_code","curr_code"), .SDcols=c("amount","value")],
            N, paste("group",paste(names(W[["2"]]),collapse=","),"no_index","R","data.table","[.data.table",sep=";"))
r3 = timing(DT[,lapply(.SD,sum), by=c("prod_code","geog_code"), .SDcols=c("amount","value")],
            N, paste("group",paste(names(W[["3"]]),collapse=","),"no_index","R","data.table","[.data.table",sep=";"))

# # base R group removed due to low scalability
# DF = as.data.frame(copy(X$SALES))
# r0 = timing(aggregate(DF[c("amount","value")], by=as.list(DF[c("cust_code","prod_code","geog_code")]), FUN=sum),
#             N, paste("group",paste(names(W[["1"]]),collapse=","),"no_index","R","base","aggregate",sep=";"))
# stopifnot(data.equal.data.table(r1,setDT(r0)))
# r0 = timing(aggregate(DF[c("amount","value")], by=as.list(DF[c("cust_code","geog_code","curr_code")]), FUN=sum),
#             N, paste("group",paste(names(W[["2"]]),collapse=","),"no_index","R","base","aggregate",sep=";"))
# stopifnot(data.equal.data.table(r2,setDT(r0)))
# r0 = timing(aggregate(DF[c("amount","value")], by=as.list(DF[c("prod_code","geog_code")]), FUN=sum),
#             N, paste("group",paste(names(W[["3"]]),collapse=","),"no_index","R","base","aggregate",sep=";"))
# stopifnot(data.equal.data.table(r3,setDT(r0)))

DF = as.data.frame(copy(X$SALES))
r0 = timing(DF %>% group_by(cust_code, prod_code, geog_code) %>% summarise_each(funs(sum), amount,value),
            N, paste("group",paste(names(W[["1"]]),collapse=","),"no_index","R","dplyr","select-group_by-summarise_each",sep=";"))
stopifnot(data.equal.data.table(r1,setDT(as.data.frame(r0))))
r0 = timing(DF %>% group_by(cust_code, geog_code, curr_code) %>% summarise_each(funs(sum), amount,value),
            N, paste("group",paste(names(W[["2"]]),collapse=","),"no_index","R","dplyr","select-group_by-summarise_each",sep=";"))
stopifnot(data.equal.data.table(r2,setDT(as.data.frame(r0))))
r0 = timing(DF %>% group_by(prod_code, geog_code) %>% summarise_each(funs(sum), amount,value),
            N, paste("group",paste(names(W[["3"]]),collapse=","),"no_index","R","dplyr","select-group_by-summarise_each",sep=";"))
stopifnot(data.equal.data.table(r3,setDT(as.data.frame(r0))))

DT = copy(X$SALES)
try(db("DROP TABLE db", db_test),silent = TRUE)
db(DT,"db",db_test)
select_sql <- c(
  paste0("SELECT ",paste(names(W[[1]]),collapse=","),", sum(amount) as amount, sum(value) as value FROM db GROUP BY ",paste(names(W[[1]]),collapse=",")),
  paste0("SELECT ",paste(names(W[[2]]),collapse=","),", sum(amount) as amount, sum(value) as value FROM db GROUP BY ",paste(names(W[[2]]),collapse=",")),
  paste0("SELECT ",paste(names(W[[3]]),collapse=","),", sum(amount) as amount, sum(value) as value FROM db GROUP BY ",paste(names(W[[3]]),collapse=","))
)
r = lapply(db_test, function(db_t){ # timing of each sql per each db_test
  p = lapply(1:length(select_sql), function(i){
    r0 = eval(bquote(timing(db(.(select_sql[i]),.(db_t)),N,paste("group",paste(names(W[[as.character(i)]]),collapse=","),"no_index","DB",db_t,"db",sep=";"))))
    rr = if(i==1L) r1 else if(i==2L) r2 else if(i==3L) r3 else stop("three filters defined")
    if(!(data.equal.data.table(rr,r0))){
      rr[,value:=round(value,2)]
      r0[,value:=round(value,2)]
    }
    stopifnot(data.equal.data.table(rr,r0))
    invisible()
  })
})
non_idx_n <- 2*3 + length(db_test)*length(select_sql)

# INDEX

DT = copy(X$SALES)
setkeyv(DT, Idx[[1]])
r0 = timing(DT[,lapply(.SD, sum), by=c("cust_code", "prod_code", "geog_code"), .SDcols=c("amount","value")],
            N, paste("group",paste(names(W[["1"]]),collapse=","),"index","R","data.table","[.data.table",sep=";"))
stopifnot(data.equal.data.table(r1,r0))
rm(r0)

#DT = copy(X$SALES)
#try(db("DROP TABLE db", db_test),silent = TRUE)
#db(DT,"db",db_test)
create_index_sql <- sapply(Idx, function(x) paste0("CREATE INDEX db_idx_",paste(x,collapse="_")," ON db (",paste(x,collapse=","),")"))
r = lapply(db_test, function(db_t){
  db(create_index_sql,db_t)
})
select_sql <- c(
  paste0("SELECT ",paste(names(W[[1]]),collapse=","),", sum(amount) as amount, sum(value) as value FROM db GROUP BY ",paste(names(W[[1]]),collapse=",")),
  paste0("SELECT ",paste(names(W[[2]]),collapse=","),", sum(amount) as amount, sum(value) as value FROM db GROUP BY ",paste(names(W[[2]]),collapse=",")),
  paste0("SELECT ",paste(names(W[[3]]),collapse=","),", sum(amount) as amount, sum(value) as value FROM db GROUP BY ",paste(names(W[[3]]),collapse=","))
)
r = lapply(db_test, function(db_t){ # timing of each sql per each db_test
  p = lapply(1:length(select_sql), function(i){
    r0 = eval(bquote(timing(db(.(select_sql[i]),.(db_t)),N,paste("group",paste(names(W[[as.character(i)]]),collapse=","),"index","DB",db_t,"db",sep=";"))))
    rr = if(i==1L) r1 else if(i==2L) r2 else if(i==3L) r3 else stop("three filters defined")
    if(!(data.equal.data.table(rr,r0))){
      rr[,value:=round(value,2)]
      r0[,value:=round(value,2)]
    }
    stopifnot(data.equal.data.table(rr,r0))
    invisible()
  })
})
idx_n <- 1+length(db_test)*length(select_sql)

rm(r1,r2,r3,r)

last = non_idx_n+idx_n
showtiming(last=last)[,kable(.SD),.SDcols=-c("expr")]
invisible(showtiming(last=last)[,print_expr(.SD)])

partition by - aggregate

SQLite does not support partition by.
Partition by using aggregate function:

W <- whereList(N=N)

# NON INDEX

DT = copy(X$SALES)
r1 = timing(DT[,c("grp_amount","grp_value") := lapply(.SD,sum), by=list(cust_code,prod_code,geog_code), .SDcols=c("amount","value")],
            N, paste("partition_aggr",paste(names(W[["1"]]),collapse=","),"no_index","R","data.table","[.data.table",sep=";"))
DT = copy(X$SALES)
r2 = timing(DT[,c("grp_amount","grp_value") := lapply(.SD,sum), by=list(cust_code,geog_code,curr_code), .SDcols=c("amount","value")],
            N, paste("partition_aggr",paste(names(W[["2"]]),collapse=","),"no_index","R","data.table","[.data.table",sep=";"))
DT = copy(X$SALES)
r3 = timing(DT[,c("grp_amount","grp_value") := lapply(.SD,sum), by=list(prod_code,geog_code), .SDcols=c("amount","value")],
            N, paste("partition_aggr",paste(names(W[["3"]]),collapse=","),"no_index","R","data.table","[.data.table",sep=";"))

DF = as.data.frame(copy(X$SALES))
r0 = timing(DF %>% group_by(cust_code, prod_code, geog_code) %>% mutate_each(funs(sum), grp_amount=amount,grp_value=value),
            N, paste("partition_aggr",paste(names(W[["1"]]),collapse=","),"no_index","R","dplyr","select-group_by-mutate_each",sep=";"))
stopifnot(data.equal.data.table(setDT(r1),setDT(as.data.frame(r0))))
r0 = timing(DF %>% group_by(cust_code, geog_code, curr_code) %>% mutate_each(funs(sum), grp_amount=amount,grp_value=value),
            N, paste("partition_aggr",paste(names(W[["2"]]),collapse=","),"no_index","R","dplyr","select-group_by-mutate_each",sep=";"))
stopifnot(data.equal.data.table(setDT(r2),setDT(as.data.frame(r0))))
r0 = timing(DF %>% group_by(prod_code, geog_code) %>% mutate_each(funs(sum), rp_amount=amount,grp_value=value),
            N, paste("partition_aggr",paste(names(W[["3"]]),collapse=","),"no_index","R","dplyr","select-group_by-mutate_each",sep=";"))
stopifnot(data.equal.data.table(setDT(r3),setDT(as.data.frame(r0))))
rm(DF,r0)
if(length(db_test[!(db_test %like% "sqlite")])>0){
  DT = copy(X$SALES)
  try(db("DROP TABLE db", db_test[!(db_test %like% "sqlite")]),silent = TRUE)
  db(DT,"db",db_test[!(db_test %like% "sqlite")])
  select_sql <- c(
    paste0("SELECT *, sum(amount) over (partition by ",paste(names(W[[1]]),collapse=","),") as grp_amount, sum(value) over (partition by ",paste(names(W[[1]]),collapse=","),") as grp_value FROM db"),
    paste0("SELECT *, sum(amount) over (partition by ",paste(names(W[[2]]),collapse=","),") as grp_amount, sum(value) over (partition by ",paste(names(W[[2]]),collapse=","),") as grp_value FROM db"),
    paste0("SELECT *, sum(amount) over (partition by ",paste(names(W[[3]]),collapse=","),") as grp_amount, sum(value) over (partition by ",paste(names(W[[3]]),collapse=","),") as grp_value FROM db")
  )
  # exclude sqlite
  r = lapply(db_test[!(db_test %like% "sqlite")], function(db_t){ # timing of each sql per each db_test
    p = lapply(1:length(select_sql), function(i){
      r0 = eval(bquote(timing(db(.(select_sql[i]),.(db_t)),N,paste("partition_aggr",paste(names(W[[as.character(i)]]),collapse=","),"no_index","DB",db_t,"db",sep=";"))))
      rr = if(i==1L) r1 else if(i==2L) r2 else if(i==3L) r3 else stop("three filters defined")
      if(!(data.equal.data.table(rr,r0))){
        # rounding issues
        rr[,`:=`(value=trunc(value),grp_amount=trunc(grp_amount),grp_value=trunc(grp_value))]
        r0[,`:=`(value=trunc(value),grp_amount=trunc(grp_amount),grp_value=trunc(grp_value))]
      }
      stopifnot(data.equal.data.table(rr,r0))
      invisible()
    })
  })
}

non_idx_n <- 3 + 3 + 3*length(db_test[!(db_test %like% "sqlite")])

# INDEX

DT = copy(X$SALES)
setkeyv(DT, Idx[[1]])
r0 = timing(DT[,c("grp_amount","grp_value") := lapply(.SD, sum), by = key(DT), .SDcols=c("amount","value")],
            N, paste("partition_aggr",paste(names(W[["1"]]),collapse=","),"index","R","data.table","[.data.table",sep=";"))
stopifnot(data.equal.data.table(r1,r0))
rm(r0,DT)

if(length(db_test[!(db_test %like% "sqlite")])>0){
  #DT = copy(X$SALES)
  #try(db("DROP TABLE db", db_test[!(db_test %like% "sqlite")]),silent = TRUE)
  #db(DT,"db",db_test[!(db_test %like% "sqlite")])
  create_index_sql <- sapply(Idx, function(x) paste0("CREATE INDEX db_idx_",paste(x,collapse="_")," ON db (",paste(x,collapse=","),")"))
  r = lapply(db_test[!(db_test %like% "sqlite")], function(db_t){
    db(create_index_sql,db_t)
  })
  select_sql <- c(
    paste0("SELECT *, sum(amount) over (partition by ",paste(names(W[[1]]),collapse=","),") as grp_amount, sum(value) over (partition by ",paste(names(W[[1]]),collapse=","),") as grp_value FROM db"),
    paste0("SELECT *, sum(amount) over (partition by ",paste(names(W[[2]]),collapse=","),") as grp_amount, sum(value) over (partition by ",paste(names(W[[2]]),collapse=","),") as grp_value FROM db"),
    paste0("SELECT *, sum(amount) over (partition by ",paste(names(W[[3]]),collapse=","),") as grp_amount, sum(value) over (partition by ",paste(names(W[[3]]),collapse=","),") as grp_value FROM db")
  )
  r = lapply(db_test[!(db_test %like% "sqlite")], function(db_t){ # timing of each sql per each db_test
    p = lapply(1:length(select_sql), function(i){
      r0 = eval(bquote(timing(db(.(select_sql[i]),.(db_t)),N,paste("partition_aggr",paste(names(W[[as.character(i)]]),collapse=","),"index","DB",db_t,"db",sep=";"))))
      rr = if(i==1L) r1 else if(i==2L) r2 else if(i==3L) r3 else stop("three filters defined")
      if(!(data.equal.data.table(rr,r0))){
        # rounding issues
        rr[,`:=`(value=trunc(value),grp_amount=trunc(grp_amount),grp_value=trunc(grp_value))]
        r0[,`:=`(value=trunc(value),grp_amount=trunc(grp_amount),grp_value=trunc(grp_value))]
      }
      stopifnot(data.equal.data.table(rr,r0))
      invisible()
    })
  })
  rm(r1,r2,r3,r)
}
idx_n <- 1 + 3 * length(db_test[!(db_test %like% "sqlite")])

last = non_idx_n + idx_n
showtiming(last=last)[,kable(.SD),.SDcols=-c("expr")]
invisible(showtiming(last=last)[,print_expr(.SD)])

partition by - non-aggregate

SQLite does not support partition by.
Partition by using non-aggregate function:

W <- whereList(N=N)

# psql did not retain order of `order by` in case of duplicates: lag() over (partition by ... order by ...)
# possibly other sql db as well due to sql unordered nature
# populating unique `order by` field in partitions
DTunq = copy(X$SALES)
DTunq[,time_code:=as.Date(1:.N,origin="2015-01-01"),by="cust_code"]

# NON INDEX

# previous (ordered by time) value of sales by customer
DT = copy(DTunq)
r1 = timing(DT[order(time_code), c("value_lag_1") := shift(value, n=1, fill=NA, type="lag"), by="cust_code"],
            N, paste("partition_non_aggr","value_lag_1","no_index","R","data.table","[.data.table",sep=";"))
# next time and value of sales by customer
DT = copy(DTunq)
r2 = timing(DT[order(time_code), c(paste(c("time_code","value"),"lead_1",sep="_")) := shift(.SD, n=1, fill=NA, type="lead"), by="cust_code", .SDcols=c("time_code","value")],
            N, paste("partition_non_aggr","time_code_value_lead_1","no_index","R","data.table","[.data.table",sep=";"))
# dplyr
DF = as.data.frame(copy(DTunq))
r0 = timing(DF %>% group_by(cust_code) %>% mutate(value_lag_1 = lag(order_by = time_code, value)),
            N, paste("partition_non_aggr","value_lag_1","no_index","R","dplyr","group_by-mutate-lag",sep=";"))
stopifnot(data.equal.data.table(r1,setDT(as.data.frame(r0))))
r0 = timing(DF %>% group_by(cust_code) %>% mutate(time_code_lead_1 = lead(order_by = time_code, time_code), value_lead_1 = lead(order_by = time_code, value)),
            N, paste("partition_non_aggr","time_code_value_lead_1","no_index","R","dplyr","group_by-mutate-lag",sep=";"))
stopifnot(data.equal.data.table(setDT(r2),setDT(as.data.frame(r0))))
rm(DF,r0)

# sql
if(length(db_test[!(db_test %like% "sqlite")])>0){
  DT = copy(DTunq)
  select_sql <- c(
    paste0("SELECT *, lag(value,1) over (partition by cust_code order by time_code asc) as value_lag_1 FROM db"),
    paste0("SELECT *, lead(time_code,1) over (partition by cust_code order by time_code asc) as time_code_lead_1, lead(value,1) over (partition by cust_code order by time_code asc) as value_lead_1 FROM db")
  )
  # exclude sqlite
  r = lapply(db_test[!(db_test %like% "sqlite")], function(db_t){ # timing of each sql per each db_test
    try(db("DROP TABLE db",db_t),silent = TRUE)
    db(DT,"db",db_t)
    p = lapply(1:length(select_sql), function(i){
      r0 = eval(bquote(timing(db(.(select_sql[i]),.(db_t)),N,paste("partition_non_aggr",if(i==1L) "value_lag_1" else if(i==2L) "time_code_value_lead_1","no_index","DB",db_t,"db",sep=";"))))
      stopifnot(data.equal.data.table({if(i==1L) r1 else if(i==2L) r2},r0))
      invisible()
    })
  })
}
non_idx_n <- 2 + 2 + 2*length(db_test[!(db_test %like% "sqlite")])

# INDEX

DT = copy(DTunq)
setkey(DT,time_code,cust_code)
r0 = timing(DT[, c("value_lag_1") := shift(value, n=1, fill=NA, type="lag"), by="cust_code"],
            N, paste("partition_non_aggr","value_lag_1","index","R","data.table","[.data.table",sep=";"))
stopifnot(data.equal.data.table(r1,r0))
DT = copy(DTunq)
setkey(DT,time_code,cust_code)
r0 = timing(DT[, c(paste(c("time_code","value"),"lead_1",sep="_")) := shift(.SD, n=1, fill=NA, type="lead"), by="cust_code", .SDcols=c("time_code","value")],
            N, paste("partition_non_aggr","time_code_value_lead_1","index","R","data.table","[.data.table",sep=";"))
stopifnot(data.equal.data.table(r2,r0))

# sql
if(length(db_test[!(db_test %like% "sqlite")])>0){
  DT = copy(DTunq)
  select_sql <- c(
    paste0("SELECT *, lag(value,1) over (partition by cust_code order by time_code asc) as value_lag_1 FROM db"),
    paste0("SELECT *, lead(time_code,1) over (partition by cust_code order by time_code asc) as time_code_lead_1, lead(value,1) over (partition by cust_code order by time_code asc) as value_lead_1 FROM db")
  )
  # exclude sqlite
  r = lapply(db_test[!(db_test %like% "sqlite")], function(db_t){ # timing of each sql per each db_test
    #try(db("DROP TABLE db",db_t),silent = TRUE)
    #db(DT,"db",db_t)
    db("CREATE INDEX db_idx_cust_code_time_code ON db (cust_code,time_code)",db_t)
    p = lapply(1:length(select_sql), function(i){
      r0 = eval(bquote(timing(db(.(select_sql[i]),.(db_t)),N,paste("partition_non_aggr",if(i==1L) "value_lag_1" else if(i==2L) "time_code_value_lead_1","index","DB",db_t,"db",sep=";"))))
      stopifnot(data.equal.data.table({if(i==1L) r1 else if(i==2L) r2},r0))
      invisible()
    })
  })
}
rm(r1,r2,r0,DT)
idx_n = 2 + 2*length(db_test[!(db_test %like% "sqlite")])

last = non_idx_n + idx_n
showtiming(last=last)[,kable(.SD),.SDcols=-c("expr")]
invisible(showtiming(last=last)[,print_expr(.SD)])

inner join

FT <- copy(X$SALES)
JN <- copy(X$PRODUCT[sample(1:nrow(X$PRODUCT),nrow(X$PRODUCT)/2,FALSE)])

# NO INDEX

df = as.data.frame(FT)
jn = as.data.frame(JN)
r1 = timing(merge(df, jn, by = "prod_code",all = FALSE),
            N, paste("inner_join",NA_character_,"no_index","R","base","merge",sep=";"))
r0 = timing(df %>% inner_join(jn, by="prod_code"),
            N, paste("inner_join",NA_character_,"no_index","R","dplyr","inner_join",sep=";"))
stopifnot(data.equal.data.table(setDT(r1),setDT(r0),ignore_col_order = TRUE))
rm(df,jn,r0)

dt <- copy(FT)
jn <- copy(JN)
r0 = timing(setkeyv(jn,"prod_code")[setcolorder(dt,c("prod_code",names(dt)[names(dt)!="prod_code"])),nomatch=0],
            N, paste("inner_join",NA_character_,"no_index","R","data.table","[.data.table",sep=";"))
stopifnot(data.equal.data.table(r1,r0,ignore_col_order = TRUE))
rm(r0,dt,jn)

select_sql <- paste0("SELECT cust_code,db.prod_code prod_code,geog_code,time_code,curr_code,amount,value,prod_name,prod_group_code,prod_group_name,prod_family_code,prod_family_name FROM db INNER JOIN jn ON db.prod_code = jn.prod_code")
r = lapply(db_test, function(db_t){
  try(db("DROP TABLE db", db_t),silent = TRUE)
  try(db("DROP TABLE jn", db_t),silent = TRUE)
  db(FT,"db",db_t)
  db(JN,"jn",db_t)
  r0 = eval(bquote(timing(db(.(select_sql),.(db_t)),N,paste("inner_join",NA_character_,"no_index","DB",db_t,"db",sep=";"))))
  stopifnot(data.equal.data.table(r0,r1,ignore_col_order = TRUE))
  invisible()
})
non_idx_n <- 3 + length(db_test)

# INDEX

dt <- copy(FT)
jn <- copy(JN)
setkeyv(dt,"prod_code")
setkeyv(jn,"prod_code")
r0 = timing(dt[jn,nomatch=0],
            N, paste("inner_join",NA_character_,"index","R","data.table","[.data.table",sep=";"))
stopifnot(data.equal.data.table(r1,r0,ignore_col_order = TRUE))

# try(db("DROP TABLE db", db_test),silent = TRUE)
# try(db("DROP TABLE jn", db_test),silent = TRUE)
# db(FT,"db",db_test)
# db(JN,"jn",db_test)
r = lapply(db_test, function(db_t){
  db("CREATE INDEX db_idx_prod_code ON db (prod_code)",db_t)
  db("CREATE INDEX jn_idx_prod_code ON jn (prod_code)",db_t)
})
select_sql <- paste0("SELECT cust_code,db.prod_code prod_code,geog_code,time_code,curr_code,amount,value,prod_name,prod_group_code,prod_group_name,prod_family_code,prod_family_name FROM db INNER JOIN jn ON db.prod_code = jn.prod_code")
r = lapply(db_test, function(db_t){
  r0 = eval(bquote(timing(db(.(select_sql),.(db_t)),N,paste("inner_join",NA_character_,"index","DB",db_t,"db",sep=";"))))
  stopifnot(data.equal.data.table(r0,r1,ignore_col_order = TRUE))
  invisible()
})
rm(r,FT,JN,dt,jn)
idx_n = 1 + length(db_test)

last = non_idx_n + idx_n
showtiming(last=last)[,kable(.SD),.SDcols=-c("expr")]
invisible(showtiming(last=last)[,print_expr(.SD)])

outer join

FT <- copy(X$SALES)
JN <- copy(X$PRODUCT[sample(1:nrow(X$PRODUCT),nrow(X$PRODUCT)/2,FALSE)])

# NO INDEX

df = as.data.frame(FT)
jn = as.data.frame(JN)
r1 = timing(merge(df, jn, by = "prod_code",all.x = TRUE, sort=FALSE),
            N, paste("left_join",NA_character_,"no_index","R","base","merge",sep=";"))
r0 = timing(df %>% left_join(jn, by="prod_code"),
            N, paste("left_join",NA_character_,"no_index","R","dplyr","left_join",sep=";"))
stopifnot(data.equal.data.table(setDT(r1),setDT(r0),ignore_col_order = TRUE))
rm(df,jn,r0)

dt <- copy(FT)
jn <- copy(JN)
r0 = timing(setkeyv(jn,"prod_code")[setcolorder(dt,c("prod_code",names(dt)[names(dt)!="prod_code"])),nomatch=NA],
            N, paste("left_join",NA_character_,"no_index","R","data.table","[.data.table",sep=";"))
stopifnot(data.equal.data.table(r1,r0,ignore_col_order = TRUE))
rm(r0,dt,jn)

select_sql <- paste0("SELECT cust_code,db.prod_code prod_code,geog_code,time_code,curr_code,amount,value,prod_name,prod_group_code,prod_group_name,prod_family_code,prod_family_name FROM db LEFT JOIN jn ON db.prod_code = jn.prod_code")
r = lapply(db_test, function(db_t){
  try(db("DROP TABLE db", db_t),silent = TRUE)
  try(db("DROP TABLE jn", db_t),silent = TRUE)
  db(FT,"db",db_t)
  db(JN,"jn",db_t)
  r0 = eval(bquote(timing(db(.(select_sql),.(db_t)),N,paste("left_join",NA_character_,"no_index","DB",db_t,"db",sep=";"))))
  stopifnot(data.equal.data.table(r0,r1,ignore_col_order = TRUE))
  invisible()
})
non_idx_n = 3+length(db_test)

# INDEX

dt <- copy(FT)
jn <- copy(JN)
setkeyv(dt,"prod_code")
setkeyv(jn,"prod_code")
r0 = timing(jn[dt,nomatch=NA],
            N, paste("left_join",NA_character_,"index","R","data.table","[.data.table",sep=";"))
stopifnot(data.equal.data.table(r1,r0,ignore_col_order = TRUE))

# try(db("DROP TABLE db", db_test),silent = TRUE)
# try(db("DROP TABLE jn", db_test),silent = TRUE)
# db(FT,"db",db_test)
# db(JN,"jn",db_test)
r = lapply(db_test, function(db_t){
  db("CREATE INDEX db_idx_prod_code ON db (prod_code)",db_t)
  db("CREATE INDEX jn_idx_prod_code ON jn (prod_code)",db_t)
})
select_sql <- paste0("SELECT cust_code,db.prod_code prod_code,geog_code,time_code,curr_code,amount,value,prod_name,prod_group_code,prod_group_name,prod_family_code,prod_family_name FROM db LEFT JOIN jn ON db.prod_code = jn.prod_code")
r = lapply(db_test, function(db_t){
  r0 = eval(bquote(timing(db(.(select_sql),.(db_t)),N,paste("left_join",NA_character_,"index","DB",db_t,"db",sep=";"))))
  stopifnot(data.equal.data.table(r0,r1,ignore_col_order = TRUE))
  invisible()
})
rm(r,FT,JN,dt,jn)
idx_n = 1+length(db_test)

last = non_idx_n + idx_n
showtiming(last=last)[,kable(.SD),.SDcols=-c("expr")]
invisible(showtiming(last=last)[,print_expr(.SD)])

cross join

Cross joins are performed on smaller datasets to give a result of nrow equal to N.

if(N==1e3){
  nr <- c(10,100)
} else if(N==1e5){
  nr <- c(100,1000)
} else if(N==1e6){
  nr <- c(1000,1000)
} else if(N==1e7){
  nr <- c(10000,1000)
} else if(N==1e8){
  nr <- c(10000,10000)
} else if(N==5e5){
  nr <- c(500,1000)
} else if(N==5e6){
  nr <- c(5000,1000)
} else if(N==5e7){
  nr <- c(10000,5000)
}

# NON INDEX

DT = copy(X$SALES)
DT1 = DT[sample(1:nrow(DT),nr[1],TRUE)]
DT2 = DT[sample(1:nrow(DT),nr[2],TRUE)]
DF1 = as.data.frame(DT1)
DF2 = as.data.frame(DT2)
rm(DT)

r1 = timing(merge(x = DF1, y = DF2, by = NULL),
            N, paste("cross_join",NA_character_,"no_index","R","base","merge",sep=";"))
rm(DF1,DF2)

r0 = timing(CJ.dt(DT1,DT2),
            N, paste("cross_join",NA_character_,"no_index","R","data.table-optiRum","[.data.table",sep=";"))
stopifnot(data.equal.data.table(
  setDT(r1),
  setnames(r0,c("cust_code", "prod_code", "geog_code", "time_code", "curr_code","amount", "value", "i.cust_code", "i.prod_code", "i.geog_code","i.time_code", "i.curr_code", "i.amount", "i.value"),c("cust_code.x", "prod_code.x", "geog_code.x", "time_code.x", "curr_code.x", "amount.x", "value.x", "cust_code.y", "prod_code.y", "geog_code.y", "time_code.y", "curr_code.y", "amount.y", "value.y")),
  ignore_col_order = TRUE))
rm(r0)

select_sql <- paste0("SELECT db.cust_code cust_code,db.prod_code prod_code,db.geog_code geog_code,db.time_code time_code,db.curr_code curr_code,db.amount amount,db.value value_tmp,jn.cust_code i_cust_code,jn.prod_code i_prod_code,jn.geog_code i_geog_code,jn.time_code i_time_code,jn.curr_code i_curr_code,jn.amount i_amount,jn.value i_value FROM db CROSS JOIN jn")
r = lapply(db_test, function(db_t){
  try(db("DROP TABLE db", db_t),silent = TRUE)
  try(db("DROP TABLE jn", db_t),silent = TRUE)
  db(DT1,"db",db_t)
  db(DT2,"jn",db_t)
  r0 = eval(bquote(timing(db(.(select_sql),.(db_t)),N,paste("cross_join",NA_character_,"no_index","DB",db_t,"db",sep=";"))))
  setnames(r0,c("cust_code", "prod_code", "geog_code", "time_code", "curr_code","amount", "value_tmp", "i_cust_code", "i_prod_code", "i_geog_code","i_time_code", "i_curr_code", "i_amount", "i_value"),c("cust_code.x", "prod_code.x", "geog_code.x", "time_code.x", "curr_code.x", "amount.x", "value.x", "cust_code.y", "prod_code.y", "geog_code.y", "time_code.y", "curr_code.y", "amount.y", "value.y"))
  stopifnot(data.equal.data.table(r0,r1,ignore_col_order = TRUE))
  invisible()
})
non_idx_n = 2+length(db_test)

rm(DT1,DT2)

last = non_idx_n
showtiming(last=last)[,kable(.SD),.SDcols=-c("expr")]
invisible(showtiming(last=last)[,print_expr(.SD)])

lateral join

Lateral join case for top N in group by time.
SQLite does not support lateral.
Postgres supports but scales bad in case of no indices. You can uncomment code to run that test but be aware on 1e6 rows it was 4654.595 times slower than data.table way.

# populating unique time_code, bue to sql unordered nature, see why in "Partition by using non-aggregate function" section
DTunq = copy(X$SALES)
DTunq[,time_code:=as.Date(1:.N,origin="2015-01-01"),by="prod_code"]

# NO INDEX

# data.table
DT = copy(DTunq)
r1 = timing(DT[order(time_code), indx := seq_len(.N), "prod_code"][indx <= 5][,indx:=NULL][],
            N, paste("lateral_join",NA_character_,"no_index","R","data.table","[.data.table",sep=";"))

# dplyr
# could not use top_n due to https://github.com/hadley/dplyr/issues/921
DF = as.data.frame(DTunq)
r0 = timing(DF %>% group_by(prod_code) %>% arrange(time_code) %>% slice(1:5),
            N, paste("lateral_join",NA_character_,"no_index","R","dplyr","group_by-arrange-slice",sep=";"))
stopifnot(data.equal.data.table(r1,setDT(as.data.frame(r0))))
rm(DF,r0)

# sql # exclude postgres due to not scalling
if(length(db_test[!(db_test %like% "sqlite") && db_test!="psql"])>0){
  DT = copy(DTunq)
  REF = copy(X$PRODUCT)
  r = lapply(db_test[!(db_test %like% "sqlite")], function(db_t){ # timing of each sql per each db_test
    try(db("DROP TABLE db", db_t),silent = TRUE)
    try(db("DROP TABLE ref", db_t),silent = TRUE)
    db(DT,"db",db_t)
    db(REF,"ref",db_t)
    r0 = eval(bquote(timing(db("SELECT l.* FROM ref r JOIN LATERAL (SELECT * FROM db WHERE prod_code = r.prod_code ORDER BY time_code ASC LIMIT 5) l ON TRUE",.(db_t)),
                              N,paste("lateral_join",NA_character_,"no_index","DB",db_t,"db",sep=";"))))
    if(!data.equal.data.table(r0,r1)){
      stopifnot(data.equal.data.table(
        r0[,list(cust_code,prod_code,geog_code,time_code,curr_code,amount,value=round(value,2))],
        r1[,list(cust_code,prod_code,geog_code,time_code,curr_code,amount,value=round(value,2))]
        ))
    }
    invisible()
  })
}
non_idx_n <- 1 + 1 + 1*length(db_test[!(db_test %like% "sqlite") && db_test!="psql"])

# INDEX

DT = copy(DTunq)
setkey(DT, time_code, prod_code)
r0 = timing(DT[, indx := seq_len(.N), "prod_code"][indx <= 5][,indx:=NULL][],
            N, paste("lateral_join",NA_character_,"index","R","data.table","[.data.table",sep=";"))
stopifnot(data.equal.data.table(r1,r0))

if(length(db_test[!(db_test %like% "sqlite")])>0){
  DT = copy(DTunq)
  REF = copy(X$PRODUCT)
  r = lapply(db_test[!(db_test %like% "sqlite")], function(db_t){ # timing of each sql per each db_test
    try(db("DROP TABLE db", db_t),silent = TRUE)
    try(db("DROP TABLE ref", db_t),silent = TRUE)
    db(DT,"db",db_t)
    db(REF,"ref",db_t)
    db("CREATE INDEX db_idx_prod_code_time_code ON db (prod_code, time_code)",db_t)
    db("CREATE INDEX ref_idx_prod_code ON ref (prod_code)",db_t)
    r0 = eval(bquote(timing(db("SELECT l.* FROM ref r JOIN LATERAL (SELECT * FROM db WHERE prod_code = r.prod_code ORDER BY time_code ASC LIMIT 5) l ON TRUE",.(db_t)),
                              N,paste("lateral_join",NA_character_,"index","DB",db_t,"db",sep=";"))))
    if(!data.equal.data.table(r0,r1)){
      stopifnot(data.equal.data.table(
        r0[,list(cust_code,prod_code,geog_code,time_code,curr_code,amount,value=round(value,2))],
        r1[,list(cust_code,prod_code,geog_code,time_code,curr_code,amount,value=round(value,2))]
        ))
    }
    invisible()
  })
}

idx_n <- 1 + 1*length(db_test[!(db_test %like% "sqlite")])

rm(DT,DTunq,r1,r0)

last = non_idx_n+idx_n
showtiming(last=last)[,kable(.SD),.SDcols=-c("expr")]
invisible(showtiming(last=last)[,print_expr(.SD)])

update

DT = copy(X$SALES)
upd_cust_code = DT[,sample(cust_code,5,FALSE)]

# NO INDEX

DF = as.data.frame(copy(X$SALES))
r1 = timing(DF[DF$cust_code %in% upd_cust_code,"value"] <- 0,
            N, paste("update",NA_character_,"no_index","R","base","<-",sep=";"))

DF2 = as.data.frame(copy(X$SALES))
r0 = timing(DF2 <- mutate(DF2, value = replace(value, cust_code %in% upd_cust_code, 0)),
            N, paste("update",NA_character_,"no_index","R","dplyr","mutate",sep=";"))
stopifnot(data.equal.data.table(setDT(DF),setDT(as.data.frame(DF2))))

r0 = timing(DT[cust_code %in% upd_cust_code, value := 0],
            N, paste("update",NA_character_,"no_index","R","data.table",":=",sep=";"))
stopifnot(data.equal.data.table(DF,DT))

DT = copy(X$SALES)
r = lapply(db_test, function(db_t){ # timing of each sql per each db_test
  try(db("DROP TABLE db", db_t),silent = TRUE)
  db(DT,"db",db_t)
  rUpd = eval(bquote(timing(db(paste0("UPDATE db SET value = 0 WHERE cust_code IN ('",paste(upd_cust_code,collapse="','"),"')"),.(db_t)),
                            N,paste("update",NA_character_,"no_index","DB",db_t,"db",sep=";"))))
  rDB = db("SELECT * FROM db",db_t)
  stopifnot(data.equal.data.table(rDB,DF))
  invisible()
})
non_idx_n <- 1 + 1 + 1 + 1*length(db_test)

# INDEX

DT = copy(X$SALES)
setkey(DT,cust_code)
r0 = timing(DT[.(upd_cust_code), value := 0],
            N, paste("update",NA_character_,"index","R","data.table",":=",sep=";"))
stopifnot(data.equal.data.table(DF,DT))

DT = copy(X$SALES)
r = lapply(db_test, function(db_t){ # timing of each sql per each db_test
  try(db("DROP TABLE db", db_t),silent = TRUE)
  db(DT,"db",db_t)
  db("CREATE INDEX db_idx_cust_code ON db (cust_code)",db_t)
  rUpd = eval(bquote(timing(db(paste0("UPDATE db SET value = 0 WHERE cust_code IN ('",paste(upd_cust_code,collapse="','"),"')"),.(db_t)),
                            N,paste("update",NA_character_,"index","DB",db_t,"db",sep=";"))))
  rDB = db("SELECT * FROM db",db_t)
  stopifnot(data.equal.data.table(rDB,DF))
  invisible()
})
idx_n <- 1 + 1*length(db_test)

rm(DT,DF,r1,r0,r)

last = non_idx_n+idx_n
showtiming(last=last)[,kable(.SD),.SDcols=-c("expr")]
invisible(showtiming(last=last)[,print_expr(.SD)])

update from

SQLite workaround for join and update do not scale enough.

# NO INDEX

DT = copy(X$SALES)
REF = copy(X$PRODUCT)
set.seed(1)
REF[,prod_value_mp := rnorm(.N,1,sd=0.10)]
REF_back = copy(REF)

r1 = timing(setkey(DT,prod_code)[setkey(REF,prod_code), value := value * prod_value_mp],
            N, paste("update_from",NA_character_,"no_index","R","data.table","setkey-[.data.table",sep=";"))

# tried prefiltering cols in REF but it is slower: DF <- left_join(DF, select(REF,prod_code,prod_value_mp), by = "prod_code") %>% mutate(value = value * prod_value_mp) %>% select(-prod_value_mp)
DF = as.data.frame(X$SALES)
REF = as.data.frame(REF_back)
r0 = timing(DF <- left_join(DF, REF, by = "prod_code") %>% mutate(value = value * prod_value_mp) %>% select(cust_code,prod_code,geog_code,time_code,curr_code,amount,value),
            N, paste("update_from",NA_character_,"no_index","R","dplyr","left_join-mutate-select",sep=";"))
stopifnot(data.equal.data.table(r1,setDT(as.data.frame(DF))))
rm(r0)

if(length(db_test[!(db_test %like% "sqlite")])>0){
  DT = copy(X$SALES)
  REF = copy(REF_back)
  r = lapply(db_test[!(db_test %like% "sqlite")], function(db_t){ # timing of each sql per each db_test
    try(db("DROP TABLE db", db_t),silent = TRUE)
    try(db("DROP TABLE ref", db_t),silent = TRUE)
    db(DT,"db",db_t)
    db(REF,"ref",db_t)
    rUpd = eval(bquote(timing(db("UPDATE db SET value = value * ref.prod_value_mp FROM ref WHERE db.prod_code = ref.prod_code",.(db_t)),
                              N,paste("update_from",NA_character_,"no_index","DB",db_t,"db",sep=";"))))
    rDB = db("SELECT * FROM db",db_t)
    if(!data.equal.data.table(rDB,r1)){
      stopifnot(data.equal.data.table(
        rDB[,list(cust_code,prod_code,geog_code,time_code,curr_code,amount,value=round(value,2))],
        r1[,list(cust_code,prod_code,geog_code,time_code,curr_code,amount,value=round(value,2))]
        ))
    }
    invisible()
  })
}
non_idx_n = 1 + 1 + 1*length(db_test[!(db_test %like% "sqlite")])

# INDEX

DT = copy(X$SALES)
REF = copy(REF_back)

setkey(DT,prod_code)
setkey(REF,prod_code)
r0 = timing(DT[REF, value := value * prod_value_mp],
            N, paste("update_from",NA_character_,"index","R","data.table","[.data.table",sep=";"))
stopifnot(data.equal.data.table(r1,r0))

if(length(db_test[!(db_test %like% "sqlite")])>0){
  DT = copy(X$SALES)
  REF = copy(REF_back)
  r = lapply(db_test[!(db_test %like% "sqlite")], function(db_t){ # timing of each sql per each db_test
    try(db("DROP TABLE db", db_t),silent = TRUE)
    try(db("DROP TABLE ref", db_t),silent = TRUE)
    db(DT,"db",db_t)
    db(REF,"ref",db_t)
    db("CREATE INDEX db_idx_prod_code ON db (prod_code)",db_t)
    db("CREATE INDEX ref_idx_prod_code ON ref (prod_code)",db_t)
    rUpd = eval(bquote(timing(db("UPDATE db SET value = value * ref.prod_value_mp FROM ref WHERE db.prod_code = ref.prod_code",.(db_t)),
                              N,paste("update_from",NA_character_,"index","DB",db_t,"db",sep=";"))))
    rDB = db("SELECT * FROM db",db_t)
      if(!data.equal.data.table(rDB,r1)){
      stopifnot(data.equal.data.table(
        rDB[,list(cust_code,prod_code,geog_code,time_code,curr_code,amount,value=round(value,2))],
        r1[,list(cust_code,prod_code,geog_code,time_code,curr_code,amount,value=round(value,2))]
        ))
    }
    invisible()
  })
}
rm(REF,REF_back)
idx_n = 1 + 1*length(db_test[!(db_test %like% "sqlite")])

last=non_idx_n+idx_n
showtiming(last=last)[,kable(.SD),.SDcols=-c("expr")]
invisible(showtiming(last=last)[,print_expr(.SD)])

order by

DT = copy(X$SALES)

# NON INDEX

# base
DF = as.data.frame(DT)
r1 <-timing(DF[with(DF, order(cust_code,prod_code,geog_code,time_code,curr_code,amount,value)), ],
            N, paste("order",NA_character_,"no_index","R","base","[.data.frame-with",sep=";"))
# dt
r0 <- timing(DT[order(cust_code,prod_code,geog_code,time_code,curr_code,amount,value)],
             N, paste("order",NA_character_,"no_index","R","data.table","[.data.table",sep=";"))
stopifnot(data.equal.data.table(setDT(r1),r0))
# dplyr
DF = as.data.frame(DT)
r0 <- timing(arrange(DF,cust_code,prod_code,geog_code,time_code,curr_code,amount,value),
             N, paste("order",NA_character_,"no_index","R","dplyr","arrange",sep=";"))
stopifnot(data.equal.data.table(r1,setDT(r0)))
# sql
try(db("DROP TABLE db", db_test),silent = TRUE)
db(DT,"db",db_test)
r = lapply(db_test, function(db_t){ # timing of each sql per each db_test
  r0 = eval(bquote(timing(db(paste0("SELECT * FROM db ORDER BY ",paste(names(DT),collapse=',')),.(db_t)),
                          N,paste("order",NA_character_,"no_index","DB",db_t,"db",sep=";"))))
  stopifnot(data.equal.data.table(r1,r0))
  invisible()
})
non_idx_n <- 1 + 1 + 1 + 1*length(db_test)

rm(DF)

# INDEX
setkeyv(DT,names(DT))
r0 <- timing(DT[order(cust_code,prod_code,geog_code,time_code,curr_code,amount,value)],
             N, paste("order",NA_character_,"index","R","data.table","[.data.table",sep=";"))
stopifnot(data.equal.data.table(r1,r0))

#DT = copy(X$SALES)
#try(db("DROP TABLE db", db_test),silent = TRUE)
#db(DT,"db",db_test)
r = lapply(db_test, function(db_t){
  db(paste0("CREATE INDEX db_idx_all ON db (",paste(names(DT),collapse=","),")"),db_t)
})
r = lapply(db_test, function(db_t){ # timing of each sql per each db_test
  r0 = eval(bquote(timing(db(paste0("SELECT * FROM db ORDER BY ",paste(names(DT),collapse=',')),.(db_t)),
                          N,paste("order",NA_character_,"index","DB",db_t,"db",sep=";"))))
  stopifnot(data.equal.data.table(r1,r0))
  invisible()
})
idx_n <- 1 + 1*length(db_test)

rm(DT,r0,r1)

last = idx_n+non_idx_n
showtiming(last=last)[,kable(.SD),.SDcols=-c("expr")]
invisible(showtiming(last=last)[,print_expr(.SD)])

pivot

Tested databases (sqlite, psql) does not play well with pivot like statements:

DT = copy(X$SALES)

# NON INDEX

# dt
r1 <- timing(dcast.data.table(DT, curr_code ~ geog_code, value.var = "value", fun=sum, na.rm=TRUE),
             N, paste("pivot",NA_character_,"no_index","R","data.table","dcast.data.table",sep=";"))
# dplyr
DF = as.data.frame(DT)
# this should looks like: select(DF,-amount) %>% group_by(geog_code, curr_code) %>% summarise(value = sum(value)) %>% spread(geog_code, value)
# tidyr#41 workaround
r0 <- timing(select(DF,-amount) %>% group_by(geog_code,curr_code) %>% summarise(value=sum(value)) %>% spread(geog_code, value) %>% group_by(curr_code) %>% summarise_each(funs(sum(.,na.rm=TRUE))),
             N, paste("pivot",NA_character_,"no_index","R","dplyr-tidyr","select-group_by-summarise-spread-group_by-summarise_each",sep=";"))
rm(DF)
stopifnot(data.equal.data.table(r1,setDT(as.data.frame(r0))))

# sql, none psql and sqlite handle pivot well
non_idx_n <- 1 + 1
last = non_idx_n
showtiming(last=last)[,kable(.SD),.SDcols=-c("expr")]
invisible(showtiming(last=last)[,print_expr(.SD)])

unpivot

Tested databases (sqlite, psql) does not play well with unpivot like statements:

DT = copy(X$SALES)

# NON INDEX

# dt
r1 <- timing(melt.data.table(DT, id.vars=c("cust_code","prod_code","geog_code","time_code","curr_code"), measure.vars=c("amount","value"), variable.name="measure"),
             N, paste("unpivot",NA_character_,"no_index","R","data.table","melt.data.table",sep=";"))
# dplyr
DF = as.data.frame(DT)
r0 <- timing(DF %>% gather(measure, value, -cust_code,-prod_code,-geog_code,-time_code,-curr_code),
             N, paste("unpivot",NA_character_,"no_index","R","dplyr-tidyr","gather",sep=";"))
rm(DF)
stopifnot(data.equal.data.table(r1,as.data.frame(r0)))
non_idx_n <- 1 + 1
last = non_idx_n
showtiming(last=last)[,kable(.SD),.SDcols=-c("expr")]
invisible(showtiming(last=last)[,print_expr(.SD)])

distinct

DT = copy(X$SALES)

# NON INDEX

# dt
r1 <- timing(unique(DT,by=names(DT)),
             N, paste("distinct",NA_character_,"no_index","R","data.table","unique",sep=";"))
# # base R removed due to low scalability
# DF = as.data.frame(DT)
# r0 <- timing(unique(DF),
#              N, paste("distinct",NA_character_,"no_index","R","base","unique",sep=";"))
# stopifnot(data.equal.data.table(r1,setDT(r0)))
# dplyr
DF = as.data.frame(DT)
r0 <- timing(distinct(DF),
             N, paste("distinct",NA_character_,"no_index","R","dplyr","distinct",sep=";"))
stopifnot(data.equal.data.table(r1,setDT(r0)))
# sql
r = lapply(db_test, function(db_t){ # timing of each sql per each db_test
  try(db("DROP TABLE db", db_t),silent = TRUE)
  db(DT,"db",db_t)
  r0 = eval(bquote(timing(db("SELECT DISTINCT * FROM db",.(db_t)),
                          N,paste("distinct",NA_character_,"no_index","DB",db_t,"db",sep=";"))))
  stopifnot(data.equal.data.table(r1,r0))
  invisible()
})
non_idx_n <- 1 + 1 + 1*length(db_test)

rm(DF)

# INDEX
setkeyv(DT,names(DT))
r0 <- timing(unique(DT,by=names(DT)),
             N, paste("distinct",NA_character_,"index","R","data.table","unique",sep=";"))
stopifnot(data.equal.data.table(r1,r0))

DT = copy(X$SALES)
r = lapply(db_test, function(db_t){ # timing of each sql per each db_test
  #try(db("DROP TABLE db", db_t),silent = TRUE)
  #db(DT,"db",db_t)
  db(paste0("CREATE INDEX db_idx_all ON db (",paste(names(DT),collapse=","),")"),db_t)
  r0 = eval(bquote(timing(db("SELECT DISTINCT * FROM db",.(db_t)),
                          N,paste("distinct",NA_character_,"index","DB",db_t,"db",sep=";"))))
  stopifnot(data.equal.data.table(r1,r0))
  invisible()
})
idx_n <- 1 + 1*length(db_test)

rm(DT,r0,r1)

last = idx_n+non_idx_n
showtiming(last=last)[,kable(.SD),.SDcols=-c("expr")]
invisible(showtiming(last=last)[,print_expr(.SD)])

union

DT = copy(X$SALES)

set.seed(1)
i1 = sample(1:nrow(DT),nrow(DT)/2, replace = FALSE)
i2 = sample(1:nrow(DT),nrow(DT)/2, replace = FALSE)
DT1 = DT[i1]
DT2 = DT[i2]
rm(DT,i1,i2)

# NON INDEX only

# dt
r1 <- timing(unique(rbindlist(list(DT1,DT2))),
             N, paste("union",NA_character_,"no_index","R","data.table","unique-rbindlist",sep=";"))
DF1 = as.data.frame(DT1)
DF2 = as.data.frame(DT2)
# # base R removed due to low scalability
# r0 <-timing(unique(rbind(DF1,DF2)),
#             N, paste("union",NA_character_,"no_index","R","base","unique-rbind",sep=";"))
# stopifnot(data.equal.data.table(r1,setDT(r0)))
# dplyr, union performance issue reported: dplyr/issues/924, below is workaround
r0 <- timing(unique(bind_rows(DF1,DF2)),
             N, paste("union",NA_character_,"no_index","R","dplyr","unique-bind_rows",sep=";"))
stopifnot(data.equal.data.table(r1,setDT(as.data.frame(r0))))
rm(DF1,DF2)
# sql
r = lapply(db_test, function(db_t){ # timing of each sql per each db_test
  try(db("DROP TABLE db1", db_t),silent = TRUE)
  try(db("DROP TABLE db2", db_t),silent = TRUE)
  db(DT1,"db1",db_t)
  db(DT2,"db2",db_t)
  r0 = eval(bquote(timing(db("SELECT * FROM db1 UNION SELECT * FROM db2",.(db_t)),
                          N,paste("union",NA_character_,"no_index","DB",db_t,"db",sep=";"))))
  stopifnot(data.equal.data.table(r1,r0))
  invisible()
})
non_idx_n <- 1 + 1 + 1*length(db_test)

rm(DT1,DT2,r0,r1)

last = non_idx_n
showtiming(last=last)[,kable(.SD),.SDcols=-c("expr")]
invisible(showtiming(last=last)[,print_expr(.SD)])

union all

DT = copy(X$SALES)

DT1 = DT[1:(nrow(DT)/2)]
DT2 = DT[((nrow(DT)/2)+1):nrow(DT)]
rm(DT)

# NON INDEX only

# base
DF1 = as.data.frame(DT1)
DF2 = as.data.frame(DT2)
r1 <-timing(rbind(DF1,DF2),
            N, paste("union_all",NA_character_,"no_index","R","base","rbind",sep=";"))
# dt
r0 <- timing(rbindlist(list(DT1,DT2)),
             N, paste("union_all",NA_character_,"no_index","R","data.table","rbindlist",sep=";"))
stopifnot(data.equal.data.table(setDT(r1),r0))
# dplyr
r0 <- timing(bind_rows(list(DF1,DF2)),
             N, paste("union_all",NA_character_,"no_index","R","dplyr","bind_rows",sep=";"))
stopifnot(data.equal.data.table(r1,setDT(as.data.frame(r0))))
rm(DF1,DF2,r0)
# sql
try(db("DROP TABLE db1", db_test),silent = TRUE)
try(db("DROP TABLE db2", db_test),silent = TRUE)
db(DT1,"db1",db_test)
db(DT2,"db2",db_test)
r = lapply(db_test, function(db_t){ # timing of each sql per each db_test
  r0 = eval(bquote(timing(db("SELECT * FROM db1 UNION ALL SELECT * FROM db2",.(db_t)),
                          N,paste("union_all",NA_character_,"no_index","DB",db_t,"db",sep=";"))))
  stopifnot(data.equal.data.table(r1,r0))
  invisible()
})
non_idx_n <- 1 + 1 + 1 + 1*length(db_test)

rm(DT1,DT2,r1)

last = non_idx_n
showtiming(last=last)[,kable(.SD),.SDcols=-c("expr")]
invisible(showtiming(last=last)[,print_expr(.SD)])

minus

DT = copy(X$SALES)

set.seed(1)
i1 = sample(1:nrow(DT),nrow(DT)/2, replace = FALSE)
i2 = sample(1:nrow(DT),nrow(DT)/2, replace = FALSE)
DT1 = DT[i1]
DT2 = DT[i2]
rm(DT,i1,i2)

# NON INDEX only

# dt
r1 <- timing(data.table:::setdiff_(DT1,DT2),
             N, paste("minus",NA_character_,"no_index","R","data.table","data.table:::setdiff_",sep=";"))
#stopifnot(data.equal.data.table(setDT(r1),r0))
# dplyr distinct
DF1 = as.data.frame(DT1)
DF2 = as.data.frame(DT2)
r0 <- timing(setdiff(DF1,DF2),
             N, paste("minus",NA_character_,"no_index","R","dplyr","setdiff",sep=";"))
stopifnot(data.equal.data.table(r1,setDT(r0)))
rm(DF1,DF2)
# sql
try(db("DROP TABLE db1", db_test),silent = TRUE)
try(db("DROP TABLE db2", db_test),silent = TRUE)
db(DT1,"db1",db_test)
db(DT2,"db2",db_test)
r = lapply(db_test, function(db_t){ # timing of each sql per each db_test
  r0 = eval(bquote(timing(db("SELECT * FROM db1 EXCEPT SELECT * FROM db2",.(db_t)),
                          N,paste("minus",NA_character_,"no_index","DB",db_t,"db",sep=";"))))
  stopifnot(data.equal.data.table(r1,r0))
  invisible()
})
non_idx_n <- 1 + 1 + 1*length(db_test)

rm(DT1,DT2,r0,r1)

last = non_idx_n
showtiming(last=last)[,kable(.SD),.SDcols=-c("expr")]

Note that data.table:::setdiff_ function is not exported yet, so you should take care of eventual changes (incl. function name). It is likely it will be exported soon.

invisible(showtiming(last=last)[,print_expr(.SD)])

intersect

DT = copy(X$SALES)

set.seed(1)
i1 = sample(1:nrow(DT),nrow(DT)/2, replace = FALSE)
i2 = sample(1:nrow(DT),nrow(DT)/2, replace = FALSE)

# NON INDEX

# dt
DT1 = DT[i1]
DT2 = DT[i2]
r1 <- timing(setkeyv(unique(DT1),names(DT1))[setkeyv(unique(DT2),names(DT2)),.SD,nomatch=0L,.SDcols=c(names(DT1))],
             N, paste("intersect",NA_character_,"no_index","R","data.table","unique-setkeyv-[.data.table",sep=";"))
# dplyr
DT1 = DT[i1]
DT2 = DT[i2]
DF1 = as.data.frame(DT1)
DF2 = as.data.frame(DT2)
r0 <- timing(intersect(DF1,DF2),
             N, paste("intersect",NA_character_,"no_index","R","dplyr","intersect",sep=";"))
stopifnot(data.equal.data.table(r1,setDT(r0)))
rm(DF1,DF2,i1,i2)
# sql
try(db("DROP TABLE db1", db_test),silent = TRUE)
try(db("DROP TABLE db2", db_test),silent = TRUE)
db(DT1,"db1",db_test)
db(DT2,"db2",db_test)
r = lapply(db_test, function(db_t){ # timing of each sql per each db_test
  r0 = eval(bquote(timing(db("SELECT * FROM db1 INTERSECT SELECT * FROM db2",.(db_t)),
                          N,paste("intersect",NA_character_,"no_index","DB",db_t,"db",sep=";"))))
  stopifnot(data.equal.data.table(r1,r0))
  invisible()
})
non_idx_n <- 1 + 1 + 1*length(db_test)

# INDEX
setkeyv(DT1,names(DT1))
setkeyv(DT2,names(DT2))
r0 <- timing(DT1[DT2,.SD,nomatch=0L,.SDcols=c(names(DT1))],
             N, paste("intersect",NA_character_,"index","R","data.table","[.data.table",sep=";"))

# sql
# DT1 = DT[i1]
# DT2 = DT[i2]
# try(db("DROP TABLE db1", db_test),silent = TRUE)
# try(db("DROP TABLE db2", db_test),silent = TRUE)
# db(DT1,"db1",db_test)
# db(DT2,"db2",db_test)
r = lapply(db_test, function(db_t){
  db(paste0("CREATE INDEX db1_idx_all ON db1 (",paste(names(DT1),collapse=","),")"),db_t)
  db(paste0("CREATE INDEX db2_idx_all ON db1 (",paste(names(DT2),collapse=","),")"),db_t)
})
r = lapply(db_test, function(db_t){ # timing of each sql per each db_test
  r0 = eval(bquote(timing(db("SELECT * FROM db1 INTERSECT SELECT * FROM db2",.(db_t)),
                          N,paste("intersect",NA_character_,"index","DB",db_t,"db",sep=";"))))
  stopifnot(data.equal.data.table(r1,r0))
  invisible()
})

idx_n <- 1 + 1*length(db_test)

rm(DT,DT1,DT2,r0,r1)

last = idx_n + non_idx_n
showtiming(last=last)[,kable(.SD),.SDcols=-c("expr")]
invisible(showtiming(last=last)[,print_expr(.SD)])

Benchmark summary

rm(X)
if(file.exists("benchmark.rds")) invisible(file.remove("benchmark.rds"))
if(file.exists("benchmark.csv")) invisible(file.remove("benchmark.csv"))
invisible(lapply(getOption("dwtools.db.conns"), function(db_test) dbDisconnect(db_test[["conn"]])))
options("dwtools.db.conns"=NULL)
if(file.exists("sqlite.db")) invisible(file.remove("sqlite.db"))

Environment details

Update this section in case of publishing reproduction of benchmark.

amazon_ec2_R3 = data.table(model=c("r3.large","r3.xlarge","r3.2xlarge","r3.4xlarge","r3.8xlarge"),
                           vcpu=2^(1:5),
                           memory_gb=15.25*2^(0:4),
                           ssd_storage_n=c(1,1,1,1,2),
                           ssd_storage_gb=c(32,80,160,320,320),
                           key="model")[.("r3.xlarge"),print(kable(.SD))]
print(sessionInfo())
## Benchmark timings

# query benchmark results only from current session
dt = get.timing(FALSE)
saveRDS(dt,paste0("benchmark_",N,"_",getOption("dwtools.session"),".rds")) # store logs for easier merging multiple N timings
# TO DO some overview plots for multiple tests, yet single N:
# each tool vs mean
# dt <- dt[,c("test","scenario","indexing","environment","tool","fun") := rbindlist(lapply(strsplit(tag, ";", fixed="TRUE"),as.list))
#          ][,.(elapsed),by = c("dwtools_session","in_n","test","scenario","indexing","environment","tool")]
# rm(dt)

Conclusions

R can cover ETL needs well and scalable. It's computing time and memory usage beats competitors.
The biggest limitation is the need to fit data in memory, but I believe it will not be an issues for most use cases (see 2 000 000 000 rows data.table benchmark).
R's flexibility as programming language greatly enhance capabilities of such ETL approach.

References

In terms of quality code refence there are few interested topics:

Also one of the bests way to learn can be from the courses of authors of the tools:

Notable articles:

And good R books:

Reproduction script for amazon EC2:

sudo apt-key adv --keyserver keyserver.ubuntu.com --recv-keys E084DAB9
sudo add-apt-repository 'deb  http://cran.stat.ucla.edu/bin/linux/ubuntu trusty/'
sudo apt-get update
sudo apt-get -y install r-base-core libcurl4-gnutls-dev libxml2-dev htop git gdebi-core libapparmor1 postgresql libpq-dev
sudo su postgres -c "psql -c \"CREATE USER dwtools WITH PASSWORD 'dwtools_pass';\""
sudo su postgres -c "createdb -O dwtools dwtools"
wget http://download2.rstudio.org/rstudio-server-0.98.1103-amd64.deb # get latest version from: http://www.rstudio.com/products/rstudio/download-server/
sudo gdebi rstudio-server-0.98.1103-amd64.deb
sudo ln -s /usr/lib/rstudio-server/bin/pandoc/pandoc /usr/local/bin # for rmarkdown from R console
sudo ln -s /usr/lib/rstudio-server/bin/pandoc/pandoc-citeproc /usr/local/bin
sudo useradd -m -p $(openssl passwd -1 rstudio_user_pass) rstudio_user # add RStudio user
install.packages(c("RCurl","devtools","knitr","microbenchmark","rmarkdown","DBI","dplyr","tidyr","RSQLite","RPostgreSQL","optiRum"), repos="http://cran.stat.ucla.edu")
library(devtools)
install_github("Rdatatable/data.table")
install_github("jangorecki/dwtools")
cat("Document produced in ",round(proc.time()[[3]] - global_start,2)," seconds.\n",sep="")


jangorecki/dwtools documentation built on May 18, 2019, 12:24 p.m.