##################################################
# Version 3 db wrappers
# database options are set in ~\workspace\.my.cnf
# Group values include:
# remoteharshadb = Harsha Intake
# EPA_harshadb = EPA enterprise harshadb instance
# EPA_chl_wsd_admin = EPA enterprise chl_wsd admin
# EPA_chl_wsd_user = EPA enterprise chl_wsd user
# EPA_chl_wsd_read = EPA enterprise chl_wsd reader
##################################################
#' Wrapper to connect to db
#'
#' Connect to database using RMariaDB. May want to add secure credentials method.
#' @param Group Group identifier found in .my.cnf
#' @param defaultFile file where Group credentials are found
#' @param verbose provide feedback to user
#' @export
dbConnect_ <- function(Group = "EPA_harshadb",
defaultFile = file.path(path$ws, ".my.cnf"),
verbose = FALSE, ...){
# Make Connection
con <- suppressWarnings(DBI::dbConnect(RMariaDB::MariaDB(),
default.file = defaultFile,
groups = Group))
# If connection is valid
if(!DBI::dbIsValid(con)){
message("Invalid connection")
stop()
}else{
if(verbose) message("Connection Succeeded")
con
}
}
#' Function to get data from sql statement
#'
#' Submit sql and get results from database using RMariaDB. May want to add secure credentials method.
#' @param sql sql statement
#' @param Group Group identifier found in .my.cnf
#' @param returnClass class of return object. Should be one of data.table, tibble, or data.frame.
#' @param defaultFile file where Group credentials are found
#' @param verbose provide feedback to user
#' @export
get_sql3 <- function(sql, Group = "EPA_harshadb",
returnClass = "data.table",
defaultFile = file.path(gfuns::sg("ws"), ".my.cnf"),
verbose = FALSE, ...){
# Set on exit
# Set on exit
on.exit(expr = {
if(exists("con")){
if(inherits(con, "DBIConnection")) suppressWarnings(DBI::dbDisconnect(con))
rm(con)
}
})
tryCatch({
con <- gfuns::dbConnect_(defaultFile = defaultFile,
Group = Group)
# If connection is valid
if(!DBI::dbIsValid(con)){
message("Invalid connection")
stop()
}else{
if(verbose) print(con)
DBI::dbGetQuery(con, statement = sql) %>%
switch(returnClass,
data.table = data.table::as.data.table(.),
tibble = tibble::as_tibble(.),
.)
}
}, error = function(e) print(e))
}
#' Function to send sql statement
#'
#' Submit sql statement using RMariaDB. If dataF given, a the SQL statement will be parameterized. May want to add secure credentials method.
#' @param dataF If given, data frame of key values to be used in parameterized statement.
#' @param tName table name
#' @param returnClass class of return object. Should be one of data.table, tibble, or data.frame.
#' @param sql sql statement. defaults to select
#' @param Group Group identifier found in .my.cnf
#' @param defaultFile file where Group credentials are found
#' @param verbose provide feedback to user
#' @param CLIENT_MULTI_STATEMENTS_ logical to use CLIENT_MULTI_STATEMENTS
#' @export
send_sql3 <- function(dataF = NULL,
tName = NULL,
returnClass = "data.table",
sql = "SELECT * FROM ",
Group = "EPA_harshadb",
defaultFile = file.path(gfuns::sg("ws"), ".my.cnf"),
verbose = FALSE,
CLIENT_MULTI_STATEMENTS_ = TRUE, ...){
# dataF <- gfuns::get_sql3("SELECT * from sample_id;") %>%
# tail(10)
# Set on exit
on.exit(expr = {
if(exists("con")){
if(inherits(con, "DBIConnection")) suppressWarnings(DBI::dbDisconnect(con))
rm(con)
}
if(exists("rs")){
if(inherits(rs, "DBIResult")) suppressWarnings(DBI::dbClearResult(rs))
rm(rs)
}
})
# If sql is a data set, run parameterized query, else run sql as query
if(inherits(dataF, "data.frame")){
# Check for tName
if(is.null(tName)){
stop("gfuns::send_sql3: tName cannot be null for parameterized statement")
}
# Establish Fields
fields <- names(dataF)
# Get primary key(s)
PK <- get_PrimaryKeys(tName, Group = Group)
# Make sure sql has all primary keys
if(!all(PK %in% fields)){
stop(paste(PK[!PK %in% fields], "gfuns::send_sql3: While attempting parameterized sql, not all Primary Keys in table"))
}
# Arrange dataF
dataF_ <-
dataF %>%
dplyr::select(dplyr::one_of(PK)) %>%
# Arrange dataF
as.list() %>%
# Remove names for anonymous
unname()
# Create Select statement
sql <- paste0(sql, tName, " WHERE ",
paste0("`", PK, "`", collapse = "= ? AND "), " = ?;")
if(verbose) print(dim(dataF))
if(verbose) print(head(dataF))
if(verbose) print(sql)
}
# open the connection using user, passsword, etc., as
tryCatch({
con <- gfuns::dbConnect_(defaultFile = defaultFile,
Group = Group)
# If connection is valid continue
if(DBI::dbIsValid(con)){
if(verbose) print(con)
# If parameterized statement
if(!is.null(dataF)){
# Send statements
rs <- DBI::dbSendQuery(con, statement = sql)
# Bind
DBI::dbBind(rs, dataF_)
result <-
DBI::dbFetch(rs) %>%
switch(returnClass,
data.table = data.table::as.data.table(.),
tibble = tibble::as_tibble(.),
.)
if(DBI::dbHasCompleted(rs)){
if(verbose) message("gfuns::send_sql3: Statment succeeded.")
DBI::dbClearResult(rs)
return(result)
}else{
message("gfuns::send_sql3: Statement failed.")
return(NULL)
}
} else{
# dbExecute statment
rs <- DBI::dbExecute(con, sql)
# give results
if(verbose) message(rs, " rows affected")
}
}else{
message("Connection to DB failed")
return(NULL)
}
}, error = function(e) print(e))
}
#
# dataF_ <-
# dataF[, c("Sample_ID", "CoC_ID")] %>%
# as.list()
# names(dataF_) <- NULL
# rs <- DBI::dbSendQuery(con, paste("SELECT * FROM", tName, "WHERE Sample_ID = ? And CoC_ID = ? AND isnull(QA_CoC"))
# DBI::dbBind(rs, dataF_)
# DBI::dbFetch(rs)
# DBI::dbClearResult(rs)
#' Function to create a new db table from dataframe or append dataframe to existing db table.
#'
#' Submit data to be appended to database table using RMariaDB. Output 0 or 1 based on success if verbose = TRUE May want to add secure credentials method.
#' @param dataF data frame to be appended
#' @param tName table to appand data to
#' @param Group Group identifier found in .my.cnf
#' @param Append boolean defaults to TRUE. If True, dataF is appended to existing table. If FALSE and tName does not exist, a new table is created using dataF.
#' @param defaultFile file where Group credentials are found
#' @param header_ logical whether first row is header
#' @param rowNames boolean passed to dbWriteTable, defaults to FALSE. If True, row names are translated into column row_names.
#' @param verbose provide feedback to user
#' @export
append_sql3 <- function(dataF, tName, Append = TRUE,
Group = "EPA_harshadb",
defaultFile = file.path(gfuns::sg("ws"), ".my.cnf"),
header_ = FALSE, rowNames = FALSE, verbose = FALSE,
...){
# Testing
if(FALSE){
dataF <- samples[1]$Template[[1]]
tName <- "chl_analysis_template"
Append <- !FALSE
Group <- "EPA_chl_wsd_admin"
defaultFile <- file.path(gfuns::sg("ws"), ".my.cnf")
header_ <- FALSE
rowNames <- FALSE
verbose <- !FALSE
}
# Set on exit
# Set on exit
on.exit(expr = {
if(exists("con")){
if(inherits(con, "DBIConnection")) suppressWarnings(DBI::dbDisconnect(con))
rm(con)
}
if(exists("rs")){
if(inherits(rs, "DBIResult")) suppressWarnings(DBI::dbClearResult(rs))
rm(rs)
}
})
# open the connection using user, passsword, etc., as
tryCatch({
con <- gfuns::dbConnect_(defaultFile = defaultFile,
Group = Group)
# If connection is valid continue
if(DBI::dbIsValid(con)){
if(verbose) print(con)
# Does tName exist
tNameExists <-
DBI::dbExistsTable(con, tName)
# Make sure table exists
if(!tNameExists & Append == TRUE){
stop("Attempting to append to ", tName, " which does not exist")
}
# If Append = TRUE, append the table
# Else, create the table
# overwrite = False is default so should fail if record exists
rs <-
DBI::dbWriteTable(conn = con, name = tName,
value = dataF, append = Append,
header = header_, row.names = rowNames)
# Return logical if verbose otherwise quit silently
if(verbose){
message(ifelse(Append, "Append to ", "Create "), tName, ifelse(rs, " succeeded", " failed"))
}
}else{
stop("Connection to DB failed")
}
}, error = function(e) print(e))
}
#' Function to update db table
#'
#' Update database table. Records with extant keys will be updated those without will not be inserted using RMariaDB. Output 0 or 1 based on success if verbose = TRUE May want to add secure credentials method.
#' @param dataF data frame to be appended
#' @param tName table to appand data to
#' @param Group Group identifier found in .my.cnf
#' @param defaultFile file where Group credentials are found
#' @param verbose provide feedback to user
#' @export
update_sql3 <- function(dataF, tName, key = NULL,
Group = "EPA_harshadb",
defaultFile = file.path(gfuns::sg("ws"), ".my.cnf"),
verbose = FALSE, ...)
{
# Set on exit
# Set on exit
on.exit(expr = {
if(exists("con")){
if(inherits(con, "DBIConnection")) suppressWarnings(DBI::dbDisconnect(con))
rm(con)
}
if(exists("rs")){
if(inherits(rs, "DBIResult")) suppressWarnings(DBI::dbClearResult(rs))
rm(rs)
}
})
# Make sure key is not NULL
if(is.null(key)){
message("key cannot be NULL")
stop()
}
# Check if key is a primary key
PK <- get_PrimaryKeys(tName, Group = Group)
if(!all(key %in% PK)){
message(paste(key[!key %in% PK], "Is not a Primary Key"))
return(NULL)
}
# Establish Fields
fields <- names(dataF)
# Establish Fields to be updated
Ufields <- fields[!fields %in% key]
# Arrange dataF
dataF <-
dataF %>%
dplyr::select(dplyr::one_of(Ufields), dplyr::one_of(key))# %>%
# cbind(., .[,names(.) %in% Ufields])
# Arrange dataF
dataF_ <-
dataF %>%
as.list() %>%
# Remove names for anonymous
unname()
# Create UPDATE statement
sql_ <- paste0("UPDATE ", tName, " set ",
paste0("`", Ufields, "`=?", collapse = ", "),
" WHERE ",
paste0("`", key, "`=?", collapse = " AND "),
";")
if(verbose) print(dim(dataF))
if(verbose) print(head(dataF))
if(verbose) print(sql_)
# Try UPDATE
rowsAffected <-
tryCatch({
# Open the connection
con <- gfuns::dbConnect_(defaultFile = defaultFile,
Group = Group)
# If connection is valid continue
if(DBI::dbIsValid(con)){
if(verbose) print(con)
# Make sure table exists
if(!DBI::dbExistsTable(con, tName)){
message(paste(tName, "does not exist"))
stop()
}
# Make sure fields in table
tFields <- DBI::dbListFields(con, tName)
if(!all(fields %in% tFields)){
message(paste("Fields",
paste(fields[!fields %in% tFields], collapse = ", "),
"not in table"))
}
# DBI::dbExecute(con, sql_, param = dataF_)
rs <- DBI::dbSendStatement(con, sql_)
DBI::dbBind(rs, dataF_)
rowsAffected <- DBI::dbGetRowsAffected(rs)
if(verbose) message(paste(rowsAffected, "Rows Updated"))
DBI::dbClearResult(rs)
rowsAffected
} else{
message("Connection to DB failed")
stop()
}
}, error = function(e, ...){
message("Update Failed")
print(e)
})
# Return rowsAffected
rowsAffected
}
#' Function to insert or update on duplicate key
#'
#' Insert data using parameterized statement. If duplicate key exists perform Update. Records with extant keys will be updated those without will be inserted using RMariaDB.
#'
#' Return value is number of rows affected
#' @param dataF data frame to be appended
#' @param tName table to appand data to
#' @param Group Group identifier found in .my.cnf
#' @param defaultFile file where Group credentials are found
#' @param verbose provide feedback to user
#' @export
upsert_sql3 <- function(dataF, tName, key = NULL,
Group = "EPA_harshadb",
defaultFile = file.path(gfuns::sg("ws"), ".my.cnf"),
verbose = FALSE, ...)
{
# Set on exit
# Set on exit
on.exit(expr = {
if(exists("con")){
if(inherits(con, "DBIConnection")) suppressWarnings(DBI::dbDisconnect(con))
rm(con)
}
if(exists("rs")){
if(inherits(rs, "DBIResult")) suppressWarnings(DBI::dbClearResult(rs))
rm(rs)
}
})
# Make sure key is not NULL
if(is.null(key)){
message("key cannot be NULL")
return(NULL)
}
# Check if key is a primary key
PK <- get_PrimaryKeys(tName, Group = Group)
if(!(all(key %in% PK) | all(key %in% PK))){
if(!all(key %in% PK)){
message(paste(key[!key %in% PK], "Is not a Primary Key"))
} else if(!all(key %in% PK)){
message(paste(PK[!PK %in% key], "Primary Key is missing"))
}
return(NULL)
}
# Establish Fields
fields <- names(dataF)
# Establish Fields to be updated
Ufields <- fields[!fields %in% key]
# Arrange dataF
dataF <-
dataF %>%
dplyr::select(dplyr::one_of(Ufields), dplyr::one_of(key)) #%>%
# cbind(., .[,names(.) %in% Ufields])
# Arrange dataF
dataF_ <-
dataF %>%
as.list() %>%
# Remove names for anonymous
unname()
# Create Insert statement
sql_ <- paste("INSERT INTO", tName, "(",
paste0("`", c(Ufields, key), "`", collapse = ", "), ")",
paste0("Values(", paste0(rep("?", length(c(Ufields, key))), collapse = ", "), ")"),
"ON DUPLICATE KEY UPDATE",
paste0("`", Ufields, "`=VALUES(?)", collapse = ", "),
";")
if(verbose) print(dim(dataF))
if(verbose) print(head(dataF))
if(verbose) print(sql_)
# Try UPDATE
rowsAffected <-
tryCatch({
# Open the connection
con <- gfuns::dbConnect_(defaultFile = defaultFile,
Group = Group)
# If connection is valid continue
if(DBI::dbIsValid(con)){
if(verbose) print(con)
# Make sure table exists
if(!DBI::dbExistsTable(con, tName)){
message(paste(tName, "does not exist"))
stop()
}
# Make sure fields in table
tFields <- DBI::dbListFields(con, tName)
if(!all(fields %in% tFields)){
message(paste("Fields",
paste(fields[!fields %in% tFields], collapse = ", "),
"not in table"))
}
# DBI::dbExecute(con, sql_, param = dataF_)
rs <- DBI::dbSendStatement(con, sql_)
DBI::dbBind(rs, dataF_)
rowsAffected <- DBI::dbGetRowsAffected(rs)
if(verbose) message(paste(rowsAffected, "Rows Updated"))
DBI::dbClearResult(rs)
rowsAffected
}else{
message("Connection to DB failed")
stop()
}
}, error = function(e, ...){
message("Update Failed")
print(e)
})
# Return value
rowsAffected
}
#' Function to insert values into db table
#'
#' By default, uses parameterized INSERT IGNORE statement with option to replace if exists.
#'
#' INSERT IGNORE will allow inserts if primary key does not exist and ignores inserts where it does preventing failure of statement when values exist.
#' Note: REPLACE makes sense only if a table has a PRIMARY KEY or UNIQUE index. Otherwise, it becomes equivalent to INSERT, because there is no index to be used to determine whether a new row duplicates another. replaceifexists = TRUE is slower.
#' @param dataF data frame to be appended
#' @param tName table to appand data to
#' @param Group Group identifier found in .my.cnf
#' @param defaultFile file where Group credentials are found
#' @param verbose provide feedback to user
#' @export
# insert_sql3(dataF = sample.sID, replaceifexists = FALSE, Group = "EPA_harshadb")
insert_sql3 <- function(dataF, tName, replaceifexists = FALSE,
Group = "EPA_harshadb",
defaultFile = file.path(gfuns::sg("ws"), ".my.cnf"),
verbose = FALSE, ...)
{
# Test
if(!TRUE){
tName <- "chl_sample_log_dev"
replaceifexists <- FALSE
Group <- "EPA_chl_wsd_admin"
defaultFile <- file.path(gfuns::sg("ws"), ".my.cnf")
verbose <- T
CoCdir <- normalizePath("L:\\Priv\\Cin\\NRMRL\\Chlorophyll\\CoC")
files <- matrix(list.files(CoCdir, pattern = ".+xlsm$", full.names = TRUE),
ncol = 1, dimnames = list(NULL, "files")) %>%
data.table::as.data.table() %>%
.[!grepl("2016|Date", files)]
dataF <- readxl::read_excel(files$files[1], sheet = 1) %>%
data.table::as.data.table() %>%
.[, Type := ifelse(grepl("sample", Type), "Sample", Type)] %>%
.[, c("Sample_ID", "Project", "Collection_Date", "CoC_ID")]
}
# Set on exit
# Set on exit
on.exit(expr = {
if(exists("con")){
if(inherits(con, "DBIConnection")) suppressWarnings(DBI::dbDisconnect(con))
rm(con)
}
if(exists("rs")){
if(inherits(rs, "DBIResult")) suppressWarnings(DBI::dbClearResult(rs))
rm(rs)
}
})
# Establish Fields
fields <- names(dataF)
# Arrange dataF
dataF_ <-
dataF %>%
as.list() %>%
# Remove names for anonymous
unname()
# Create sql statement
# If replaceifexists use replace otherwise select
if(replaceifexists){
# Create REPLACE statement
sql_ <- paste0("REPLACE INTO ", tName, " (",
paste0("`", fields, "`", collapse = ", "),
") VALUES(",
paste(rep("?", length(fields)), collapse = ","),
")")
}else{
# Create SELECT statement
sql_ <- paste0("INSERT IGNORE INTO ", tName, " (",
paste0("`", fields, "`", collapse = ", "),
") VALUES(",
paste(rep("?", length(fields)), collapse = ","),
")")
}
# Feedback
if(verbose){
print(sql_)
print(utils::head(dataF))
}
# Try INSERT
rowsAffected <-
tryCatch({
# Open the connection
con <- gfuns::dbConnect_(defaultFile = defaultFile,
Group = Group)
# If connection is valid continue
if(DBI::dbIsValid(con)){
if(verbose) print(con)
# Make sure table exists
if(!DBI::dbExistsTable(con, tName)){
message(paste(tName, "does not exist"))
stop()
}
# Make sure fields in table
tFields <- DBI::dbListFields(con, tName)
if(!all(fields %in% tFields)){
message(paste("Fields",
paste(fields[!fields %in% tFields], collapse = ", "),
"not in table"))
stop()
}
rs <- DBI::dbSendStatement(con, sql_)
DBI::dbBind(rs, dataF_)
rowsAffected <- DBI::dbGetRowsAffected(rs)
if(verbose) message(paste(rowsAffected, "Rows Affected"))
DBI::dbClearResult(rs)
rowsAffected
}else{
message("Connection to DB failed")
stop()
}
}, error = function(e, ...){
message("Insert Failed")
print(e)
if(DBI::dbIsValid(con)){
}
})
rowsAffected
}
#' Get primary keys from MySQL table
#'
#' Function will request table information from MySQL database information_schema.table_constraints table.
#' @param tName table name for which requesting primary keys
#' @param Group Group name linked to my.cnf with connection information
#' @return Returns a one column (COLUMN_NAME) data.table with primary key fields.
#' @export
get_PrimaryKeys <- function(tName, Group = "EPA_harshadb"){
get_sql3(paste0('SELECT k.COLUMN_NAME
FROM information_schema.table_constraints t
LEFT JOIN information_schema.key_column_usage k
USING(constraint_name,table_schema,table_name)
WHERE t.constraint_type="PRIMARY KEY"
AND t.table_schema=DATABASE()
AND t.table_name="', tName, '";'),
Group = Group)[["COLUMN_NAME"]]
# Alternative statement:
# SELECT COLUMN_NAME
# FROM information_schema.KEY_COLUMN_USAGE
# WHERE TABLE_NAME = 'sample_id'
# AND CONSTRAINT_NAME = 'PRIMARY';
}
# get_sql3("SELECT @@global.time_zone, @@session.time_zone")
# get_sql3("select from_unixtime(86400-3600)")
# get_sql3("select timediff(now(),convert_tz(now(),@@session.time_zone,'+00:00'));")
# get_sql3("select time_format(timediff(now(),convert_tz(now(),@@session.time_zone,'+00:00')),'%H%i');")
# get_sql3("SELECT @@system_time_zone;")
# get_sql3("SELECT TIMEDIFF(NOW(), UTC_TIMESTAMP);")
# tzTest <-
# data.table::data.table(datetime = Sys.time()) %>%
# .[, datetimeChar := format(datetime, "%F %X %z")]
# tzTest <-
# data.table::rbindlist(
# list(tzTest,
# data.table::copy(tzTest)[, `:=` (
# datetime = lubridate::force_tz(datetime, tzone = "EST"),
# datetimeChar = format(lubridate::force_tz(datetime, tzone = "EST"), "%F %X %z"))]),
# use.names = T, fill = T)
#
# gfuns::insert_sql3(tzTest, "tzTest")
#
# tzTest_ <-
# get_sql3("select * from tzTest;")
#
# str(tzTest)
# lubridate::tz(tzTest_$datetime[1])
# lubridate::force_tz(tzTest_$datetime, tzone = "EST")
#
# sample_id <- get_sql3("Select SampleDateTime, TimeZone, SampleDateTimeChar from sample_id")
# str(sample_id)
# sample_id
# lubridate::tz(sample_id[, .(SampleDateTime)])
# lubridate::force_tz(sample_id[, .(SampleDateTime)], tzone = "EST")
# lubridate::with_tz(sample_id[, .(SampleDateTime)], tzone = "Etc/GMT+5")
# lubridate::with_tz(sample_id[, .(SampleDateTime)], tzone = "Etc/GMT-5")
# as.POSIXct(sample_id[, SampleDateTimeChar]) %>% head()
#
# sample_id[, SampleDateTime_ := as.POSIXct(paste0(SampleDateTimeChar, "00"), format = "%F %X %z", tz = "Etc/GMT+5")]
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.