R/isi_get_data.R

Defines functions isiData isConnParams isConnString getIsConn

Documented in getIsConn isConnParams isConnString isiData

#' Connect to ISI Database
#'
#' Provides convenience features to make read-only connection to ISI databases.
#'
#' @param db DOCUMENTATION NEEDED
#' @param tbl DOCUMENTATION NEEDED
#' @param wh DOCUMENTATION NEEDED
#' @param max_rows DOCUMENTATION NEEDED
#'
#' @return DOCUMENTATION NEEDED
#'
#' @import RODBC
#' @import data.table
#' @importFrom stringr str_c str_to_lower
#' @importFrom secret get_secret
#'
#' @name isi_get_data
NULL

#' @describeIn isi_get_data [DOCUMENTATION NEEDED]
#' @export
isiData <- function(db = "morpheus", tbl = "MainLog", wh = NULL, max_rows = NULL){



  ## GET CONNECTION STRING AND MAKE OBJECT
  cnll <- isConnParams(db)
  txt  <- isConnString(cnll)
  cn   <- RODBC::odbcDriverConnect(txt, readOnlyOptimize = TRUE, rows_at_time = 3000)

  on.exit(RODBC::odbcClose(cn))

  ## GET DESIRED TABLE's COLUMN NAMES
  cnam <- RODBC::sqlColumns(cn, tbl)[["COLUMN_NAME"]]

  ## BUILD QUERY WITH OPTIONAL wh ARGUMENT
  query <- stringr::str_c("SELECT * FROM [", db, "].[dbo].[", tbl, "]", stringr::str_c(" WHERE ", wh)[!is.null(wh)])
  print_stamp(query)


  ## CHECK QUERY AGAINST CONNECTION FOR ERRORS
  stopifnot(RODBC::odbcQuery(cn, query) == 1)

  outputDT <- as.data.table(matrix(NA_character_, nrow = 1, ncol = length(cnam)))
  setnames(outputDT, cnam)

  # FIRST ROW IS NA FROM WHEN INITIALIZED, REMOVE AND RETURN

  iter <- 0              # INIT TO TRACK TOTAL ROWS RECEIVED
  count_received <- 1000 # INIT TO TRACK WHEN TO STOP

  nRows <- attr(cn, "rows_at_time") # FOR UPDATE OF TOTAL COUNT

  while(count_received > 0){

    # GLOBAL UPDATE OF ITERATION THAT RAN
    iter <- iter + 1
    count_rows <- iter*nRows

    ## GET DATA, SET DT AND BIND
    iRes <- RODBC::odbcFetchRows(channel = cn,
                                 max = nRows,
                                 buffsize = 3000,
                                 nullstring = NA_character_,
                                 believeNRows = TRUE)
    setDT(iRes$data)

    # UPDATE COUNT AS LOOP WILL EXIT WHEN COUNT IS O
    count_received <- nrow(iRes$data)

    # BIND WITH PREVIOUSLY RECEIVED ROWS
    outputDT <- rbindlist(list(outputDT, iRes$data))

    ## BREAK IF MAX ROWS IS REACHED
    if( !is.null(max_rows) ){
      if(count_rows > max_rows){
        outputDT <- outputDT[1:max_rows]
        break
      }
    }

    # UPDATE THE USER AFTER EVERY 10 ITERATIONS
    if(iter %% 10 == 0)
      print(paste0("Total rows returned: ", count_rows/1000, "K"))

  }
  return(outputDT[-1])

}

#' @describeIn isi_get_data [DOCUMENTATION NEEDED]
#' @export
isConnParams <- function(db = NULL){
  if(is.null(db))
    stop("provide db argument", call. = FALSE)

  ## Connection encrypted connection strings
  key <- openssl::read_key(getOption("is.key.path"))
  vpath <- getOption("secret.vault")
  cnll <- list(get_secret("cnList", vault = vpath, key = key)[[db]])

  if(is.null( cnll[[1]] ))
    stop("db not configured... no connection string found", call. = FALSE)

  names(cnll) <- db
  return(cnll)
}

#' @describeIn isi_get_data [DOCUMENTATION NEEDED]
#' @export
isConnString <- function(cnll){
  cnName <- names(cnll)
  stringr::str_c(names(cnll[[cnName]]), "=", cnll[[cnName]], collapse = ";")
}


#' @describeIn isi_get_data [DOCUMENTATION NEEDED]
#' @export
getIsConn <- function(db=NULL){
  cnTxt <- isConnString(isConnParams(db))
  cnObj <- RODBC::odbcDriverConnect(cnTxt, rows_at_time = 2000, readOnlyOptimize = TRUE)
  return(cnObj)
}
bfatemi/isdata documentation built on May 14, 2019, 11:15 a.m.