detach("package:SparkR")
suppressPackageStartupMessages(library(dplyr))
suppressPackageStartupMessages(library(nycflights13))
suppressPackageStartupMessages(library(SparkR))
library(SparkRext)
sc <- sparkR.init(master="local")
sqlContext <- sparkRSQL.init(sc)
on.exit({
message("Stopping Spark...")
sparkR.stop()
message("OK.")
})
set.seed(123)
data <- sample_n(flights, 10000)
df <- createDataFrame(sqlContext, data.frame(data))
context("filter")
test_that("one condition", {
result <- filter(df, month == 12) %>% count
expect_equal(result, 883)
})
test_that("two conditions", {
result <- filter(df, month == 12, day == 31) %>% count
expect_equal(result, 27)
})
test_that("no conditions", {
expect_equal(filter(df), df)
})
test_that("use variable", {
x <- 12
result <- filter(df, month == x) %>% count
expect_equal(result, 883)
})
test_that("use variable same name", {
x <- list(month = 12)
result <- filter(df, month == x$month) %>% count
expect_equal(result, 883)
})
test_that("with pipe", {
result <- df %>% filter(month == 12) %>% count
expect_equal(result, 883)
})
context("select")
test_that("one column", {
result <- select(df, year)
expect_equal(columns(result), "year")
})
test_that("two column", {
result <- select(df, year, day)
expect_equal(columns(result), c("year", "day"))
})
test_that("column sequence", {
result <- select(df, year:day)
expect_equal(columns(result), c("year", "month", "day"))
})
test_that("one minus column", {
result <- select(df, -year)
act <- Filter(function(col) col != "year", columns(result))
expect_equal(columns(result), act)
})
test_that("two minus columns", {
result <- select(df, -year, -day)
act <- Filter(Negate(function(col) col %in% c("year", "day")), columns(result))
expect_equal(columns(result), act)
})
test_that("minus column sequence", {
result <- select(df, -(year:day))
act <- Filter(Negate(function(col) col %in% c("year", "month", "day")), columns(result))
expect_equal(columns(result), act)
})
test_that("one number column", {
result <- select(df, 1)
expect_equal(columns(result), "year")
})
test_that("two number columns", {
result <- select(df, 1, 3)
expect_equal(columns(result), c("year", "day"))
})
test_that("number column sequence", {
result <- select(df, 1:3)
expect_equal(columns(result), c("year", "month", "day"))
})
test_that("one minus number column", {
result <- select(df, -1)
act <- Filter(function(col) col != "year", columns(result))
expect_equal(columns(result), act)
})
test_that("two minus number columns", {
result <- select(df, -1, -3)
act <- Filter(Negate(function(col) col %in% c("year", "day")), columns(result))
expect_equal(columns(result), act)
})
test_that("minus number column sequence", {
result <- select(df, -(1:3))
act <- Filter(Negate(function(col) col %in% c("year", "month", "day")), columns(result))
expect_equal(columns(result), act)
})
test_that("character input", {
result <- select(df, "year")
expect_equal(columns(result), "year")
})
test_that("with pipe", {
result <- df %>% select(year)
expect_equal(columns(result), "year")
})
context("mutate")
test_that("add one column", {
result <- mutate(df, gain = arr_delay - dep_delay)
act <- c(columns(df), "gain")
expect_equal(SparkR::columns(result), act)
})
test_that("add two columns", {
result <- mutate(df, gain = arr_delay - dep_delay, gain_per_hour = gain/(air_time/60))
act <- c(columns(df), "gain", "gain_per_hour")
expect_equal(SparkR::columns(result), act)
})
test_that("with pipe", {
result <- df %>% mutate(gain = arr_delay - dep_delay)
act <- c(columns(df), "gain")
expect_equal(SparkR::columns(result), act)
})
test_that("with pipe2", {
result <- df %>% mutate(gain = arr_delay - dep_delay, gain_per_hour = gain/(air_time/60))
act <- c(columns(df), "gain", "gain_per_hour")
expect_equal(SparkR::columns(result), act)
})
context("arrange")
test_that("one variable", {
result <- arrange(df, month) %>% head
expect_equal(result$month, rep(1, 6))
})
test_that("two variables", {
result <- arrange(df, month, day) %>% head
expect_equal(result$month, rep(1, 6))
expect_equal(result$day, rep(1, 6))
})
test_that("use desc()", {
result <- arrange(df, desc(month)) %>% head
expect_equal(result$month, rep(12, 6))
})
test_that("use other function", {
result <- arrange(filter(df, isNotNull(dep_delay)), abs(dep_delay)) %>% head
expect_equal(result$dep_delay, rep(0, 6))
})
test_that("with pipe", {
result <- df %>% arrange(month) %>% head
expect_equal(result$month, rep(1, 6))
})
context("summarize")
test_that("one column", {
result <- summarize(df, size=n(distance)) %>% collect
expect_equal(result$size, 10000)
})
test_that("two columns", {
result <- summarize(df, months=n_distinct(month), days=n_distinct(day)) %>% collect
expect_equal(result$months, 12)
expect_equal(result$days, 31)
})
test_that("first()", {
result <- summarize(df, first_distance=first(distance)) %>% collect
expect_equal(result$first_distance, 488)
})
test_that("last()", {
result <- summarize(df, last_distance=last(distance)) %>% collect
expect_equal(result$last_distance, 1400)
})
test_that("with pipe", {
result <- df %>% summarize(size=n(distance)) %>% collect
expect_equal(result$size, 10000)
})
context("group_by")
test_that("check class", {
grouped_data <- group_by(df, tailnum)
grouped_data_class <- class(grouped_data$grouped_data)
attributes(grouped_data_class) <- NULL
expect_equal(class(grouped_data), "SparkGroupedData")
expect_equal(grouped_data_class, "GroupedData")
expect_equal(grouped_data$DataFrame, df)
})
test_that("one column", {
grouped_data <- group_by(df, tailnum)
result <- summarize(grouped_data, size=n(distance)) %>% arrange(tailnum) %>% head
expect_equal(result$tailnum, c("", "N0EGMQ", "N10156", "N102UW", "N103US", "N104UW"))
expect_equal(result$size, c(80, 15, 6, 2, 1, 1))
})
test_that("three columns", {
grouped_data <- group_by(df, year, month, day)
result <- summarize(grouped_data, size=n(distance)) %>% arrange(year, month, day) %>% head
expect_equal(result$year, c(2013, 2013, 2013, 2013, 2013, 2013))
expect_equal(result$month, c(1, 1, 1, 1, 1, 1))
expect_equal(result$day, c(1, 2, 3, 4, 5, 6))
expect_equal(result$size, c(25, 29, 24, 30, 16, 35))
})
test_that("with pipe", {
result <- df %>%
group_by(tailnum) %>%
summarize(size=n(distance)) %>%
arrange(tailnum) %>%
head
expect_equal(result$tailnum, c("", "N0EGMQ", "N10156", "N102UW", "N103US", "N104UW"))
expect_equal(result$size, c(80, 15, 6, 2, 1, 1))
})
context("combination")
test_that("combination", {
result <- df %>%
select(year:day, flight, distance) %>%
group_by(year, month, day) %>%
summarize(flight_mean = SparkR::mean(flight), distance_mean = SparkR::mean(distance)) %>%
filter(flight_mean >= 2000, distance_mean >= 1000) %>%
arrange(year, month, day) %>%
head
expect_equal(result$year, c(2013, 2013, 2013, 2013, 2013, 2013))
expect_equal(result$month, c(1, 1, 2, 3, 3, 3))
expect_equal(result$day, c(13, 24, 25, 5, 8, 9))
})
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.