# Copyright 2024 Cloudera Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# register virtual class
#' @importFrom methods setOldClass
setOldClass("tbl_impala")
#' @export
#' @importFrom dplyr intersect
intersect.tbl_impala <- function(x, y, copy = FALSE, ...) {
stop("Impala does not support intersect operations.", call. = FALSE)
}
#' @export
#' @importFrom dplyr setdiff
setdiff.tbl_impala <- function(x, y, copy = FALSE, ...) {
stop("Impala does not support setdiff operations.", call. = FALSE)
}
#' Force execution of an Impala query
#'
#' @name compute
#' @description \describe{ \item{\code{compute()}}{Executes the query and stores
#' the result in a new Impala table} \item{\code{collect()}}{Executes the query
#' and returns the result to R as a data frame \code{tbl}}
#' \item{\code{collapse()}}{Generates the query for later execution} }
#'
#' @param x an object with class \code{tbl_impala}
#' @param name the name for the new Impala table
#' @param temporary must be set to \code{FALSE}
#' @param unique_indexes not used
#' @param indexes not used
#' @param analyze whether to run \code{COMPUTE STATS} after adding data to the
#' new table
#' @param external whether the new table will be externally managed
#' @param overwrite whether to overwrite existing table data (currently ignored)
#' @param force whether to silently fail if the table already exists
#' @param field_terminator the deliminter to use between fields in text file
#' data. Defaults to the ASCII control-A (hex 01) character
#' @param line_terminator the line terminator. Defaults to \code{"\n"}
#' @param file_format the storage format to use. Options are \code{"TEXTFILE"}
#' (default) and \code{"PARQUET"}
#' @param ... other arguments passed on to methods
#' @note Impala does not support temporary tables. When using \code{compute()}
#' to store results in an Impala table, you must set \code{temporary = FALSE}.
#' @export
#' @importFrom assertthat assert_that
#' @importFrom assertthat is.string
#' @importFrom assertthat is.flag
#' @importFrom dbplyr op_grps
#' @importFrom dbplyr op_vars
#' @importFrom dplyr %>%
#' @importFrom dplyr compute
#' @importFrom dplyr group_by
#' @importFrom dplyr select
#' @importFrom dplyr tbl
#' @importFrom rlang !!!
#' @importFrom rlang syms
#' @importFrom utils getFromNamespace
compute.tbl_impala <-
function(x,
name,
temporary = TRUE,
unique_indexes = NULL,
indexes = NULL,
analyze = FALSE,
external = FALSE,
overwrite = FALSE,
force = FALSE,
field_terminator = NULL,
line_terminator = NULL,
file_format = NULL,
...) {
# TBD: add params to control location and other CREATE TABLE options
assert_that(
is.string(name),
is.flag(temporary),
is.flag(external),
is.flag(overwrite),
is.flag(force),
is.flag(analyze),
is_string_or_null(file_format),
is_nchar_one_string_or_null(field_terminator),
is_nchar_one_string_or_null(line_terminator)
)
if (temporary) {
stop(
"Impala does not support temporary tables. Set temporary = FALSE in compute().",
call. = FALSE
)
}
vars <- op_vars(x)
assert_that(all(unlist(indexes) %in% vars))
assert_that(all(unlist(unique_indexes) %in% vars))
x_aliased <- select(x, !!! syms(vars))
sql <- db_sql_render(x$src$con, x_aliased$lazy_query)
# TBD: implement db_compute.impala_connection and call it here instead of db_save_query
name <- db_save_query(
con = x$src$con,
sql = sql,
name = name,
temporary = FALSE,
analyze = analyze,
external = external,
overwrite = overwrite,
force = force,
field_terminator = field_terminator,
line_terminator = field_terminator,
file_format = file_format,
...
)
tbl(x$src, name) %>%
group_by(!!! syms(op_grps(x)))
}
#' @name collect
#' @rdname compute
#' @param n the number of rows to return
#' @param warn_incomplete whether to issue a warning if not all rows retrieved
#' @export
#' @importFrom dplyr collect
collect.tbl_impala <-
function(x,
...,
n = Inf,
warn_incomplete = TRUE) {
NextMethod("collect")
}
#' @name collapse
#' @rdname compute
#' @param vars not used
#' @export
#' @importFrom dplyr collapse
collapse.tbl_impala <- function(x, vars = NULL, ...) {
NextMethod("collapse")
}
#' Unnest a complex column in an Impala table
#'
#' @description \code{impala_unnest()} unnests a
#' column of type \code{ARRAY}, \code{MAP}, or \code{STRUCT}
#' in a \code{tbl_impala}. These column types are referred to
#' as complex or nested types.
#'
#' @param data an object with class \code{tbl_impala}
#' @param col the unquoted name of an \code{ARRAY},
#' \code{MAP}, or \code{STRUCT} column
#' @param ... ignored (included for compatibility)
#' @return an object with class \code{tbl_impala} with the
#' complex column unnested into two or more separate columns
#' @details \code{impala_unnest()} currently can unnest only
#' one column, can only be applied once to a \code{tbl_impala},
#' and must be applied to a \code{tbl_impala} representing an
#' Impala table or view before applying any other operations.
#' @seealso \href{https://docs.cloudera.com/documentation/enterprise/latest/topics/impala_complex_types.html}{
#' Impala Complex Types}
#' @export
#' @importFrom dbplyr ident_q
#' @importFrom rlang enexpr
#' @importFrom rlang !!
#' @importFrom rlang :=
#' @importFrom tidyselect vars_select
#' @importFrom tidyselect vars_rename
impala_unnest <- function(data, col, ...) {
res <- data
if (!inherits(res, "tbl_impala")) {
stop("data argument must be a tbl_impala", call. = FALSE)
}
if (!"vars" %in% names(res$lazy_query$x) ||
all(is.na(attr(res$lazy_query$x$vars, "complex_type")))) {
stop("data argument must contain complex columns", call. = FALSE)
}
if (is.null(attr(res$lazy_query$x$vars, "complex_type"))) {
stop("impala_unnest() can only be applied once to a tbl_impala",
call. = FALSE)
}
if (!"x" %in% names(res$lazy_query$x) ||
!inherits(res$lazy_query$x$x, "dbplyr_table_ident")) {
stop("impala_unnest() must be applied to a tbl_impala before any other operations",
call. = FALSE)
}
col <- enexpr(col)
colname <- as.character(col)
if (length(colname) != 1) {
stop("impala_unnest() can unnest only one column")
}
colindex <- which(res$lazy_query$x$vars == colname)
if (length(colindex) != 1) {
stop("Column ", colname, " not found", call. = FALSE)
}
coltype <- attr(res$lazy_query$x$vars, "complex_type")[colindex]
tablename <- format(res$lazy_query$x$x)
if (identical(coltype, "array")) {
quoted_tablename <- tablename # impala_escape_ident(res$src$con, tablename, "`")
res$lazy_query$x$x <- utils::getFromNamespace("as_table_ident", "dbplyr")(
ident_q(
paste0(quoted_tablename, ", ", quoted_tablename, ".`", colname,"`")
)
)
res$lazy_query$x$vars <- c(
setdiff(res$lazy_query$x$vars, colname),
paste0(colname, ".item"),
paste0(colname, ".pos")
)
res$lazy_query <- res$lazy_query$x
item_name_before <- paste0(colname, ".item")
item_name_after <- paste0(colname, "_item")
pos_name_before <- paste0(colname, ".pos")
pos_name_after <- paste0(colname, "_pos")
rename_complex_cols <- vars_rename(
colnames(res),
!!item_name_after := !!item_name_before,
!!pos_name_after := !!pos_name_before
)
res <- select(res, rename_complex_cols)
} else if (identical(coltype, "map")) {
quoted_tablename <- tablename # impala_escape_ident(res$src$con, tablename, "`")
res$lazy_query$x$x <- utils::getFromNamespace("as_table_ident", "dbplyr")(
ident_q(
paste0(quoted_tablename, ", ", quoted_tablename, ".`", colname,"`")
)
)
res$lazy_query$x$vars <- c(
setdiff(res$lazy_query$x$vars, colname),
paste0(colname, ".key"),
paste0(colname, ".value")
)
res$lazy_query <- res$lazy_query$x
key_name_before <- paste0(colname, ".key")
key_name_after <- paste0(colname, "_key")
value_name_before <- paste0(colname, ".value")
value_name_after <- paste0(colname, "_value")
rename_complex_cols <- vars_rename(
colnames(res),
!!key_name_after := !!key_name_before,
!!value_name_after := !!value_name_before
)
res <- select(res, rename_complex_cols)
} else if (identical(coltype, "struct")) {
sql <- paste0("DESCRIBE ", tablename, ".", colname)
structcolnames <- dbGetQuery(res$src, sql)$name
othercolnames <- setdiff(res$lazy_query$x$vars, colname)
col_names_before <- c(
othercolnames,
paste(colname, structcolnames, sep = ".")
)
col_names_after <- c(
othercolnames,
paste(colname, structcolnames, sep = "_")
)
res$lazy_query$x$vars <- col_names_before
res$lazy_query <- res$lazy_query$x
rename_complex_cols <- col_names_before
names(rename_complex_cols) <- col_names_after
res <- select(res, !!rename_complex_cols)
} else {
stop("Column ", colname, " must be of type ARRAY, MAP, or STRUCT", call. = FALSE)
}
res
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.