###############################################################
#'Get datasource connection
#'
#' This is a function to obtain connection to a
#' datasource from the envision server by providing the token
#' obtained from the envision App for the specific datasource
#'
#' @param baseUrl - CA server URL
#' @param token - Token for the datasource
#' obtained from the App
#' @param apiKey - ApiKey for the user/company
#' @export
connect.ca <- function(url, token, apiKey) {
# //////////////////////////////////////////////////////
BAConnectionData <- R6::R6Class(
"BAConnectionData",
public = list (
factTable = "",
jdbc = NULL,
columns = NULL,
engineType = NULL,
ba2DBTypes = NULL,
q = NULL,
username = "",
initialize = function(factTable,
jdbc,
columns,
ba2DBTypes,
qVar,
uname) {
self$factTable <- factTable
self$jdbc <- jdbc
self$columns <- columns
self$ba2DBTypes <- ba2DBTypes
self$q <- qVar
self$username <- uname
},
quot = function(attribute) {
tmp <- ""
tmp <- paste(self$q, attribute, self$q, sep = "")
tmp
},
finalize = function() {
if (!is.null(jdbc))
RJDBC::dbDisconnect(jdbc)
}
)
)
# //////////////////////////////////////////////////////
BAConnection <- R6::R6Class(
"BAConnection",
private = list(
conn_data = '',
createTable = function(df, colName, type) {
if (is.null(colName))
stop("Invalid column name")
#
# temp <- gsub("\"","",private$conn_data$factTable)
# temp <- unlist(strsplit(temp,split = "[.]"))
# dsName <- NULL
# schema <- NULL
# if(length(temp) > 1){
# schema <- temp[1]
# dsName <- temp[2]
# }
# else
# dsName <- temp
#
# tableName <- paste(dsName,colName, sep="_")
#
# # If module is parameterized append the userName as well _<USERNAME> to tableName
# if(carriots.analytics.isParametrised)
# tableName <- paste(tableName,private$conn_data$username, sep="_")
#
# print(tableName)
# md5table <- digest::digest(tableName,"md5",serialize = FALSE)
#
# print(md5table)
# md5table <- private$conn_data$quot(md5table)
# if(!is.null(schema))
# md5table <- paste(private$conn_data$quot(schema),md5table,sep=".")
#
temp <- gsub("\"","",carriots.analytics.fact_table_name)
temp <- unlist(strsplit(temp, split = "[.]"))
dsName <- NULL
schema <- NULL
if (length(temp) > 1) {
schema <- temp[1]
dsName <- temp[2]
}
else
dsName <- temp
md5table <- private$conn_data$quot(dsName)
if (!is.null(schema))
md5table <-
paste(private$conn_data$quot(schema), md5table, sep = ".")
private$dropIfExists(md5table)
colNames <- colnames(df)
colNames <- colNames[colNames != colName]
if (!all(colNames %in% self$getColumnNames()))
stop("Data Frame has more than one new column compared to fact table")
query <- "CREATE TABLE"
query <- paste(query, md5table, "AS")
colString <-
paste(private$conn_data$quot(colNames), ",", collapse = "")
colString <- substr(colString, 1, nchar(colString) - 1)
orgQuery <-
paste("(",
"SELECT",
colString,
"FROM",
private$conn_data$factTable,
"WHERE 1=2",
")")
query <- paste(query, orgQuery)
print(query)
RJDBC::dbSendUpdate(private$conn_data$jdbc, query)
#Add a column first
private$addColumn(md5table, name = colName, type = type)
md5table
},
addColumn = function(md5table,
name = NULL,
type = NULL,
default = NULL) {
if (is.null(name) | is.null(type))
stop("Column name or type is empty")
if (!(type %in% names(private$conn_data$ba2DBTypes)))
stop(paste("Invalid Type selected", "-", type))
alterQuery <- "ALTER TABLE"
alterQuery <-
paste(
alterQuery,
md5table,
"ADD COLUMN",
private$conn_data$quot(name),
private$conn_data$ba2DBTypes[[type]]
)
if (!is.null(default)) {
alterQuery <- paste(alterQuery, "NOT NULL DEFAULT")
if (is.numeric(default))
alterQuery <- paste(alterQuery, " (", default, ") ", sep = "")
else
alterQuery <- paste(alterQuery, " '", default, "' ", sep = "")
}
print(alterQuery)
RJDBC::dbSendUpdate(private$conn_data$jdbc, alterQuery)
},
insertData = function(tablename, dataframe) {
if (nrow(dataframe) < 1)
stop("No data available in dataframe")
query <- "INSERT INTO"
query <- paste(query, tablename)
colNames <- paste("\"", colnames(dataframe), "\"", sep = "")
query <- paste(query, "(", paste(colNames, collapse = ","), ")")
print(paste("intial--->",query))
query <- paste(query, "VALUES")
values <- ""
for (i in 1:nrow(dataframe)) {
if (i > 1)
values <- paste(values, ",")
row <- dataframe[i, ]
#string values are printed with index
#values <- paste(values,paste("(",paste(paste("'",row,"'",sep=""),collapse=","),")",sep = ""))
values <-
paste(values, paste("(", paste(apply(row, 1, function(k) {
paste(paste("'", k, "'", sep = ""), collapse = ",")
})), ")", sep = ""))
}
query <- paste(query, values)
print(query)
RJDBC::dbSendUpdate(private$conn_data$jdbc, query)
},
dropIfExists = function(tableName) {
t <- tableName
temp <- unlist(strsplit(t, split = "[.]"))
if (length(temp) > 1) {
RJDBC::dbSendUpdate(private$conn_data$jdbc, paste("set schema", temp[1]))
t <- temp[2]
}
t <- gsub("\"", "", t)
if (RJDBC::dbExistsTable(private$conn_data$jdbc, t)) {
RJDBC::dbRemoveTable(private$conn_data$jdbc,
DBI::dbQuoteIdentifier(private$conn_data$jdbc, t))
}
}
),
public = list (
dataTypes = '',
initialize = function(conn_data) {
private$conn_data <- conn_data
self$dataTypes <-
list(
STRING = "STRING",
NUMERIC = "NUMERIC",
INTEGER = "INTEGER",
DATE = "DATE",
DATETIME = "DATETIME",
TIME = "TIME"
)
},
load = function(columns = NULL) {
query <-
paste("SELECT ",
getColumns(colSelected = columns, private$conn_data),
sep = "")
query <- paste(query, "FROM", private$conn_data$factTable)
# if(!is.null(limit) & !is.na(limit) & limit > 0)
# query <- paste(query, "LIMIT",limit)
df <- RJDBC::dbGetQuery(private$conn_data$jdbc, query)
col2Label <- getColumn2Label(private$conn_data$columns)
orgNames <- names(df)
for (i in 1:length(orgNames)) {
names(df)[i] <- col2Label[names(df)[i]]
}
df
},
close = function() {
RJDBC::dbDisconnect(private$conn_data$jdbc)
},
updateDataFrame = function(df = NULL,
colName = NULL,
type = NULL) {
if (is.na(df) || missing(colName))
stop("Required parameters were missing")
if (!(colName %in% colnames(df)))
stop("Specified column doesnt exists in the data frame")
label2Col <- private$conn_data$columns
orgNames <- names(df)
for (i in 1:length(orgNames)) {
if (orgNames[i] != colName) {
label <- label2Col[orgNames[i]]
if (is.null(label))
stop("Dataframe has an unexpected column")
names(df)[i] <- label
} else {
if (exists(paste("carriots.analytics.dervived_dim_name"))) {
carriots.analytics.dervived_dim_name <- gsub("\"","",carriots.analytics.dervived_dim_name)
print(paste("dimName------>",carriots.analytics.dervived_dim_name))
names(df)[i] <- carriots.analytics.dervived_dim_name
colName <- carriots.analytics.dervived_dim_name
}
}
}
#create table
md5Table <- private$createTable(df, colName, type)
#insert in to new table
private$insertData(md5Table, df)
#Fire an event to reload the table in the app
params <-
list(dstoken = token,
dim = carriots.analytics.dervived_dim_name,
support_table = md5Table)
headerParams <- c('X-CA-apiKey' = apiKey)
res <- doHttpCall(url, "reloadext", params, headerParams)
res
},
getColumnNames = function() {
cols <- names(private$conn_data$columns)
cols
}
)
)
# //////////////////////////////////////////////////////
# Call the CA REST API to get the connection data
# Based on the engine type, get the appropriate JDBC driver
data <- getDatasourceConnection(url, token, apiKey)
jdbc <- data$jdbc
factTable <- data$ftable
columns <- data$columns
quot <- data$quot
ba2DBTypes <- getDataTypes(data$engineType)
connect_data <-
BAConnectionData$new(factTable, jdbc, columns, ba2DBTypes, quot, data$username)
conn <- BAConnection$new(connect_data)
conn
}
#################################################################################
#' Method to load the table data
#'
#' User can pass the selected column names as the data frame returned by this
#' method will have only the selected columns
#'
#' @param columns - vector of selected column names
#' @param conn - BAConnection object obtained from connect API
#'
#'@export
load.ca = function(conn = NULL, columns = NULL) {
conn$load(columns = columns)
}
#################################################################################
#' Method to update the data in the table
#'
#' Update the table with the values provided in the data frame. Below is the
#' structure of the data frame for WhereClause
#'
#' @param conn - BAConnection object obtained from connect API
#' @param dataframe - DataFrame for where Clause
#' @param colname - colname in dataframe which is to be added in to table
#' @param type - Data type of column, supports specific types available in the conn$dataTypes list
#'
#'@export
update.ca = function(conn = NULL,
dataframe = NULL,
colname = NULL,
type = NULL) {
conn$updateDataFrame(df = dataframe,
colName = colname,
type = type)
}
###############################################################################
#'Reload Datasource
#'
#' Function to refresh the datasource in envision, which reflects
#' the latest update
#'
#' @param baseUrl - Envision server URL
#' @param token - Token for the datasource
#' obtained from the App
#' @param apiKey - ApiKey for the user/company
#'
#' @export
reloadDatasource.ca <- function(baseUrl, token, apiKey) {
headerParams <- c('X-CA-apiKey' = apiKey)
queryParams <- list(dstoken = token)
res <- doHttpCall(baseUrl, "reloadext", queryParams, headerParams)
res
}
##################################################################################
# UTILITIES
##################################################################################
asc <- function(x) {
strtoi(charToRaw(x), 16L)
}
chr <- function(n) {
rawToChar(as.raw(n))
}
#supported data types - ba2DBTypes
getDataTypes <- function(engineType) {
dataTypes <-
list(
STRING = "VARCHAR(256)",
NUMERIC = "DOUBLE",
INTEGER = "INT",
DATE = "DATE",
DATETIME = "TIMESTAMP",
TIME = "TIME"
)
if (toupper(engineType) == "ORACLE") {
dataTypes$NUMERIC = "NUMBER(20,4)"
dataTypes$INTEGER = "NUMBER(20)"
}
else if (toupper(engineType) == "MYSQL") {
dataTypes$NUMERIC = "DECIMAL(20,4)"
dataTypes$TIMESTAMP = "DATETIME"
}
else if (toupper(engineType) == "REDSHIFT") {
dataTypes$NUMERIC = "DOUBLE PRECISION"
dataTypes$INTEGER = "BIGINT"
}
dataTypes
}
getColumns <- function(colSelected = NULL, conn) {
cols <- conn$columns
colString <- ""
if (!is.null(colSelected) & length(colSelected) > 0) {
length <- length(colSelected)
for (i in 1:length) {
if (i > 1 & i <= length)
colString <- paste(colString, ",")
if (!is.null(cols[[colSelected[[i]]]])) {
colString <- paste(colString, conn$quot(cols[[colSelected[[i]]]]))
} else
stop(paste("ColName:", colSelected[[i]], "doesn't exists in the table"))
i <- i + 1
}
} else
colString <- "*"
colString
}
getColumn2Label <- function(colList) {
column <- paste(colList)
labels <- names(colList)
col2Label <- list()
for (i in 1:length(labels))
col2Label[column[i]] = labels[i]
col2Label
}
#-------------------------------------------------------------
#'Get DataSource Meta data
#'
#' This is a function to obtain the meta data of a
#' datasource from the envision server by providing the secret
#' key of the datasource
#'
#' @param baseUrl - Envision server URL
#' @param token - token for the datasource
#' obtained from the App
#' @param apiKey - ApiKey for the user/company
#--------------------------------------------------------------
getDataSourceMetaData <- function(baseUrl, token, apiKey) {
headerParams <- c('X-CA-apiKey' = apiKey)
queryParams <- list(dstoken = token)
res <- doHttpCall(baseUrl, "dsExtConnect", queryParams, headerParams)
res
}
#-----------------------------------------------------------------------
#'Get DataSource Connection
#'
#' This is a function to obtain JDBC connection to a
#' datasource from the envision server by providing the secret
#' key of the datasource
#'
#' @param baseUrl - Envision server URL
#' @param token - token for the datasource
#' obtained from the App
#' @param apiKey - ApiKey for the user/company
#-----------------------------------------------------------------------
getDatasourceConnection <- function(baseUrl, token, apiKey) {
data <- getDataSourceMetaData(baseUrl, token, apiKey)
if (!is.null(data$connect_data)) {
connect_data <- data$connect_data
jdbcDetails <- getDriverDetails(connect_data)
if (is.null(jdbcDetails$driverClass) ||
is.null(jdbcDetails$driver) || is.null(jdbcDetails$connString))
stop("Unable to create JDBC connection- required info missing")
#decrypt password
passWord <- character()
encrypt <- strsplit(connect_data$password, "")[[1]]
for (i in 1:stringr::str_length(connect_data$password)) {
passWord[[i]] <- chr(asc(encrypt[[i]]) - 4)
}
decrypt <- paste(passWord, collapse = "")
passWord <- stringi::stri_reverse(decrypt)
passWord <- rawToChar(base64enc::base64decode(passWord))
#passPhrase <- digest::AES(connect_data$ftable,mode="CBC")
#passPhrase <- substr(passPhrase,0,8)
#aes <- digest::AES(passPhrase,mode="CBC")
jdbcDriver <- RJDBC::JDBC(
driverClass = jdbcDetails$driverClass,
classPath = system.file("extdata", jdbcDetails$driver, package = "envision"),
identifier.quote = jdbcDetails$quot
)
conn <-
RJDBC::dbConnect(jdbcDriver,
jdbcDetails$connString,
connect_data$username,
passWord)
ftable <- connect_data$ftable
data <- NULL
data$ftable <- ftable
data$username <- connect_data$user_login_name
data$jdbc <- conn
data$quot <- jdbcDetails$quot
data$columns <- connect_data$columns
data$engineType <- connect_data$engine_type
} else {
stop("Connect data of the datasource not available")
}
data
}
doHttpCall <-
function(baseUrl,
identifier,
queryParams,
headerParams) {
url_length <- stringr::str_length(baseUrl)
if (substr(baseUrl, url_length, url_length) == "/") {
baseUrl <- substr(baseUrl, 0, url_length - 1)
}
baseUrl <-
paste(baseUrl, "/datasource.do?action=", identifier, sep = "")
ua <-
"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:33.0) Gecko/20100101 Firefox/33.0"
doc <- httr::POST(baseUrl,
httr::add_headers(.headers = headerParams),
query = queryParams,
httr::user_agent(ua))
res <- httr::content(doc, useInternalNodes = T)
data <- jsonlite::fromJSON(res)
data
}
getDriverDetails <- function(connect_data) {
if (is.null(connect_data$engine_type))
stop("Engine Type not found")
engineType <- connect_data$engine_type
if (is.null(connect_data$hostname))
stop("Host Name not found")
host_port <- connect_data$hostname
if (!is.null(connect_data$port))
host_port <- paste(host_port, connect_data$port, sep = ":")
if (is.null(connect_data$dbName))
stop("DBName not found")
jdbcDetails <- NULL
if (toupper(engineType) == "MONETDB") {
jdbcDetails$driverClass <- "nl.cwi.monetdb.jdbc.MonetDriver"
jdbcDetails$driver <- "monetdb-jdbc-2.8.jar"
jdbcDetails$connString <-
paste("jdbc:monetdb://",
host_port,
"/",
connect_data$dbName,
sep = "")
jdbcDetails$quot <- "\""
} else if (toupper(engineType) == "MYSQL") {
jdbcDetails$driverClass <- "com.mysql.jdbc.Driver"
jdbcDetails$driver <- "mysql-connector-java-5.1.21-bin"
jdbcDetails$connString <-
paste("jdbc:mysql://", host_port, "/", connect_data$dbName, sep = "")
jdbcDetails$quot <- "`"
} else if (toupper(engineType) == "POSTGRESQL" ||
toupper(engineType) == "REDSHIFT") {
jdbcDetails$driverClass <- "org.postgresql.Driver"
jdbcDetails$driver <- "postgresql-9.2-1002.jdbc4.jar"
jdbcDetails$connString <-
paste("jdbc:postgresql://",
host_port,
"/",
connect_data$dbName,
sep = "")
jdbcDetails$quot <- "\""
} else if (toupper(engineType) == "SQLSERVER") {
jdbcDetails$driverClass <-
"com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbcDetails$driver <- "sqljdbc4.jar"
jdbcDetails$connString <-
paste("jdbc:sqlserver://",
getSQLServerConnString(connect_data),
sep = "")
jdbcDetails$quot <- "\""
} else if (toupper(engineType) == "ORACLE") {
jdbcDetails$driverClass <- "oracle.jdbc.driver.OracleDriver"
jdbcDetails$driver <- "ojdbc6.jar"
jdbcDetails$connString <-
paste("jdbc:oracle:thin:@",
host_port,
"/",
connect_data$dbName,
sep = "")
jdbcDetails$quot <- "\""
} else if (toupper(engineType) == "SUNDB") {
jdbcDetails$driverClass <- "sunje.sundb.jdbc.SundbDriver"
jdbcDetails$driver <- "sundb6.jar"
jdbcDetails$connString <-
paste("jdbc:sundb://", host_port, "/", connect_data$dbName, sep = "")
jdbcDetails$quot <- "\""
} else if (toupper(engineType) == "MARIADB") {
jdbcDetails$driverClass <- "org.mariadb.jdbc.Driver"
jdbcDetails$driver <- "mariadb-java-client-1.3.3.jar"
jdbcDetails$connString <-
paste("jdbc:mariadb://",
host_port,
"/",
connect_data$dbName,
sep = "")
jdbcDetails$quot <- "\""
}
jdbcDetails
}
getSQLServerConnString <- function(connect_data) {
conn_string <-
paste(connect_data$hostname, connect_data$dbName, sep = "\\")
if (!is.null(connect_data$port))
conn_string <- paste(conn_string, connect_data$port, sep = ":")
conn_string
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.