Nothing
#' @include tidyr_utils.R
NULL
#' @importFrom tidyr unnest
#' @export
unnest.tbl_spark <- function(data,
cols,
...,
keep_empty = FALSE,
ptype = NULL,
names_sep = NULL,
names_repair = "check_unique",
.drop = "DEPRECATED",
.id = "DEPRECATED",
.sep = "DEPRECATED",
.preserve = "DEPRECATED") {
cols <- tidyselect::eval_select(
rlang::enquo(cols), replicate_colnames(data)
) %>%
names()
if (length(cols) == 0) {
return(data)
}
group_vars <- dplyr::group_vars(data)
data <- data %>%
dplyr::compute() %>%
dplyr::ungroup()
sc <- spark_connection(data)
num_rows <- spark_dataframe(data) %>% invoke("count")
schema <- data %>% sdf_schema(expand_nested_cols = TRUE)
struct_fields <- list()
for (col in cols) {
dtype <- schema[[col]]$type
if (!identical(as.character(dtype), "array")) {
paste0(
"`unnest.tbl_spark` is only supported for columns of type ",
"`array<struct<.*>>` (i.e., a column produced by a previous ",
"`nest.tbl_spark` operation. Column '", col, "' is of type ",
schema[[col]]$type
) %>%
rlang::abort()
}
nested_cols <- attributes(dtype)$element_type %>%
lapply(function(field) field$name) %>%
list()
names(nested_cols) <- col
struct_fields <- append(struct_fields, nested_cols)
}
output_cols <- NULL
for (col in colnames(data)) {
if (col %in% cols) {
for (nested_col in struct_fields[[col]]) {
dest_col <- (
if (is.null(names_sep)) {
nested_col
} else {
paste0(col, names_sep, nested_col)
})
output_cols <- append(output_cols, dest_col)
}
} else {
output_cols <- append(output_cols, col)
}
}
output_cols <- repair_names(output_cols, names_repair)
output_cols_idx <- 1
unnest_col_sqls <- list()
other_col_sqls <- list()
for (col in colnames(data)) {
if (col %in% cols) {
struct_name <- quote_sql_name(col)
for (nested_col in struct_fields[[col]]) {
unnest_col_sql <- sprintf(
"EXPLODE_OUTER(%s.%s) AS %s",
struct_name,
quote_sql_name(nested_col),
quote_sql_name(output_cols[[output_cols_idx]])
)
output_cols_idx <- output_cols_idx + 1
unnest_col_sqls <- append(unnest_col_sqls, unnest_col_sql)
}
} else {
other_col_sqls <- append(
other_col_sqls,
sprintf("%s AS %s", quote_sql_name(col), output_cols[[output_cols_idx]])
)
output_cols_idx <- output_cols_idx + 1
}
}
unnest_col_sqls[[1]] <- paste(
append(
list(unnest_col_sqls[[1]]),
other_col_sqls
),
collapse = ", "
)
out <- data
if (identical(keep_empty, FALSE)) {
no_empty_value_sql <- paste(
lapply(
cols,
function(col) {
sprintf("ISNOTNULL(%s)", quote_sql_name(col))
}
),
collapse = " AND "
) %>%
dplyr::sql()
out <- dplyr::filter(out, no_empty_value_sql)
}
out <- out %>% dplyr::compute()
out_tbl <- out %>%
ensure_tmp_view() %>%
quote_sql_name()
unnested_cols <- lapply(
unnest_col_sqls,
function(sql) {
tmp_view_name <- random_string("sparklyr_tmp_")
DBI::dbGetQuery(
sc,
sprintf(
"CREATE TEMPORARY VIEW %s AS (SELECT %s FROM %s)",
quote_sql_name(tmp_view_name),
paste(sql, collapse = " ,"),
out_tbl
)
)
dplyr::tbl(sc, tmp_view_name)
}
)
group_vars <- intersect(setdiff(group_vars, cols), output_cols)
out <- do.call(sdf_fast_bind_cols, unnested_cols) %>>%
dplyr::select %@% lapply(output_cols, as.symbol) %>>%
dplyr::group_by %@% lapply(group_vars, as.symbol)
apply_ptype(out, ptype)
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.