skip_connection("spark-apply-ext")
skip_on_livy()
test_requires("dplyr")
sc <- testthat_spark_connection()
iris_tbl <- testthat_tbl("iris")
dates <- data.frame(dates = c(as.Date("2015/12/19"), as.Date(NA), as.Date("2015/12/19")))
dates_tbl <- testthat_tbl("dates")
colnas <- data.frame(c1 = c("A", "B"), c2 = c(NA, NA))
colnas_tbl <- testthat_tbl("colnas")
test_that("'spark_apply' can filter columns", {
expect_equivalent(
iris_tbl %>% spark_apply(function(e) e[1:1]) %>% collect(),
iris_tbl %>% select(Sepal_Length) %>% collect()
)
})
test_that("'spark_apply' can add columns", {
expect_equivalent(
iris_tbl %>% spark_apply(function(e) cbind(e, 1), names = c(colnames(iris_tbl), "new")) %>% collect(),
iris_tbl %>% mutate(new = 1) %>% collect()
)
})
test_that("'spark_apply' can concatenate", {
expect_equivalent(
iris_tbl %>% spark_apply(function(e) apply(e, 1, paste, collapse = " "), names = "s") %>% collect(),
iris_tbl %>% transmute(s = paste(Sepal_Length, Sepal_Width, Petal_Length, Petal_Width, Species)) %>% collect()
)
})
test_that("'spark_apply' can filter", {
expect_equivalent(
iris_tbl %>% spark_apply(function(e) e[e$Species == "setosa", ]) %>% collect(),
iris_tbl %>% filter(Species == "setosa") %>% collect()
)
})
test_that("'spark_apply' works with 'sdf_repartition'", {
id <- random_string("id")
expect_equivalent(
iris_tbl %>%
sdf_with_sequential_id(id) %>%
sdf_repartition(2L) %>%
spark_apply(function(e) e) %>%
collect() %>%
arrange(!!rlang::sym(id)),
iris_tbl %>%
sdf_with_sequential_id(id) %>%
collect()
)
})
test_that("'spark_apply' works with 'group_by' over multiple columns", {
iris_tbl_ints <- iris_tbl %>%
mutate(Petal_Width_Int = as.integer(Petal_Width))
grouped_lm <- spark_apply(
iris_tbl_ints,
function(e, species, petal_width) {
lm(Petal_Width ~ Petal_Length, e)$coefficients[["(Intercept)"]]
},
names = "Intercept",
group_by = c("Species", "Petal_Width_Int")
) %>% collect()
iris_int <- iris %>% mutate(
Petal_Width_Int = as.integer(Petal.Width),
GroupBy = paste(Species, Petal_Width_Int, sep = "|")
)
lapply(
unique(iris_int$GroupBy),
function(group_by_entry) {
parts <- strsplit(group_by_entry, "\\|")
species_test <- parts[[1]][[1]]
petal_width_test <- as.integer(parts[[1]][[2]])
expect_equal(
grouped_lm[grouped_lm$Species == species_test & grouped_lm$Petal_Width_Int == petal_width_test, ]$Intercept,
lm(Petal.Width ~ Petal.Length, iris_int[iris_int$Species == species_test & iris_int$Petal_Width_Int == petal_width_test, ])$coefficients[["(Intercept)"]]
)
}
)
})
test_that("'spark_apply' works over empty partitions", {
skip_slow("takes too long to measure coverage")
expect_equal(
sdf_len(sc, 2, repartition = 4) %>%
spark_apply(function(e) e) %>%
collect() %>%
as.data.frame(),
data.frame(id = seq_len(2))
)
})
test_that("'spark_apply' works over 'tryCatch'", {
skip_slow("takes too long to measure coverage")
expect_equal(
sdf_len(sc, 1) %>%
spark_apply(function(e) {
tryCatch(
{
stop("x")
},
error = function(e) {
100
}
)
}) %>%
pull() %>%
as.integer(),
100
)
})
test_that("'spark_apply' can filter data.frame", {
skip_slow("takes too long to measure coverage")
expect_equal(
sdf_len(sc, 10) %>%
spark_apply(function(e) as.data.frame(e[e$id > 1, ])) %>%
collect() %>%
nrow(),
9
)
})
test_that("'spark_apply' can filter using dplyr", {
skip_slow("takes too long to measure coverage")
expect_equal(
sdf_len(sc, 10) %>%
spark_apply(function(e) dplyr::filter(e, id > 1)) %>%
collect() %>%
as.data.frame(),
data.frame(id = c(2:10))
)
})
test_that("'spark_apply' can return 'NA's", {
skip_slow("takes too long to measure coverage")
expect_equal(
dates_tbl %>%
spark_apply(function(e) e) %>%
collect() %>%
nrow(),
nrow(dates)
)
})
test_that("'spark_apply' can return 'NA's for dates", {
skip_slow("takes too long to measure coverage")
expect_equal(
sdf_len(sc, 1) %>%
spark_apply(function(e) data.frame(dates = c(as.Date("2001/1/1"), NA))) %>%
collect() %>%
nrow(),
2
)
})
test_that("'spark_apply' can roundtrip dates", {
skip_slow("takes too long to measure coverage")
expect_equal(
dates_tbl %>%
spark_apply(function(e) as.Date(e[[1]], origin = "1970-01-01")) %>%
spark_apply(function(e) e) %>%
collect() %>%
pull() %>%
class(),
"Date"
)
})
test_that("'spark_apply' can roundtrip Date-Time", {
skip("Research issue with test failure (#3341)")
skip_slow("takes too long to measure coverage")
expect_equal(
dates_tbl %>%
spark_apply(function(e) as.POSIXct(e[[1]], origin = "1970-01-01")) %>%
spark_apply(function(e) e) %>%
collect() %>%
pull() %>%
class() %>%
first(),
"POSIXct"
)
})
test_that("'spark_apply' supports grouped empty results", {
skip_slow("takes too long to measure coverage")
process_data <- function(DF, exclude) {
DF <- subset(DF, select = colnames(DF)[!colnames(DF) %in% exclude])
DF[complete.cases(DF), ]
}
data <- data.frame(
grp = rep(c("A", "B", "C"), each = 5),
x1 = 1:15,
x2 = c(1:9, rep(NA, 6)),
stringsAsFactors = FALSE
)
data_spark <- sdf_copy_to(sc, data, "grp_data", memory = TRUE, overwrite = TRUE)
collected <- data_spark %>%
spark_apply(
process_data,
group_by = "grp",
columns = c("x1", "x2"),
packages = FALSE,
context = {
exclude <- "grp"
}
) %>%
collect()
expect_equivalent(
collected %>% arrange(x1),
data %>%
group_by(grp) %>%
do(process_data(., exclude = "grp")) %>%
arrange(x1)
)
})
test_that("'spark_apply' can use anonymous functions", {
skip_slow("takes too long to measure coverage")
expect_equal(
sdf_len(sc, 3) %>% spark_apply(~ .x + 1) %>% collect(),
tibble(id = c(2, 3, 4))
)
})
test_that("'spark_apply' can apply function with 'NA's column", {
skip_slow("takes too long to measure coverage")
if (spark_version(sc) < "2.0.0") skip("automatic column types supported in Spark 2.0+")
expect_equivalent(
colnas_tbl %>% mutate(c2 = as.integer(c2)) %>% spark_apply(~ class(.x[[2]])) %>% pull(),
"integer"
)
expect_equivalent(
colnas_tbl %>%
mutate(c2 = as.integer(c2)) %>%
spark_apply(~ dplyr::mutate(.x, c1 = tolower(c1))) %>%
collect(),
colnas_tbl %>%
mutate(c2 = as.integer(c2)) %>%
mutate(c1 = tolower(c1)) %>%
collect()
)
})
test_that("can infer R package dependencies", {
fn1 <- function(x) {
library(utf8)
x + 1
}
expect_true("utf8" %in% sparklyr:::infer_required_r_packages(fn1))
fn2 <- function(x) {
require(utf8)
x + 2
}
expect_true("utf8" %in% sparklyr:::infer_required_r_packages(fn2))
fn3 <- function(x) {
requireNamespace("utf8")
x + 3
}
expect_true("utf8" %in% sparklyr:::infer_required_r_packages(fn3))
fn4 <- function(x) {
library("sparklyr", quietly = FALSE)
x + 4
}
expected_deps <- tools::package_dependencies(
"sparklyr", db = installed.packages(), recursive = TRUE
)
testthat::expect_setequal(
union(expected_deps$sparklyr, c("base", "sparklyr")),
sparklyr:::infer_required_r_packages(fn4)
)
})
test_clear_cache()
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.