#' sql_connect
#'
#' Connects and disconnects from the SQL database using the `tessilake.tessitura` connection string in config.yml and
#' updates the Rstudio connection pane
#'
#' @return invisible NULL
#' @importFrom rlang ns_env env_bind
#'
#' @examples
#' \dontrun{
#' tessilake:::db$db
#' sql_connect()
#' tessilake:::db$db
#' sql_disconnect()
#' }
sql_connect <- function() {
if (is.null(db$db)) {
tryCatch({
args <- list(drv = odbc::odbc(),
dsn = config::get("tessilake")[["tessitura"]],
encoding = config::get("tessilake")[["tessitura.encoding"]] %||% "")
callbacks <- getTaskCallbackNames()
db$db <- do.call(DBI::dbConnect,args)
removeTaskCallback(setdiff(getTaskCallbackNames(), callbacks))
if ("odbc.version" %in% names(attributes(db$db))) {
odbc:::on_connection_opened(db$db, deparse(db_expr))
}
},
error = function(e) {
message(e$message)
stop("Database connection failed, please set the `tessilake.tessitura` configuration in config.yml to a valid database DSN.")
}
)
}
invisible(NULL)
}
#' @describeIn sql_connect Tear down the SQL connection
sql_disconnect <- function() {
DBI::dbDisconnect(db$db)
db$db <- NULL
}
db <- new.env(parent = emptyenv())
#' read_sql
#'
#' Execute a database query and cache it locally as a Feather file and remotely as a Parquet file.
#' Cache storage locations are managed by `tessilake.depths` options.
#' Database connection defined by an ODBC profile with the name set by the `tessilake.tessitura` option.
#'
#' @param query character query to run on the database.
#' @param name name of the query, defaults to the SHA1 hash of the query.
#' @param select vector of strings indicating columns to select from database
#' @param primary_keys primary keys of the query to be used for incremental updates
#' @param date_column update date column of the query to be used for incremental updates
#' @param freshness the returned data will be at least this fresh
#' @param incremental whether or not to load data incrementally, default is `TRUE`
#' @inheritDotParams cache_write overwrite
#' @return an Apache [arrow::Table]
#' @importFrom arrow arrow_table
#' @importFrom checkmate assert_character
#' @importFrom dplyr tbl sql summarise
#' @importFrom dbplyr tbl_sql
#' @importFrom digest sha1
#' @importFrom lubridate tz force_tz is.POSIXct
#' @importFrom rlang .data
#' @importFrom utils head
#' @importFrom purrr iwalk
#' @export
#'
#' @examples
#' \dontrun{
#' read_sql("select * from T_CUSTOMER", "t_customer",
#' primary_keys = "customer_no",
#' date_column = "last_update_dt"
#' )
#' }
read_sql <- function(query, name = digest::sha1(query),
select = NULL,
primary_keys = NULL, date_column = NULL,
freshness = as.difftime(7, units = "days"),
incremental = TRUE, ...) {
assert_character(query, len = 1)
if (!is.null(date_column)) assert_character(date_column, max.len = 1)
if (!is.null(primary_keys)) assert_character(primary_keys, min.len = length(date_column))
sql_connect()
# build the query with dplyr and sort by primary keys and date column for faster updating
table <- tbl(db$db, sql(query)) %>% arrange(across(!!c(primary_keys, date_column)))
test_mtime <- Sys.time() - freshness
depths <- names(config::get("tessilake")[["depths"]])
mtimes <- purrr::map_vec(depths, \(depth) cache_get_mtime(name, depth, "tessi"))
if(all(mtimes < test_mtime))
# Update caches
write_cache(table, table_name = name, type = "tessi", incremental = incremental,
primary_keys = primary_keys, date_column = date_column, partition = FALSE, ...)
read_cache(table_name = name, type = "tessi", select = select)
}
#' read_sql_table
#'
#' @param schema character, database schema. Default is `dbo`
#' @param table_name character, table name without schema.
#' @param select vector of strings indicating columns to select from database
#' @param primary_keys character vector, primary keys of the table. `read_sql_table` will attempt to identify the primary keys using
#' SQL metadata.
#' @param date_column character, date column of the table showing the last date the row was updated.
#' Defaults to "last_update_dt" if it exists in the table.
#' @param freshness the returned data will be at least this fresh
#' @param incremental whether or not to load data incrementally, default is `TRUE`
#' @inheritDotParams cache_write overwrite
#' @describeIn read_sql Reads a table or view from a SQL database and caches it locally using read_sql.
#' @importFrom dplyr filter select collect
#' @importFrom DBI dbListTables
#' @importFrom rlang maybe_missing
#' @importFrom stringr str_split
#' @return an Apache [arrow::Table].
#' @export
#'
#' @examples
#' \dontrun{
#' read_sql_table("T_CUSTOMER")
#' }
read_sql_table <- function(table_name, schema = "dbo",
select = NULL,
primary_keys = NULL, date_column = NULL,
freshness = as.difftime(7, units = "days"),
incremental = TRUE, ...) {
table_schema <- constraint_type <- character_maximum_length <- column_name <- NULL
assert_character(table_name, len = 1)
assert_character(schema, len = 1, null.ok = TRUE)
assert_character(date_column, len = 1, null.ok = TRUE)
sql_connect()
available_tables <- list(
dbo = list(table_name = dbListTables(db$db, schema_name = "dbo")),
BI = list(table_name = dbListTables(db$db, schema_name = "BI"))
) %>%
rbindlist(idcol = "schema")
table_name_part <- str_split(table_name, " ", n = 2)[[1]][[1]]
if (available_tables[
eval(expr(schema == !!schema & table_name == !!table_name_part)),
.N
] == 0) {
stop(paste("Table", paste(schema, table_name, collapse = "."), "doesn't exist."))
}
available_columns <- read_sql(
query = "select c.table_schema, c.table_name, c.column_name, cc.constraint_type, c.character_maximum_length from INFORMATION_SCHEMA.COLUMNS c
left join (select cc.*,CONSTRAINT_TYPE from INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE cc
join INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc on tc.CONSTRAINT_NAME=cc.CONSTRAINT_name and
CONSTRAINT_TYPE='PRIMARY KEY') cc on c.TABLE_NAME=cc.TABLE_NAME and c.TABLE_SCHEMA=cc.TABLE_SCHEMA and c.COLUMN_NAME=cc.COLUMN_NAME",
name = "available_columns",
freshness = freshness
) %>%
filter(table_schema == schema & table_name == !!table_name_part) %>%
collect()
if (!is.null(select)) assert_names(select, subset.of = available_columns$column_name)
if (!is.null(primary_keys)) assert_names(primary_keys, subset.of = available_columns$column_name)
if (!is.null(date_column)) assert_names(date_column, subset.of = available_columns$column_name)
# get primary key info
if (is.null(primary_keys) && "PRIMARY KEY" %in% available_columns$constraint_type) {
primary_keys <- filter(available_columns, constraint_type == "PRIMARY KEY")$column_name
}
if (is.null(date_column) && !is.null(primary_keys) && "last_update_dt" %in% available_columns$column_name) {
date_column <- "last_update_dt"
}
# arrange columns so that the "long data' is at the end
# https://stackoverflow.com/questions/60757280/result-fetchresptr-n-nanodbc-nanodbc-cpp2966-07009-microsoftodbc-dri
max_cols <- available_columns %>% filter(character_maximum_length == -1)
non_max_cols <- available_columns %>% filter((character_maximum_length != -1 | is.na(character_maximum_length)) &
!grepl("encrypt", column_name))
cols <- paste(c(non_max_cols$column_name, max_cols$column_name), collapse = ",")
# build the table query
read_sql(
query = paste("select", cols, "from", paste(schema, table_name, sep = "."), "with (nolock)"),
name = paste(c(schema, table_name_part), collapse = "."),
primary_keys = primary_keys,
date_column = maybe_missing(date_column),
select = select,
freshness = freshness,
incremental = incremental, ...
)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.