#------------------------------------------------------------------------------#
#-------------------------------- yg::sqlite.r --------------------------------#
#------------------------- author: gyang274@gmail.com -------------------------#
#------------------------------------------------------------------------------#
#--------+---------+---------+---------+---------+---------+---------+---------#
#234567890123456789012345678901234567890123456789012345678901234567890123456789#
#------------------------------------------------------------------------------#
#------------------------------------ init ------------------------------------#
#------------------------------------------------------------------------------#
#' `%|%`: set x default to y when x is null or na
`%|%` <- function(x, y) { if ( is.null(x) || is.na(x) ) y else x }
#' `%+%`: concatenate strings
#' @note `%+%` is also defined in ggplot2 and when loaded ggplot2 after yg
#' using ggplot2::`%+%` in connecting string will cause R stop and crashes
#' in such case it would be the best to use commented version as integrate
#' `%+%` <- function(x, y) {
#'
#' if ( is.environment(x) && is.environment(y) ) {
#'
#' return( ggplot2::`%+%`(x, y) )
#'
#' } else {
#'
#' return( paste0(x, y) )
#'
#' }
#' }
`%+%` <- function(stringX, stringY) { return( paste0(stringX, stringY) ) }
#------------------------------------------------------------------------------#
#------------------------------------------------------------------------------#
#--------------------------------- data.table ---------------------------------#
#------------------------------------------------------------------------------#
#' removeWS
#' replace whitespace "\t", "\n" and "\\s+" with single whitespace ' '.
removeWS <- function(x) {
x %>%
gsub('\t' , ' ', .) %>%
gsub('\n' , ' ', .) %>%
gsub('\\s+', ' ', .)
}
#' CJ.dt
#' CJ on data.table
CJ.dt <- function(X, Y) {
stopifnot(is.data.table(X), is.data.table(Y))
k = NULL
X = X[, c(k = 1L, .SD)]
setkey(X, k)
Y = Y[, c(k = 1L, .SD)]
setkey(Y, NULL)
return( X[Y, allow.cartesian=TRUE][, k := NULL][] )
}
#' dtFillNA
dtFillNA <- function(dt, fillNA = 0, fillNA_I = 0L, fillNA_C = "",
jcolidx = NULL, jcolnms = NULL) {
# fill by name:
# for ( j in names(dt) ) {
#
# set(dt, which(is.na(dt[[j]])), j, fillNA)
#
# }
# fill by number:
# slightly faster
if ( ! is.null(jcolidx) ) {
message("fillNA on column index: ", jcolidx, " ...\n")
if ( all(jcolidx <= dim(dt)[2L]) ) {
# jcolidx <- jcolidx
} else {
stop("fillNA on column index: ", paste(jcolidx, sep = ", "), " not all in dt column index: ", paste(dim(dt)[2L] , sep = ", "), "...\n")
}
} else if ( ! is.null(jcolnms) ) {
message("fillNA on column names: ", paste(jcolnms, sep = ", "), " ...\n")
if ( all(jcolnms %in% colnames(dt)) ) {
jcolidx <- match(jcolnms, colnames(dt))
} else {
stop("fillNA on column names: ", paste(jcolnms, sep = ", "), " not all in dt column names: ", paste(colnames(dt), sep = ", "), " ...\n")
}
} else {
# message("fillNA on all columns ...\n")
jcolidx <- seq_len(ncol(dt))
}
for ( j in jcolidx ) {
if ( class(dt[[j]]) == "integer" ) {
set(dt, which(is.na(dt[[j]])), j, fillNA_I)
} else if ( class(dt[[j]]) == "numeric" ) {
set(dt, which(is.na(dt[[j]])), j, fillNA)
} else if ( class(dt[[j]]) == "character" ) {
set(dt, which(is.na(dt[[j]])), j, fillNA_C)
} else {
message("warning: column ", j, " is not integer, numeric or character.\n")
set(dt, which(is.na(dt[[j]])), j, fillNA)
}
}
return(dt)
}
#' dtRfrhTb
#' update dt with tb based on id on columns in vd (all none id column in tb)
#' nomatched dt id will be value specified in nomatch (kept intact as in dt)
#' nofounded tb id will be added into dt with value in tb when nofound = TRUE
dtRfrhTb <- function(dt, tb, id, vd = NULL, nomatch = NULL, nofound = FALSE,
id_unique = TRUE, in_situ = TRUE) {
if ( in_situ ) {
dt <- dt %>% `class<-`(c("data.table", "data.frame"))
} else {
dt <- copy(dt) %>% `class<-`(c("data.table", "data.frame"))
}
tb <- tb %>% `class<-`(c("data.table", "data.frame"))
if ( ! all(c(id, vd) %in% colnames(dt)) ) { stop("dtRfrhTb: c(id, vd) not all in colnames(dt).") }
if ( ! all(c(id, vd) %in% colnames(tb)) ) { stop("dtRfrhTb: c(id, vd) not all in colnames(tb).") }
if ( is.null(vd) ) {
vd <- setdiff(intersect(colnames(dt), colnames(tb)), id)
if ( length(vd) == 0 ) {
stop("dtRfrhTb: refresh no column.\n")
}
}
dt_id <- eval(parse(text = "dt %$% paste0(" %+% paste(id, collapse = ", ") %+% ")"))
if ( length(unique(dt_id)) < nrow(dt) ) {
if ( id_unique ) {
stop("dtRfrhTb: none unique id in dt.\n")
} else {
message("dtRfrhTb: none unique id in dt.\n")
}
}
tb_id <- eval(parse(text = "tb %$% paste0(" %+% paste(id, collapse = ", ") %+% ")"))
if ( length(unique(tb_id)) < nrow(tb) ) {
if ( id_unique ) {
stop("dtRfrhTb: none unique id in tb.\n")
} else {
message("dtRfrhTb: none unique id in tb.\n")
}
}
# inspired by plyr::mapvalues
mapidx <- match(dt_id, tb_id)
mapidxNA <- is.na(mapidx)
# update on matched idx
eval(parse(text = 'dt[!mapidxNA, c("' %+% paste0(vd, collapse = '" , "') %+% '") := as.list(tb[mapidx[!mapidxNA], vd, with = FALSE])]'))
# update on nomatch idx
if ( !is.null(nomatch) ) {
eval(parse(text = 'dt[mapidxNA, c("' %+% paste0(vd, collapse = '" , "') %+% '") := nomatch]'))
}
# insert id in tb nofound in dt
idx_found_tb <- sort(unique(mapidx))
if ( length(idx_found_tb) != length(tb_id) && nofound ) {
message("dtRfrhTb: insert id in tb not in dt into dt.\n")
idx_nofound_tb <- setdiff(seq_len(length(tb_id)), idx_found_tb)
dt <- dt %>%
rbind(tb[idx_nofound_tb, c(id, vd), with = FALSE], use.names = TRUE, fill = TRUE)
}
return(dt)
}
#------------------------------------------------------------------------------#
#------------------------------------------------------------------------------#
#----------------------- execute windows system command -----------------------#
#------------------------------------------------------------------------------#
#' executesc
#' executesc wrapper on system() to execute system commands
#' @note use shell() on calling and kicking off batch files
executesc <- function(sc, ...) {
message("execute system command: ", substitute(sc), " ...\n")
.ptc <- proc.time()
x <- system(command = sc, ...)
if ( x != 0 ) {
message("execute system command return none success info?\n")
}
.ptd <- proc.time() - .ptc
message("execute system command: ", substitute(sc), " consumes ", .ptd[3], " ...\n")
message("execute system command: ", substitute(sc), " ... done.\n")
return(x)
}
#------------------------------------------------------------------------------#
#------------------------------------------------------------------------------#
#------------------------ bcp azure sql table or query ------------------------#
#------------------------------------------------------------------------------#
#' bcp_azure_table
#' copy tb from db into fn
#' must connect to designated azsql database within azdb
#' because azsql server dsnt allow db change after login
#' @param db: list of srv, usr, pwd, and dbn
bcp_azure_table <- function(tb, db, fn) {
winAuth <- db[["winAuth"]] %|% FALSE
if ( winAuth ) {
message("bcp_azure_table: connect sql server with windows authentication.\n")
sc <- "bcp " %+% tb %+% " out " %+% fn %+%
" -S " %+% db[["srv"]] %+%
" -d " %+% db[["dbn"]] %+%
" -T -c -r \r\n"
} else {
sc <- "bcp " %+% tb %+% " out " %+% fn %+%
" -S " %+% db[["srv"]] %+%
" -U " %+% db[["usr"]] %+%
" -P " %+% db[["pwd"]] %+%
" -d " %+% db[["dbn"]] %+%
" -c -r \r\n"
}
executesc(sc)
}
#' bcp_azure_query
#' copy qy from db into fn
#' @param db: list of srv, usr, pwd, and dbn
bcp_azure_query <- function(qy, db, fn) {
winAuth <- db[["winAuth"]] %|% FALSE
if ( winAuth ) {
message("bcp_azure_query: connect sql server with windows authentication.\n")
sc <- "bcp \"" %+% qy %+% "\" queryout " %+% fn %+%
" -S " %+% db[["srv"]] %+%
" -d " %+% db[["dbn"]] %+%
" -T -c -r \r\n"
} else {
sc <- "bcp \"" %+% qy %+% "\" queryout " %+% fn %+%
" -S " %+% db[["srv"]] %+%
" -U " %+% db[["usr"]] %+%
" -P " %+% db[["pwd"]] %+%
" -d " %+% db[["dbn"]] %+%
" -c -r \r\n"
}
executesc(sc)
}
#' bcp_azure_inrdt
#' copy dt from r into db
#' @param db: list of srv, usr, pwd, and dbn
bcp_azure_inrdt <- function(dt, tb, db, qy_fmt = NULL, overwrite = TRUE,
id = NULL, id_unique = TRUE) {
winAuth <- db[["winAuth"]] %|% FALSE
if ( is.null(qy_fmt) ) {
qy_fmt <- "create table [" %+% db[["dbn"]] %+% "]." %+% tb %+%
" (" %+%
paste(paste(colnames(dt), det_col_type(dt), "NULL"), collapse = ", ") %+%
");" %>% removeWS
}
if ( overwrite ) {
qy_fmt <- "if object_id('[" %+% db[["dbn"]] %+% "]." %+% tb %+%"', 'u') is not null " %+%
"drop table [" %+% db[["dbn"]] %+% "]." %+% tb %+% ";\n" %+%
qy_fmt %>% removeWS
}
sqodbc_executeqy(qy = qy_fmt, db = db)
fm_tmpt_file <- getwd() %+% gsub("\\\\", "/", tempfile(pattern = "fm", tmpdir = "", fileext = ".xml"))
if ( winAuth ) {
fm <- "bcp " %+% tb %+% " format nul -c -x -f " %+% fm_tmpt_file %+%
" -S " %+% db[["srv"]] %+%
" -d " %+% db[["dbn"]] %+%
" -T "
} else {
fm <- "bcp " %+% tb %+% " format nul -c -x -f " %+% fm_tmpt_file %+%
" -S " %+% db[["srv"]] %+%
" -U " %+% db[["usr"]] %+%
" -P " %+% db[["pwd"]] %+%
" -d " %+% db[["dbn"]]
}
xs_fm <- executesc(fm)
sc_tmpt_file <- getwd() %+% gsub("\\\\", "/", tempfile(pattern = "sc", tmpdir = "", fileext = ".txt"))
write.table(dt, file = sc_tmpt_file, col.names = FALSE, row.names = FALSE, quote = FALSE, na = "", sep = "\t")
if ( winAuth ) {
sc <- "bcp " %+% tb %+% " in " %+% sc_tmpt_file %+% " -f " %+% fm_tmpt_file %+%
" -S " %+% db[["srv"]] %+%
" -d " %+% db[["dbn"]] %+%
" -T "
} else {
sc <- "bcp " %+% tb %+% " in " %+% sc_tmpt_file %+% " -f " %+% fm_tmpt_file %+%
" -S " %+% db[["srv"]] %+%
" -U " %+% db[["usr"]] %+%
" -P " %+% db[["pwd"]] %+%
" -d " %+% db[["dbn"]]
}
xs_sc <- executesc(sc)
if ( ! is.null(id) ) {
# gsub("[[:punct:]]", "", tb) remove "." and all other special characters in tb
qy_idx <-'create ' %+% ifelse(id_unique, 'unique ', ' ') %+%
'index idx_' %+% gsub("[[:punct:]]", "", tb) %+% ' on ' %+% tb %+% '(' %+% paste0(id, collapse = ', ') %+% ');'
sqodbc_executeqy(qy = qy_idx, db = db)
}
if ( file.exists(fm_tmpt_file) ) { file.remove(fm_tmpt_file) }
if ( file.exists(sc_tmpt_file) ) { file.remove(sc_tmpt_file) }
return(xs_sc)
}
det_col_type <- function(x) {
# cc <- data.table(idx = 1:ncol(x), r_type = vapply(x, class, character(1L)))
#
# dd <- data.table(r_type = c("character" , "numeric" , "integer"),
# s_type = c("varchar(255)", "float(16)", "int" ))
#
# tt <- merge(cc, dd, by = "r_type", all.x = TRUE, all.y = FALSE) %>%
# setorder(idx) %>% .[ , s_type, drop = TRUE]
cc <- data.table(r_type = vapply(x, class, character(1L)))
dd <- data.table(r_type = c("character" , "numeric" , "integer"),
s_type = c("varchar(255)", "float(16)", "int" ))
tt <- merge(cc, dd, by = "r_type", all.x = TRUE, all.y = FALSE, sort = FALSE) %>% .[ , s_type, drop = TRUE]
return(tt)
}
#------------------------------------------------------------------------------#
#------------------------------------------------------------------------------#
#---------------------- wrapper on rsqlite sql functions ----------------------#
#------------------------------------------------------------------------------#
#' sqlite_executeqs
#'
#' @description
#'
#' wrapper on sqlite3
#'
#' @details
#'
#' wrapper on sqlite3 command line shell command
#'
#' chooses no wrapper on the dbGetQuery function
#'
#' because of dbGetQuery can not handle mutliple line query with "\n"
#'
sqlite_executeqs <- function(qs, db) {
qs_tmpt_file <- getwd() %+% gsub("\\\\", "/", tempfile(pattern = "qs", tmpdir = "", fileext = ".sql"))
message("write qs into ", qs_tmpt_file, " ...\n")
writeLines(qs, qs_tmpt_file)
executesc(sc = 'sqlite3 ' %+% db %+% ' ".read ' %+% qs_tmpt_file %+% '"')
if ( file.exists(qs_tmpt_file) ) { file.remove(qs_tmpt_file) }
return(NULL)
}
#' sqlite_executeqy wrapper on dbGetQuery
sqlite_executeqy <- function(qy, db, ...) {
.sqlite_db <- DBI::dbConnect(DBI::dbDriver("SQLite"), db)
xs <- DBI::dbGetQuery(.sqlite_db, qy, ...)
on.exit(DBI::dbDisconnect(.sqlite_db))
return(xs)
}
#' sqlite_existsqtb wrapper on dbExistsTable
sqlite_existsqtb <- function(tb, db) {
.sqlite_db <- DBI::dbConnect(DBI::dbDriver("SQLite"), db)
xs <- DBI::dbExistsTable(.sqlite_db, tb)
on.exit(DBI::dbDisconnect(.sqlite_db))
return(xs)
}
#' sqlite_fetchsqtb wrapper on dbReadTable
sqlite_fetchsqtb <- function(tb, db, ...) {
message("fetch table: ", substitute(tb), " from ", substitute(db), " ...\n")
.ptc <- proc.time()
.sqlite_db <- DBI::dbConnect(DBI::dbDriver("SQLite"), db)
dt <- DBI::dbReadTable(.sqlite_db, tb, ...) %>%
`class<-`(c("data.table", "data.frame"))
on.exit(DBI::dbDisconnect(.sqlite_db))
.ptd <- proc.time() - .ptc
message("fetch table: ", substitute(tb), " consumes ", .ptd[3], " seconds.\n")
message("fetch table: ", substitute(tb), " from ", substitute(db), " ... done.\n")
return(dt)
}
#' sqlite_removestb wrapper on dbRemoveTable
sqlite_removestb <- function(tb, db) {
.sqlite_db <- DBI::dbConnect(DBI::dbDriver("SQLite"), db)
xs <- DBI::dbRemoveTable(.sqlite_db, tb)
on.exit(DBI::dbDisconnect(.sqlite_db))
return(xs)
}
#' sqlite_uploadrdt wrapper on dbWriteTable
#' id and id_unique - create [unique] index
sqlite_uploadrdt <- function(
dt, tb, db, id = NULL, id_unique = TRUE, index_name = NULL,
overwrite = TRUE, append = FALSE, ...
) {
message("sqlite_uploadrdt: load table ", substitute(tb), " into ", substitute(db), " ...\n")
.ptc <- proc.time()
.sqlite_db <- DBI::dbConnect(DBI::dbDriver("SQLite"), db)
xs <- DBI::dbWriteTable(
conn = .sqlite_db, name = tb, value = dt,
overwrite = overwrite, append = append,
...
)
if ( !is.null(id) ) {
if ( !all(id %in% colnames(dt)) ) {
stop(substitute(id), " not in ", substitute(dt))
}
if ( is.null(index_name) ) {
index_name = paste0("idx", "_", tb)
}
if ( is.null(DBI::dbGetQuery(
conn = .sqlite_db, statement = paste0("pragma index_info('", index_name, "')")
)) ) {
message("sqlite_uploadrdt: create index ", index_name, ".\n")
DBI::dbGetQuery(
conn = .sqlite_db,
statement = paste0(
'create ', ifelse(id_unique, 'unique ', ' '),
'index ', index_name, ' on ', tb, '(', paste0(id, collapse = ', '), ');'
)
)
} else {
message("sqlite_uploadrdt: reindex ", index_name, " since index already exists.\n")
DBI::dbGetQuery(
conn = .sqlite_db, statement = paste0('reindex ', index_name, ';')
)
}
}
# on case any error stops call
on.exit(DBI::dbDisconnect(.sqlite_db))
.ptd <- proc.time() - .ptc
message("sqlite_uploadrdt: load table ", substitute(tb), " consumes ", .ptd[3], " seconds.\n")
message("sqlite_uploadrdt: load table ", substitute(tb), " into ", substitute(db), " ... done.\n")
return(xs)
}
#' sqlite_uploadrdt_dtlist
#' @description
#' upload a list contains multiple dt into sqlite
#' a wrapper over yg::sqlite_uploadrdt()
#' TODO: (should) extend to list of list of dt[?]
#' TODO: (should) extend id into idlist [?] - [x]
sqlite_uploadrdt_dtlist <- function(
dtlist, tblist, db, id = NULL,
id_unique = TRUE, index_name = NULL,
overwrite = TRUE, append = FALSE, ...
) {
message("sqlite_uploadrdt_dtlist: load table list into ", substitute(db), " ...\n")
.ptc <- proc.time()
dtname <- names(dtlist)
if ( is.null(tblist) ) tblist <- dtname
if ( length(tblist) != length(dtlist) ) stop("sqlite_uploadrdt_dtlist: length(tblist) != length(dtlist)?\n")
eval(parse(text = paste0("
sqlite_uploadrdt(dt = dtlist[[", dtname, "]], tb =", tblist, ", db = db, id = id, id_unique = id_unique, index_name = index_name, overwrite = overwrite, append = append, ...)"
)))
.ptd <- proc.time() - .ptc
message("sqlite_uploadrdt_dtlist: load table list consumes ", .ptd[3], " seconds.\n")
message("sqlite_uploadrdt_dtlist: load table list into ", substitute(db), " ... done.\n")
return(NULL)
}
#' sqlite_refreshtb
#' refreshtb will update value when primary key exist,
#' and also insert new row when primary key not exist.
sqlite_refreshtb <- function(
dt, tb, id, db, dt_coltp = NULL, id_unique = TRUE,
batch_size = 10000L
) {
message("refresh table: ", substitute(tb), " in ", substitute(db), " ...\n")
.ptc <- proc.time()
.nn <- nrow(dt)
.sqlite_db <- DBI::dbConnect(DBI::dbDriver("SQLite"), db)
if ( ! (tb %in% DBI::dbListTables(.sqlite_db)) ) {
message("sqlite_refreshtb: create ", substitute(tb), " not in ", substitute(db), " ... \n")
# dplyr::copy_to cann't create unqiue index?
# .src_sqlite_db <- dplyr::src_sqlite(db, create = TRUE)
# dplyr::copy_to(dest = .src_sqlite_db, df = dt, name = tb, indexes = list(id), temporary = FALSE)
# rm(.src_sqlite_db)
# gc()
dt_coltp <- dt_coltp %|% "text"
# dbGetQuery cann't handle '\n' - execute first line limit 1
.qs <- 'create table ' %+% tb %+% ' (' %+% paste(colnames(dt), dt_coltp, collapse = ', ') %+% ');\n' %+%
'create ' %+% ifelse(id_unique, 'unique ', ' ') %+%
'index idx_' %+% tb %+% ' on ' %+% tb %+% '(' %+% paste0(id, collapse = ', ') %+% ');\n'
sqlite_executeqs(qs = .qs, db = db)
message("sqlite_refreshtb: create ", substitute(tb), " not in ", substitute(db), " ... done.\n")
}
.nm <- DBI::dbListFields(.sqlite_db, tb)
if ( ! all(colnames(dt) == .nm) ) {
stop("dt and tb must have same name.\n")
}
if ( ! all(id %in% .nm) ) {
stop("id not in tb\n")
}
.nk <- setdiff(.nm, id)
.idx <- 0
while (.idx < .nn) {
.xx <- dt[(.idx + 1):min(.idx + batch_size, .nn), ]
.qs <- 'begin;\n' %+%
'insert or replace into ' %+% tb %+%
' (' %+% paste0(.nm, collapse = ", ") %+% ') ' %+%
'values' %+%
'("' %+% paste0(apply(.xx, 1, paste0, collapse = '", "'), collapse = '"), ("') %+% '");\n' %+%
'commit;\n'
sqlite_executeqs(qs = .qs, db = db)
.idx <- min(.idx + batch_size, .nn)
}
on.exit(DBI::dbDisconnect(conn = .sqlite_db))
.ptd <- proc.time() - .ptc
message("refresh table: ", substitute(tb), " consumes ", .ptd[3], " seconds.\n")
message("refresh table: ", substitute(tb), " in ", substitute(connectsql), " ... done.\n")
return(NULL)
}
#' sqlite_subsetidx
#' fetch an subset of index table from sqlite db - inner join
#' @param dt: a data.frame/data.table in R with column of key
#' @param tb: a full reference table in sqlite with key value
#' @return a data.table with value append to the R data.table
#' id is limit to single index - add capability to id multiple index
sqlite_subsetidx <- function(dt, tb, db, id, batch_size = 10000,
all.dt = FALSE, all.tb = FALSE) {
message("subset index from table: ", substitute(tb), " in ", substitute(db), " ...\n")
.ptc = proc.time()
dt <- dt %>% `class<-`(c("data.table", "data.frame"))
# limit id to single index
# dt_idx <- eval(substitute(dt %>% dplyr::select(.idx), list(.idx = parse(text = id)))) %>% unique
dt_idx <- eval(substitute(dt %>% dplyr::select(.idx), list(.idx = parse(text = id)))) %>% unique %>% unlist(use.names = FALSE)
.sqlite_db <- DBI::dbConnect(DBI::dbDriver("SQLite"), db)
# .nn <- nrow(dt_idx)
.nn <- length(dt_idx)
.nm <- DBI::dbListFields(.sqlite_db, tb)
# limit 1 is potential bug when empty table
# fixit use dbGetQuery(.sqlite_db, 'PRAGMA table_info("' %+% tb %+% '")')
# .nc <- dbGetQuery(.sqlite_db, "select * from " %+% tb %+% " limit 1") %>% apply(2, class)
# dt_rtn <- matrix(NA_character_, .nn, length(.nm)) %>% data.table %>% `colnames<-`(.nm)
# fix attempt 1 - limit to handle type of int and text
.nc <- DBI::dbGetQuery(.sqlite_db, 'PRAGMA table_info("' %+% tb %+% '")') %>% `[[`("type")
dt_rtn <- eval(parse(text = "data.table(" %+% paste0(
paste0(.nm %+% " = rep(" %+% ifelse(.nc == "int", "NA_integer_", "NA_character_") %+% " , " %+% .nn %+% ")" ),
collapse = ", ") %+% ")"))
.ik <- 0
.jk <- 0
while (.ik < .nn) {
# limit id to single index
# .xx <- dt_idx[(.ik + 1):min(.ik + batch_size, .nn), ] %>% unlist(use.names = FALSE)
.xx <- dt_idx[(.ik + 1):min(.ik + batch_size, .nn)]
# double to escape ' when query against sqlite sql
.qy <- 'select * from ' %+% tb %+% ' where ' %+% id %+% ' in ("' %+% paste0(gsub("'", "''", .xx), collapse = '", "') %+% '")'
dt_rtn_chunk <- DBI::dbGetQuery(conn = .sqlite_db, statement = .qy)
dt_rtn[(.jk + 1):(.jk + nrow(dt_rtn_chunk)), ] <- dt_rtn_chunk
.ik <- min(.ik + batch_size, .nn)
.jk <- .jk + nrow(dt_rtn_chunk)
message("sqlite_subsetidx: bulk-match ", .ik, " rows ...\n")
}
dt_rtn <- dt_rtn[1:.jk, ] %>% `class<-`(c("data.table", "data.frame"))
dtAgmt <- merge(dt, dt_rtn, by = id, all.x = all.dt, all.y = all.tb) %>%
`class<-`(c("data.table", "data.frame"))
.ptd = proc.time() - .ptc
message("sqlite_subsetidx: consumes ", .ptd[3], " seconds ...\n")
message("subset index from table: ", substitute(tb), " in ", substitute(db), " ... done.\n")
return(dtAgmt)
}
#' sqlite_selectidx wrapper on dbGetQuery
#' select * from tb in db where id = id_value
sqlite_selectidx <- function(tb, db, id, id_value) {
.sqlite_db <- DBI::dbConnect(DBI::dbDriver("SQLite"), db)
# double to escape ' when query against sqlite sql
xs <- DBI::dbGetQuery(.sqlite_db, "select * from " %+% tb %+% " where " %+% paste(id, "'" %+% gsub("'", "''", id_value) %+% "'", sep = " = ", collapse = " and ") %+% ";") %>%
`class<-`(c("data.table", "data.frame"))
on.exit(DBI::dbDisconnect(.sqlite_db))
return(xs)
}
#------------------------------------------------------------------------------#
#------------------------------------------------------------------------------#
#----------------------- wrapper on rodbc sql functions -----------------------#
#------------------------------------------------------------------------------#
#' sqodbc_createcnn wrapper on odbcDriverConnect
#' @param db: list of [dsn] srv usr pwd and dbn
sqodbc_createcnn <- function(db) {
# require windows dsn configuration #
# RODBC::odbcConnect(
# dsn = db[["dsn"]],
# uid = db[["usr"]],
# pwd = db[["pwd"]]
# )
winAuth <- db[["winAuth"]] %|% FALSE
db[["drv"]] <- db[["drv"]] %|% "ODBC Driver 11 for SQL Server"
if ( winAuth ) {
xc <- RODBC::odbcDriverConnect(
connection = paste0(
"driver=" , db[["drv"]], ";",
"server=" , db[["srv"]], ";",
"database=" , db[["dbn"]], ";",
"trusted_connection=yes"
)
)
} else {
xc <- RODBC::odbcDriverConnect(
connection = paste0(
"driver=" , db[["drv"]], ";",
"server=" , db[["srv"]], ";",
"uid=" , db[["usr"]], ";",
"pwd=" , db[["pwd"]], ";",
"database=" , db[["dbn"]]
)
)
}
return(xc)
}
#' sqodbc_executeqy wrapper on sqlQuery
#' @param db: list of [dsn] srv usr pwd and dbn
sqodbc_executeqy <- function(qy, db, ...) {
message("execute query: ", substitute(qy), " on ", substitute(db), " ...\n")
.ptc <- proc.time()
.sql <- sqodbc_createcnn(db)
x <- RODBC::sqlQuery(channel = .sql, query = qy, ...)
RODBC::odbcClose(.sql)
if ( ! (is.data.frame(x) || length(x) == 0 || x == "No Data") ) {
message("query returns none data and none success info?\n")
}
.ptd <- proc.time() - .ptc
message("execute query: ", substitute(qy), " consumes ", .ptd[3], " seconds.\n")
message("execute query: ", substitute(qy), " on ", substitute(db), " ... done.\n")
return(x)
}
#' sqodbc_existsqtb
#' @param db: list of [dsn] srv usr pwd and dbn
sqodbc_existsqtb <- function(tb, db, tb_type = c("table", "view"), ...) {
message("exists table: ", substitute(tb), " on ", substitute(db), " ...\n")
.ptc <- proc.time()
tb_list = unlist(strsplit(tb, split = ".", fixed = TRUE))
if ( length(tb_list) == 1 ) {
tb_name = tb_list[1]
tb_schm = "dbo"
} else if ( length(tb_list) == 2 ) {
tb_name = tb_list[2]
tb_schm = tb_list[1]
} else {
stop("exists table: tb name must have at most one . and specify db with dbn ...\n")
}
.sql <- sqodbc_createcnn(db)
xs <- RODBC::sqlTables(.sql) %>% `class<-`(c("data.table", "data.frame"))
RODBC::odbcClose(.sql)
names(xs) <- tolower(names(xs))
xs <- data.frame(lapply(xs, tolower)) %>% `class<-`(c("data.table", "data.frame"))
xstb <- xs[table_schem == tb_schm & table_name == tb_name & table_type %in% tb_type]
if ( dim(xstb)[1] > 0 ) {
xstb_b = TRUE
} else {
xstb_b = FALSE
}
.ptd <- proc.time() - .ptc
message("exists table: ", substitute(tb), " consumes ", .ptd[3], " seconds.\n")
message("exists table: ", substitute(tb), " on ", substitute(db), " ... done.\n")
return( xstb_b )
}
#' sqodbc_fetchsqtb wrapper on sqlFetch
sqodbc_fetchsqtb <- function(tb, db, ...) {
message("fetch table: ", substitute(tb), " from ", substitute(db), " ...\n")
.ptc <- proc.time()
.sql <- sqodbc_createcnn(db)
x <- RODBC::sqlFetch(channel = .sql, sqtable = tb, ...)
RODBC::odbcClose(.sql)
if ( !is.data.frame(x) && length(x) != 0 ) {
message("query returns none data and none success info?\n")
}
.ptd <- proc.time() - .ptc
message("fetch table: ", substitute(tb), " consumes ", .ptd[3], " seconds.\n")
message("fetch table: ", substitute(tb), " from ", substitute(db), " ... done.\n")
return(x)
}
#' sqodbc_uploadrdt wrapper on sqlSave
#' make sqodbc_uploadrdt wrapper on bcp_azure_inrdt?
sqodbc_uploadrdt <- function(dt, tb, db, id = NULL, id_unique = TRUE,
rownames = FALSE, colnames = FALSE,
safer = TRUE, fast = TRUE, overwrite = TRUE, ...) {
message("upload table: ", substitute(tb), " to ", substitute(db), " ...\n")
.ptc <- proc.time()
e <- sqodbc_existsqtb(tb, db)
if ( e && !overwrite ) {
message("upload table: tb exists on db and not overwrite ...\n")
} else {
.sql <- sqodbc_createcnn(db)
if ( e ) { sqlDrop(channel = .sql, sqtable = tb) }
x <- RODBC::sqlSave(channel = .sql, dat = dt, tablename = tb, rownames = rownames,
colnames = colnames, safer = safer, fast = fast, ...)
if ( !is.null(id) ) {
if ( ! all( id %in% colnames(dt) ) ) { stop(substitute(id), " not in ", substitute(dt)) }
RODBC::sqlQuery(channel = .sql, query = 'create ' %+% ifelse(id_unique, 'unique ', ' ') %+%
'index idx_' %+% gsub("[[:punct:]]", "", tb) %+% ' on ' %+% tb %+% '(' %+% paste0(id, collapse = ', ') %+% ');')
}
RODBC::odbcClose(.sql)
if ( x != 1 ) {
message("upload table: query returns none data and none success info?\n")
}
}
.ptd <- proc.time() - .ptc
message("upload table: ", substitute(tb), " consumes ", .ptd[3], " seconds.\n")
message("upload table: ", substitute(tb), " to ", substitute(db), " ... done.\n")
return(x)
}
#' sqodbc_removetb wrapper on sqlDrop
sqodbc_removestb <- function(tb, db) {
message("remove table: ", substitute(tb), " on ", substitute(db), " ...\n")
.ptc <- proc.time()
if ( sqodbc_existsqtb(tb, db) ) {
.sql <- sqodbc_createcnn(db)
x <- RODBC::sqlDrop(channel = .sql, sqtable = tb, error = FALSE)
RODBC::odbcClose(.sql)
} else {
message("remove table: ", substitute(tb), " not exists on ", substitute(db), " ...\n")
x <- NULL
}
# if ( ! (is.data.frame(x) || length(x) == 0 || x == "No Data") ) {
#
# message("query returns none data and none success info?\n")
#
# }
if ( !is.null(x) && x == -1 ) {
message("query returns none data and none success info?\n")
}
.ptd <- proc.time() - .ptc
message("remove table: ", substitute(tb), " consumes ", .ptd[3], " seconds.\n")
message("remove table: ", substitute(tb), " on ", substitute(db), " ... done.\n")
return(x)
}
#' sqodbc_updatestb wrapper on sqlUpdate
#' sqlUpdate will update value when primary key exist,
#' but no insert or append when primary key not exist.
sqodbc_updatestb <- function(dt, tb, id, db, ...) {
message("update table: ", substitute(tb), " in ", substitute(db), " ...\n")
.ptc <- proc.time()
.nn <- nrow(dt)
.sql <- sqodbc_createcnn(db)
x <- RODBC::sqlUpdate(channel = .sql, dat = dt, tablename = tb, index = id, ...)
RODBC::odbcClose(.sql)
if ( x != 1 ) {
message("query returns none data and none success info?\n")
}
.ptd <- proc.time() - .ptc
message("update table: ", substitute(tb), " consumes ", .ptd[3], " seconds.\n")
message("update table: ", substitute(tb), " in ", substitute(db), " ... done.\n")
return(x)
}
#' sqodbc_refreshtb
#' refreshtb mannual insert rows
#' refreshtb will update value when primary key exist,
#' and also insert new row when primary key not exist.
sqodbc_refreshtb <- function(dt, tb, id, db, batch_size = 10000) {
message("refresh table: ", substitute(tb), " in ", substitute(db), " ...\n")
.ptc <- proc.time()
.nn <- nrow(dt)
.sql <- sqodbc_createcnn(db)
.nm <- RODBC::sqlColumns(.sql, tb) %>% `[[`("COLUMN_NAME")
RODBC::odbcClose(.sql)
if ( ! all(colnames(dt) == .nm) ) {
stop("dt and tb must have same name.\n")
}
if ( ! (id %in% .nm) ) {
stop("id not in tb\n")
}
.nk <- setdiff(.nm, id)
.idx <- 0
while (.idx < .nn) {
.xx <- dt[(.idx + 1):min(.idx + batch_size, .nn), ]
.qy <- "insert into " %+% tb %+%
" (" %+% paste0(.nm, collapse = ", ") %+% ") " %+%
"values" %+%
" ('" %+% paste0(apply(.xx, 1, paste0, collapse = "', '"), collapse = "'), ('") %+% "') " %+%
"on duplicate key update " %+% paste0(sprintf("%s = values(%s)", .nk, .nk), collapse = ", ")
suppressMessages(sqodbc_executeqy(.qy, db))
.idx <- min(.idx + batch_size, .nn)
}
.ptd <- proc.time() - .ptc
message("refresh table: ", substitute(tb), " consumes ", .ptd[3], " seconds.\n")
message("refresh table: ", substitute(tb), " in ", substitute(db), " ... done.\n")
return(NULL)
}
#' sqodbc_subsetidx
#' fetch an subset of index table from sqodbc db - inner join
#' @param dt: a data.frame/data.table in R with column of key
#' @param tb: a full reference table in sqodbc with key value
#' @return a data.table with value append to the R data.table
#' id limit to single index: add capability to id multi index
#' sqodbc_subsetidx is a simplier verison of sqlite_subsetidx
#' without checking on type of each column when define dt_rtn
sqodbc_subsetidx <- function(dt, tb, db, id, batch_size = 10000,
all.dt = FALSE, all.tb = FALSE) {
# TODO: add batch_size to improve efficiency when subset a large set of id
message("subset index from table: ", substitute(tb), " in ", substitute(db), " ...\n")
.ptc = proc.time()
dt <- dt %>% `class<-`(c("data.table", "data.frame"))
# TODO: id limit to single index: add capability to id multi index
# dt_idx <- eval(substitute(dt %>% dplyr::select(.idx), list(.idx = parse(text = id)))) %>% unique
dt_idx <- eval(substitute(dt %>% dplyr::select(.idx), list(.idx = parse(text = id)))) %>% unique %>% unlist(use.names = FALSE)
.sql <- sqodbc_createcnn(db)
# TODO: id limit to single index: add capability to id multi index
# .nn <- nrow(dt_idx)
.nn <- length(dt_idx)
.ik <- 0
.kk <- 0
while (.ik < .nn) {
# TODO: id limit to single index: add capability to id multi index
# .xx <- dt_idx[(.ik + 1):min(.ik + batch_size, .nn), ] %>% unlist(use.names = FALSE)
.xx <- dt_idx[(.ik + 1):min(.ik + batch_size, .nn)]
# double to escape ' when query against azure sql
.qy <- "select * from " %+% tb %+% " where " %+% id %+% " in ('" %+% paste0(gsub("'", "''", .xx), collapse = "', '") %+% "')"
.vv <- RODBC::sqlQuery(channel = .sql, query = .qy)
do.call('<-', list(x = paste("dt_rtn_chunk", .kk, sep = "_"), value = .vv))
.ik <- min(.ik + batch_size, .nn)
.kk <- .kk + 1
message("sqlite_subsetidx: bulk-match ", .ik, " rows ...\n")
}
RODBC::odbcClose(.sql)
eval(parse(text = "dt_rtn <- rbind(" %+% paste("dt_rtn_chunk", 0:(.kk - 1), sep = "_", collapse = ", ") %+% ")"))
dt_rtn <- dt_rtn %>% `class<-`(c("data.table", "data.frame"))
dtAgmt <- merge(dt, dt_rtn, by = id, all.x = all.dt, all.y = all.tb) %>%
`class<-`(c("data.table", "data.frame"))
.ptd = proc.time() - .ptc
message("sqodbc_subsetidx: consumes ", .ptd[3], " seconds ...\n")
message("subset index from table: ", substitute(tb), " in ", substitute(db), " ... done.\n")
return(dtAgmt)
}
#' sqodbc_selectidx wrapper on sqlQuery
#' select * from tb in db where id = id_value
sqodbc_selectidx <- function(tb, db, id, id_value) {
.sql <- sqodbc_createcnn(db)
# double to escape ' when query against azure sql
x <- RODBC::sqlQuery(channel = .sql, query = "select * from " %+% tb %+% " where " %+% paste(id, "'" %+% gsub("'", "''", id_value) %+% "'", sep = " = ", collapse = " and ") %+% ";") %>%
`class<-`(c("data.table", "data.frame"))
RODBC::odbcClose(.sql)
return(x)
}
#------------------------------------------------------------------------------#
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.