R/tarpan2.R

#' Tarpan 2.0 Credentials
#'
#' tarpan2_user, tarpan2_password
#'
#' Provides decrypted Tarpan 2.0 credentials
#'
#'
#' @name tarpan2creds
NULL

#' tarpan2_user
#'
#' @rdname tarpan2creds
#'
#' @export
tarpan2_user <- function() {
  tarpan_decrypt_value(.tarpan$config$user)
}

#' tarpan2_password
#'
#' @rdname tarpan2creds
#'
#' @export
tarpan2_password <- function() {
  tarpan_decrypt_value(.tarpan$config$password)
}

#' tarpan2_query
#'
#' Sends a Tarpan 2.0 query to the specified connection or, if not supplied, to
#' the default connection created by \code{tarpan2_connect}
#'
#' @param query The query to send to the database
#' @param con A connection created by \code{make_tarpan2_connection}
#' @param retries The number of retry attempts - defaults to 1.
#'
#' @return The response of \code{dbGetQuery}
#' @export
tarpan2_query <- function(query, con = NULL, retries = 1) {
  if (getOption("tarpan2_show_queries", FALSE))
    cat(query, "\n")

  result <- dbGetQuery(conn = tarpan2_db_connection(con), statement = query)

  if (is.null(result)) {
    if (retries > 0) {
      message("Reconnecting to Tarpan 2.0 due to broken connection.")
      tarpan2_disconnect()
      tarpan2_query(query = query, con = tarpan2_db_connection(con),
                    retries = retries - 1)
    } else
      stop("Hit max number of retries and still receiving NULL from database.")
  } else {
    data.frame(map_df(result, ~type.convert(as.character(.x), as.is = TRUE)))
  }
}

#' tarpan2_db_connection
#'
#' Retrieves a database pool connection that has been created by
#' \code{make_tarpan2_connection}. If no database name supplied, the connection
#' created by \code{tarpan2_connect} is returned.
#'
#' @param dbname The name of the database, can be missing.
#'
#' @return A pool database connection
#' @export
tarpan2_db_connection <- function(dbname = NULL) {
  if (inherits(dbname, "Pool"))
    return(dbname)

  if (!isTRUE(is.character(dbname))) {
    if (!inherits(.tarpan$tarpan2$pool, "Pool"))
      tarpan2_connect()

    return(.tarpan$tarpan2$pool)
  } else if (!(dbname %in% names(.tarpan$tarpan2$pools)) && !is.null(dbname)) {
    message(dbname, " not in available connections. Creating connection...")
    .tarpan$tarpan2$pools[[dbname]] <- make_tarpan2_connection(
      dbname = dbname,
      drv = PostgreSQL(),
      host = .tarpan$config$host,
      port = .tarpan$config$port,
      user = tarpan2_user(),
      password = tarpan2_password()
    )
  }

  invisible(.tarpan$tarpan2$pools[[dbname]])
}

#' tarpan2_show_databases
#'
#' Show databases for given host(s)
#'
#' @param connection A connection created by \code{make_tarpan2_connection}
#'
#' @return A character vector of database names
#' @export
tarpan2_show_databases <- function(connection = NULL) {
  query <- "SELECT datname FROM pg_database WHERE datistemplate = FALSE;"

  tarpan2_query(con = connection, query = query)$datname
}

#' tarpan2_projects
#'
#' @return A character vector of projects (project databases)
#' @export
tarpan2_projects <- function() {
  setdiff(tarpan2_show_databases(), c("postgres", "sds_admin"))
}

#' tarpan2_show_tables
#'
#' Shows the tables for a given database for a given connection
#'
#' @param con A connection created by \code{make_tarpan2_connection}
#' @param schema The table schema to limit by
#'
#' @export
tarpan2_show_tables <- function(con = NULL, schema = "public") {
  tables_query <- glue(
    "SELECT table_name FROM information_schema.tables ",
    "WHERE table_schema = '{schema}' AND table_type != 'VIEW';"
  )

  tables <- tarpan2_query(con = con, query = tables_query)

  tables$table_name
}

#' tarpan2_show_schemas
#'
#' Lists the schemas for a given connection
#'
#' @param con A connection created by \code{make_tarpan2_connection}
#'
#' @export
tarpan2_show_schemas <- function(con = NULL) {
  show_schemas_query <- "SELECT nspname FROM pg_catalog.pg_namespace;"

  schemas <- tarpan2_query(con = con, query = show_schemas_query)

  schemas$nspname
}

#' tarpan2_show_variables
#'
#' Shows variables for specified
#'
#' @param con A connection created by \code{make_tarpan2_connection}
#' @param tables character - one or more table names
#' @param schema character - the schema to use, defaults the public
#'
#' @return A list of variables keyed by table name
#' @export
tarpan2_show_variables <- function(con = NULL, tables, schema = "public") {
  in_tables <- paste(tables, collapse = "', '")
  query <- glue("SELECT column_name, table_name FROM ",
                "INFORMATION_SCHEMA.COLUMNS ",
                "WHERE table_name IN ('{in_tables}') AND ",
                "table_schema = '{schema}'")
  variables <- tarpan2_query(query = query, con = con)

  with(variables, split(column_name, table_name))
}

#' tarpan2_get_variables
#'
#' Gets variable data for specified variable(s) from specified table.
#'
#' @param con A connection created by \code{make_tarpan2_connection}
#' @param variables character - the variable(s) to fetch data for
#' @param table character - the name of the table containing specified
#' variables
#' @param schema character - one or more table names
#' @param n_vals numeric - the nuber of vals to retrieve. Defaults to NULL,
#' retrieving all values
#' @param where_clause A where clause to append to the query
#' @param group_by Any `specified_variables` to group results by, typically
#' grouped by a time or location variable
#'
#' @return A dataframe with the specified variables as columns
#' @export
tarpan2_get_variables <- function(con = NULL, variables = "*", table,
                                  schema = "public", n_vals = NULL,
                                  where_clause = NULL, group_by = NULL) {

  if (is.character(group_by))
    if (any(variables == "*"))
      stop("specified_variables must be named for a group_by request
           specified_variables not in group_by must be part of an
           aggregate request, e.g., sum(), avg(), min(), max()")

  available_variables <- tarpan2_get_table_vars(con = con,
                                                table_name = table,
                                                table_schema = schema)

  if (!all(group_by %in% available_variables$var_name))
    stop(glue("group_by variables must be in the list ",
              "{paste0(available_variables$var_name, collapse = ', ')}. ",
              "Try using weight_by and weight inputs instead"))

  query <- glue("SELECT {paste0(variables, collapse = ', ')} ",
                "FROM {schema}.{table}")

  if (is.character(where_clause))
    query <- glue("{query} WHERE {where_clause}")

  if (is.character(group_by)){
    if (any(variables == "*"))
      stop("specified_variables must be named for a group_by request")
    query <- glue("{query} GROUP BY {paste0(group_by,collapse = ', ')} ",
                  "ORDER BY {paste0(group_by,collapse = ', ')}")
  }

  if (is.numeric(n_vals) && n_vals > 0)
    query <- glue(query, " LIMIT {round(n_vals)}")

  tarpan2_query(query = query, con = con)
}

#' tarpan2_get_variables_weighted
#'
#' Gets variable data for specified variable(s) from specified table and applies
#' a weighted average on the variable "val" using the the `group_by` input
#'
#' @inheritParams tarpan2_get_variables
#' @param weight_by Name of column from `project_tbl` that is to be used to
#' group the weights, typically a spatial grouping, such as "sub_iso".
#' @param weight Name of column the `project_tbl` that is to be used to weight the
#' results, typically "branch_book". Must be used in combination with
#' `weight_by`
#'
#' @return A dataframe with the specified variables as columns
#' @export
tarpan2_get_variables_weighted <- function(con = NULL, variable,
                                           variables = NULL, model,
                                           schema = "public", where_clause,
                                           group_by = NULL, weight = "branch_book",
                                           weight_by = NULL, bc_method = NULL,
                                           data_var = NULL) {

  if(!is.null(variables))
    message("NOTE: specified_variables are ignored for a weighted query")

  if(is.null(group_by))
    stop("queries with weight and weight_by must also use group_by")

  if(is.null(weight))
    stop("queries with weight_by must include weight")

  available_variables <- tarpan2_get_table_vars(con = con,
                                                table_name = model,
                                                table_schema = model)

  if (!all(group_by %in% available_variables$var_name))
    stop(glue("group_by variable must be in the list ",
              "{paste0(available_variables$var_name, collapse = ', ')}"))

  available_variables <- tarpan2_get_table_vars(con = con,
                                                table_name = "project_tbl")

  if (!all(c(weight,weight_by) %in% available_variables$var_name))
    stop(glue("weight_by and weight variables must be in the list ",
              "{paste0(available_variables$var_name, collapse = ', ')}"))

  if (!is.null(where_clause)){
    where_clause <- paste("AND",gsub("valid_time","initial_t",where_clause))
  } else {
    where_clause <- ""
  }

  if (grepl("cfs[^<>]*hindcast", model)){
    if (is.null(bc_method)){
      tarpan2_sql_template(con = con,
                           template_name = "get_weighted_variables_cfs",
                           data_var = variable, group_by = group_by,
                           weight_by = weight_by, weight = weight,
                           where_clause = where_clause)
    } else {
      tarpan2_sql_template(con = con,
                           template_name = "get_weighted_variables_cfs_bc",
                           data_var = variable, group_by = group_by,
                           weight_by = weight_by, weight = weight,
                           where_clause = where_clause, bc_method = bc_method)
    }
  } else {
    tarpan2_sql_template(con = con,
                         template_name = "get_weighted_variables",
                         model = model, data_var = variable, group_by = group_by,
                         weight_by = weight_by, weight = weight,
                         where_clause = where_clause)
  }
}

#' tarpan2_get_table
#'
#' Retrieves all values from a table.
#'
#' @param con A connection created by \code{make_tarpan2_connection}
#' @param table_name character - the name of the table containing specified
#' variables
#' @param n_vals numeric - the nuber of vals to retrieve. Defaults to NULL,
#' retrieving all values
#'
#' @export
tarpan2_get_table <- function(con = NULL, table_name, n_vals = NULL) {
  tarpan2_get_variables(con = con, table = table_name, n_vals = n_vals)
}

#' tarpan2_delete_table
#'
#' Deletes a table from a project database. Ability to delete table is
#' only available for database owners and superusers.
#'
#' @param con A connection created by \code{make_tarpan2_connection}
#' @param table_name character - the name of the table to be deleted
#'
#' @export
tarpan2_delete_table <- function(con = NULL, table_name) {
  query <- glue("DROP TABLE {table_name};")
  tarpan2_query(query = query, con = con)

  invisible()
}

#' tarpan2_get_variable_range
#'
#' Gets specified variable range from specified table
#'
#' @param con A connection created by \code{make_tarpan2_connection}
#' @param variable character - the variable to fetch data a range for
#' @param table character - the name of the table containing specified
#' variables
#' @param schema character - one or more table names
#'
#' @return A dataframe with two columns: {variable}_min and {variable}_max
#' @export
tarpan2_get_variable_range <- function(con = NULL, variable, table,
                                       schema = "public") {
  query <- glue("SELECT min({variable}) as {variable}_min, ",
                "max({variable}) as {variable}_max ",
                "FROM {schema}.{table}")

  tarpan2_query(query = query, con = con)
}

#' tarpan2_get_grid
#'
#' Gets a grid for a given set of bounds, data schema, table and buffer
#'
#' @param con A connection created by \code{make_tarpan2_connection}
#' @param bounds list - must have four named numeric values: north, south, east
#' and west
#' @param data_schema character
#' @param data_table character
#' @param buffer numeric
#'
#' @export
tarpan2_get_grid <- function(con = NULL, bounds, data_schema, data_table,
                             buffer = 1) {

  if (any(grepl("cfs[^<>]*hindcast",data_schema))) {
    if (data_schema == "cfs_hindcast") {
      data_config <- "T382"
    } else if (data_schema == "cfs2_hindcast") {
      data_config <- "T574"
    }

    data_config_clause <- glue("AND grid_config='{data_config}'")
  } else {
    data_config_clause <- NULL
  }

  query <- glue("SELECT grid_id, ST_AsText(cellbnd) FROM ",
                "{data_schema}.{data_table} ",
                "WHERE cellbnd && ST_MakeEnvelope(",
                "{bounds$west - buffer}, {bounds$south - buffer}, ",
                "{bounds$east + buffer}, {bounds$north + buffer}, 4326) ",
                "{data_config_clause}")

  tarpan2_query(query = query, con = con)
}

tarpan2_run_sql_file <- function(con = NULL, file_path) {
  query <- readChar(file_path, file.info(file_path)$size)

  tarpan2_query(query = query, con = con)
}

scripts <- sub(".sql", "", list.files(system.file("sql", package = "taRpan")))

#' tarpan2_sql_script
#'
#' Function that runs any scripts included in the installed package files.
#' Available scripts:
#' @eval paste("@description", paste0(scripts, collapse = "\n"))
#'
#' @param con A connection created by \code{make_tarpan2_connection}
#' @param script_name The name of a script file included in the installed
#' package files. "sql" file extension optional
#'
#' @export
tarpan2_sql_script <- function(con = NULL, script_name) {
  # @TODO: Use transactions
  if (!isTRUE(script_name %in% scripts))
    stop(script_name, " is not an available sql script. Available ",
         "scripts:\n", paste0(scripts, "\n"))

  if (!endsWith(script_name, ".sql"))
    script_name <- paste0(script_name, ".sql")

  script_file <- system.file("sql", script_name, package = "taRpan")

  tarpan2_run_sql_file(con = con, file_path = script_file)
}

#' tarpan2_sql_migration
#'
#' Runs a a set of migration scripts. Currently available migrations:
#' @eval paste("@description", paste0(list.files(system.file("migrations",
#'                                               package = "taRpan")), "\n"))
#'
#' @param con A connection created by \code{make_tarpan2_connection}
#' @param migration_name The name of the folder of migration scripts to run
#'
#' @export
tarpan2_sql_migration <- function(con = NULL, migration_name) {
  # @TODO: Use transactions
  migrations <- list.files(system.file("migrations", package = "taRpan"))

  if (!isTRUE(migration_name %in% migrations))
    stop(migration_name, " is not an available migration. Available ",
         "migrations:\n", paste0(migrations, "\n"))

  migration_files <- list.files(system.file("migrations", migration_name,
                                            package = "taRpan"),
                                full.names = TRUE)

  map(migration_files, ~tarpan2_run_sql_file(con = con, file_path = .x))

  invisible()
}

templates <- paste0(
  gsub(".sql", "", list.files(system.file("templates", package = "taRpan"))),
  collapse = "\n"
)

#' tarpan2_sql_template
#'
#' Fills a template SQL file (with variables of the form {variable}). Currently
#' available templates:
#' @eval paste0("@description ", templates)
#'
#' @param con A connection created by \code{make_tarpan2_connection}
#' @param template_name The name of a template. See above.
#' @param ... A named list of values that corresponds to the variables in the
#' specified template
#'
#' @export
tarpan2_sql_template <- function(con, template_name, ...) {
  # @TODO: Use transactions
  inputs <- list(...)

  templates <- list.files(system.file("templates", package = "taRpan"))

  if (!endsWith(template_name, ".sql"))
    template_name <- paste0(template_name, ".sql")

  if (!is.character(template_name) ||
      !isTRUE(template_name %in% templates))
    stop("Must provide a valid template name. Available templates:\n",
         paste0(templates, "\n"))

  template_file <- system.file("templates", template_name, package = "taRpan")

  template <- readChar(template_file, file.info(template_file)$size)

  variables <- gsub("[{}]", "", regmatches(template, gregexpr("[{][^}]+[}]",
                                                              template))[[1]])

  if (!all(variables %in% names(inputs)))
    stop("Missing inputs for template ", gsub(".sql", "", template_name),
         ":\n", paste0(setdiff(variables, names(inputs)), "\n"))

  query <- glue(template, .envir = as.environment(inputs))

  tarpan2_query(con = con, query = query)
}

#' tarpan2_convert_data_var
#'
#' Converts a data variable from the table sds_vars if possible
#'
#' @param con A connection created by \code{make_tarpan2_connection}
#' @param data_var character - the data variable to convert
#' @param model character - A model name from the sds models
#'
#' @export
tarpan2_convert_data_var <- function(con = NULL, data_var, model) {
  query <- glue("SELECT orig_name, std_name FROM {model}.sds_vars")
  var_tbl <- tarpan2_query(con = con, query = query)

  if (data_var %in% var_tbl$orig_name) {
    var_tbl$std_name[var_tbl$orig_name == data_var]
  } else if (data_var %in% var_tbl$std_name) {
    var_tbl$orig_name[var_tbl$std_name == data_var]
  } else {
    message("CONVERSION IS NOT AVAILABLE FOR VAR: ", data_var)
    data_var
  }
}

#' tarpan2_get_table_vars
#'
#' Generates dataframe with table variables (e.g., sds_id, val) and data types
#' (e.g., varchar, float, integer)
#'
#' @param con A connection created by \code{make_tarpan2_connection}
#' @param table_name character - the name of the table containing specified
#' variables
#' @param table_schema character - the name of the schmea containing specified
#' variables, default is `public`
#'
#' @export
tarpan2_get_table_vars <- function(con = NULL, table_name,
                                   table_schema = 'public') {
  query <- glue("SELECT column_name as var_name, data_type as var_class ",
                "FROM information_schema.columns ",
                "WHERE table_name LIKE '{table_name}' ",
                "AND table_schema LIKE '{table_schema}'")
  tarpan2_query(query = query,con = con)
}

#' tarpan2_time_switch
#'
#' Converts valid_t to initial_t for models that contain the keyword "forecast"
#'
#' @param model The name of a sds model
#'
tarpan2_time_switch <- function(model) {
  if (grepl("forecast", model))
    return("initial_t")
  else
    return("valid_t")
}
GlobalParametrics/taRpan_readonly documentation built on May 13, 2019, 11:23 a.m.