#' for a given Java com.datastax.driver.core.ResultSet object, get the column definitions as a List.
#' Raises errors if we do not yet support the column type.
#' Currently we support: float, double, text, varchar, int, tinyint, date (assumes UTC context), timestamp (stored as POSIXct in timezone UTC)
#'
#' @param jcRes instance of com.datastax.driver.core.ResultSet
.cass_col_map <- function(jcRes) {
jcColDefs <- rJava::.jcall(jcRes, 'Lcom/datastax/driver/core/ColumnDefinitions;', 'getColumnDefinitions')
numCols <- rJava::.jcall(jcColDefs, 'I', 'size')
mappings = list()
for (i in 1:numCols) {
cname <- rJava::.jcall(jcColDefs, "S", "getName", as.integer(i-1))
ctypeObj <- rJava::.jcall(jcColDefs, 'Lcom/datastax/driver/core/DataType;','getType', as.integer(i-1))
ctype <- rJava::.jcall(ctypeObj, 'S', 'asFunctionParameterString')
map = list()
map$index = as.integer(i-1)
map$javatype = ctype
map$colname = cname
if (ctype=='float') {
map$java_get_func = 'getFloat'
map$r_cast_func = 'as.numeric'
map$jni_type = 'F'
} else if (ctype=='double') {
map$java_get_func = 'getDouble'
map$r_cast_func = 'as.numeric'
map$jni_type = 'D'
} else if (ctype=='text'||ctype=='varchar') {
map$java_get_func = 'getString'
map$r_cast_func = 'as.character'
map$jni_type = 'S'
} else if (ctype=='int') {
map$java_get_func = 'getInt'
map$r_cast_func = 'as.integer'
map$jni_type = 'I'
} else if (ctype=='tinyint') {
map$java_get_func = 'getByte'
map$r_cast_func = 'as.integer'
map$jni_type = 'B'
} else if (ctype=='date') {
map$java_get_func = 'getDate'
map$r_cast_func = 'as_date_from_cql_date' # our custom function
map$jni_type = 'Lcom/datastax/driver/core/LocalDate;'
} else if (ctype=='timestamp') {
map$java_get_func = 'getTimestamp'
map$r_cast_func = 'as_posixct_from_cql_get_timestamp'
map$jni_type = 'Ljava/util/Date;'
} else {
stop(paste0("Unsupported type of column ",cname," type ",ctype," in query "))
}
mappings[[cname]] <- map
}
return(mappings)
}
#' Perform a CQL update.
#'
#' @param jCassSess The Cassandra session, from get_cass_session
#' @param cqlQuery A string query. If performing multiple at once, use BEG
#'
#' @examples
#' cass_update(jCassSess, "insert into test_lmp_combined (dt,hr,official_pnode_id,rt_energy,da_energy) values ('2017-01-19',13,80,24.33,-34.33)")
#' cass_update(jCassSess, "BEGIN BATCH
#' insert into test_lmp_combined (dt,hr,official_pnode_id,rt_energy,da_energy) values ('2017-01-20',13,80,24.33,-34.33);
#' insert into test_lmp_combined (dt,hr,official_pnode_id,rt_energy,da_energy) values ('2017-01-20',24,80,14.44,55.55);
#' APPLY BATCH")
#' @export
cass_update <- function(jCassSess, cqlQuery) {
rJava::.jcall(jCassSess, 'Lcom/datastax/driver/core/ResultSet;', 'execute', cqlQuery)
return(T)
}
cass_query <- function(jCassSess, cqlQuery, estimatedNumRows=1000, debug=F) {
reader <- rJava::.jnew("com/tioscapital/cassandrasimple/CassandraReader",jCassSess, cqlQuery, as.integer(estimatedNumRows), debug);
numCols <- rJava::.jcall(reader, 'I', 'getNumColumns')
dfList <- list()
if (debug) { cat("... in new cass_query I have ",numCols," columns\n") }
for (i in seq.int(numCols)) {
col <- rJava::.jcall(reader, 'Lcom/tioscapital/cassandrasimple/CassandraColumn;', 'getColumn', as.integer(i-1))
colName <- rJava::.jcall(col, 'S', 'getColName')
colGetter <- rJava::.jcall(col, 'S', 'getGetMethodName')
colGetterJniType <- rJava::.jcall(col, 'S', 'getGetMethodJniType')
colGetterRConversion <- rJava::.jcall(col, 'S', 'getRConversionName')
colData <- rJava::.jcall(col, colGetterJniType, colGetter)
nullIndexes <- rJava::.jcall(col, "[I", "getNullIndexes")
if (nchar(colGetterRConversion) > 0) {
colData <- get(colGetterRConversion)(colData)
}
if (length(nullIndexes) > 0) {
nullIndexes <- nullIndexes + 1 # indexes in java are 0-based
if (debug) { cat("for ", colName, " the null indexes are ", paste0(nullIndexes,collapse=","), "\n") }
colData[nullIndexes] <- NA
}
dfList[[colName]] <- colData
}
df <- as.data.frame(dfList, stringsAsFactors=F)
return(df)
}
#' Execute a CQL query and capture the results as a data.frame. Columns will match up with what is in the Cassandra table definition and your query.
#' Follow CQL rules for queries. If you try to query on a column without a secondary index, you will get errors from the Datastax driver.
#' NULL values in the Cassandra tables are returned as NA in the data frame. We convert Cassandra data types to reasonable R equivalents.
#' We currently support the following mappings:
#' float, double, text, varchar, int, tinyint, date
#' See .cass_col_map for mapping details.
#'
#' @example stuff <- cass_query(jCassSess,"select * from test_lmp_combined where dt = '2017-01-20' and hr = 13")
#' @param jCassSess The Cassandra session, from get_cass_session
#' @param cqlQuery A string query.
#' @export
cass_query_slower <- function(jCassSess, cqlQuery) {
jcRes <- rJava::.jcall(jCassSess, 'Lcom/datastax/driver/core/ResultSet;', 'execute', cqlQuery)
results = list()
colMaps <- .cass_col_map(jcRes)
ctr <- 0
while(!rJava::.jcall(jcRes, 'Z', 'isExhausted')) {
jcRow <- rJava::.jcall(jcRes, 'Lcom/datastax/driver/core/Row;', 'one')
ctr <- ctr+1
row = list()
for (m in names(colMaps)) {
if (rJava::.jcall(jcRow, 'Z', "isNull", m)) {
val <- NA
} else {
val <- rJava::.jcall(jcRow, colMaps[[m]]$jni_type, colMaps[[m]]$java_get_func, m)
val <- get(colMaps[[m]]$r_cast_func)(val)
}
row[[m]] <- val
}
results[[ctr]] <- row
}
return(dplyr::bind_rows(results))
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.