R/pg.R

Defines functions pgExplain pgDisconnect pgUpsertTable pgSendUpsert pgDropTable pgRemoveTable pgDropSchema pgListTableColumns pgListFields pgListSchema pgCreateSchema pgTruncateTable pgListTables pgExistsSchema pgExistsTable pgReadTable pgWriteTable pgGetQuery pgSendQuery pgGetVersion pgConnect

Documented in pgConnect pgCreateSchema pgDisconnect pgDropSchema pgDropTable pgExistsSchema pgExistsTable pgExplain pgGetQuery pgGetVersion pgListFields pgListSchema pgListTableColumns pgListTables pgReadTable pgRemoveTable pgSendQuery pgSendUpsert pgTruncateTable pgUpsertTable pgWriteTable

#' @name pg-package
#' @title pg - postgres utilities for R
#' @description Set of handy function when extensively working with postgres database. Includes transactional logging using *logR* package. Read `?pg` manual, use similarly to DBI functions.
#' @docType package
NULL

#' @name pg
#' @title pg* wrappers to DBI::db*
#' @description Many wrappers around `DBI::db*` functions. Some new wrappers added are `pgTruncateTable`, `pgExistsSchema`, `pgDropSchema`, `pgListTableColumns`. Follow `tests/tests.R` script for reproducible workflow with tests.
#' @param conn active connection to postgres database, by default `getOption("pg.conn")`.
#' @param .log logical default `getOption("pg.log",TRUE)` decides if call is logged using *logR*.
#' @param statement character scalar
#' @param name character
#' @param value data.table
#' @param key character vector of columns to be used to set data.table key on the db results.
#' @param norows arbitrat object which will be returned in case of 0 rows result from db.
#' @param stage_name character, staging schema-table name used for performing *Upsert*.
#' @param conflict_by character vector, will be used collapsed in `ON CONFLICT (conflict_by) DO ...`.
#' @param on_conflict character scalar to be send as `ON CONFLICT` postgres content. Key column set for conflict may be included here, but then should not be provided to `conflict_by` argument. Default `DO NOTHING`.
#' @param techstamp logical decides if `dbWriteTable` will add technical metadata on saving each object to db.
#' @param schema_name character vector of schema names to be checked by `pgExistsSchema` or dropped by `pgDropSchema`.
#' @param select character vector of column names to fetch from `information_schema.columns` table.
#' @param cascade logical use cascade drop while dropping object.
#' @param silent logical catch potential errors, useful for potentially non existing ojbect on while dropping.
#' @param host character hostname/ip by default from ENV var `POSTGRES_HOST`
#' @param port character port by default from ENV var `POSTGRES_PORT`
#' @param dbname character port by default from ENV var `POSTGRES_DB`
#' @param user character port by default from ENV var `POSTGRES_USER`
#' @param password character port by default from ENV var `POSTGRES_PASSWORD`
#' @details
#' 1. Instead of reusing `conn` arg in each query just save it to option `options("pg.conn" = pgConnect())` and it will be reused.
#' 2. Logging is by default enabled, table for logs needs to be created, use `logR::logR_schema()` potentially with custom defined metadata columns. See `tests/tests.R` or *logR* package documentation.
#' 3. Stamping data by technical run id requires another table which can be created using `create_run_table()`. This feature has to be utilized as logR custom metadata values, see `tests/tests.R`.

#' @rdname pg
pgConnect = function(host = Sys.getenv("POSTGRES_HOST", "127.0.0.1"), port = Sys.getenv("POSTGRES_PORT", "5432"), dbname = Sys.getenv("POSTGRES_DB", "postgres"), user = Sys.getenv("POSTGRES_USER", "postgres"), password = Sys.getenv("POSTGRES_PASSWORD", "postgres")){
    dbConnect(PostgreSQL(), host = host, port = port, dbname = dbname, user = user, password = password)
}

#' @rdname pg
pgGetVersion = function(conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
    pgGetQuery("SELECT version();",
               conn = conn,
               .log = .log)
}

#' @rdname pg
pgSendQuery = function(statement, silent = FALSE, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
    meta = getOption("logR.meta")
    stopifnot(!is.null(conn), is.character(statement), is.logical(.log), is.function(meta))
    invisible(logR(dbSendQuery(conn, statement),
                   alert = !silent,
                   silent = silent,
                   meta = meta(r_fun = "dbSendQuery", r_args = statement),
                   .log = .log))
}

#' @rdname pg
pgGetQuery = function(statement, key, norows, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
    meta = getOption("logR.meta")
    stopifnot(!is.null(conn), is.character(statement), is.logical(.log), is.function(meta))
    data = logR(dbGetQuery(conn, statement),
                silent = FALSE,
                meta = meta(r_fun = "dbGetQuery", r_args = statement),
                .log = .log)
    if(!missing(norows) && !nrow(data)) return(norows)
    if(!missing(key)) setDT(data, key = key)[] else setDT(data)[]
}

#' @rdname pg
pgWriteTable = function(name, value, techstamp = TRUE, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
    meta = getOption("logR.meta")
    stopifnot(!is.null(conn), is.logical(.log), is.logical(techstamp), is.function(meta), is.data.table(value))
    name = schema_table(name)
    if(techstamp){
        add_techstamp(value) # by meta cols ref
        on.exit(rm_techstamp(value))
    }
    invisible(logR(dbWriteTable(conn, name, value, row.names = FALSE, append = TRUE, match.cols = TRUE), # match.cols requires 0.4.1 - fork of the CRAN's  RPostgreSQL
                   in_rows = nrow(value),
                   silent = FALSE,
                   meta = meta(r_fun = "dbWriteTable", r_args = paste(name, collapse=".")),
                   .log = .log))
}

#' @rdname pg
pgReadTable = function(name, key, norows, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
    meta = getOption("logR.meta")
    stopifnot(!is.null(conn), is.logical(.log), is.function(meta))
    name = schema_table(name)
    data = logR(dbReadTable(conn, name),
                silent = FALSE,
                meta = meta(r_fun = "dbReadTable", r_args = paste(name, collapse=".")),
                .log = .log)
    if(!missing(norows) && !nrow(data)) return(norows)
    if(!missing(key)) setDT(data, key = key)[] else setDT(data)[]
}

#' @rdname pg
pgExistsTable = function(name, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
    meta = getOption("logR.meta")
    stopifnot(!is.null(conn), is.logical(.log), is.function(meta))
    name = schema_table(name)
    logR(dbExistsTable(conn, name),
         silent = FALSE,
         meta = meta(r_fun = "dbExistsTable", r_args = paste(name, collapse=".")),
         .log = .log)
}

#' @rdname pg
pgExistsSchema = function(schema_name, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
    stopifnot(!is.null(conn), is.logical(.log), is.character(schema_name), length(schema_name)>0L)
    schema = pgGetQuery(sprintf("SELECT schema_name FROM information_schema.schemata WHERE schema_name IN (%s);", paste(paste0("'",schema_name,"'"), collapse=",")),
                        norows = data.table(schema_name = character(0)),
                        conn = conn,
                        .log = .log)
    schema_name %in% schema$schema_name
}

#' @rdname pg
pgListTables = function(schema_name, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
    meta = getOption("logR.meta")
    stopifnot(!is.null(conn), is.logical(.log), is.function(meta))
    if(!missing(schema_name)){
        in_schema = sprintf("and schemaname IN (%s)",paste(paste0("'",schema_name,"'"),collapse=","))
        logR(dbListTables(conn, in_schema),
             silent = FALSE,
             meta = meta(r_fun = "dbListTables", r_args = paste(schema_name, collapse=",")),
             .log = .log)
    } else {
        logR(dbListTables(conn),
             silent = FALSE,
             meta = meta(r_fun = "dbListTables"),
             .log = .log)
    }
}

#' @rdname pg
pgTruncateTable = function(name, silent = FALSE, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
    stopifnot(!is.null(conn), is.logical(.log), is.logical(silent))
    # TO DO support vectorized
    name = schema_table(name)
    pgSendQuery(sprintf("TRUNCATE TABLE %s;", paste(name, collapse=".")),
                silent = silent,
                conn = conn,
                .log = .log)
}

#' @rdname pg
pgCreateSchema = function(schema_name, silent = FALSE, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
    sapply(schema_name,
           function(schema) pgSendQuery(sprintf("CREATE SCHEMA %s;", schema),
                                        silent = silent,
                                        conn = conn,
                                        .log = .log))
}

#' @rdname pg
pgListSchema = function(){
    stop("TO DO")
}

#' @rdname pg
pgListFields = function(name, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
    meta = getOption("logR.meta")
    stopifnot(!is.null(conn), is.logical(.log), is.function(meta))
    name = schema_table(name)
    logR(dbListFields(conn, name),
         silent = FALSE,
         meta = meta(r_fun = "dbListFields"),
         .log = .log)
}

#' @rdname pg
pgListTableColumns = function(schema_name, select = c("table_schema", "table_name", "column_name", "ordinal_position", "column_default", "is_nullable", "data_type", "character_maximum_length", "numeric_precision", "datetime_precision"), conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
    stopifnot(!is.null(conn), is.logical(.log), is.character(select), length(select) > 0L, c("table_schema","table_name","ordinal_position") %in% select)
    sql = sprintf("SELECT %s FROM information_schema.columns WHERE table_schema != 'pg_catalog' AND table_schema != 'information_schema' %sORDER BY table_schema, table_name, ordinal_position;",
                  paste(select, collapse=", "),
                  if(!missing(schema_name)){
                      stopifnot(is.character(schema_name), length(schema_name)>0L)
                      sprintf("AND table_schema IN (%s) ", paste(paste0("'",schema_name,"'"), collapse=", "))
                  } else "")
    DT = pgGetQuery(sql, conn = conn, .log = .log)
    is_nullable = NULL # skip CRAN note for local DT var
    if("is_nullable" %in% select) DT[, is_nullable := c("YES" = TRUE, "NO" = FALSE)[is_nullable]]
    keys = c("table_schema","table_name","ordinal_position")
    setkeyv(DT, if(any(keys %in% select)) keys[keys %in% select])
}

#' @rdname pg
pgDropSchema = function(schema_name, cascade = FALSE, silent = FALSE, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
    stopifnot(!is.null(conn), is.logical(.log), is.character(schema_name), length(schema_name)>0L, is.logical(cascade), is.logical(silent))
    invisible(sapply(
        setNames(nm = schema_name),
        function(schema) pgSendQuery(sprintf("DROP SCHEMA %s%s;", schema, if(cascade) " CASCADE" else ""),
                                     silent = silent,
                                     conn = conn,
                                     .log = .log)
    ))
}

#' @rdname pg
pgRemoveTable = function(name, cascade = FALSE, silent = FALSE, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
    meta = getOption("logR.meta")
    stopifnot(!is.null(conn), is.logical(.log), is.character(name), is.logical(cascade), is.logical(silent), is.function(meta))
    name = schema_table(name)
    # local dbRemoveTable to allow cascade
    dbRemoveTable = function(conn, name, cascade = FALSE){
        # https://github.com/tomoakin/RPostgreSQL/blob/35f4d7e4510992bee9b06a886eedac21f8688ebf/RPostgreSQL/R/PostgreSQL.R#L191
        if(RPostgreSQL::dbExistsTable(conn, name)){
            rc <- try(RPostgreSQL::dbGetQuery(conn, sprintf("DROP TABLE %s%s;", RPostgreSQL::postgresqlTableRef(name), if(cascade) " CASCADE" else "")))
            !inherits(rc, RPostgreSQL:::ErrorClass)
        }
        else FALSE
    }
    logR(dbRemoveTable(conn, name, cascade),
         alert = !silent,
         silent = silent,
         meta = meta(r_fun = "dbRemoveTable", r_args = paste(name, collapse=".")),
         .log = .log)
}

#' @rdname pg
pgDropTable = function(name, cascade = FALSE, silent = FALSE, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
    stopifnot(!is.null(conn), is.logical(.log), is.character(name), is.logical(cascade), is.logical(silent))
    # detect if c(schema,table), otherwise process already collapsed schema.table vectorized, similarly to pgDropSchema but here tricky detection is required
    vec = FALSE # TO DO
    if(!vec) name = setNames(list(name), paste(name, collapse="."))
    sapply(name, pgRemoveTable, cascade = cascade, silent = silent, conn = conn, .log = .log)
}

#' @rdname pg
pgSendUpsert = function(stage_name, name, conflict_by, on_conflict = "DO NOTHING", techstamp = TRUE, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
    stopifnot(!is.null(conn), is.logical(.log), is.logical(techstamp), is.character(on_conflict), length(on_conflict)==1L)
    cols = pgListFields(stage_name)
    cols = setdiff(cols, c("run_id","r_timestamp")) # remove techstamp to have clean column list, as the fresh one will be used, if any
    # sql
    insert_into = sprintf("INSERT INTO %s.%s (%s)", name[1L], name[2L], paste(if(techstamp) c(cols, c("run_id","r_timestamp")) else cols, collapse=", "))
    select = sprintf("SELECT %s", paste(cols, collapse=", "))
    if(techstamp) select = sprintf("%s, %s::INTEGER run_id, '%s'::TIMESTAMPTZ r_timestamp", select, get_run_id(), format(Sys.time(), "%Y-%m-%d %H:%M:%OS"))
    from = sprintf("FROM %s.%s", stage_name[1L], stage_name[2L])
    if(!missing(conflict_by)) on_conflict = paste(paste0("(",paste(conflict_by, collapse=", "),")"), on_conflict)
    on_conflict = paste("ON CONFLICT",on_conflict)
    sql = paste0(paste(insert_into, select, from, on_conflict), ";")
    pgSendQuery(sql, conn = conn, .log = .log)
}

#' @rdname pg
pgUpsertTable = function(name, value, conflict_by, on_conflict = "DO NOTHING", stage_name, techstamp = TRUE, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
    stopifnot(!is.null(conn), is.logical(.log), is.logical(techstamp), is.character(on_conflict), length(on_conflict)==1L)
    name = schema_table(name)
    if(!missing(stage_name)){
        stage_name = schema_table(stage_name)
        drop_stage = FALSE
    } else {
        stage_name = name
        stage_name[2L] = paste("tmp", stage_name[2L], sep="_")
        drop_stage = TRUE
    }
    if(pgExistsTable(stage_name)) pgTruncateTable(name = stage_name, conn = conn, .log = .log)
    pgWriteTable(name = stage_name, value = value, techstamp = techstamp, conn = conn, .log = .log)
    on.exit(if(drop_stage) pgDropTable(stage_name, conn = conn, .log = .log))
    pgSendUpsert(stage_name = stage_name, name = name, conflict_by = conflict_by, on_conflict = on_conflict, techstamp = techstamp, conn = conn, .log = .log)
}

#' @rdname pg
pgDisconnect = function(conn = getOption("pg.conn")){
    dbDisconnect(conn)
}

#' @rdname pg
pgExplain = function(statement, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
    cat((r <- pgGetQuery(sprintf("EXPLAIN ANALYZE %s;", statement), conn = conn, .log = .log))[["QUERY PLAN"]],
        sep = "\n")
    invisible(r)
}
jangorecki/pg documentation built on May 17, 2017, 9:47 a.m.