#' @name pg-package
#' @title pg - postgres utilities for R
#' @description Set of handy function when extensively working with postgres database. Includes transactional logging using *logR* package. Read `?pg` manual, use similarly to DBI functions.
#' @docType package
NULL
#' @name pg
#' @title pg* wrappers to DBI::db*
#' @description Many wrappers around `DBI::db*` functions. Some new wrappers added are `pgTruncateTable`, `pgExistsSchema`, `pgDropSchema`, `pgListTableColumns`. Follow `tests/tests.R` script for reproducible workflow with tests.
#' @param conn active connection to postgres database, by default `getOption("pg.conn")`.
#' @param .log logical default `getOption("pg.log",TRUE)` decides if call is logged using *logR*.
#' @param statement character scalar
#' @param name character
#' @param value data.table
#' @param key character vector of columns to be used to set data.table key on the db results.
#' @param norows arbitrat object which will be returned in case of 0 rows result from db.
#' @param stage_name character, staging schema-table name used for performing *Upsert*.
#' @param conflict_by character vector, will be used collapsed in `ON CONFLICT (conflict_by) DO ...`.
#' @param on_conflict character scalar to be send as `ON CONFLICT` postgres content. Key column set for conflict may be included here, but then should not be provided to `conflict_by` argument. Default `DO NOTHING`.
#' @param techstamp logical decides if `dbWriteTable` will add technical metadata on saving each object to db.
#' @param schema_name character vector of schema names to be checked by `pgExistsSchema` or dropped by `pgDropSchema`.
#' @param select character vector of column names to fetch from `information_schema.columns` table.
#' @param cascade logical use cascade drop while dropping object.
#' @param silent logical catch potential errors, useful for potentially non existing ojbect on while dropping.
#' @param host character hostname/ip by default from ENV var `POSTGRES_HOST`
#' @param port character port by default from ENV var `POSTGRES_PORT`
#' @param dbname character port by default from ENV var `POSTGRES_DB`
#' @param user character port by default from ENV var `POSTGRES_USER`
#' @param password character port by default from ENV var `POSTGRES_PASSWORD`
#' @details
#' 1. Instead of reusing `conn` arg in each query just save it to option `options("pg.conn" = pgConnect())` and it will be reused.
#' 2. Logging is by default enabled, table for logs needs to be created, use `logR::logR_schema()` potentially with custom defined metadata columns. See `tests/tests.R` or *logR* package documentation.
#' 3. Stamping data by technical run id requires another table which can be created using `create_run_table()`. This feature has to be utilized as logR custom metadata values, see `tests/tests.R`.
#' @rdname pg
pgConnect = function(host = Sys.getenv("POSTGRES_HOST", "127.0.0.1"), port = Sys.getenv("POSTGRES_PORT", "5432"), dbname = Sys.getenv("POSTGRES_DB", "postgres"), user = Sys.getenv("POSTGRES_USER", "postgres"), password = Sys.getenv("POSTGRES_PASSWORD", "postgres")){
dbConnect(PostgreSQL(), host = host, port = port, dbname = dbname, user = user, password = password)
}
#' @rdname pg
pgGetVersion = function(conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
pgGetQuery("SELECT version();",
conn = conn,
.log = .log)
}
#' @rdname pg
pgSendQuery = function(statement, silent = FALSE, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
meta = getOption("logR.meta")
stopifnot(!is.null(conn), is.character(statement), is.logical(.log), is.function(meta))
invisible(logR(dbSendQuery(conn, statement),
alert = !silent,
silent = silent,
meta = meta(r_fun = "dbSendQuery", r_args = statement),
.log = .log))
}
#' @rdname pg
pgGetQuery = function(statement, key, norows, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
meta = getOption("logR.meta")
stopifnot(!is.null(conn), is.character(statement), is.logical(.log), is.function(meta))
data = logR(dbGetQuery(conn, statement),
silent = FALSE,
meta = meta(r_fun = "dbGetQuery", r_args = statement),
.log = .log)
if(!missing(norows) && !nrow(data)) return(norows)
if(!missing(key)) setDT(data, key = key)[] else setDT(data)[]
}
#' @rdname pg
pgWriteTable = function(name, value, techstamp = TRUE, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
meta = getOption("logR.meta")
stopifnot(!is.null(conn), is.logical(.log), is.logical(techstamp), is.function(meta), is.data.table(value))
name = schema_table(name)
if(techstamp){
add_techstamp(value) # by meta cols ref
on.exit(rm_techstamp(value))
}
invisible(logR(dbWriteTable(conn, name, value, row.names = FALSE, append = TRUE, match.cols = TRUE), # match.cols requires 0.4.1 - fork of the CRAN's RPostgreSQL
in_rows = nrow(value),
silent = FALSE,
meta = meta(r_fun = "dbWriteTable", r_args = paste(name, collapse=".")),
.log = .log))
}
#' @rdname pg
pgReadTable = function(name, key, norows, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
meta = getOption("logR.meta")
stopifnot(!is.null(conn), is.logical(.log), is.function(meta))
name = schema_table(name)
data = logR(dbReadTable(conn, name),
silent = FALSE,
meta = meta(r_fun = "dbReadTable", r_args = paste(name, collapse=".")),
.log = .log)
if(!missing(norows) && !nrow(data)) return(norows)
if(!missing(key)) setDT(data, key = key)[] else setDT(data)[]
}
#' @rdname pg
pgExistsTable = function(name, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
meta = getOption("logR.meta")
stopifnot(!is.null(conn), is.logical(.log), is.function(meta))
name = schema_table(name)
logR(dbExistsTable(conn, name),
silent = FALSE,
meta = meta(r_fun = "dbExistsTable", r_args = paste(name, collapse=".")),
.log = .log)
}
#' @rdname pg
pgExistsSchema = function(schema_name, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
stopifnot(!is.null(conn), is.logical(.log), is.character(schema_name), length(schema_name)>0L)
schema = pgGetQuery(sprintf("SELECT schema_name FROM information_schema.schemata WHERE schema_name IN (%s);", paste(paste0("'",schema_name,"'"), collapse=",")),
norows = data.table(schema_name = character(0)),
conn = conn,
.log = .log)
schema_name %in% schema$schema_name
}
#' @rdname pg
pgListTables = function(schema_name, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
meta = getOption("logR.meta")
stopifnot(!is.null(conn), is.logical(.log), is.function(meta))
if(!missing(schema_name)){
in_schema = sprintf("and schemaname IN (%s)",paste(paste0("'",schema_name,"'"),collapse=","))
logR(dbListTables(conn, in_schema),
silent = FALSE,
meta = meta(r_fun = "dbListTables", r_args = paste(schema_name, collapse=",")),
.log = .log)
} else {
logR(dbListTables(conn),
silent = FALSE,
meta = meta(r_fun = "dbListTables"),
.log = .log)
}
}
#' @rdname pg
pgTruncateTable = function(name, silent = FALSE, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
stopifnot(!is.null(conn), is.logical(.log), is.logical(silent))
# TO DO support vectorized
name = schema_table(name)
pgSendQuery(sprintf("TRUNCATE TABLE %s;", paste(name, collapse=".")),
silent = silent,
conn = conn,
.log = .log)
}
#' @rdname pg
pgCreateSchema = function(schema_name, silent = FALSE, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
sapply(schema_name,
function(schema) pgSendQuery(sprintf("CREATE SCHEMA %s;", schema),
silent = silent,
conn = conn,
.log = .log))
}
#' @rdname pg
pgListSchema = function(){
stop("TO DO")
}
#' @rdname pg
pgListFields = function(name, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
meta = getOption("logR.meta")
stopifnot(!is.null(conn), is.logical(.log), is.function(meta))
name = schema_table(name)
logR(dbListFields(conn, name),
silent = FALSE,
meta = meta(r_fun = "dbListFields"),
.log = .log)
}
#' @rdname pg
pgListTableColumns = function(schema_name, select = c("table_schema", "table_name", "column_name", "ordinal_position", "column_default", "is_nullable", "data_type", "character_maximum_length", "numeric_precision", "datetime_precision"), conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
stopifnot(!is.null(conn), is.logical(.log), is.character(select), length(select) > 0L, c("table_schema","table_name","ordinal_position") %in% select)
sql = sprintf("SELECT %s FROM information_schema.columns WHERE table_schema != 'pg_catalog' AND table_schema != 'information_schema' %sORDER BY table_schema, table_name, ordinal_position;",
paste(select, collapse=", "),
if(!missing(schema_name)){
stopifnot(is.character(schema_name), length(schema_name)>0L)
sprintf("AND table_schema IN (%s) ", paste(paste0("'",schema_name,"'"), collapse=", "))
} else "")
DT = pgGetQuery(sql, conn = conn, .log = .log)
is_nullable = NULL # skip CRAN note for local DT var
if("is_nullable" %in% select) DT[, is_nullable := c("YES" = TRUE, "NO" = FALSE)[is_nullable]]
keys = c("table_schema","table_name","ordinal_position")
setkeyv(DT, if(any(keys %in% select)) keys[keys %in% select])
}
#' @rdname pg
pgDropSchema = function(schema_name, cascade = FALSE, silent = FALSE, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
stopifnot(!is.null(conn), is.logical(.log), is.character(schema_name), length(schema_name)>0L, is.logical(cascade), is.logical(silent))
invisible(sapply(
setNames(nm = schema_name),
function(schema) pgSendQuery(sprintf("DROP SCHEMA %s%s;", schema, if(cascade) " CASCADE" else ""),
silent = silent,
conn = conn,
.log = .log)
))
}
#' @rdname pg
pgRemoveTable = function(name, cascade = FALSE, silent = FALSE, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
meta = getOption("logR.meta")
stopifnot(!is.null(conn), is.logical(.log), is.character(name), is.logical(cascade), is.logical(silent), is.function(meta))
name = schema_table(name)
# local dbRemoveTable to allow cascade
dbRemoveTable = function(conn, name, cascade = FALSE){
# https://github.com/tomoakin/RPostgreSQL/blob/35f4d7e4510992bee9b06a886eedac21f8688ebf/RPostgreSQL/R/PostgreSQL.R#L191
if(RPostgreSQL::dbExistsTable(conn, name)){
rc <- try(RPostgreSQL::dbGetQuery(conn, sprintf("DROP TABLE %s%s;", RPostgreSQL::postgresqlTableRef(name), if(cascade) " CASCADE" else "")))
!inherits(rc, RPostgreSQL:::ErrorClass)
}
else FALSE
}
logR(dbRemoveTable(conn, name, cascade),
alert = !silent,
silent = silent,
meta = meta(r_fun = "dbRemoveTable", r_args = paste(name, collapse=".")),
.log = .log)
}
#' @rdname pg
pgDropTable = function(name, cascade = FALSE, silent = FALSE, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
stopifnot(!is.null(conn), is.logical(.log), is.character(name), is.logical(cascade), is.logical(silent))
# detect if c(schema,table), otherwise process already collapsed schema.table vectorized, similarly to pgDropSchema but here tricky detection is required
vec = FALSE # TO DO
if(!vec) name = setNames(list(name), paste(name, collapse="."))
sapply(name, pgRemoveTable, cascade = cascade, silent = silent, conn = conn, .log = .log)
}
#' @rdname pg
pgSendUpsert = function(stage_name, name, conflict_by, on_conflict = "DO NOTHING", techstamp = TRUE, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
stopifnot(!is.null(conn), is.logical(.log), is.logical(techstamp), is.character(on_conflict), length(on_conflict)==1L)
cols = pgListFields(stage_name)
cols = setdiff(cols, c("run_id","r_timestamp")) # remove techstamp to have clean column list, as the fresh one will be used, if any
# sql
insert_into = sprintf("INSERT INTO %s.%s (%s)", name[1L], name[2L], paste(if(techstamp) c(cols, c("run_id","r_timestamp")) else cols, collapse=", "))
select = sprintf("SELECT %s", paste(cols, collapse=", "))
if(techstamp) select = sprintf("%s, %s::INTEGER run_id, '%s'::TIMESTAMPTZ r_timestamp", select, get_run_id(), format(Sys.time(), "%Y-%m-%d %H:%M:%OS"))
from = sprintf("FROM %s.%s", stage_name[1L], stage_name[2L])
if(!missing(conflict_by)) on_conflict = paste(paste0("(",paste(conflict_by, collapse=", "),")"), on_conflict)
on_conflict = paste("ON CONFLICT",on_conflict)
sql = paste0(paste(insert_into, select, from, on_conflict), ";")
pgSendQuery(sql, conn = conn, .log = .log)
}
#' @rdname pg
pgUpsertTable = function(name, value, conflict_by, on_conflict = "DO NOTHING", stage_name, techstamp = TRUE, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
stopifnot(!is.null(conn), is.logical(.log), is.logical(techstamp), is.character(on_conflict), length(on_conflict)==1L)
name = schema_table(name)
if(!missing(stage_name)){
stage_name = schema_table(stage_name)
drop_stage = FALSE
} else {
stage_name = name
stage_name[2L] = paste("tmp", stage_name[2L], sep="_")
drop_stage = TRUE
}
if(pgExistsTable(stage_name)) pgTruncateTable(name = stage_name, conn = conn, .log = .log)
pgWriteTable(name = stage_name, value = value, techstamp = techstamp, conn = conn, .log = .log)
on.exit(if(drop_stage) pgDropTable(stage_name, conn = conn, .log = .log))
pgSendUpsert(stage_name = stage_name, name = name, conflict_by = conflict_by, on_conflict = on_conflict, techstamp = techstamp, conn = conn, .log = .log)
}
#' @rdname pg
pgDisconnect = function(conn = getOption("pg.conn")){
dbDisconnect(conn)
}
#' @rdname pg
pgExplain = function(statement, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
cat((r <- pgGetQuery(sprintf("EXPLAIN ANALYZE %s;", statement), conn = conn, .log = .log))[["QUERY PLAN"]],
sep = "\n")
invisible(r)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.