### Not to be called by the users, so all functions start with a prefix of dot
### and thus are not exported.
### Database related utilities, using RPostgreSQL to connect
## -----------------------------------------------------------------------
.db.connect.rpostgresql <- function(host, user, dbname, password,
port)
{
if (is.null(.localVars$drv$rpostgresql)){
.localVars$drv$rpostgresql <- DBI::dbDriver("PostgreSQL", max.con=50)
}
n.db <- length(.localVars$db)
func <- getMethod("dbConnect", signature="PostgreSQLDriver",
where=as.environment("package:RPostgreSQL"))
db.connection <- func(.localVars$drv$rpostgresql,
host=host, user=user,
dbname=dbname,
password=password,
port = port)
if (length(.localVars$conn.id) == 0)
conn.id <- 1
else
conn.id <- max(.localVars$conn.id[,1]) + 1
.localVars$db[[n.db+1]] <- list(
conn = db.connection,
conn.id = conn.id,
host = host,
user = user,
port = port,
dbname = dbname, # database name
# which R package is used to connected to database
conn.pkg = "rpostgresql"
)
.localVars$conn.id <- rbind(.localVars$conn.id,
c(conn.id, n.db + 1))
.localVars$conn.type[["rpostgresql"]] <- c(
.localVars$conn.type[["rpostgresql"]],
conn.id)
return(conn.id) # return the connection ID
}
## -----------------------------------------------------------------------
.db.unloadDriver.rpostgresql <- function(drv)
{
func <- getMethod("dbUnloadDriver", signature="PostgreSQLDriver",
where=as.environment("package:RPostgreSQL"))
func(drv)
}
## -----------------------------------------------------------------------
.db.disconnect.rpostgresql <- function(idx)
{
func <- getMethod("dbDisconnect", signature="PostgreSQLConnection",
where=as.environment("package:RPostgreSQL"))
func(.localVars$db[[idx]]$conn)
}
## -----------------------------------------------------------------------
.db.sendQuery.rpostgresql <- function(query, idx)
{
func <- getMethod("dbSendQuery",
signature=c("PostgreSQLConnection", "character"),
where=as.environment("package:RPostgreSQL"))
func(.localVars$db[[idx]]$conn, query)
}
## ----------------------------------------------------------------------
.db.clearResult.rpostgresql <- function(res)
{
func <- getMethod("dbClearResult",
signature=c("PostgreSQLResult"),
where=as.environment("package:RPostgreSQL"))
func(res)
}
## -----------------------------------------------------------------------
.db.fetch.rpostgresql <- function(res, n)
{
func <- getMethod("fetch",
signature=c("PostgreSQLResult", "numeric"),
where=as.environment("package:RPostgreSQL"))
func(res, n)
}
## -----------------------------------------------------------------------
.db.getQuery.rpostgresql <- function(query, idx)
{
func <- getMethod("dbGetQuery",
signature=c("PostgreSQLConnection", "character"),
where=as.environment("package:RPostgreSQL"))
func(.localVars$db[[idx]]$conn, query)
}
## -----------------------------------------------------------------------
.db.listTables.rpostgresql <- function(idx)
{
func <- getMethod("dbListTables",
signature="PostgreSQLConnection",
where=as.environment("package:RPostgreSQL"))
func(.localVars$db[[idx]]$conn)
}
## -----------------------------------------------------------------------
.db.existsTable.rpostgresql <- function(table, idx)
{
func <- getMethod("dbExistsTable",
signature=c("PostgreSQLConnection", "character"),
where=as.environment("package:RPostgreSQL"))
func(.localVars$db[[idx]]$conn, table)
}
## -----------------------------------------------------------------------
.db.listFields.rpostgresql <- function(table, idx)
{
func <- getMethod("dbListFields",
signature=c("PostgreSQLConnection", "character"),
where=as.environment("package:RPostgreSQL"))
func(.localVars$db[[idx]]$conn, table)
}
## -----------------------------------------------------------------------
.db.buildTableDefinition <- function(dbObj, name, obj, field.types,
add.row.names, dist.str, is.temp, ...)
{
func <- getMethod("dbDataType",
signature=c("PostgreSQLObject", "ANY"),
where=as.environment("package:RPostgreSQL"))
if(!is.data.frame(obj))
obj <- as.data.frame(obj)
if(!is.null(add.row.names) && add.row.names){
obj <- cbind(row.names(obj), obj) ## can't use row.names= here
names(obj)[1] <- "row_names"
}
if(is.null(field.types)){
## the following mapping should be coming from some kind of table
## also, need to use converter functions (for dates, etc.)
field.types <- sapply(obj, func, dbObj = dbObj)
}
## need to create a new (empty) table
flds <- paste(RPostgreSQL::postgresqlQuoteId(names(field.types)),
field.types)
if (is.temp) tmp.str <- "temp"
else tmp.str <- ""
paste("CREATE", tmp.str, "TABLE",
RPostgreSQL::postgresqlTableRef(name), "\n(",
paste(flds, collapse=",\n\t"), "\n)", dist.str)
}
## -----------------------------------------------------------------------
.db.writeTable.rpostgresql <- function (table, r.obj, add.row.names,
overwrite, append, distributed.by,
is.temp,
idx, header, nrows = 50, sep = ",",
eol="\n", skip = 0, quote = "\"",
field.types, ...)
{
conn <- .localVars$db[[idx]]$conn
conn.id <- .localVars$db[[idx]]$conn.id
name <- table
value <- r.obj
func1 <- getMethod("dbClearResult",
signature="PostgreSQLResult",
where=as.environment("package:RPostgreSQL"))
func2 <- getMethod("dbWriteTable",
signature=c("PostgreSQLConnection", "character",
"character"),
where=as.environment("package:RPostgreSQL"))
func3 <- getMethod("dbWriteTable",
signature=c("PostgreSQLConnection", "character",
"data.frame"),
where=as.environment("package:RPostgreSQL"))
## only for GPDB
## This why this function is so complicated
dist.str <- .get.distributed.by.str(conn.id, distributed.by)
## need to create the table first
if (is.character(r.obj)) # create from file
{
new.con <- conn
if (is.temp) {
check.temp <- .db.existsTempTable(name, conn.id)
name <- check.temp[[2]]
}
if((!is.temp && .db.existsTable(name, conn.id)) ||
(is.temp && check.temp[[1]]))
{
if(overwrite)
{
if(!.db.removeTable(name, conn.id))
{
warning(paste("table", name,
"couldn't be overwritten"))
return(FALSE)
}
}
else if(!append)
{
warning(
paste("table", name,
"exists in database: aborting dbWriteTable"))
return(FALSE)
}
}
## compute full path name (have R expand ~, etc)
fn <- file.path(dirname(r.obj), basename(r.obj))
if(missing(header) || missing(add.row.names))
{
f <- file(fn, open="r")
if (skip>0) readLines(f, n=skip)
txtcon <- textConnection(readLines(f, n=2))
flds <- count.fields(txtcon, sep)
close(txtcon)
close(f)
nf <- length(unique(flds))
}
if(missing(header)) header <- nf==2
if(missing(add.row.names))
{
if(header)
add.row.names <- if(nf==2) TRUE else FALSE
else
add.row.names <- FALSE
}
new.table <- !.db.existsTable(name, conn.id)
if(new.table)
{
## need to init table, say, with the first nrows lines
d <- read.table(fn, sep=sep, header=header, skip=skip,
nrows=nrows, ...)
if (missing(field.types)) field.types <- NULL
sql <- .db.buildTableDefinition(new.con, table, d,
field.types, add.row.names,
dist.str, is.temp)
rs <- try(.db.sendQuery(sql, conn.id))
if(is(rs, .err.class)){
warning("could not create table: aborting postgresqlImportFile")
return(FALSE)
}
else
func1(rs$res)
}
else if(!append)
{
warning(sprintf(
"table %s already exists -- use append=TRUE?", name))
}
fmt <- paste("COPY %s FROM '%s' ","WITH DELIMITER AS '%s' ",
if(!is.null(quote)) "CSV HEADER QUOTE AS '%s'", sep="")
if(is.null(quote))
sql <- sprintf(fmt, name,fn, sep)
else
sql <- sprintf(fmt, name,fn, sep, quote)
rs <- try(.db.sendQuery(sql, conn.id))
if(inherits(rs, .err.class)){
warning("could not load data into table")
return(FALSE)
}
TRUE
}
else # create table from a data frame --------------------------------
{
if (is.temp) check.temp <- .db.existsTempTable(table, conn.id)
if((!is.temp && .db.existsTable(name, conn.id)) ||
(is.temp && check.temp[[1]]))
{
if (overwrite) {
if (!.db.removeTable(name, conn.id))
{
warning(paste("table", name,
"couldn't be overwritten"))
return(FALSE)
}
}
else if (!append)
{
warning(
paste("table", name,
"exists in database: aborting assignTable"))
return(FALSE)
}
}
else
{
if (missing(field.types)) field.types <- NULL
sql <- .db.buildTableDefinition(conn, table, r.obj,
field.types, add.row.names,
dist.str, is.temp)
rs <- try(.db.sendQuery(sql, conn.id))
if (is.temp) name <- (.db.existsTempTable(table,
conn.id))[[2]]
if(is(rs, .err.class))
{
warning("could not create table: aborting assignTable")
return(FALSE)
}
else
{
func1(rs$res)#######
}
}
## After the table has been created, one can append data to it
func3(conn = .localVars$db[[idx]]$conn,
name = name, value = value,
row.names = add.row.names,
overwrite = overwrite, append = TRUE)
}
}
## -----------------------------------------------------------------------
.db.readTable.rpostgresql <- function (table, row.names, idx)
{
func <- getMethod("dbReadTable",
signature=c("PostgreSQLConnection", "character"),
where=as.environment("package:RPostgreSQL"))
func(conn = .localVars$db[[idx]]$conn,
name = table, row.names = row.names)
}
## -----------------------------------------------------------------------
.db.removeTable.rpostgresql <- function (table, idx)
{
func <- getMethod("dbRemoveTable",
signature=c("PostgreSQLConnection", "character"),
where=as.environment("package:RPostgreSQL"))
func(conn = .localVars$db[[idx]]$conn,
name = table)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.