get_sdf_storage_level <- function(sdf_jobj) {
invoke(sdf_jobj, "storageLevel")
}
#' @export
#' @importFrom dbplyr sql_render
#' @importFrom dbplyr sql_build
spark_dataframe.tbl_spark <- function(x, ...) {
x[["spark_dataframe"]](
x,
function(tbl_spark) {
sc <- spark_connection(tbl_spark)
sql <- as.character(dbplyr::remote_query(tbl_spark))
hive <- hive_context(sc)
invoke(hive, "sql", sql)
}
)
}
#' @export
spark_dataframe.spark_connection <- function(x, sql = NULL, ...) {
invoke(hive_context(x), "sql", as.character(sql))
}
#' Read the Schema of a Spark DataFrame
#'
#' Read the schema of a Spark DataFrame.
#'
#' The \code{type} column returned gives the string representation of the
#' underlying Spark type for that column; for example, a vector of numeric
#' values would be returned with the type \code{"DoubleType"}. Please see the
#' \href{https://spark.apache.org/docs/latest/api/scala/index.html}{Spark Scala API Documentation}
#' for information on what types are available and exposed by Spark.
#'
#' @param expand_nested_cols Whether to expand columns containing nested array
#' of structs (which are usually created by tidyr::nest on a Spark data frame)
#'
#' @param expand_struct_cols Whether to expand columns containing structs
#'
#' @return An \R \code{list}, with each \code{list} element describing the
#' \code{name} and \code{type} of a column.
#'
#' @template roxlate-ml-x
#'
#' @export
sdf_schema <- function(x,
expand_nested_cols = FALSE,
expand_struct_cols = FALSE) {
UseMethod("sdf_schema")
}
#' @export
sdf_schema.tbl_spark <- function(x,
expand_nested_cols = FALSE,
expand_struct_cols = FALSE) {
x[["schema"]](
x,
sdf_schema_impl,
expand_nested_cols = expand_nested_cols,
expand_struct_cols = expand_struct_cols
)
}
#' @export
sdf_schema.default <- function(x,
expand_nested_cols = FALSE,
expand_struct_cols = FALSE) {
sdf_schema_impl(
x,
expand_nested_cols = expand_nested_cols,
expand_struct_cols = expand_struct_cols
)
}
sdf_schema_impl <- function(x,
expand_nested_cols,
expand_struct_cols) {
process_struct_type <- function(x) {
fields <- x$fields
fields_list <- lapply(
fields,
function(field) {
name <- field$name
type <- (
if ("fields" %in% names(field$dtype)) {
process_struct_type(field$dtype)
} else if ("elementType" %in% names(field$dtype)) {
dtype <- "array"
attributes(dtype)$element_type <- process_struct_type(field$dtype$elementType)
dtype
} else {
field$dtype$repr
})
list(name = name, type = type)
}
)
names(fields_list) <- unlist(lapply(fields_list, `[[`, "name"))
fields_list
}
invoke_static(
spark_connection(x),
"sparklyr.SchemaUtils",
"sdfSchema",
spark_dataframe(x),
expand_nested_cols,
expand_struct_cols
) %>%
jsonlite::fromJSON(simplifyDataFrame = FALSE, simplifyMatrix = FALSE) %>%
process_struct_type()
}
sdf_deserialize_column <- function(column, sc) {
separator <- split_separator(sc)
if (is.character(column)) {
splat <- strsplit(column, separator$regexp, fixed = TRUE)[[1]]
splat[splat == "<NA>"] <- NA
Encoding(splat) <- "UTF-8"
return(splat)
}
column
}
#' Read a Column from a Spark DataFrame
#'
#' Read a single column from a Spark DataFrame, and return
#' the contents of that column back to \R.
#'
#' It is expected for this operation to preserve row order.
#'
#' @template roxlate-ml-x
#' @param column The name of a column within \code{x}.
#' @export
sdf_read_column <- function(x, column) {
UseMethod("sdf_read_column")
}
#' @export
sdf_read_column.spark_jobj <- function(x, column) {
sdf <- spark_dataframe(x)
col_df <- invoke(sdf, "select", column, list())
col_df <- collect(col_df)
col_df[[column]]
}
#' @export
sdf_read_column.tbl_spark <- function(x, column) {
dplyr::pull(x, column)
}
#' Collect a Spark DataFrame into R.
#'
#' Collects a Spark dataframe into R.
#'
#' @param object Spark dataframe to collect
#' @param impl Which implementation to use while collecting Spark dataframe
#' - row-wise: fetch the entire dataframe into memory and then process it row-by-row
#' - row-wise-iter: iterate through the dataframe using RDD local iterator, processing one row at
#' a time (hence reducing memory footprint)
#' - column-wise: fetch the entire dataframe into memory and then process it column-by-column
#' NOTE: (1) this will not apply to streaming or arrow use cases (2) this parameter will only affect
#' implementation detail, and will not affect result of `sdf_collect`, and should only be set if
#' performance profiling indicates any particular choice will be significantly better than the default
#' choice ("row-wise")
#' @param ... Additional options.
#'
#' @export
sdf_collect <- function(object, impl = c("row-wise", "row-wise-iter", "column-wise"), ...) {
args <- list(...)
impl <- match.arg(impl)
sc <- spark_connection(object)
if (sdf_is_streaming(object)) {
sdf_collect_stream(object, ...)
} else if (arrow_enabled(sc, object) && !identical(args$arrow, FALSE)) {
arrow_collect(object, ...)
} else {
sdf_collect_static(object, impl, ...)
}
}
#' Collect Spark data serialized in RDS format into R
#'
#' Deserialize Spark data that is serialized using `spark_write_rds()` into a R
#' dataframe.
#'
#' @param path Path to a local RDS file that is produced by `spark_write_rds()`
#' (RDS files stored in HDFS will need to be downloaded to local filesystem
#' first (e.g., by running `hadoop fs -copyToLocal ...` or similar)
#'
#' @family Spark serialization routines
#' @export
collect_from_rds <- function(path) {
data <- readRDS(path)
col_names <- data[[1]]
timestamp_col_idxes <- data[[2]]
date_col_idxes <- data[[3]]
struct_col_idxes <- data[[4]]
col_data <- data[[5]]
names(col_data) <- col_names
df <- dplyr::as_tibble(col_data)
apply_conversion <- function(df, idx, fn) {
if ("list" %in% class(df[[idx]])) {
df[[idx]] <- lapply(df[[idx]], fn)
} else {
df[[idx]] <- fn(df[[idx]])
}
df
}
for (idx in timestamp_col_idxes) {
df <- df %>%
apply_conversion(
idx,
function(x) {
as.POSIXct(x, origin = "1970-01-01")
}
)
}
for (idx in date_col_idxes) {
df <- df %>%
apply_conversion(
idx,
function(x) {
as.Date(x, origin = "1970-01-01")
}
)
}
for (idx in struct_col_idxes) {
df <- df %>%
apply_conversion(
idx,
function(x) {
x %>%
lapply(
function(v) {
jsonlite::fromJSON(
v,
simplifyDataFrame = FALSE,
simplifyMatrix = FALSE
)
}
)
}
)
}
df
}
sdf_collect_data_frame <- function(sdf, collected) {
if (identical(collected, NULL)) {
return(invisible(NULL))
}
sc <- spark_connection(sdf)
# deserialize columns as needed (string columns will enter as
# a single newline-delimited string)
transformed <- lapply(collected, function(e) {
sdf_deserialize_column(e, sc)
})
# fix an issue where sometimes columns in a Spark DataFrame are empty
# in such a case, we fill those with NAs of the same type (#477)
n <- vapply(transformed, length, numeric(1))
rows <- if (length(n) > 0) max(n) else 0
fixed <- lapply(transformed, function(column) {
if (length(column) == 0) {
converter <- switch(
typeof(column),
character = as.character,
logical = as.logical,
integer = as.integer,
double = as.double,
identity
)
return(converter(rep(NA, rows)))
}
column
})
# set column names and return dataframe
colNames <- invoke(sdf, "columns")
names(fixed) <- as.character(colNames)
fixed <- as_tibble(fixed, stringsAsFactors = FALSE, optional = TRUE)
# fix booleans
schema <- sdf_schema(sdf)
for (field in schema) {
if (field$type == "BooleanType" && field$name %in% names(fixed)) {
fixed[[field$name]] <- as.logical(fixed[[field$name]])
}
}
fixed
}
# Read a Spark Dataset into R.
#' @importFrom dplyr as_tibble
sdf_collect_static <- function(object, impl, ...) {
args <- list(...)
n <- args$n
sc <- spark_connection(object)
sdf <- spark_dataframe(object)
if (!is.null(n) && !is.infinite(n)) {
n <- as.integer(n)
if (!is.na(n)) {
# If n is Inf or any value outside of integer range, then ignore it
sdf <- sdf %>% invoke("limit", n)
}
}
separator <- split_separator(sc)
# for some reason, we appear to receive invalid results when
# collecting Spark DataFrames with many columns. empirically,
# having more than 50 columns seems to trigger the buggy behavior
# collect the data set in chunks, and then join those chunks.
# note that this issue should be resolved with Spark >2.0.0
collected <- if (spark_version(sc) > "2.0.0") {
if (!identical(args$callback, NULL)) {
batch_size <- spark_config_value(sc$config, "sparklyr.collect.batch", as.integer(10^5L))
ctx <- invoke_static(sc, "sparklyr.DFCollectionUtils", "prepareDataFrameForCollection", sdf)
sdf <- invoke(ctx, "_1")
dtypes <- invoke(ctx, "_2")
sdf_iter <- invoke(sdf, "toLocalIterator")
iter <- 1
while (invoke(sdf_iter, "hasNext")) {
raw_df <- invoke_static(
sc,
"sparklyr.Utils",
"collectIter",
invoke(sdf_iter, "underlying"),
dtypes,
batch_size,
separator$regexp
)
df <- sdf_collect_data_frame(sdf, raw_df)
cb <- args$callback
if (is.language(cb)) cb <- rlang::as_closure(cb)
if (length(formals(cb)) >= 2) cb(df, iter) else cb(df)
iter <- iter + 1
}
NULL
} else {
invoke_static(
sc,
"sparklyr.Utils",
"collect",
sdf,
separator$regexp,
impl
)
}
} else {
if (!identical(args$callback, NULL)) stop("Parameter 'callback' requires Spark 2.0+")
columns <- invoke(sdf, "columns") %>% as.character()
chunk_size <- getOption("sparklyr.collect.chunk.size", default = 50L)
chunks <- split_chunks(columns, as.integer(chunk_size))
pieces <- lapply(chunks, function(chunk) {
subset <- sdf %>% invoke("selectExpr", as.list(chunk))
invoke_static(
sc,
"sparklyr.Utils",
"collect",
subset,
separator$regexp,
impl
)
})
do.call(c, pieces)
}
sdf_collect_data_frame(sdf, collected)
}
# Split a Spark DataFrame
sdf_split <- function(object,
weights = c(0.5, 0.5),
seed = sample(.Machine$integer.max, 1)) {
jobj <- spark_dataframe(object)
invoke(jobj, "randomSplit", as.list(weights), as.integer(seed))
}
#' Pivot a Spark DataFrame
#'
#' Construct a pivot table over a Spark Dataframe, using a syntax similar to
#' that from \code{reshape2::dcast}.
#'
#' @template roxlate-ml-x
#' @param formula A two-sided \R formula of the form \code{x_1 + x_2 + ... ~ y_1}.
#' The left-hand side of the formula indicates which variables are used for grouping,
#' and the right-hand side indicates which variable is used for pivoting. Currently,
#' only a single pivot column is supported.
#' @param fun.aggregate How should the grouped dataset be aggregated? Can be
#' a length-one character vector, giving the name of a Spark aggregation function
#' to be called; a named \R list mapping column names to an aggregation method,
#' or an \R function that is invoked on the grouped dataset.
#'
#' @examples
#' \dontrun{
#' library(sparklyr)
#' library(dplyr)
#'
#' sc <- spark_connect(master = "local")
#' iris_tbl <- sdf_copy_to(sc, iris, name = "iris_tbl", overwrite = TRUE)
#'
#' # aggregating by mean
#' iris_tbl %>%
#' mutate(Petal_Width = ifelse(Petal_Width > 1.5, "High", "Low")) %>%
#' sdf_pivot(Petal_Width ~ Species,
#' fun.aggregate = list(Petal_Length = "mean")
#' )
#'
#' # aggregating all observations in a list
#' iris_tbl %>%
#' mutate(Petal_Width = ifelse(Petal_Width > 1.5, "High", "Low")) %>%
#' sdf_pivot(Petal_Width ~ Species,
#' fun.aggregate = list(Petal_Length = "collect_list")
#' )
#' }
#'
#' @export
sdf_pivot <- function(x, formula, fun.aggregate = "count") {
sdf <- spark_dataframe(x)
# parse formulas of form "abc + def ~ ghi + jkl"
deparsed <- paste(deparse(formula), collapse = " ")
splat <- strsplit(deparsed, "~", fixed = TRUE)[[1]]
if (length(splat) != 2) {
stop("expected a two-sided formula; got '", deparsed, "'")
}
grouped_cols <- trim_whitespace(strsplit(splat[[1]], "[+*]")[[1]])
pivot_cols <- trim_whitespace(strsplit(splat[[2]], "[+*]", fixed = TRUE)[[1]])
# ensure no duplication of variables on each side
intersection <- intersect(grouped_cols, pivot_cols)
if (length(intersection)) {
stop("variables on both sides of forumla: ", paste(deparse(intersection), collapse = " "))
}
# ensure variables exist in dataset
nm <- as.character(invoke(sdf, "columns"))
all_cols <- c(grouped_cols, pivot_cols)
missing_cols <- setdiff(all_cols, nm)
if (length(missing_cols)) {
stop("missing variables in dataset: ", paste(deparse(missing_cols), collapse = " "))
}
# ensure pivot is length one (for now)
if (length(pivot_cols) != 1) {
stop("pivot column is not length one")
}
# generate pivoted dataset
grouped <- sdf %>%
invoke(
"%>%",
list("groupBy", grouped_cols[[1]], as.list(grouped_cols[-1])),
list("pivot", pivot_cols[[1]])
)
# perform aggregation
fun.aggregate <- fun.aggregate %||% "count"
result <- if (is.function(fun.aggregate)) {
fun.aggregate(grouped)
} else if (is.character(fun.aggregate)) {
if (length(fun.aggregate) == 1) {
invoke(grouped, fun.aggregate[[1]])
} else {
invoke(grouped, fun.aggregate[[1]], as.list(fun.aggregate[-1]))
}
} else if (is.list(fun.aggregate)) {
invoke(grouped, "agg", list2env(fun.aggregate))
} else {
stop("unsupported 'fun.aggregate' type '", class(fun.aggregate)[[1]], "'")
}
sdf_register(result)
}
#' Separate a Vector Column into Scalar Columns
#'
#' Given a vector column in a Spark DataFrame, split that
#' into \code{n} separate columns, each column made up of
#' the different elements in the column \code{column}.
#'
#' @template roxlate-ml-x
#' @param column The name of a (vector-typed) column.
#' @param into A specification of the columns that should be
#' generated from \code{column}. This can either be a
#' vector of column names, or an \R list mapping column
#' names to the (1-based) index at which a particular
#' vector element should be extracted.
#' @export
sdf_separate_column <- function(x,
column,
into = NULL) {
column <- cast_string(column)
# extract spark dataframe reference, connection
sdf <- spark_dataframe(x)
sc <- spark_connection(x)
into_is_set <- cast_scalar_logical(!is.null(into))
# when 'into' is NULL, we auto-generate a names -> index map
if (is.null(into)) {
# determine the length of vector elements (assume all
# elements have the same length as the first) and
# generate indices
indices <- x %>%
head(1) %>%
dplyr::pull(!!rlang::sym(column)) %>%
purrr::list_flatten() %>%
length() %>%
seq_len(.)
names <- sprintf("%s_%i", column, indices)
# construct our into map
into <- as.list(indices)
names(into) <- names
}
# when 'into' is a character vector, generate a default
# names -> index map
if (is.character(into)) {
indices <- seq_along(into)
names <- into
into <- as.list(indices)
names(into) <- names
}
# extract names, indices from map
names <- names(into)
indices <- as.integer(unlist(into, use.names = FALSE)) - 1L
# call split routine
splat <- invoke_static(
sc,
"sparklyr.Utils",
"separateColumn",
sdf,
column,
as.list(names),
as.list(indices),
into_is_set
)
# and give it back
sdf_register(splat)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.