# TODO: Add description
#
# Author: Linh Tran
# Date: Dec 11, 2015
# Email: Linh_m_tran@apple.com
###############################################################################
#' @title tdJoin
#'
#' @description
#' Takes two (or more) Teradata EDW tables using a JDBC connection
#' object via the \code{RJDBC} package and merges them together.
#'
#' @details
#' By default, the code tries to do joins starting from Table 1 going up.
#' So if, for example, three tables are provided for inner joins, then Table 1
#' and Table 2 will first be inner joined, and the resulting output will
#' then be inner joined with Table 3. If a left join is desired for the
#' three tables, then Table 2 will be left joined to Table 1 and Table 3 will
#' then be left joined with the resulting table.
#'
#' If desired, column names for each table can be provided to merge together.
#' By default, the code will try to use all columns of the tables provided.
#' All tables will be searched for duplicate column names. If any exists,
#' then copies will be renamed with a suffix of \code{_copyX} where \code{X}
#' represents the number of copies. If an index name merging by has copies
#' across the tables, then only one index name is kept.
#'
#' If a connection profile (e.g. username, password, etc.) is provided, then
#' an attempt is made to connect to Teradata. Once the query is run, the
#' connection is then closed. If a connection object (\code{conn}) is provided
#' to the function (or one is found globally), then the connection remains
#' open.
#'
#' @param tdfO Name of resulting Teradata to output.
#' @param tdf1 Name of first Teradata table to merge.
#' @param tdf2 Name of second Teradata table to merge.
#' @param index1 Name of index from first table to merge by.
#' @param index2 Name of index from second table to merge by.
#' @param col1 Name of columns from first table to merge.
#' @param col2 Name of columns from second table to merge.
#' @param joinType Type of merge to perform. Needs to be one of following: \code{inner,
#' left outer, right outer, full outer}.
#' @param ... Additional \code{tdfX} and \code{indexX} to merge,
#' where \code{X} is the count. Also can take optional connection settings.
#'
#' @return The code creates the data table on the Teradata server via the
#' \code{JDBCConnection} object. Names of each table created are returned
#' as a string vector.
#'
#' @seealso
#' \code{\link{tdConn}} for connection, \code{\link{tdDisk}} for disk usage,
#' \code{\link{tdSpool}} for spool usage, \code{\link{tdCpu}} for CPU
#' usage, and \code{\link{td}} for general queries.
#'
#' @examples
#' ## NOT RUN ##
#' ## With connection pre-established, inner join on table ##
#' # conn = tdConn(<username>, <password>)
#' # tdJoin(<outputTable>, <inputTable1>, <inputTable2>, <index1>, <index2>)
#'
#' ## inner join on table with select columns ##
#' # tdJoin(<outputTable>, <inputTable1>, <inputTable2>, <index1>, <index2>, joinType="left")
#'
#' ## left join on table ##
#' # tdJoin(<outputTable>, <inputTable1>, <inputTable2>, <index1>, <index2>, joinType="left")
#'
#' @export
tdJoin = function(tdfO, tdf1, tdf2, index1, index2, col1=NULL, col2=NULL, joinType="inner", ...) {
## CHECKS ##
args = list(...)
jtypes = c("inner", "left outer", "right outer", "full outer")
if(missing(tdf1) | missing(tdf2) | missing(index1) | missing(index2) | length(grep("tdf", args)) != length(grep("index", args))) stop("Both tables and indices (to match by) need to be specified.")
if(missing(tdfO)) stop("Output table name needs to be specified.")
if (!joinType %in% jtypes) stop(gettextf("Unknown 'joinType'. Must be one of %s", paste(jtypes, collapse = ",")))
tdf1 = toupper(tdf1)
tdf2 = toupper(tdf2)
index1 = toupper(index1)
index1 = toupper(index1)
## Connection ##
conn = tdCheckConn(list(...))
## TABLE CHECK ##
cat("Checking tables...")
if(DBI::dbExistsTable(conn, tdfO)) {
if ( attr(conn, "tmpConnection")) DBI::dbDisconnect(conn)
stop(paste0(tdfO, " already exists."))
}
## COLUMN NAMES ##
tables = c(tdf1=tdf1, tdf2=tdf2, args[grep("tdf", names(args))])
tableNames = as.data.frame(do.call("rbind", lapply(tables, function(x) unlist(strsplit(x, split=".", fixed=TRUE)))))
for(i in 1:2) tableNames[i] = as.character(tableNames[[i]])
names(tableNames) = c("DatabaseName", "TableName")
txt = gettextf(
"SELECT DatabaseName, TableName, ColumnName
FROM DBC.columns
WHERE DatabaseName in ('%s')
AND TableName in ('%s')",
paste(tableNames$DatabaseName, collapse="', '"), paste(tableNames$TableName, collapse="', '"))
tableInfo = td(txt, conn=conn)
for(i in 1:3) tableInfo[i] = toupper(sub("\\s+$", "", tableInfo[[i]], perl = TRUE))
## INDICIES ##
indicies = toupper(unlist(c(index1=index1, index2=index2, args[grep("index", names(args))])))
indexMissing = NULL
for(i in 1:nrow(tableNames)) {
tmpColumns = subset(tableInfo, toupper(DatabaseName)==toupper(tableNames$DatabaseName[i]) & toupper(TableName)==toupper(tableNames$TableName[i]))$ColumnName
if (!toupper(indicies[i]) %in% toupper(tmpColumns)) indexMissing = c(indexMissing, indicies[i])
}
if (!is.null(indexMissing)) {
if ( attr(conn, "tmpConnection")) DBI::dbDisconnect(conn)
stop(gettextf("The following indicies were not found in the tables: %s", paste(indexMissing, collapse=", ")))
}
# renames duplicate indicies
if (length(indicies)!=length(unique(indicies))) {
indiciesTmp1 = indiciesTmp2 = indicies
for(i in 2:length(indicies)) {
if(indicies[i] %in% indicies[1:(i-1)]) {
indiciesTmp1[i] = paste0(indicies[i], " ", indicies[i], "_copy", sum(indicies[1:(i-1)]==indicies[i]))
}
}
for(i in 1:nrow(tableNames)) {
tableInfo$ColumnName[toupper(tableInfo$DatabaseName)==toupper(tableNames$DatabaseName[i]) & toupper(tableInfo$TableName)==toupper(tableNames$TableName[i]) & toupper(tableInfo$ColumnName)==toupper(indicies[i])] = indiciesTmp1[i]
}
}
## VARIABLES ##
mergeVariables = NULL
for(i in 1:(length(indicies))) {
tmpVariables = subset(tableInfo, toupper(DatabaseName)==toupper(tableNames$DatabaseName[i]) & toupper(TableName)==toupper(tableNames$TableName[i]))$ColumnName
copies = grep("(_copy[0-9])$", tmpVariables)
if(length(copies)>0) {
tmpVariables = tmpVariables[-copies]
}
mergeVariables = c(mergeVariables, paste(paste0(letters[i], ".", tmpVariables, " ", tmpVariables), collapse=", "))
}
cat("good.\n")
## JOINS ##
cat("Joining tables...")
mergeStatement = NULL
for(i in 2:(length(indicies))) {
mergeStatement = paste(mergeStatement, gettextf("%s join %s %s on a.%s=%s.%s\n", joinType, paste(tableNames[i,], collapse="."), letters[i], indicies[1], letters[i], indicies[i]))
}
txt = gettextf(paste(
"create table %s
as (select %s from %s a
%s)
with data primary index (%s)"),
tdfO, paste(mergeVariables, collapse=", "), paste(tableNames[1,], collapse="."), mergeStatement, indicies[1])
res = td(txt, conn=conn)
cat("done.\n")
## Connection ##
if ( attr(conn, "tmpConnection")) DBI::dbDisconnect(conn)
invisible(tdfO)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.