###############################################################################
## Project: ETLUtils
## Content: Utilities that work for ff objects
##
## Author: jan
## Creation date: Mar 25, 2012, 10:06:53 PM
## File: ffsql.R
###############################################################################
#' Read data from a DBI connection into an ffdf.
#'
#' Read data from a DBI connection into an \code{\link[ff]{ffdf}}. This can for example be used to import
#' large datasets from Oracle, SQLite, MySQL, PostgreSQL, Hive or other SQL databases into R. \cr
#'
#' Opens up the DBI connection using \code{DBI::dbConnect}, sends the query using \code{DBI::dbSendQuery} and \code{DBI::dbFetch}-es
#' the results in batches of next.rows rows. Heavily borrowed from \code{\link[ff]{read.table.ffdf}}
#'
#' @param query the SQL query to execute on the DBI connection
#' @param dbConnect.args a list of arguments to pass to DBI's \code{\link[DBI]{dbConnect}} (like drv, dbname, username, password). See the examples.
#' @param dbSendQuery.args a list containing database-specific parameters which will be passed to to pass to \code{\link[DBI]{dbSendQuery}}.
#' Defaults to an empty list.
#' @param dbFetch.args a list containing optional database-specific parameters which will be passed to to pass to \code{\link[DBI]{dbFetch}}.
#' Defaults to an empty list.
#' @param x NULL or an optional ffdf object to which the read records are appended.
#' See documentation in read.table.ffdf for more details and the example below.
#' @param nrows Number of rows to read from the query resultset. Default value of -1 reads in all rows.
#' @param first.rows chunk size (rows) to read for first chunk from the query resultset
#' @param next.rows chunk size (rows) to read sequentially for subsequent chunks from the query resultset. Currently, this must be specified.
#' @param levels optional specification of factor levels. A list with as names the names the columns of the data.frame
#' fetched in the first.rows, containing levels of the factors.
#' @param appendLevels logical. A vector of permissions to expand levels for factor columns. See documentation in \code{\link[ff]{read.table.ffdf}} for more details.
#' @param asffdf_args further arguments passed to \code{\link[ff]{as.ffdf}} (ignored if 'x' gives an ffdf object )
#' @param BATCHBYTES integer: bytes allowed for the size of the data.frame storing the result of reading one chunk.
#' See documentation in \code{\link[ff]{read.table.ffdf}} for more details.
#' @param VERBOSE logical: TRUE to verbose timings for each processed chunk (default FALSE).
#' @param colClasses See documentation in \code{\link[ff]{read.table.ffdf}}
#' @param transFUN function applied to the data frame after each chunk is retreived by \code{\link[DBI]{dbFetch}}
#' @param ... optional parameters passed on to transFUN
#' @return
#' An ffdf object unless the query returns zero records in which case the function will return the data.frame
#' returned by \code{\link[DBI]{dbFetch}} and possibly transFUN.
#' @export
#' @seealso \code{\link[ff]{read.table.ffdf}, \link{read.odbc.ffdf}}
#' @examples
#' require(ff)
#'
#' ##
#' ## Example query using data in sqlite
#' ##
#' require(RSQLite)
#' dbfile <- system.file("smalldb.sqlite3", package="ETLUtils")
#' drv <- dbDriver("SQLite")
#' query <- "select * from testdata limit 10000"
#' x <- read.dbi.ffdf(query = query, dbConnect.args = list(drv = drv, dbname = dbfile),
#' first.rows = 100, next.rows = 1000, VERBOSE=TRUE)
#' class(x)
#' x[1:10, ]
#'
#' ## show it is the same as getting the data directly using RSQLite
#' ## apart from characters which are factors in ffdf objects
#' directly <- dbGetQuery(dbConnect(drv = drv, dbname = dbfile), query)
#' directly <- as.data.frame(as.list(directly), stringsAsFactors=TRUE)
#' all.equal(x[,], directly)
#'
#' ## show how to use the transFUN argument to transform the data before saving into the ffdf
#' ## and shows the use of the levels argument
#' query <- "select * from testdata limit 10"
#' x <- read.dbi.ffdf(query = query, dbConnect.args = list(drv = drv, dbname = dbfile),
#' first.rows = 100, next.rows = 1000, VERBOSE=TRUE, levels = list(a = rev(LETTERS)),
#' transFUN = function(x, subtractdays){
#' x$b <- as.Date(x$b)
#' x$b.subtractdaysago <- x$b - subtractdays
#' x
#' }, subtractdays=7)
#' class(x)
#' x[1:10, ]
#' ## remark that the levels of column a are reversed due to specifying the levels argument correctly
#' levels(x$a)
#'
#' ## show how to append data to an existing ffdf object
#' transformexample <- function(x, subtractdays){
#' x$b <- as.Date(x$b)
#' x$b.subtractdaysago <- x$b - subtractdays
#' x
#' }
#' dim(x)
#' x[,]
#' combined <- read.dbi.ffdf(query = query,
#' dbConnect.args = list(drv = drv, dbname = dbfile),
#' first.rows = 100, next.rows = 1000, x = x, VERBOSE=TRUE,
#' transFUN = transformexample, subtractdays=1000)
#' dim(combined)
#' combined[,]
#'
#' ##
#' ## Example query using ROracle. Do try this at home with some larger data :)
#' ##
#' \dontrun{
#' require(ROracle)
#' query <- "select OWNER, TABLE_NAME, TABLESPACE_NAME, NUM_ROWS, LAST_ANALYZED from all_all_tables"
#' x <- read.dbi.ffdf(query=query,
#' dbConnect.args = list(drv = dbDriver("Oracle"),
#' user = "YourUser", password = "YourPassword", dbname = "Mydatabase"),
#' first.rows = 100, next.rows = 50000, nrows = -1, VERBOSE=TRUE)
#' }
read.dbi.ffdf <- function(
query = NULL,
dbConnect.args = list(drv=NULL, dbname = NULL, username = "", password = ""),
dbSendQuery.args = list(),
dbFetch.args = list(),
x = NULL, nrows = -1,
first.rows = NULL, next.rows = NULL, levels = NULL, appendLevels = TRUE,
asffdf_args = list(), BATCHBYTES = getOption("ffbatchbytes"), VERBOSE = FALSE, colClasses = NULL,
transFUN = NULL, ...){
cleanupConnection <- function(x){
if("resultset" %in% names(x)){
DBI::dbClearResult(res=x$resultset)
}
if("channel" %in% names(x)){
DBI::dbDisconnect(x$channel)
}
}
dbiinfo <- list()
##
## Connect to database
##
dbiinfo$channel <- do.call('dbConnect', dbConnect.args)
on.exit(cleanupConnection(dbiinfo))
append <- !is.null(x)
if(append && !inherits(x, "ffdf")){
stop("only ffdf objects can be used for appending (and skipping the first.row chunk)")
}
if(VERBOSE){
read.time <- 0
write.time <- 0
}
nrows <- as.integer(nrows)
N <- 0L
if(!append){
if(VERBOSE){
cat("read.dbi.ffdf ", N+1L, "..", sep="")
read.start <- proc.time()[3]
}
if(is.null(colClasses)){
colClasses <- NA
}
## Specify the size of the first number of rows to import
if (is.null(first.rows)){
if (is.null(next.rows)){
first.rows <- 1e3L
}else{
first.rows <- as.integer(next.rows)
}
}else{
first.rows <- as.integer(first.rows)
}
if (nrows>=0L && nrows<first.rows){
first.rows <- nrows
}
if (first.rows==1){
stop("first.rows must not be 1")
}
##
## Set up arguments for odbcQuery and sqlGetResults functions
##
dbSendQuery.args$conn <- dbiinfo$channel
dbSendQuery.args$statement <- query
##
## Launch the query
##
dbiinfo$resultset <- do.call('dbSendQuery', dbSendQuery.args)
##
## Read in the first chunk
##
dbFetch.args$res <- dbiinfo$resultset
dbFetch.args$n <- first.rows
if(is.null(transFUN)){
dat <- convertCharacterToFactor(do.call("dbFetch", dbFetch.args))
}else{
dat <- convertCharacterToFactor(transFUN(do.call("dbFetch", dbFetch.args), ...))
}
n <- nrow(dat)
N <- n
## Recode levels if levels are specified
if(!is.null(levels)){
cnam <- colnames(dat)
lnam <- names(levels)
if(!is.list(levels) || is.null(lnam) || any(is.na(match(lnam,cnam)))){
stop("levels must be a list with names matching column names of the first data.frame read")
}
for (i in lnam){
dat[[i]] <- recodeLevels(dat[[i]], levels[[i]])
}
}
if (VERBOSE){
write.start <- proc.time()[3]
read.time <- read.time + (write.start - read.start)
cat(N, " (", n, ") dbi-read=", round(write.start-read.start, 3), "sec", sep="")
}
##
## If there are zero rows in the query, return a data.frame instead of an ffdf object
##
if(nrow(dat) == 0){
if (VERBOSE){
cat(" query returned 0 records\n", sep="")
cat(" dbi-read=", round(read.time, 3), "sec TOTAL=", round(read.time, 3), "sec\n", sep="")
}
return(dat)
}
##
## If there are non-zero rows in the query, convert to a filebased ffdf object
##
x <- do.call("as.ffdf", c(list(dat), asffdf_args))
##
## Fix ordered factors
##
colClasses <- repnam(colClasses, colnames(x), default=NA)
i.fix <- seq.int(length.out=ncol(dat))[!is.na(match(colClasses, "ordered"))]
for (i in i.fix){
virtual(x[[i]])$ramclass <- c("ordered","factor")
}
if (VERBOSE){
write.stop <- proc.time()[3]
write.time <- write.time + (write.stop - write.start)
cat(" ffdf-write=", round(write.stop-write.start, 3), "sec\n", sep="")
}
}
##
## Fetch the other rows
##
if (append || N==first.rows){
##
## Define the number of rows to fetch in each run depending on the BATCHBYTES maximum
##
k <- ncol(x)
col.names <- colnames(x)
colClasses <- sapply(seq.int(length.out=ncol(x)), function(i)colClass(x[[i]]))
if (is.null(next.rows)){
recordsize <- sum(.rambytes[vmode(x)])
next.rows <- BATCHBYTES %/% recordsize
if (next.rows<1L){
next.rows <- 1L
warning("single record does not fit into BATCHBYTES")
}
}else{
next.rows <- as.integer(next.rows)
}
next.nrows <- next.rows
appendLevels <- repnam(appendLevels, col.names, default=TRUE)
if(any(appendLevels)){
i.fac <- seq.int(length.out=k)
if(append == TRUE){
i.fac <- i.fac[appendLevels & sapply(i.fac, function(i) is.factor(x[[i]]))]
}else{
i.fac <- i.fac[appendLevels & sapply(i.fac, function(i) is.factor(dat[[i]]))]
}
}
if(append == TRUE){
## Set up arguments for odbcQuery and sqlGetResults functions & Launch the query
dbSendQuery.args$conn <- dbiinfo$channel
dbSendQuery.args$statement <- query
dbiinfo$resultset <- do.call('dbSendQuery', dbSendQuery.args)
}
##
## Loop - fetching the subsequent data in batches
##
while(TRUE){
if (nrows>=0L && N+next.rows > nrows){
next.nrows <- nrows - N
}
if (next.nrows<1L){
break
}
if (VERBOSE){
cat("read.dbi.ffdf ", N+1L,"..", sep="")
read.start <- proc.time()[3]
}
##
## read in subsequent chunk
##
dbFetch.args$res <- dbiinfo$resultset
dbFetch.args$n <- next.nrows
if (is.null(transFUN)){
dat <- convertCharacterToFactor(do.call("dbFetch", dbFetch.args))
}else{
dat <- convertCharacterToFactor(transFUN(do.call("dbFetch", dbFetch.args), ...))
}
n <- nrow(dat)
N <- N + n
if (VERBOSE){
write.start <- proc.time()[3]
read.time <- read.time + (write.start - read.start)
cat(N, " (", n, ") dbi-read=", round(write.start-read.start, 3), "sec", sep="")
}
## Nothing to import any more
if (n<1L){
if (VERBOSE)
cat("\n")
break
}
## Recode levels if levels are specified
if(any(appendLevels))
for (i in i.fac){
lev <- unique(c(levels(x[[i]]),levels(dat[[i]]))) # we save a call to the more general appendLevels() here
levels(x[[i]]) <- lev
dat[[i]] <- recodeLevels(dat[[i]], lev)
}
##
## Append to the ffdf object
##
nff <- nrow(x)
nrow(x) <- nff + n
i <- hi(nff+1L, nff+n)
x[i,] <- dat
if (VERBOSE){
write.stop <- proc.time()[3]
write.time <- write.time + (write.stop - write.start)
cat(" ffdf-write=", round(write.stop-write.start, 3), "sec\n", sep="")
}
if(n<next.nrows){
break
}
}
}
if (VERBOSE){
cat(" dbi-read=", round(read.time, 3), "sec ffdf-write=", round(write.time, 3), "sec TOTAL=", round(read.time+write.time, 3), "sec\n", sep="")
}
return(x)
}
#' Read data from a ODBC connection into an ffdf.
#'
#' Read data from a ODBC connection into an \code{\link[ff]{ffdf}}. This can for example be used to import
#' large datasets from Oracle, SQLite, MySQL, PostgreSQL, Hive or other SQL databases into R. \cr
#'
#' Opens up the ODBC connection using \code{RODBC::odbcConnect} or \code{RODBC::odbcDriverConnect},
#' sends the query using \code{RODBC::odbcQuery} and retrieves
#' the results in batches of next.rows rows using \code{RODBC::sqlGetResults}. Heavily borrowed from \code{\link[ff]{read.table.ffdf}}
#'
#' @param query the SQL query to execute on the ODBC connection
#' @param odbcConnect.args a list of arguments to pass to ODBC's \code{\link[RODBC]{odbcConnect}} (like dsn, uid, pwd). See the examples.
#' @param odbcDriverConnect.args a list of arguments to pass to ODBC's \code{\link[RODBC]{odbcDriverConnect}} (like connection). If you want to
#' connect using odbcDriverConnect instead of odbcConnect.
#' @param odbcQuery.args a list of arguments to pass to ODBC's \code{\link[RODBC]{odbcQuery}}, like rows_at_time. Defaults to an empty list.
#' @param sqlGetResults.args a list containing optional parameters which will be passed to \code{\link[RODBC]{sqlGetResults}}.
#' Defaults to an empty list. The max parameter will be overwritten with first.rows and next.rows when importing in batches.
#' @param x NULL or an optional ffdf object to which the read records are appended.
#' See documentation in \code{\link[ff]{read.table.ffdf}} for more details and the example below.
#' @param nrows Number of rows to read from the query resultset. Default value of -1 reads in all rows.
#' @param first.rows chunk size (rows) to read for first chunk from the query resultset
#' @param next.rows chunk size (rows) to read sequentially for subsequent chunks from the query resultset. Currently, this must be specified.
#' @param levels optional specification of factor levels. A list with as names the names the columns of the data.frame
#' fetched in the first.rows, containing levels of the factors.
#' @param appendLevels logical. A vector of permissions to expand levels for factor columns. See documentation in \code{\link[ff]{read.table.ffdf}} for more details.
#' @param asffdf_args further arguments passed to \code{\link[ff]{as.ffdf}} (ignored if 'x' gives an ffdf object)
#' @param BATCHBYTES integer: bytes allowed for the size of the data.frame storing the result of reading one chunk.
#' See documentation in \code{\link[ff]{read.table.ffdf}} for more details.
#' @param VERBOSE logical: TRUE to verbose timings for each processed chunk (default FALSE).
#' @param colClasses See documentation in \code{\link[ff]{read.table.ffdf}}
#' @param transFUN function applied to the data frame after each chunk is retreived by \code{\link[RODBC]{sqlGetResults}}
#' @param ... optional parameters passed on to transFUN
#' @return
#' An ffdf object unless the query returns zero records in which case the function will return the data.frame
#' returned by \code{\link[RODBC]{sqlGetResults}} and possibly transFUN.
#' @export
#' @seealso \code{\link[ff]{read.table.ffdf}, \link{read.dbi.ffdf}}
#' @examples
#' ##
#' ## Using the sqlite database (smalldb.sqlite3) in the /inst folder of the package
#' ## set up the sqlite ODBC driver (www.stats.ox.ac.uk/pub/bdr/RODBC-manual.pd)
#' ## and call it 'smalltestsqlitedb'
#' ##
#' \dontrun{
#' require(RODBC)
#' x <- read.odbc.ffdf(
#' query = "select * from testdata limit 10000",
#' odbcConnect.args = list(
#' dsn="smalltestsqlitedb", uid = "", pwd = "",
#' believeNRows = FALSE, rows_at_time = 1),
#' nrows = -1,
#' first.rows = 100, next.rows = 1000, VERBOSE = TRUE)
#' }
read.odbc.ffdf <- function(
query = NULL,
odbcConnect.args = list(dsn=NULL, uid = "", pwd = ""),
odbcDriverConnect.args = list(connection = ""),
odbcQuery.args = list(),
sqlGetResults.args = list(),
x = NULL, nrows = -1,
first.rows = NULL, next.rows = NULL, levels = NULL, appendLevels = TRUE,
asffdf_args = list(), BATCHBYTES = getOption("ffbatchbytes"), VERBOSE = FALSE, colClasses = NULL,
transFUN = NULL, ...){
odbcinfo <- list()
##
## Connect to database
##
if(!missing(odbcDriverConnect.args)){
odbcinfo$channel <- do.call('odbcDriverConnect', odbcDriverConnect.args)
}else{
odbcinfo$channel <- do.call('odbcConnect', odbcConnect.args)
}
on.exit(try(RODBC::odbcClose(odbcinfo$channel), silent = TRUE))
append <- !is.null(x)
if(append && !inherits(x, "ffdf")){
stop("only ffdf objects can be used for appending (and skipping the first.row chunk)")
}
if(VERBOSE){
read.time <- 0
write.time <- 0
}
nrows <- as.integer(nrows)
N <- 0L
if(!append){
if(VERBOSE){
cat("read.odbc.ffdf ", N+1L, "..", sep="")
read.start <- proc.time()[3]
}
if(is.null(colClasses)){
colClasses <- NA
}
## Specify the size of the first number of rows to import
if (is.null(first.rows)){
if (is.null(next.rows)){
first.rows <- 1e3L
}else{
first.rows <- as.integer(next.rows)
}
}else{
first.rows <- as.integer(first.rows)
}
if (nrows>=0L && nrows<first.rows){
first.rows <- nrows
}
if (first.rows==1){
stop("first.rows must not be 1")
}
##
## Set up arguments for odbcQuery
##
odbcQuery.args$channel <- odbcinfo$channel
odbcQuery.args$query <- query
##
## Launch the query
##
odbcinfo$resultset <- do.call('odbcQuery', odbcQuery.args)
##
## Read in the first chunk
##
sqlGetResults.args$channel <- odbcQuery.args$channel
sqlGetResults.args$max <- first.rows
if(is.null(transFUN)){
dat <- convertCharacterToFactor(do.call("sqlGetResults", sqlGetResults.args))
}else{
dat <- convertCharacterToFactor(transFUN(do.call("sqlGetResults", sqlGetResults.args), ...))
}
n <- nrow(dat)
N <- n
## Recode levels if levels are specified
if(!is.null(levels)){
cnam <- colnames(dat)
lnam <- names(levels)
if(!is.list(levels) || is.null(lnam) || any(is.na(match(lnam,cnam)))){
stop("levels must be a list with names matching column names of the first data.frame read")
}
for (i in lnam){
dat[[i]] <- recodeLevels(dat[[i]], levels[[i]])
}
}
if (VERBOSE){
write.start <- proc.time()[3]
read.time <- read.time + (write.start - read.start)
cat(N, " (", n, ") odbc-read=", round(write.start-read.start, 3), "sec", sep="")
}
##
## If there are zero rows in the query, return a data.frame instead of an ffdf object
##
if(nrow(dat) == 0){
if (VERBOSE){
cat(" query returned 0 records\n", sep="")
cat(" odbc-read=", round(read.time, 3), "sec TOTAL=", round(read.time, 3), "sec\n", sep="")
}
return(dat)
}
##
## If there are non-zero rows in the query, convert to a filebased ffdf object
##
x <- do.call("as.ffdf", c(list(dat), asffdf_args))
##
## Fix ordered factors
##
colClasses <- repnam(colClasses, colnames(x), default=NA)
i.fix <- seq.int(length.out=ncol(dat))[!is.na(match(colClasses, "ordered"))]
for (i in i.fix){
virtual(x[[i]])$ramclass <- c("ordered","factor")
}
if (VERBOSE){
write.stop <- proc.time()[3]
write.time <- write.time + (write.stop - write.start)
cat(" ffdf-write=", round(write.stop-write.start, 3), "sec\n", sep="")
}
}
##
## Fetch the other rows
##
if (append || N==first.rows){
##
## Define the number of rows to fetch in each run depending on the BATCHBYTES maximum
##
k <- ncol(x)
col.names <- colnames(x)
colClasses <- sapply(seq.int(length.out=ncol(x)), function(i)colClass(x[[i]]))
if (is.null(next.rows)){
recordsize <- sum(.rambytes[vmode(x)])
next.rows <- BATCHBYTES %/% recordsize
if (next.rows<1L){
next.rows <- 1L
warning("single record does not fit into BATCHBYTES")
}
}else{
next.rows <- as.integer(next.rows)
}
next.nrows <- next.rows
appendLevels <- repnam(appendLevels, col.names, default=TRUE)
if(any(appendLevels)){
i.fac <- seq.int(length.out=k)
if(append == TRUE){
i.fac <- i.fac[appendLevels & sapply(i.fac, function(i) is.factor(x[[i]]))]
}else{
i.fac <- i.fac[appendLevels & sapply(i.fac, function(i) is.factor(dat[[i]]))]
}
}
if(append == TRUE){
## Set up arguments for odbcQuery
odbcQuery.args$channel <- odbcinfo$channel
odbcQuery.args$query <- query
odbcinfo$resultset <- do.call('odbcQuery', odbcQuery.args)
}
##
## Loop - fetching the subsequent data in batches
##
while(TRUE){
if (nrows>=0L && N+next.rows > nrows){
next.nrows <- nrows - N
}
if (next.nrows<1L){
break
}
if (VERBOSE){
cat("read.odbc.ffdf ", N+1L,"..", sep="")
read.start <- proc.time()[3]
}
##
## read in subsequent chunk
##
sqlGetResults.args$channel <- odbcQuery.args$channel
sqlGetResults.args$max <- next.nrows
if (is.null(transFUN)){
dat <- convertCharacterToFactor(do.call("sqlGetResults", sqlGetResults.args))
}else{
dat <- convertCharacterToFactor(transFUN(do.call("sqlGetResults", sqlGetResults.args), ...))
}
n <- nrow(dat)
N <- N + n
if (VERBOSE){
write.start <- proc.time()[3]
read.time <- read.time + (write.start - read.start)
cat(N, " (", n, ") odbc-read=", round(write.start-read.start, 3), "sec", sep="")
}
## Nothing to import any more
if (n<1L){
if (VERBOSE)
cat("\n")
break
}
## Recode levels if levels are specified
if(any(appendLevels))
for (i in i.fac){
lev <- unique(c(levels(x[[i]]),levels(dat[[i]]))) # we save a call to the more general appendLevels() here
levels(x[[i]]) <- lev
dat[[i]] <- recodeLevels(dat[[i]], lev)
}
##
## Append to the ffdf object
##
nff <- nrow(x)
nrow(x) <- nff + n
i <- hi(nff+1L, nff+n)
x[i,] <- dat
if (VERBOSE){
write.stop <- proc.time()[3]
write.time <- write.time + (write.stop - write.start)
cat(" ffdf-write=", round(write.stop-write.start, 3), "sec\n", sep="")
}
if(n<next.nrows){
break
}
}
}
if (VERBOSE){
cat(" odbc-read=", round(read.time, 3), "sec ffdf-write=", round(write.time, 3), "sec TOTAL=", round(read.time+write.time, 3), "sec\n", sep="")
}
return(x)
}
#' Read data from a JDBC connection into an ffdf.
#'
#' Read data from a JDBC connection into an \code{\link[ff]{ffdf}}. This can for example be used to import
#' large datasets from Oracle, SQLite, MySQL, PostgreSQL, Hive or other SQL databases into R. \cr
#'
#' Opens up the JDBC connection using \code{RJDBC::dbConnect}, sends the query using \code{RJDBC::dbSendQuery} and \code{RJDBC::dbFetch}-es
#' the results in batches of next.rows rows. Heavily borrowed from \code{\link[ff]{read.table.ffdf}}
#'
#' @param query the SQL query to execute on the JDBC connection
#' @param dbConnect.args a list of arguments to pass to JDBC's \code{RJDBC::dbConnect} (like drv, dbname, username, password). See the examples.
#' @param dbSendQuery.args a list containing database-specific parameters which will be passed to to pass to \code{RJDBC::dbSendQuery}.
#' Defaults to an empty list.
#' @param dbFetch.args a list containing optional database-specific parameters which will be passed to to pass to \code{RJDBC::dbFetch}.
#' Defaults to an empty list.
#' @param x NULL or an optional ffdf object to which the read records are appended.
#' See documentation in read.table.ffdf for more details and the example below.
#' @param nrows Number of rows to read from the query resultset. Default value of -1 reads in all rows.
#' @param first.rows chunk size (rows) to read for first chunk from the query resultset
#' @param next.rows chunk size (rows) to read sequentially for subsequent chunks from the query resultset. Currently, this must be specified.
#' @param levels optional specification of factor levels. A list with as names the names the columns of the data.frame
#' fetched in the first.rows, containing levels of the factors.
#' @param appendLevels logical. A vector of permissions to expand levels for factor columns. See documentation in \code{\link[ff]{read.table.ffdf}} for more details.
#' @param asffdf_args further arguments passed to \code{\link[ff]{as.ffdf}} (ignored if 'x' gives an ffdf object )
#' @param BATCHBYTES integer: bytes allowed for the size of the data.frame storing the result of reading one chunk.
#' See documentation in \code{\link[ff]{read.table.ffdf}} for more details.
#' @param VERBOSE logical: TRUE to verbose timings for each processed chunk (default FALSE).
#' @param colClasses See documentation in \code{\link[ff]{read.table.ffdf}}
#' @param transFUN function applied to the data frame after each chunk is retreived by \code{RJDBC::dbFetch}
#' @param ... optional parameters passed on to transFUN
#' @return
#' An ffdf object unless the query returns zero records in which case the function will return the data.frame
#' returned by \code{RJDBC::dbFetch} and possibly transFUN.
#' @export
#' @seealso \code{\link[ff]{read.table.ffdf}, \link{read.jdbc.ffdf}}
#' @examples
#' \dontrun{
#' require(ff)
#'
#' ##
#' ## Example query using data in sqlite
#' ##
#' require(RSQLite)
#' dbfile <- system.file("smalldb.sqlite3", package="ETLUtils")
#' drv <- JDBC(driverClass = "org.sqlite.JDBC", classPath = "/usr/local/lib/sqlite-jdbc-3.7.2.jar")
#' query <- "select * from testdata limit 10000"
#' x <- read.jdbc.ffdf(query = query,
#' dbConnect.args = list(drv = drv, url = sprintf("jdbc:sqlite:%s", dbfile)),
#' first.rows = 100, next.rows = 1000, VERBOSE=TRUE)
#' class(x)
#' x[1:10, ]
#' }
read.jdbc.ffdf <- function(
query = NULL,
dbConnect.args = list(drv=NULL, dbname = NULL, username = "", password = ""),
dbSendQuery.args = list(),
dbFetch.args = list(),
x = NULL, nrows = -1,
first.rows = NULL, next.rows = NULL, levels = NULL, appendLevels = TRUE,
asffdf_args = list(), BATCHBYTES = getOption("ffbatchbytes"), VERBOSE = FALSE, colClasses = NULL,
transFUN = NULL, ...){
cleanupConnection <- function(x){
if("resultset" %in% names(x)){
RJDBC::dbClearResult(res=x$resultset)
}
if("channel" %in% names(x)){
RJDBC::dbDisconnect(x$channel)
}
}
jdbcinfo <- list()
##
## Connect to database
##
jdbcinfo$channel <- do.call('dbConnect', dbConnect.args)
on.exit(cleanupConnection(jdbcinfo))
append <- !is.null(x)
if(append && !inherits(x, "ffdf")){
stop("only ffdf objects can be used for appending (and skipping the first.row chunk)")
}
if(VERBOSE){
read.time <- 0
write.time <- 0
}
nrows <- as.integer(nrows)
N <- 0L
if(!append){
if(VERBOSE){
cat("read.jdbc.ffdf ", N+1L, "..", sep="")
read.start <- proc.time()[3]
}
if(is.null(colClasses)){
colClasses <- NA
}
## Specify the size of the first number of rows to import
if (is.null(first.rows)){
if (is.null(next.rows)){
first.rows <- 1e3L
}else{
first.rows <- as.integer(next.rows)
}
}else{
first.rows <- as.integer(first.rows)
}
if (nrows>=0L && nrows<first.rows){
first.rows <- nrows
}
if (first.rows==1){
stop("first.rows must not be 1")
}
##
## Set up arguments for odbcQuery and sqlGetResults functions
##
dbSendQuery.args$conn <- jdbcinfo$channel
dbSendQuery.args$statement <- query
##
## Launch the query
##
jdbcinfo$resultset <- do.call('dbSendQuery', dbSendQuery.args)
##
## Read in the first chunk
##
dbFetch.args$res <- jdbcinfo$resultset
dbFetch.args$n <- first.rows
if(is.null(transFUN)){
dat <- convertCharacterToFactor(do.call("dbFetch", dbFetch.args))
}else{
dat <- convertCharacterToFactor(transFUN(do.call("dbFetch", dbFetch.args), ...))
}
n <- nrow(dat)
N <- n
## Recode levels if levels are specified
if(!is.null(levels)){
cnam <- colnames(dat)
lnam <- names(levels)
if(!is.list(levels) || is.null(lnam) || any(is.na(match(lnam,cnam)))){
stop("levels must be a list with names matching column names of the first data.frame read")
}
for (i in lnam){
dat[[i]] <- recodeLevels(dat[[i]], levels[[i]])
}
}
if (VERBOSE){
write.start <- proc.time()[3]
read.time <- read.time + (write.start - read.start)
cat(N, " (", n, ") jdbc-read=", round(write.start-read.start, 3), "sec", sep="")
}
##
## If there are zero rows in the query, return a data.frame instead of an ffdf object
##
if(nrow(dat) == 0){
if (VERBOSE){
cat(" query returned 0 records\n", sep="")
cat(" jdbc-read=", round(read.time, 3), "sec TOTAL=", round(read.time, 3), "sec\n", sep="")
}
return(dat)
}
##
## If there are non-zero rows in the query, convert to a filebased ffdf object
##
x <- do.call("as.ffdf", c(list(dat), asffdf_args))
##
## Fix ordered factors
##
colClasses <- repnam(colClasses, colnames(x), default=NA)
i.fix <- seq.int(length.out=ncol(dat))[!is.na(match(colClasses, "ordered"))]
for (i in i.fix){
virtual(x[[i]])$ramclass <- c("ordered","factor")
}
if (VERBOSE){
write.stop <- proc.time()[3]
write.time <- write.time + (write.stop - write.start)
cat(" ffdf-write=", round(write.stop-write.start, 3), "sec\n", sep="")
}
}
##
## Fetch the other rows
##
if (append || N==first.rows){
##
## Define the number of rows to fetch in each run depending on the BATCHBYTES maximum
##
k <- ncol(x)
col.names <- colnames(x)
colClasses <- sapply(seq.int(length.out=ncol(x)), function(i)colClass(x[[i]]))
if (is.null(next.rows)){
recordsize <- sum(.rambytes[vmode(x)])
next.rows <- BATCHBYTES %/% recordsize
if (next.rows<1L){
next.rows <- 1L
warning("single record does not fit into BATCHBYTES")
}
}else{
next.rows <- as.integer(next.rows)
}
next.nrows <- next.rows
appendLevels <- repnam(appendLevels, col.names, default=TRUE)
if(any(appendLevels)){
i.fac <- seq.int(length.out=k)
if(append == TRUE){
i.fac <- i.fac[appendLevels & sapply(i.fac, function(i) is.factor(x[[i]]))]
}else{
i.fac <- i.fac[appendLevels & sapply(i.fac, function(i) is.factor(dat[[i]]))]
}
}
if(append == TRUE){
## Set up arguments for odbcQuery and sqlGetResults functions & Launch the query
dbSendQuery.args$conn <- jdbcinfo$channel
dbSendQuery.args$statement <- query
jdbcinfo$resultset <- do.call('dbSendQuery', dbSendQuery.args)
}
##
## Loop - fetching the subsequent data in batches
##
while(TRUE){
if (nrows>=0L && N+next.rows > nrows){
next.nrows <- nrows - N
}
if (next.nrows<1L){
break
}
if (VERBOSE){
cat("read.jdbc.ffdf ", N+1L,"..", sep="")
read.start <- proc.time()[3]
}
##
## read in subsequent chunk
##
dbFetch.args$res <- jdbcinfo$resultset
dbFetch.args$n <- next.nrows
if (is.null(transFUN)){
dat <- convertCharacterToFactor(do.call("dbFetch", dbFetch.args))
}else{
dat <- convertCharacterToFactor(transFUN(do.call("dbFetch", dbFetch.args), ...))
}
n <- nrow(dat)
N <- N + n
if (VERBOSE){
write.start <- proc.time()[3]
read.time <- read.time + (write.start - read.start)
cat(N, " (", n, ") jdbc-read=", round(write.start-read.start, 3), "sec", sep="")
}
## Nothing to import any more
if (n<1L){
if (VERBOSE)
cat("\n")
break
}
## Recode levels if levels are specified
if(any(appendLevels))
for (i in i.fac){
lev <- unique(c(levels(x[[i]]),levels(dat[[i]]))) # we save a call to the more general appendLevels() here
levels(x[[i]]) <- lev
dat[[i]] <- recodeLevels(dat[[i]], lev)
}
##
## Append to the ffdf object
##
nff <- nrow(x)
nrow(x) <- nff + n
i <- hi(nff+1L, nff+n)
x[i,] <- dat
if (VERBOSE){
write.stop <- proc.time()[3]
write.time <- write.time + (write.stop - write.start)
cat(" ffdf-write=", round(write.stop-write.start, 3), "sec\n", sep="")
}
if(n<next.nrows){
break
}
}
}
if (VERBOSE){
cat(" jdbc-read=", round(read.time, 3), "sec ffdf-write=", round(write.time, 3), "sec TOTAL=", round(read.time+write.time, 3), "sec\n", sep="")
}
return(x)
}
#' Write ffdf data to a database table by using a DBI connection.
#'
#' Write \code{\link[ff]{ffdf}} data to a database table by using a DBI connection.
#' This can for example be used to store large ffdf datasets from R in
#' Oracle, SQLite, MySQL, PostgreSQL, Hive or other SQL databases. \cr
#' Mark that for very large datasets, these SQL databases might have tools to speed up by bulk loading.
#' You might also consider that as an alternative to using this procedure.
#'
#' Opens up the DBI connection using \code{DBI::dbConnect}, writes data to the SQL table
#' using \code{DBI::dbWriteTable} by extracting the data in batches from the \code{\link[ff]{ffdf}}
#' and appending them to the table.
#'
#' @param x the \code{\link[ff]{ffdf}} to write to the database
#' @param name character string with the name of the table to store the data in. Passed on to \code{\link[DBI]{dbWriteTable}}.
#' @param dbConnect.args a list of arguments to pass to DBI's \code{\link[DBI]{dbConnect}} (like drv, dbname, username, password). See the examples.
#' @param BATCHBYTES integer: bytes allowed for the size of the data.frame storing the result of reading one chunk.
#' See documentation in \code{\link[ff]{read.table.ffdf}} for more details.
#' @param RECORDBYTES optional integer scalar representing the bytes needed to process a single row of the ffdf
#' @param by integer passed on to \code{\link[bit]{chunk}} indicating to write to the database in chunks of this size. Overwrites
#' the behaviour of BATCHBYTES and RECORDBYTES.
#' @param VERBOSE logical: TRUE to verbose timings for each processed chunk (default FALSE).
#' @param ... optional parameters passed on to \code{\link[DBI]{dbWriteTable}}
#' @return
#' invisible()
#' @export
#' @seealso \code{\link[DBI]{dbWriteTable}}, \code{\link[bit]{chunk}}
#' @examples
#' require(ff)
#'
#' ##
#' ## Example query using data in sqlite
#' ##
#' require(RSQLite)
#' dbfile <- system.file("smalldb.sqlite3", package="ETLUtils")
#' drv <- dbDriver("SQLite")
#' query <- "select * from testdata limit 10000"
#' x <- read.dbi.ffdf(query = query, dbConnect.args = list(drv = drv, dbname = dbfile),
#' first.rows = 100, next.rows = 1000, VERBOSE=TRUE)
#'
#' ## copy db in package folder to temp folder as CRAN does not allow writing in package dirs
#' dbfile <- tempfile(fileext = ".sqlite3")
#' file.copy(from = system.file("smalldb.sqlite3", package="ETLUtils"), to = dbfile)
#' Sys.chmod(dbfile, mode = "777")
#' write.dbi.ffdf(x = x, name = "helloworld", row.names = FALSE, overwrite = TRUE,
#' dbConnect.args = list(drv = drv, dbname = dbfile),
#' by = 1000, VERBOSE=TRUE)
#' test <- read.dbi.ffdf(query = "select * from helloworld",
#' dbConnect.args = list(drv = drv, dbname = dbfile))
#'
#' ## clean up for CRAN
#' file.remove(dbfile)
#' \dontrun{
#' require(ROracle)
#' write.dbi.ffdf(x = x, name = "hellooracle", row.names = FALSE, overwrite = TRUE,
#' dbConnect.args = list(drv = dbDriver("Oracle"),
#' user = "YourUser", password = "YourPassword", dbname = "Mydatabase"),
#' VERBOSE=TRUE)
#' }
write.dbi.ffdf <- function(x, name,
dbConnect.args = list(drv=NULL, dbname = NULL, username = "", password = ""),
RECORDBYTES = sum(.rambytes[vmode(x)]),
BATCHBYTES = getOption("ffbatchbytes"),
by = NULL,
VERBOSE = FALSE,
...){
stopifnot(inherits(x, "ffdf"))
stopifnot(nrow(x) > 0)
cleanupConnection <- function(x){
if("channel" %in% names(x)){
DBI::dbDisconnect(x$channel)
}
}
dbiinfo <- list()
dbiinfo$channel <- do.call('dbConnect', dbConnect.args)
on.exit(cleanupConnection(dbiinfo))
chunks <- bit::chunk(x, RECORDBYTES=RECORDBYTES, BATCHBYTES=BATCHBYTES, by=by)
for(i in seq_along(chunks)){
if (VERBOSE){
cat(sprintf("%s dbWriteTable chunk %s/%s\n", Sys.time(), i, length(chunks)))
}
chunkidx <- chunks[[i]]
dbWriteTable.args <- list(...)
dbWriteTable.args$conn <- dbiinfo$channel
dbWriteTable.args$name <- name
dbWriteTable.args$value <- x[chunkidx, , drop=FALSE]
if(i > 1 && "overwrite" %in% names(dbWriteTable.args)){
dbWriteTable.args$overwrite <- FALSE
}
if(i > 1){
dbWriteTable.args$append <- TRUE
}
do.call('dbWriteTable', dbWriteTable.args)
}
invisible()
}
#' Write ffdf data to a database table by using a JDBC connection.
#'
#' Write \code{\link[ff]{ffdf}} data to a database table by using a JDBC connection.
#' This can for example be used to store large ffdf datasets from R in
#' Oracle, SQLite, MySQL, PostgreSQL, Hive or other SQL databases. \cr
#' Mark that for very large datasets, these SQL databases might have tools to speed up by bulk loading.
#' You might also consider that as an alternative to using this procedure.
#'
#' Opens up the JDBC connection using \code{RJDBC::dbConnect}, writes data to the SQL table
#' using \code{RJDBC::dbWriteTable} by extracting the data in batches from the \code{\link[ff]{ffdf}}
#' and appending them to the table.
#'
#' @param x the \code{\link[ff]{ffdf}} to write to the database
#' @param name character string with the name of the table to store the data in. Passed on to \code{dbWriteTable}.
#' @param dbConnect.args a list of arguments to pass to JDBC's \code{RJDBC::dbConnect} (like drv, dbname, username, password). See the examples.
#' @param BATCHBYTES integer: bytes allowed for the size of the data.frame storing the result of reading one chunk.
#' See documentation in \code{\link[ff]{read.table.ffdf}} for more details.
#' @param RECORDBYTES optional integer scalar representing the bytes needed to process a single row of the ffdf
#' @param by integer passed on to \code{\link[bit]{chunk}} indicating to write to the database in chunks of this size. Overwrites
#' the behaviour of BATCHBYTES and RECORDBYTES.
#' @param VERBOSE logical: TRUE to verbose timings for each processed chunk (default FALSE).
#' @param ... optional parameters passed on to \code{dbWriteTable}
#' @return
#' invisible()
#' @export
#' @seealso \code{\link[RJDBC]{JDBCConnection-methods}}, \code{\link[bit]{chunk}}
#' @examples
#' \dontrun{
#' require(ff)
#'
#' ##
#' ## Example query using data in sqlite
#' ##
#' require(RJDBC)
#' dbfile <- system.file("smalldb.sqlite3", package="ETLUtils")
#' drv <- JDBC(driverClass = "org.sqlite.JDBC", classPath = "/usr/local/lib/sqlite-jdbc-3.7.2.jar")
#' query <- "select * from testdata limit 10000"
#' x <- read.jdbc.ffdf(query = query,
#' dbConnect.args = list(drv = drv, url = sprintf("jdbc:sqlite:%s", dbfile)),
#' first.rows = 100, next.rows = 1000, VERBOSE=TRUE)
#'
#' write.jdbc.ffdf(x = x, name = "helloworld", row.names = FALSE, overwrite = TRUE,
#' dbConnect.args = list(drv = drv, url = sprintf("jdbc:sqlite:%s", dbfile)),
#' by = 1000, VERBOSE=TRUE)
#' test <- read.jdbc.ffdf(query = "select * from helloworld",
#' dbConnect.args = list(drv = drv, url = sprintf("jdbc:sqlite:%s", dbfile)))
#' }
write.jdbc.ffdf <- function(x, name,
dbConnect.args = list(drv=NULL, dbname = NULL, username = "", password = ""),
RECORDBYTES = sum(.rambytes[vmode(x)]),
BATCHBYTES = getOption("ffbatchbytes"),
by = NULL,
VERBOSE = FALSE,
...){
stopifnot(inherits(x, "ffdf"))
stopifnot(nrow(x) > 0)
cleanupConnection <- function(x){
if("channel" %in% names(x)){
RJDBC::dbDisconnect(x$channel)
}
}
dbiinfo <- list()
dbiinfo$channel <- do.call('dbConnect', dbConnect.args)
on.exit(cleanupConnection(dbiinfo))
chunks <- bit::chunk(x, RECORDBYTES=RECORDBYTES, BATCHBYTES=BATCHBYTES, by=by)
for(i in seq_along(chunks)){
if (VERBOSE){
cat(sprintf("%s dbWriteTable chunk %s/%s\n", Sys.time(), i, length(chunks)))
}
chunkidx <- chunks[[i]]
dbWriteTable.args <- list(...)
dbWriteTable.args$conn <- dbiinfo$channel
dbWriteTable.args$name <- name
dbWriteTable.args$value <- x[chunkidx, ]
if(i > 1 && "overwrite" %in% names(dbWriteTable.args)){
dbWriteTable.args$overwrite <- FALSE
}
if(i > 1){
dbWriteTable.args$append <- TRUE
}
do.call('dbWriteTable', dbWriteTable.args)
}
invisible()
}
#' Write ffdf data to a database table by using a ODBC connection.
#'
#' Write \code{\link[ff]{ffdf}} data to a database table by using a ODBC connection.
#' This can for example be used to store large ffdf datasets from R in
#' Oracle, SQLite, MySQL, PostgreSQL, Hive or other SQL databases. \cr
#' Mark that for very large datasets, these SQL databases might have tools to speed up by bulk loading.
#' You might also consider that as an alternative to using this procedure.
#'
#' Opens up the ODBC connection using \code{RODBC::odbcConnect}, writes data to the SQL table
#' using \code{RODBC::sqlSave} by extracting the data in batches from the \code{\link[ff]{ffdf}}
#' and appending them to the table.
#'
#' @param x the \code{\link[ff]{ffdf}} to write to the database
#' @param tablename character string with the name of the table to store the data in. Passed on to \code{\link[RODBC]{sqlSave}}.
#' @param odbcConnect.args a list of arguments to pass to ODBC's \code{\link[RODBC]{odbcConnect}} (like dsn, uid, pwd). See the examples.
#' @param BATCHBYTES integer: bytes allowed for the size of the data.frame storing the result of reading one chunk.
#' See documentation in \code{\link[ff]{read.table.ffdf}} for more details.
#' @param RECORDBYTES optional integer scalar representing the bytes needed to process a single row of the ffdf
#' @param by integer passed on to \code{\link[bit]{chunk}} indicating to write to the database in chunks of this size. Overwrites
#' the behaviour of BATCHBYTES and RECORDBYTES.
#' @param VERBOSE logical: TRUE to verbose timings for each processed chunk (default FALSE).
#' @param ... optional parameters passed on to \code{\link[RODBC]{sqlSave}}
#' @return
#' invisible()
#' @export
#' @seealso \code{\link[RODBC]{sqlSave}}, \code{\link[bit]{chunk}}
#' @examples
#' ##
#' ## Using the sqlite database (smalldb.sqlite3) in the /inst folder of the package
#' ## set up the sqlite ODBC driver (www.stats.ox.ac.uk/pub/bdr/RODBC-manual.pd)
#' ## and call it 'smalltestsqlitedb'
#' ##
#' \dontrun{
#' require(RODBC)
#' x <- read.odbc.ffdf(
#' query = "select * from testdata limit 10000",
#' odbcConnect.args = list(
#' dsn="smalltestsqlitedb", uid = "", pwd = "",
#' believeNRows = FALSE, rows_at_time = 1),
#' nrows = -1,
#' first.rows = 100, next.rows = 1000, VERBOSE = TRUE)
#'
#' write.odbc.ffdf(x = x, tablename = "testdata", rownames = FALSE, append = TRUE,
#' odbcConnect.args = list(
#' dsn="smalltestsqlitedb", uid = "", pwd = "",
#' believeNRows = FALSE, rows_at_time = 1),
#' by = 1000, VERBOSE=TRUE)
#' }
write.odbc.ffdf <- function(x, tablename,
odbcConnect.args = list(dsn=NULL, uid = "", pwd = ""),
RECORDBYTES = sum(.rambytes[vmode(x)]),
BATCHBYTES = getOption("ffbatchbytes"),
by = NULL,
VERBOSE = FALSE,
...){
stopifnot(inherits(x, "ffdf"))
stopifnot(nrow(x) > 0)
odbcinfo <- list()
odbcinfo$channel <- do.call('odbcConnect', odbcConnect.args)
on.exit(try(RODBC::odbcClose(odbcinfo$channel), silent = TRUE))
chunks <- bit::chunk(x, RECORDBYTES=RECORDBYTES, BATCHBYTES=BATCHBYTES, by=by)
for(i in seq_along(chunks)){
if (VERBOSE){
cat(sprintf("%s sqlSave chunk %s/%s\n", Sys.time(), i, length(chunks)))
}
chunkidx <- chunks[[i]]
sqlSave.args <- list(...)
sqlSave.args$channel <- odbcinfo$channel
sqlSave.args$tablename <- tablename
sqlSave.args$dat <- x[chunkidx, ]
if(i > 1){
sqlSave.args$append <- TRUE
}
do.call('sqlSave', sqlSave.args)
}
invisible()
}
# NOTE that this colClass implementation works only because accidentally the last position of the oldClasses is needed
# for c("ordered","factor") read.table does not want "ordered"
# for c("POSIXt","POSIXct") "POSIXct" is in the last position only due to "historical error" (Gabor Grothendieck, r-help, 26.9.2009)
colClass <- function(x){
UseMethod("colClass")
}
colClass.default <- function(x){
cl <- class(x)
cl[length(cl)]
}
colClass.ff <- function(x){
if (length(x))
x <- x[1]
else
x <- x[]
NextMethod()
}
convertCharacterToFactor <- function(x){
chartofactor <- sapply(x, class)
chartofactor <- chartofactor[chartofactor == "character"]
for(convertme in names(chartofactor)){
x[[convertme]] <- factor(x[[convertme]])
}
x
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.