# lumpR/db_internals.R
# Copyright (C) 2015, 2017 Tobias Pilz, Till Francke
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# interal function for db_* functions
# adjust SQL dialects
sql_dialect <- function(con, statement) {
# SQLite
if(grepl("SQLite", odbcGetInfo(con)["DBMS_Name"], ignore.case=T)) {
# remove comments
statement <- gsub("COMMENT.'[^']*'", "",statement )
# remove engine specification
statement <- gsub("ENGINE.*", "",statement )
# AUTO_INCREMENT is not supported
statementa <- gsub("INT\\(11\\) AUTO_INCREMENT NOT NULL", "INTEGER PRIMARY KEY",statement, ignore.case = T)
if(statementa != statement){
statement <- gsub(", *PRIMARY KEY *\\([^)]*\\)","",statementa)
}
# remove ticks
statement <- gsub("`", "", statement)
# close with ';' otherwise an error occurs (at least under Linux in my case)
if(substr(statement, nchar(statement), nchar(statement)) != ";")
statement <- paste0(statement,";")
# no 'modify' supported -> workaround
if(grepl("modify", statement, ignore.case = TRUE)) {
# get table to be modified
statement <- gsub(";", "", statement)
split <- unlist(strsplit(statement, "[ ]+"))
r_tbl_mod <- grep("modify", split, ignore.case=T)
tbl_mod <- split[r_tbl_mod-1]
# fetch data from table to be modified
dat_tbl_mod <- sqlFetch(con, tbl_mod)
meta_tbl_mod <- sqlColumns(con, tbl_mod)
# drop table from db
sqlQuery(con, paste0("delete from ", tbl_mod, ";"))
sqlQuery(con, paste0("drop table ", tbl_mod, ";"))
# modify data type of data
r_col_mod <- grep(split[r_tbl_mod+1], colnames(dat_tbl_mod), ignore.case=T)
varspec <- meta_tbl_mod$TYPE_NAME
names(varspec) <- meta_tbl_mod$COLUMN_NAME
varspec[r_col_mod] <- split[r_tbl_mod+2]
# re-create table and write modified data to database
# if dat_tbl_mod has no values sqlSave throws an error but table is created successfully -> suppress error in that case; could not find a more elegant workaround
if(nrow(dat_tbl_mod) == 0) {
try(sqlSave(channel=con, tablename=tbl_mod, dat=dat_tbl_mod, varTypes=varspec, append=FALSE,
nastring = NULL, fast = TRUE, rownames = FALSE),
silent=T)
} else {
suppressWarnings(sqlSave(channel=con, tablename=tbl_mod, dat=dat_tbl_mod, varTypes=varspec, append=FALSE,
nastring = NULL, fast = TRUE, rownames = FALSE))
}
# in case of columns of type datetime (see function write_datetabs() below)
if(any(grepl("datetime", meta_tbl_mod$TYPE_NAME, ignore.case = T))) {
sqlQuery(con, paste0("delete from ", tbl_mod, ";"))
write_datetabs(con, dat=dat_tbl_mod, tab=tbl_mod, verbose=F)
}
# return NULL as statement
statement <- NULL
}
}
# MS Access
if(grepl("access", odbcGetInfo(con)["DBMS_Name"], ignore.case=T)) {
# if 'alter table' statement -> 'alter column' instead of 'modify'
if(grepl("modify", statement, ignore.case = T))
statement <- gsub("modify", "alter column", statement, ignore.case = T)
# adjust column data type syntax
statement <- gsub("INT\\([0-9]*\\)", "INT", statement)
# nvarchar (i.e. unicode characters) are not supported -> convert to varchar
statement <- gsub("NVARCHAR", "VARCHAR", statement)
# auto increment syntax
statement <- gsub("INT AUTO_INCREMENT", "AUTOINCREMENT", statement)
# no comments supported
statement <- gsub("COMMENT.'[^']*'", "",statement )
# no default values supported
statement <- gsub("DEFAULT 0", "",statement )
# remove engine specification
statement <- gsub("ENGINE.*", "",statement )
# alter primary key statement
statement <- gsub("PRIMARY KEY","CONSTRAINT pk PRIMARY KEY",statement)
# BIT instead of BOOL to represent true/false data
statement <- gsub("BOOL", "BIT", statement)
# no tinyint
statement <- gsub("TINYINT", "INT", statement)
# maximum length of a data type is 255
len <- suppressWarnings(as.integer(unlist(strsplit(statement, "\\(|\\)"))))
len <- len[which(!is.na(len))]
if(any(len)) {
len_oversize <- len[which(len > 255)]
if(any(len_oversize)) {
statement <- gsub(paste(len_oversize, collapse="|"), "255", statement)
}
}
}
return(statement)
} # EOF sql_dialect
# query with error message for easier error handling
sqlQuery2 <- function(con, statement, info="") {
res <- sqlQuery(con, statement, errors=F)
if (is.data.frame(res) || res !=-1) #regular successful query
return(res)
#DELETE on an empty table also yields "-1". Don't treat this as an error
if (res==-1 & grepl(pattern = "^delete", ignore.case = TRUE, x = statement))
return(0)
res2 <- sqlQuery(con, statement, errors = T)
if (is.na(res2[1])) return(res) #for creation statements, this may be OK, don't issue an error
tryCatch(odbcClose(con), error=function(e){})
stop(cat(paste0("Error in SQL query (", info,").\nQuery: ", statement,
"\nerror-message: ", res2[1])))
} # EOF query with error message
# write data from external file into parameter database
writedb <- function(con, file, table, overwrite, verbose) {
if(verbose) message("%")
if(verbose) message(paste0("% -> Writing data into table '", table, "' ..."))
# ensure MySQL/MariaDB uses ANSI quotation (double quotes instead of back ticks)
if(grepl("MariaDB", odbcGetInfo(con)["DBMS_Name"], ignore.case=T))
sqlQuery(con, "SET sql_mode='ANSI';")
# read data
dat <- read.table(file, header=T, sep="\t", strip.white = TRUE, blank.lines.skip = TRUE, fill=TRUE)
empty_lines = apply(is.na(dat) | dat=="", MARGIN = 1, all)
if (any(empty_lines))
{
warning(paste0("Empty lines encountered in ", file,", ignored"))
dat=dat[!empty_lines,]
}
# check structure
table_desc = sqlColumns(con, table) #get table description
cols <- table_desc$COLUMN_NAME
missing_cols = setdiff(cols, colnames(dat))
for(colname in missing_cols)
{
if (grepl(colname, pattern = "^a_") |
(table=="soils" & colname %in% c("description", "Phil_s", "Phil_a", "Hort_ini", "Hort_end", "Hort_k")) |
(table=="horizons" & colname %in% c("shrinks", "description", "soil_dens"))
)
{
if (grepl(table_desc[table_desc[,"COLUMN_NAME"]==colname, "TYPE_NAME"], pattern = "VARCHAR")) #set missing values
dat[,colname]=as.character(NA) else
dat[,colname]=as.numeric(NA)
missing_cols = missing_cols[missing_cols!=colname] #remove from list of missing columns
}
}
if (length(missing_cols)>0)
stop(paste0("File '", file, "' does not contain some required columns (", paste(missing_cols, collapse=", "), ")."))
# remove unnecessary columns if available
rm_cols <- which(!(colnames(dat) %in% cols))
if (any(rm_cols))
dat <- dat[,-rm_cols]
# delete existing values in table values
if (overwrite)
res <- sqlQuery(con, paste0("delete from ", table),errors = T) #ignore errors, because emptying empty tables in MSAccess causes errors
# write values into table; in case of an error write a more meaningful error message
tryCatch(
{
sqlSave(channel=con, tablename = table, dat=dat, verbose=F,
append=TRUE , test = FALSE, nastring = NULL, fast = TRUE, rownames = FALSE)
}, error = function(e) {
stop(paste0("An error occurred when writing into table '", table, "'. ",
"All values written until error occurrence will be kept in the database! ",
"There might be a problem with the input data structure (e.g. gaps), ",
"duplicate entries or entries that already exist in the database table. ",
"Error message of the writing function: ", e))
}
)
} # EOF writedb
# write data.frame into tables having column of type datetime
# Reason: sqlSave() does not work for columns of type datetime for DBMS SQLite (recognized column size is 3 and dates are truncated to 3 characters)
write_datetabs <- function(con, dat, tab, verbose) {
# loop over rows of data.frame
for(i in 1:nrow(dat)) {
# create statement from dat
statement <- paste0("INSERT INTO ", tab, " VALUES (",
apply(dat[i,],1,function(x) paste0("'", x, "'", collapse=", ")),
");")
# adjust SQL dialect if necessary
statement <- sql_dialect(con, statement)
# apply statement
res <- sqlQuery(con, statement, errors=F)
if (res==-1)
{
res <- sqlQuery(con, statement, errors=T)
stop(paste0("Error in SQL query execution while writing into table '",tab,"': ", res))
}
}
if(verbose) message(paste0("% -> Updated table '",tab,"'."))
} # EOF write_datetabs
# check or fix that fractions sum up to 1
# returns: if fix: dat_tbl unchanged or with updated 'fraction'
# if !fix: flawd IDs (IDs of first column where fraction does not sum up to one)
check_fix_fractions <- function(dat_tbl, fix, update_frac_impervious, verbose) {
name_tbl <- attr(dat_tbl, "table")
if(verbose) message("%")
if(verbose) message(paste0("% -> Processing table '", name_tbl, "' ..."))
# sum of 'fraction' should be 1 for every higher level class (rounding error allowed)
dat_contains_sum <- round(tapply(dat_tbl$fraction, list(parent=dat_tbl[[1]]), sum, na.rm=T), 2)
# re-calculate areal fractions if needed
dat_tbl_new <- dat_tbl
if(!any(dat_contains_sum!=1)) {
if (verbose) message("% -> Everything sums up to one")
} else {
if (verbose)
{
message(paste0("% -> There are ", length(which(dat_contains_sum!=1)), " elements not summing to 1 in their fractions:"))
message(paste0("% ", paste0(names(dat_contains_sum[dat_contains_sum!=1]), ": ", dat_contains_sum[dat_contains_sum!=1], collapse=", ")))
if (!fix)
message(paste0("% -> Check table '", name_tbl, "'", ifelse(name_tbl=="r_tc_contains_svc" & !update_frac_impervious, " and 'terrain_components' (column frac_rocky)","")," or call db_check(..., check=\"check_fix_fractions\", fix=TRUE)!"))
}
if(fix) {
if(verbose)
message("% -> Re-calculate fractions ...")
dat_contains_sum <- tapply(dat_tbl$fraction, list(parent=dat_tbl[[1]]), sum, na.rm=T)
for (s in 1:nrow(dat_tbl))
dat_tbl_new$fraction[s] <- dat_tbl$fraction[s] / dat_contains_sum[paste0(dat_tbl[[1]][s])]
attr(dat_tbl_new, "altered") <- TRUE
} # if fix
} # if any(dat_contains_sum!=1)
if(verbose) message("% -> OK")
if(fix)
return(dat_tbl_new)
else
return(as.numeric(names(dat_contains_sum[which(dat_contains_sum!=1)])))
} # EOF check_fix_fractions
# filter disaggregated areas by areal fraction threshold
filter_small_areas <- function(dat_tbl, thres, fix, verbose) {
if(verbose) message("%")
if(verbose) message(paste0("% -> Processing table '", attr(dat_tbl, "table"), "' ..."))
# sum of 'fraction' should be 1 for every higher level class (rounding error allowed)
dat_contains_sum <- round(tapply(dat_tbl$fraction, list(parent=dat_tbl[[1]]), sum, na.rm=T), 2)
if(any(dat_contains_sum != 1)) {
if(fix)
stop(paste0("Before removal of tiny areas: sum of fractions per higher level unit not always equal to one. Check table '", attr(dat_tbl, "table"), "'", ifelse(attr(dat_tbl, "table")=="r_tc_contains_svc", " and 'terrain_components' (column frac_rocky)","")," or call db_check(..., check=\"check_fix_fractions\", fix=TRUE)!"))
else
warning(paste0("Before removal of tiny areas: sum of fractions per higher level unit not always equal to one. Check table '", attr(dat_tbl, "table"), "'", ifelse(attr(dat_tbl, "table")=="r_tc_contains_svc", " and 'terrain_components' (column frac_rocky)","")," or call db_check(..., check=\"check_fix_fractions\", fix=TRUE)!"))
}
# remove datasets where fraction < area_thresh
rows_rm <- which(dat_tbl$fraction < thres & dat_tbl[,2]!=-1) #find entities below threshold, omit special case "rocky fractions"
dat_tbl_new <- dat_tbl
if(!any(rows_rm)) {
message(paste0("% -> No fraction smaller ", thres, " could be found."))
} else {
if(fix)
message(paste0("% -> There are ", length(rows_rm), " datasets going to be removed from '", attr(dat_tbl, "table"), "'"))
else
message(paste0("% -> There are ", length(rows_rm), " datasets containing fractions < threshold in '", attr(dat_tbl, "table"), "'"))
# keep datasets where entities of more than 10% of the respective parent class' area would be removed
lu_rm_sum <- tapply(dat_tbl$fraction[rows_rm], list(parent=dat_tbl[[1]][rows_rm]), sum)
if(any(lu_rm_sum > 0.1)) {
keep_lu <- which(lu_rm_sum > 0.1)
message(paste0("% -> For '", colnames(dat_tbl)[1], "' ", paste(names(lu_rm_sum)[keep_lu], collapse=", "),
" more than 10% of the area would be removed due to too many small '", colnames(dat_tbl)[2], "'. These datasets will be kept."))
rows_rm_keep <- which(dat_tbl[[1]][rows_rm] %in% names(lu_rm_sum)[keep_lu])
rows_rm <- rows_rm[-rows_rm_keep]
if (!any(rows_rm))
message(paste0("% -> For '", attr(dat_tbl, "table"), "' nothing to remove or choose smaller value for 'area_thresh' and re-run check."))
}
# remove datasets and re-calculate areal fractions
if(fix & any(rows_rm))
{
dat_tbl_new <- dat_tbl[-rows_rm,]
dat_tbl_new <- check_fix_fractions(dat_tbl=dat_tbl_new, fix=TRUE, update_frac_impervious=update_frac_impervious, verbose=FALSE)
}
} # if any fraction < area_thresh
if(verbose) message("% -> OK.")
return(dat_tbl_new)
} # EOF filter_small_areas
# helper function to connect to a database
connect_db <- function(dbname) {
suppressWarnings(con <- odbcConnect(dbname, believeNRows=F))
if (con == -1)
stop(paste0("Could not connect to database '", dbname, "'. Type 'odbcDataSources()' to see the data sources known to ODBC.",
" If you want to connect to a MS Access database make sure you are using 32 bit R."))
# use PRAGMA for SQLite to enhance speed (at the cost of less data security, for details see https://www.tutorialspoint.com/sqlite/sqlite_pragma.htm)
if(grepl("SQLite", odbcGetInfo(con)["DBMS_Name"], ignore.case=T)) {
sqlQuery(con, "PRAGMA synchronous = OFF;")
sqlQuery(con, "PRAGMA journal_mode = OFF;")
}
return(con)
} # EOF connect_db
# simple function to copy data from one database table into another (same table name)
# NOTE: ODBC::sqlCopy and ODBC::sqlCopyTable do not work properly
dbCopyTable <- function(con, tab, con_dest) {
# read data
dat <- sqlFetch(con, tab)
# check if there is a DATETIME column and use internal function write_datetabs() if so
types <- sqlColumns(con, tab)$TYPE_NAME
if(any(grepl("datetime", types, ignore.case = T)))
write_datetabs(con_dest, dat, tab, verbose = F)
else {
sqlQuery(con_dest, paste0("delete from ", tab))
sqlSave(con_dest, dat, tab, verbose=F, append=TRUE , test = FALSE,
nastring = NULL, fast = TRUE, rownames = FALSE)
}
} # EOF dbCopyTable
# function to write into table meta_info
write_metainfo <- function(con, fun, affected_tbl, affected_col, remarks, verbose) {
meta_dat <- sqlFetch(con, "meta_info")
if(any(meta_dat$pid)) {
pid_new <- max(meta_dat$pid) +1
} else {
pid_new <- 1
}
meta_out <- data.frame(pid=pid_new,
mod_date=as.POSIXct(Sys.time()),
mod_user=paste0(fun, ", v. ", installed.packages()["lumpR","Version"]),
affected_tables=paste(unique(affected_tbl), collapse=", "),
affected_columns=affected_col,
remarks=remarks)
write_datetabs(con, meta_out, tab="meta_info", verbose)
} # EOF write_metainfo
# function to read in data from selected tables
read_db_dat <- function(tbl, con, tbl_exist, update_frac_impervious) {
dat_out <- NULL
tbl_read <- tbl[which(!(tbl %in% tbl_exist))]
for(t in tbl_read) {
dat_out[[t]] <- sqlFetch(con, t)
# information about rocky fractions needed for r_tc_contains_svc, see ?db_check: remove_impervious_svc, option update_frac_impervious = FALSE
# internally mask the rocky fraction with svc_id=-1
if(t == "r_tc_contains_svc" & !update_frac_impervious) {
res <- sqlQuery(con, "select pid as tc_id, -1 as svc_id, frac_rocky as fraction from terrain_components")
# replace NA in rocky fraction by zero
res$fraction[is.na(res$fraction)] <- 0
dat_out[[t]] <- rbind(dat_out[[t]], res)
}
# meta information (attributes)
attr(dat_out[[t]], "altered") <- FALSE
attr(dat_out[[t]], "table") <- t
}
return(dat_out)
} # EOF read_db_dat
# function for writing changes into database
modify_db <- function(con, dat_tbl) {
tbl_name <- attr(dat_tbl, "table")
# define index columns for the verious tables (needed by function sqlUpdate())
tbls_keys <- list(
subbasins="pid",
r_subbas_contains_lu=c("subbas_id", "lu_id"),
landscape_units="pid",
r_lu_contains_tc=c("lu_id", "tc_id"),
terrain_components="pid",
r_tc_contains_svc=c("tc_id", "svc_id"),
soil_veg_components="pid",
vegetation="pid",
soils="pid",
horizons="pid",
particle_classes="class_id",
r_soil_contains_particles=c("soil_id", "class_id"),
rainy_season="pid",
x_seasons="pid",
reservoirs_strategic="pid",
reservoirs_small_classes="pid",
r_subbas_contains_reservoirs_small=c("subbas_id", "res_class_id")
)
# check if table has been modified, otherwise return an flag (-1)
if (!attr(dat_tbl, "altered"))
return(-1)
# delete datasets no longer in table (sqlUpdate() does not delete)
key_t <- tbls_keys[[tbl_name]]
if (is.null(key_t))
stop(paste0("Unknown table ",tbl_name,", please report to package mantainer."))
del_query <- paste0("delete from ", tbl_name, " where not ", key_t[1], " in (",
paste(unique(dat_tbl[, key_t[1]]), collapse = ", "), ")")
del_query <- sql_dialect(con, del_query)
res <- sqlQuery(con, del_query, errors=F) # throws an "error" if nothing was deleted, so don't investigate further and hope everything is fine
#table has a composite key - deleting is more complicated
if(length(key_t) > 1) {
for(k in 2:length(key_t)) {
for(i in unique(dat_tbl[,k-1])) {
dat_t <- dat_tbl[which(dat_tbl[,k-1]==i),]
del_query <- paste0("delete from ", tbl_name, " where ", key_t[k-1], "=", i,
" and not ", key_t[k], " in (", paste(dat_t[, key_t[k]], collapse = ", "), ")")
del_query <- sql_dialect(con, del_query)
res <- sqlQuery(con, del_query, errors=F) # throws an "error" if nothing was deleted, so don't investigate further and hope everything is fine
}
}
}
# update db
a=try(sqlUpdate(con, dat_tbl, tbl_name, tbls_keys[[tbl_name]]), silent = TRUE)
if (class(a)=="try-error")
stop(paste0("ERROR: Could not update table '",tbl_name,"'. Additional rows in new data?"))
return(0)
} # EOF modify_db
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.