R/db.R

Defines functions db_append db_insert db_refresh db_query db_close db_connect

Documented in db_append db_close db_connect db_insert db_query db_refresh

#' Connect to a database using YAML configs
#' @param db name of database, must be present in the file specified by \code{getOption('dbr.db_config_path')}
#' @param cache optional caching of the connection. If set to \code{TRUE}, the connection will be cached in the background and an all future \code{db_connect} calls will simply return that (even if called automatically from eg \code{db_query}) until the end of the R session or when caching on the \code{db} is disabled in a future \code{db_connect} call with explic \code{cache = FALSE}. See the examples for more details.
#' @param ... extra parameters passed to the database driver, even ones overriding the default values loaded from the YAML config
#' @importFrom DBI dbConnect dbDriver
#' @export
#' @seealso \code{\link{db_close}} \code{\link{db_query}}
#' @examples \dontrun{
#' ## create new connection
#' optbak <- options()
#' options('dbr.db_config_path' = system.file('example_db_config.yaml', package = 'dbr'))
#' con <- db_connect('sqlite')
#' str(con)
#' db_query('SELECT 42', 'sqlite')
#'
#' ## reusing the connection
#' str(db_connect('sqlite', cache = TRUE))
#' str(db_connect('sqlite'))
#' str(db_connect('sqlite'))
#' ## kill cached connection
#' db_close(db_connect('sqlite', cache = FALSE))
#'
#' ## restore options
#' options(optbak)
#' }
db_connect <- function(db, cache, ...) {

    cache <- ifelse(missing(cache), 'default', cache)

    if (exists(db, dbs)) {
        if (cache != FALSE) {
            return(dbs[[db]])
        }
        else {
            ## reset cached connection
            db_close(dbs[[db]])
            rm(list = db, envir = dbs)
        }
    }

    params <- db_config(db)

    ## override defaults
    extraparams <- list(...)
    for (i in seq_len(length(extraparams))) {
        params[[names(extraparams)[i]]] <- extraparams[[i]]
    }
    extralog <- ifelse(
        length(extraparams) > 0,
        paste0(' [', paste(paste(names(extraparams), extraparams, sep = '='), collapse = ', '), ']'),
        '')

    log_info(paste('Connecting to', db, extralog))
    con <- structure(do.call(dbConnect, params), db = db, cached = cache)

    ## cache connection
    if (isTRUE(cache)) {
        dbs[[db]] <- con
    }

    con

}


#' Close a database connection
#' @param db database object returned by \code{\link{db_connect}}
#' @importFrom DBI dbDisconnect
#' @export
#' @seealso \code{\link{db_connect}}
#' @note To close a cached connection, call \code{db_close} on an object returned by \code{db_connect(..., cache = FALSE)}
db_close <- function(db) {
    assert_attr(db, 'db')
    if (!isTRUE(attr(db, 'cached'))) {
        log_info(paste('Closing connection to', attr(db, 'db')))
        dbDisconnect(db)
    }
}


#' Execute an SQL query in a database
#' @param sql string
#' @param db database reference by name or object
#' @param ... passed to \code{sql_formatter}
#' @param sql_formatter function to be applied on \code{sql} potentially with \code{...}, eg using \code{glue} for string interpolation
#' @param output_format preferred output format that defaults to \code{data.frame}, but could be also \code{data.table} or \code{tibble} as well if the related R package is installed
#' @return data.frame with query metadata
#' @export
#' @importFrom DBI dbGetQuery
#' @importFrom logger log_info skip_formatter
#' @importFrom checkmate assert_string assert_function
#' @seealso \code{\link{db_connect}} \code{\link{db_refresh}}
#' @examples \dontrun{
#' options('dbr.db_config_path' = system.file('example_db_config.yaml', package = 'dbr'))
#' db_query('SELECT 42', 'sqlite')
#' db_query('SELECT {40 + 2}', 'sqlite')
#' }
db_query <- function(sql, db, ...,
                     sql_formatter = getOption('dbr.sql_formatter'),
                     output_format = getOption('dbr.output_format')) {

    if (!is.object(db)) {
        db <- db_connect(db)
        on.exit({
          db_close(db)
        })
    }

    assert_attr(db, 'db')
    assert_string(sql)
    assert_function(sql_formatter)

    sql <- do.call(sql_formatter, c(list(sql), list(...)), envir = parent.frame())

    log_info('Executing:**********')
    log_info(skip_formatter(sql))
    log_info('********************')

    start <- Sys.time()
    result_set <- dbGetQuery(db, sql)
    time_to_exec <- Sys.time() - start

    log_info('Finished in %s returning %s rows', format(time_to_exec, digits = 4), nrow(result_set))

    attr(result_set, 'when') <- start
    attr(result_set, 'db') <- attr(db, 'db')
    attr(result_set, 'time_to_exec') <- time_to_exec
    attr(result_set, 'statement') <-  sql

    ## convert to proffered output format
    if (output_format != 'data.frame') {
        result_set <- switch(
            output_format,
            'data.table' = data.table::setDT(result_set),
            'tibble' = tibble::as_tibble(result_set),
            stop('Unsupported output_format -- please use data.frame, data.table or tibble.'))
    }

    result_set

}


#' Refresh SQL query
#' @param x object returned by \code{db_query}
#' @seealso \code{\link{db_query}}
#' @importFrom checkmate assert_data_frame
#' @export
db_refresh <- function(x) {
    assert_data_frame(x)
    assert_attr(x, 'db')
    assert_attr(x, 'statement')
    with(attributes(x), db_query(statement, db))
}



#' Insert rows into a database table
#' @param df data.frame
#' @param table character vector of an optional schema and table name
#' @inheritParams db_close
#' @param ... further parameters passed to \code{dbWriteTable}, eg to modify \code{row.names} or \code{append} (depends on driver)
#' @seealso \code{RMySQL::\link[RMySQL]{dbWriteTable}}, \code{RPostgreSQL::\link[RPostgreSQL]{dbReadTable-methods}}
#' @importFrom DBI dbWriteTable
#' @importFrom checkmate assert_character
#' @export
#' @seealso \code{\link{db_append}}
#' @examples \dontrun{
#' options('db_config_path' = system.file('example_db_config.yaml', package = 'dbr'))
#' db_insert(mtcars, 'mtcars', 'sqlite')
#' db_append(mtcars, c('dm', 'mtcars'), 'sqlite')
#' }
db_insert <- function(df, table, db, ...) {

    if (!is.object(db)) {
        db <- db_connect(db)
        on.exit({
          db_close(db)
        })
    }

    assert_data_frame(df)
    assert_character(table, min.len = 1)
    assert_attr(db, 'db')

    log_info('Inserting %s rows into %s', nrow(df), paste(table, collapse = "."))
    dbWriteTable(conn = db, name = table, value = df, ...)

}


#' Append rows into a database table
#'
#' This is a wrapper around \code{\link{db_insert}} with the default parameters set to append to a table.
#' @inheritParams db_insert
#' @export
db_append <- function(df, table, db, ...) {
    ## check if it's Redshift, as COPY FROM stdin doesn't work there
    if (is.redshift(db)) {
        redshift_insert_via_copy_from_s3(df = df, table = table, db = db)
    } else {
        ## otherwise do a COPY FROM stdin
        db_insert(df, table, db, overwrite = FALSE, append = TRUE, row.names = FALSE, ...)
    }
}
daroczig/dbr documentation built on June 12, 2020, 2:47 p.m.