R/db.R

Defines functions has.spaces is.sql is.sql.get is.sql.send is.table.name list.sub auto.table.name db

Documented in auto.table.name db has.spaces is.sql is.sql.get is.sql.send is.table.name list.sub

# technical ---------------------------------------------------------------

#' @title has.spaces
#' @description test if character has spaces, accept vector, return scalar logical.
#' @keywords internal
has.spaces <- function(x){
  r = grepl(" ", x)
  if(length(unique(r)) > 1){
    stop("Provided character vector should be a vector of table names (must not contains spaces) or vector of sql statements (must contains spaces). Now it contains both types (some contains spaces, the others are not contains spaces).")
  }
  return(unique(r))
}

#' @title is.sql
#' @description test if character is.sql (has spaces), accept vector, return scalar logical.
#' @keywords internal
is.sql <- function(x){
  r = has.spaces(x)
  return(r)
}

#' @title is.sql.get
#' @description test if character is.sql.get, accept vector, return scalar logical.
#' @keywords internal
is.sql.get <- function(x){
  r = toupper(substr(x,1,7))=="SELECT "
  if(length(unique(r)) > 1){
    stop("Provided character vector of sql statements should start with 'SELECT ' for all the elements or for none of them.")
  }
  return(unique(r))
}

#' @title is.sql.send
#' @description test if character is.sql.send, accept vector, return scalar logical.
#' @keywords internal
is.sql.send <- function(x){
  r = toupper(substr(x,1,7))!="SELECT "
  if(length(unique(r)) > 1){
    stop("Provided character vector of sql statements should start with 'SELECT ' for all the elements or for none of them.")
  }
  return(unique(r))
}

#' @title is.table.name
#' @description test if character is.table.name (no spaces), accept vector, return scalar logical.
#' @keywords internal
is.table.name <- function(x){
  r = !has.spaces(x)
  return(r)
}

#' @title list.sub
#' @description Take \code{i} element of the list \code{x} if not present or NULL then return \code{fill}. Used in \code{db} for \dots argument processing.
#' @keywords internal
list.sub <- function(x, i, fill=NULL){
  if(length(x)<i || is.null(x[[i]])) r = fill
  else r = x[[i]]
  return(r)
}

# db helpers --------------------------------------------------------------

#' @title Auto table name generate
#' @description Automatic table name generation based on first ncol and first nchar of those cols, all pasted, suffix added as timestamp*1e3.
#' @param x character names of data.table.
#' @param ncol integer number of cols to use.
#' @param nchar integer number of characters of each col to use.
#' @export auto.table.name
auto.table.name <- function(x, ncol=getOption("dwtools.db.auto.table.name.ncol"), nchar=getOption("dwtools.db.auto.table.name.nchar")){
  col.names <- gsub(",","_",gsub(" ","_",tolower(substring(na.omit(x[1:ncol]), 1, nchar))))
  now <- as.POSIXlt(Sys.time())
  suffix <- paste0(as.character(now,"%Y%m%d%H%M"),sprintf("%05d", trunc(now$sec*1e3)))
  if(nchar(suffix)<17) paste0(suffix,paste(rep("0",17-nchar(suffix)),collapse=""))
  paste(paste(col.names,collapse="_"),suffix,sep="_")
}

# db -------------------------------------------------------------------

#' @title Simple database interface
#' @description Common db interface for DBI, RJDBC, RODBC and other custom defined off-memory storage.
#' @param x data.table (to save in db) or character of table names or character of sql commands.
#' @param \dots if \code{x} is data.table then \dots expects character table names and character connection names else \dots expects only character connection names.
#' @param key character or integer, character vector to be used to set key or integer columns position to setkey.
#' @param .db.preprocess logical.
#' @param .db.postprocess logical.
#' @param .db.conns list of connections uniquely named.
#' @param .db.dict data.table db interface dictionary.
#' @param .db.batch.action character action name, use only when no recycling required, no action detection required, no timing required.
#' @param timing logical measure timing, make timings for each query in case of vectorized input, read \link{timing}.
#' @param verbose integer, if greater than 0 then print debugging messages.
#' @details Function is designed to be slim and chainable in data.table \code{`[`} operator. It accept vectorized input for all combinations of character arguments, see \emph{Multiple tables/connections} section.
#' \itemize{
#' \item \code{dbWriteTable} - \code{x} is data.table: \code{db(DT,"my_tab")}
#' \item \code{dbReadTable} - \code{x} character table name: \code{db("my_tab")}
#' \item \code{dbGetQuery} - \code{x} character with spaces and starts with \code{"SELECT "}: \code{db("SELECT col1 FROM my_tab1")}
#' \item \code{dbSendQuery} - \code{x} character with spaces and \strong{not} starts with \code{"SELECT "}: \code{db("UPDATE my_tab1 SET col1 = NULL")}
#' }
#' @note In case of \emph{get} and \emph{send} actions any semicolon \emph{;} sign as the last char will be removed from query. When \emph{write} is used it will also set \emph{tablename} attribute by reference to a data.table which was written to database, because it is done by reference it will alter input data.table also and overwrite any previous \emph{tablename} attribute, you can always use \code{newDT <- db(copy(DT),NULL)} to keep both.
#' @return In case of \strong{write / read / get} the data.table object (possibly with some extra attributes). In case of \strong{send} action the send query results.
#' @section Multiple tables/connections:
#' Table names, sql commands, connection names can be character vectors. It allows processing into multiple connections and tables at once. The list of results will be returned, except the \emph{write} action where single data.table will be always returned (for chaining). It will be named by the connection names, so if the connecion name was recycled (e.g. \code{db(c("my_tab1","my_tab2"))}) then there will be duplicated names in the resulted list.
#' @section Limitations:
#' Table names must not contains spaces (which are accepted by some db vendors).\cr
#' SQL send statements should contains spaces, e.g. sqlite \code{.tables} command needs to be written as \code{db("SELECT * FROM sqlite_master WHERE type='table'")}.\cr
#' Below are the per driver name limitations:
#' \itemize{
#' \item \code{csv}: No \strong{get} and \strong{send} actions. Extension \emph{.csv} is automatically added to provided table name character (or to \link{auto.table.name} if table name was not provided).
#' \item \code{JDBC}: Might not support \emph{append} for all jdbc drivers.
#' }
#' @section Auto table name:
#' If writing to db and table name is missing or NULL then the \link{auto.table.name} will be used, auto generated tablename can be catched for later use by \code{(attr(DT,"tablename",TRUE)}, read note section.
#' @section DB interface dictionary:
#' If you read/write to non-default schema you should use \code{"my_schema1.my_tab1"} table names, it will be translated to expected format for target db, e.g. for postgres: \code{c("my_schema1","my_tabl1")}.\cr
#' SQL statements are of course not unified but most of the syntax is already common across different db.\cr
#' There are preprocessing and postprocessing functions available per defined db driver. Those functions can be used for seemless integration in case if write/read to db lose classes of the data. This gives R ability to act as data hub and gain value as ETL tool.\cr
#' You can add new db interfaces by extending \link{db_dict}. Pull Requests are welcome.
#' @seealso \link{dbCopy}, \link{timing}
#' @export
#' @example tests/example-db.R
db <- function(x, ..., key,
               .db.preprocess = getOption("dwtools.db.preprocess"),
               .db.postprocess = getOption("dwtools.db.postprocess"),
               .db.conns = getOption("dwtools.db.conns"),
               .db.dict = getOption("dwtools.db.dict"),
               .db.batch.action = getOption("dwtools.db.batch.action"),
               timing = getOption("dwtools.timing"),
               verbose = getOption("dwtools.verbose")){
  
  ####
  ###   Start by folding braclets
  ####
  ## 
  ## For vectorized input this function will call self again in lapply
  ## When using timing=TRUE this allows to retain and log substituted expressions
  ## for each of the elements in input vector separately
  ## 
  
  if(is.null(.db.batch.action)){
    
    ### validate input
    
    if(missing(x)){
      stop("x argument must be provided to db function")
    } # stop on missing 'x'
    if(is.null(.db.conns) || length(.db.conns)==0){
      if(!getOption("dwtools.db.silent.drvName.csv",FALSE)) warning("You should define 'dwtools.db.conns' option to route db requests, `options('dwtools.db.conns'=list(source1=source1,source2=source2))`, read ?db examples. It will use csv connection now. You can suppress the warning by setting `options('dwtools.db.silent.drvName.csv'=TRUE)`")
      .db.conns = list(csv1 = list(drvName = "csv"))
    } # warning if connections not defined, set csv connection
    if(is.list(.db.conns) && !is.list(.db.conns[[1]])){
      stop("Correct 'dwtools.db.conns' option should be list of uniquely named lists, one for each connection, even if there is only one. Fix your connections definition, see examples.")
    } # stop on incorrect 'dwtools.db.conns'
    if(!is.character(x) && !is.data.table(x)){
      stop("Argument 'x' should be single data.table or character (can be vector) sql statement or table name.")
    } # stop when 'x' is not character or data.table
    
    ### catch dots
    
    dots = list(...) # magic ui decoder
    
    ### recycle
    
    ## write:
    # 1 DT save to 1 table in 1 conn
    # 1 DT save to 1 table in X conns
    # 1 DT save to X tables in X conns
    # 1 DT save to X tables in 1 conn
    ## read:
    # 1 table in 1 conn
    # 1 table in X conns
    # X tables in X conns
    # X tables in 1 conn
    ## get/send:
    # 1 sql in 1 conn
    # 1 sql in X conns
    # X sqls in X conns
    # X tables in 1 conn
    name <- NULL
    if(missing(key)) key <- NULL
    if(is.data.table(x)){
      action = "write"
      name = list.sub(x=dots,i=1,fill=NULL)
      conn.name = list.sub(x=dots,i=2,fill=names(.db.conns[1]))
      if(!is.null(name) && length(conn.name)==1 && length(name)>1) conn.name = rep(conn.name,length(name))
      if(!is.null(name) && length(conn.name)>1 && length(name)==1) name = rep(name,length(conn.name))
      if(is.null(name)) name = vector(mode="list",length(conn.name))
    } # write
    else if(is.character(x) && is.table.name(x)){
      action = "read"
      conn.name = list.sub(x=dots,i=1,fill=names(.db.conns[1]))
      if(length(conn.name)==1 && length(x)>1) conn.name = rep(conn.name,length(x))
      if(length(conn.name)>1 && length(x)==1) x = rep(x,length(conn.name))
    } # read
    else if(is.character(x) && is.sql(x) && is.sql.get(x)){
      action = "get"
      conn.name = list.sub(x=dots,i=1,fill=names(.db.conns[1]))
      x <- vapply(x, function(x) if(substr(x,nchar(x),nchar(x))==";") x <- substr(x,1,nchar(x)-1) else x, "", USE.NAMES=FALSE)
      if(length(conn.name)==1 && length(x)>1) conn.name = rep(conn.name,length(x))
      if(length(conn.name)>1 && length(x)==1) x = rep(x,length(conn.name))
    } # get
    else if(is.character(x) && is.sql(x) && is.sql.send(x)){
      action = "send"
      conn.name = list.sub(x=dots,i=1,fill=names(.db.conns[1]))
      x <- vapply(x, function(x) if(substr(x,nchar(x),nchar(x))==";") x <- substr(x,1,nchar(x)-1) else x, "", USE.NAMES=FALSE)
      if(length(conn.name)==1 && length(x)>1) conn.name = rep(conn.name,length(x))
      if(length(conn.name)>1 && length(x)==1) x = rep(x,length(conn.name))
    } # send
    else{
      stop("Unsupported input, `db` expects `x` as data.table / character sql statement / character table name.")
    } # else error
    
    ### skip batch processing for scalar input
    
    if(!isTRUE(timing) && verbose == 0L && length(conn.name)==1L) .db.batch.action <- action
    
  } # run only once for db(), skip on vectorized input
  else{ # single process
    # same as above but no validate input and no recycle
    dots = list(...) # magic ui decoder
    if(.db.batch.action=="write"){
      action = "write"
      name = list.sub(x=dots,i=1,fill=NULL)
      conn.name = list.sub(x=dots,i=2,fill=names(.db.conns[1]))
    }
    else if(.db.batch.action %in% c("read","get","send")){
      action = "read"
      name = NULL
      conn.name = list.sub(x=dots,i=1,fill=names(.db.conns[1]))
    }
  } # run only for vectorized input, or on demand by param
  
  ### batch processing
  
  if(is.null(.db.batch.action)){
    pretty_log_on_timing <- function(i, x, name, conn.name, key, .timing, verbose){
      action <- getOption("dwtools.db.batch.action")
      if(action=="write" && is.null(name[[i]])) name[[i]] <- auto.table.name(names(x))
      r <- eval(bquote(
        switch(action,
               write = invisible(timing(db(x,.(name[[i]]),.(conn.name[[i]])), nrow(x), tag=paste("db",action,name[[i]],conn.name[[i]],sep=getOption("dwtools.tag.sep",";")), .timing=.timing, verbose=verbose)),
               read = if(is.null(key)){
                 timing(db(.(x[[i]]),.(conn.name[[i]])), NA_integer_, tag=paste("db",action,x[[i]],conn.name[[i]],sep=getOption("dwtools.tag.sep",";")), .timing=.timing, verbose=verbose)[]
               } else {
                 timing(db(.(x[[i]]),.(conn.name[[i]]), key=.(key)), NA_integer_, tag=paste("db",action,x[[i]],conn.name[[i]],sep=getOption("dwtools.tag.sep",";")), .timing=.timing, verbose=verbose)[]
               },
               get = if(is.null(key)){
                 timing(db(.(x[[i]]),.(conn.name[[i]])), NA_integer_, tag=paste("db",action,x[[i]],conn.name[[i]],sep=getOption("dwtools.tag.sep",";")), .timing=.timing, verbose=verbose)[]
                } else {
                  timing(db(.(x[[i]]),.(conn.name[[i]]), key=.(key)), NA_integer_, tag=paste("db",action,x[[i]],conn.name[[i]],sep=getOption("dwtools.tag.sep",";")), .timing=.timing, verbose=verbose)[]
                },
               send = invisible(timing(db(.(x[[i]]),.(conn.name[[i]])), NA_integer_, tag=paste("db",action,x[[i]],conn.name[[i]],sep=getOption("dwtools.tag.sep",";")), .timing=.timing, verbose=verbose)),
               stop("not supported action, reset `options('dwtools.db.batch.action'=NULL)` may help, read ?db"))
      ))
      # for batch processing on write the single process should not return DT, whole batch must return single DT, so it will return tablename of DT, yet retain correct out_n in timings
      if(action=="write") r <- name[[i]]
      return(r)
    }
    r <- devtools::with_options( # using options it is possible to have cleaner expression field in timing logs
      new = c("dwtools.db.batch.action"=action, # used in inner db(), NOT ONLY in pretty_log_on_timing where setting options would be redundant
              "dwtools.timing"=FALSE),
      code = lapply(setNames(seq_len(length(conn.name)),conn.name), pretty_log_on_timing, x=x, name=name, conn.name=conn.name, key=key, .timing=timing, verbose=verbose)
    )
    if(action=="write"){
      tbls <- unlist(r)
      r <- x
      setattr(r,"tablename",tbls)
    }
    else if(length(r) == 1L){ # single process but with timing
      r <- r[[1]]
    }
    return(if(action %in% c("write","send")) invisible(r) else r)
  }
  
  ### single processing
  
  # write: 1 DT save to 1 table in 1 conn
  # read: 1 table in 1 conn
  # get/send: 1 sql in 1 conn
  
  .db.conn = .db.conns[[conn.name]]
  if(.db.batch.action=="write"){ # write
    if(is.list(name)) name <- name[[1]]
    if(is.null(name)) name <- auto.table.name(names(x))
    r = .db.dict[.(.db.conn$drvName), write[[1]](conn = .db.conn$conn, name = tablename[[1]](name), value = if(.db.preprocess) preprocess[[1]](x) else x)]
    if(is.null(getOption("dwtools.db.batch.action"))) setattr(x,"tablename",setNames(name,conn.name)) # only for non batch process
  } # write
  else if(.db.batch.action=="read"){ # read
    x = .db.dict[.(.db.conn$drvName), read[[1]](conn = .db.conn$conn, name = tablename[[1]](x))]
    if(is.data.frame(x)){
      setDT(x)
      if(.db.postprocess) x = .db.dict[.(.db.conn$drvName), postprocess[[1]]](x)
    }
  } # read
  else if(.db.batch.action=="get"){ # get
    x = .db.dict[.(.db.conn$drvName), get[[1]]](conn = .db.conn$conn, statement = x)
    if(is.data.frame(x)){
      setDT(x)
      if(.db.postprocess) x = .db.dict[.(.db.conn$drvName), postprocess[[1]]](x)
    }
  } # get
  else if(.db.batch.action=="send"){ # send
    x = .db.dict[.(.db.conn$drvName), send[[1]](conn = .db.conn$conn, statement = x)] # here x may not be a data.table but also a non-table send query results
    if(is.data.frame(x)){
      setDT(x)
      if(.db.postprocess) x = .db.dict[.(.db.conn$drvName), postprocess[[1]]](x)
    }
  } # send
  
  # setkey
  if(!missing(key) && !is.null(key) && .db.batch.action %in% c("read","get") && is.data.table(x)){
    if(is.numeric(key)) nosetkey <- (max(key) > length(x))
    else if(is.character(key)) nosetkey <- !all(key %in% names(x))
    if(nosetkey){
      warning("key argument provided to db function attempt to setkey on non existing columns")
    } else {
      if(is.numeric(key)) key <- names(x)[as.integer(key)]
      setkeyv(x,key)[]
    }
  }
  
  return(if(.db.batch.action %in% c("write","send")) invisible(x) else x)
}

# db migration --------------------------------------------------------

#' @title Copy tables between databases
#' @param source.table.name character vector of tables names to copy from source connection
#' @param source.conn.name character scalar
#' @param target.table.name character vector of tables names to copy to target connection
#' @param target.conn.name character scalar
#' @param timing logical measure timing
#' @param verbose integer status messages
#' @seealso \link{db}, \link{timing}
#' @export
#' @examples
#' # see the last example in ?db
dbCopy <- function(source.table.name, source.conn.name, target.table.name, target.conn.name, timing=getOption("dwtools.timing"), verbose=getOption("dwtools.verbose")){
  stopifnot(length(source.conn.name)==1 && length(target.conn.name)==1)
  stopifnot(length(source.table.name)==length(target.table.name))
  # do one copy
  dbCopy.one <- function(source.table.name, source.conn.name, target.table.name, target.conn.name, timing, verbose){
    db(
      db(source.table.name, source.conn.name, timing=timing, verbose=verbose),
      target.table.name,
      target.conn.name,
      timing=timing,
      verbose=verbose
    )
  }
  # batch copy
  mx = mapply(dbCopy.one, source.table.name, target.table.name, MoreArgs = list(source.conn.name=source.conn.name, target.conn.name=target.conn.name, timing=timing, verbose=verbose-1), SIMPLIFY = FALSE)
  invisible(mx)
}
jangorecki/dwtools documentation built on May 18, 2019, 12:24 p.m.