R/db.R

Defines functions sql_dialect parse_sql_date verify_table get_default_type dettl_db_args sqlite_enable_fk db_get_log_table db_connect

Documented in db_connect db_get_log_table dettl_db_args parse_sql_date sql_dialect sqlite_enable_fk verify_table

#' Connect to the database configured via yaml.
#'
#' Uses \code{\link[DBI]{dbConnect}} to connect to a DBMS. If this uses
#' \code{\link[RSQLite]{SQLite}} then it will ensure foreign key constraints are
#' enabled.
#'
#' @param type The db to connect to, must match a db configured in db config.
#' @param path Path to directory containing yaml config.
#'
#' @keywords internal
#'
db_connect <- function(type, path) {
  x <- dettl_db_args(path, type)
  con <- do.call(
    DBI::dbConnect,
    c(list(x$driver()), x$args)
  )
  sqlite_enable_fk(con)
  con
}

#' Get the log table name from configuration for a particular db
#'
#' @param type The db to get the log table for, must match a db configured in
#' db config.
#' @param path Path to directory containing yaml config.
#'
#' @keywords internal
#'
db_get_log_table <- function(type, path) {
  x <- dettl_db_args(path, type)
  x$log_table
}

#' Enable foreign key constraints for SQLite connections
#'
#' Foreign key constraints aren't enabled by default in SQLite. Ensure
#' they are enabled each time we connect to the db.
#'
#' @keywords internal
sqlite_enable_fk <- function(con) {
  if (sql_dialect(con) == "sqlite") {
    DBI::dbExecute(con, "PRAGMA foreign_keys = ON")
  }
}

#' Get the DB args from config.
#'
#' Converts the configured DB driver to appropriate driver function and
#' map the args.
#'
#' @param path Path to db config.
#' @param type The db type to get the args for, if null defaults to the first
#' configured database.
#'
#' @keywords internal
#'
dettl_db_args <- function(path, type = NULL) {
  config <- dettl_config(path)
  if (is.null(type)) {
    type <- get_default_type(config)
  }
  x <- config$db[[type]]
  if (is.null(x)) {
    stop(sprintf("Cannot find config for database %s.", type))
  }
  driver <- getExportedValue(x$driver[[1L]], x$driver[[2L]])

  if (x$driver[[2]] == "SQLite") {
    dbname <- x$args$dbname
    if (is.null(dbname) || !nzchar(dbname) || tolower(dbname) == ":memory:") {
      stop("Cannot use a transient SQLite database with dettl")
    }
    if (is_relative_path(x$args$dbname)) {
      x$args$dbname <- file.path(config$path, x$args$dbname)
    }
  }

  withCallingHandlers(
    withr::with_envvar(
      envir_read(config$path),
      resolved_args <- vaultr::vault_resolve_secrets(
        x$args,
        addr = config$vault_server
      )
    ),
    error = function(e) {
      ## Vault errors can be pretty cryptic e.g. see VIMC-4026 so
      ## provide some context for the error message
      e$message <- paste0("Failed to retrieve database info from vault:\n    ",
                          e$message)
      stop(e)
    }
  )
  list(driver = driver, args = resolved_args, log_table = x$log_table)
}

get_default_type <- function(config) {
  names(config$db)[[1]]
}

#' Verify the data adheres to the DB schema.
#'
#' Verifies that
#' * A table with name `table_name` exists in the DB
#' * All the columns in `table` exist in the DB table with name `table_name`
#' * Any `additional_columns` exist in the DB table with name `table_name`
#'
#' @param con The active DB connection to check the schema for.
#' @param table_name The name of the table to check.
#' @param table The table to check.
#' @param additional_columns Vector of additional column names to check exist
#' in DB table.
#' @param context_info Info to be logged should a check fail.
#' @param solution_text Text describing possible solution should error occur.
#'
#' @keywords internal
#'
# nolint start
verify_table <- function(con, table_name, table, additional_columns = NULL,
                         context_info = "", solution_text = "") {
#nolint end
  if (!DBI::dbExistsTable(con, table_name)) {
    stop(sprintf(
      "%s: Table '%s' is missing from db schema. %s",
      context_info, table_name, solution_text
    ))
  }
  db_col_names <- DBI::dbListFields(con, table_name)
  for (col_name in colnames(table)) {
    if (!(col_name %in% db_col_names)) {
      stop(sprintf(
        "%s: Column '%s' in table '%s' but is missing from db schema.",
        context_info, col_name, table_name
      ))
    }
  }
  if (!is.null(additional_columns)) {
    for (col_name in additional_columns) {
      if (!(col_name %in% db_col_names)) {
        stop(sprintf(
          "%s: Column '%s' is missing from db schema.",
          context_info, col_name))
      }
    }
  }
  not_null <- NotNullConstraints$new(con)
  for (col_name in not_null$not_nulls(table_name)) {
    ## If column is not a serial column (i.e. is not autogenerated) and is
    ## missing or has missing values then throw an error.
    not_serial <- !not_null$is_serial(table_name, col_name)
    has_missing <- !(col_name %in% colnames(table)) ||
                   any(is.na(table[[col_name]]))
    if (not_serial && has_missing) {
      stop(sprintf(
        "%s: Column '%s' in table '%s' violates not null constraint - column missing or contains missing values.",
        context_info, col_name, table_name
      ))
    }
  }
}


#' Parse date into POSIXct UTC from SQL db.
#'
#' Parses the date according to the SQL driver used. Expects that if
#' driver is Postgres this will already be POSIXct otherwise if SQLite
#' driver converts to POSIXct.
#'
#' @param con Active db connection.
#' @param date The date.
#'
#' @return The parsed date
#'
#' @keywords internal
parse_sql_date <- function(con, date) {
  if (sql_dialect(con) == "sqlite") {
    date <- format(as.POSIXct(date, origin = "1970-01-01", tz = "UTC"),
                   digits = 0)
  }
  date
}

#' Get the SQL dialect used for a connection.
#'
#' @param con The connection to test.
#'
#' @return The dialect used, either "sqlite" or "postgres".
#'
#' @keywords internal
sql_dialect <- function(con) {
  dialect <- NULL
  if (inherits(con, "SQLiteConnection")) {
    dialect <- "sqlite"
  } else if (inherits(con, "PqConnection")) {
    dialect <- "postgresql"
  }
  dialect
}
vimc/dettl documentation built on Oct. 6, 2022, 2:13 p.m.