knitr::opts_chunk$set(echo=TRUE, warning=FALSE, message=FALSE, cache=TRUE, error=TRUE)
knitr::knit_hooks$set(document = function(x) {
  SparkR::sparkR.stop()
  x
})
library(dplyr)
library(SparkR)
library(SparkRext)
sc <- sparkR.init(master="local")
sqlContext <- sparkRSQL.init(sc)

This is a fork of the excellent package SparkRext, by @hoxo-m, which enables users to use dplyr NSE style calls for all data wrangling functions. However it's still impossible to use these functions for distributed Spark DataFrame and local R DataFrame at the same time. This fork enables such use case as shown below.

The motivation is that while SparkR provides a powerful interface to transform distributed DataFrame and practice machine learning algorithms, R still excels in small data world such as data visualization, small data aggregation and etc.

Overview

Apache Spark is one of the hottest products in data science.
Spark 1.4.0 has formally adopted SparkR package which enables to handle Spark DataFrames on R.(See this article)

SparkR is very useful and powerful.
One of the reasons is that SparkR DataFrames present an API similar to dplyr.

For example:

df <- createDataFrame(sqlContext, iris)
df %>%
  select("Sepal_Length", "Species") %>%
  filter(df$Sepal_Length >= 5.5) %>%
  group_by(df$Species) %>%
  summarize(count = n(df$Sepal_Length), mean = mean(df$Sepal_Length)) %>%
  collect  

This is very cool. But I have a little discontent.

One of the reasons that dplyr is so much popular is the functions adopts NSE(non-standard evaluation).

library(dplyr)
iris %>%
  select(Sepal.Length, Species) %>%
  filter(Sepal.Length >= 5.5) %>%
  group_by(Species) %>%
  summarize(count = n(), mean = mean(Sepal.Length))

It's very smart.
With NSE, you don't need to type quotations or names of DataFrame that the columns belong to.

The package SparkRext have been created to make SparkR be closer to dplyr.

library(dplyr)
library(SparkR)
library(SparkRext)
library(SparkRext)
df <- createDataFrame(sqlContext, iris)
df %>%
  select(Sepal_Length, Species) %>%
  filter(Sepal_Length >= 5.5) %>%
  group_by(Species) %>%
  summarize(count = n(Sepal_Length), mean = mean(Sepal_Length)) %>%
  collect  

SparkRext redefines the functions of SparkR to enable NSE inputs.
As a result, the functions will be able to be used in the same way as dplyr.

How to install

You can install the package from there.

install.packages("devtools") # if you have not installed "devtools" package
devtools::install_github("saurfang/SparkRext")

3. Functions

SparkRext redefines six functions on SparkR.

Note we only redefine the generic functions for Spark DataFrame so these functions can still be used with local data frame.

In this section, these funcions are explained.

For illustration, let’s prepare data.

library(dplyr)
library(nycflights13)
library(SparkR)

set.seed(123)
data <- sample_n(flights, 10000)

library(SparkRext)

df <- createDataFrame(sqlContext, data.frame(data))
df %>% head

filter()

filter() is used to extract rows that the conditions specified are satisfied.

df %>% filter(month == 12, day == 31) %>% head
df %>% filter(month == 12 | day == 31) %>% head

Note that filter() of SparkR cannot accept multiple conditions at once.

select()

select() is used to extract columns specified.

df %>% select(year, month, day) %>% head

Continuous columns can be extracted using a colon :.

df %>% select(year:day) %>% head

You can use the minus sign - to extract columns with the exception of columns specified.

df %>% select(-year, -month, -day) %>% head

You can also extract columns by using column numbers.

df %>% select(1, 2, 3) %>% head

You can use the select utility functions in dplyr such as starts_with().

df %>% select(starts_with("arr")) %>% head

All select utility functions is below.

Note that select() of SparkR cannot accept a variety of input like this.

mutate()

mutate() is used to add new columns.

df %>% mutate(gain = arr_delay - dep_delay, speed = distance / air_time * 60) %>% head

Note that mutate() of SparkR cannot accept multiple input at once.
Furthermore, mutate() of SparkR cannot also reuse columns added like below.

df %>% mutate(gain = arr_delay - dep_delay, gain_per_hour = gain/(air_time/60)) %>% head

arrange()

arrange() is used to sort rows by columns specified.

df %>% arrange(month, day) %>% head

It will be sorted in ascending order if you write just column names.
If you want to sort in descending order, you can use desc().

df %>% arrange(month, desc(day)) %>% head

You can also sort by values that are transformed from columns.

df %>% arrange(abs(dep_delay)) %>% head

summarize()

summarize() is used to collapse a DataFrame to a single row.

df %>% summarize(count = n(year)) %>% collect

Typically, summarize() is used with group_by() to collapse each group to a single row.

As far as I know, you can use the following functions in summarize().

It seems that other aggregate functions are available in Scala (See docs).

Like dplyr, you can use summarise() instead of simmarize().

group_by()

group_by() is used to describe how to break a DataFrame down into groups of rows.
Usually it is used with summarize() to collapse each group to a single row.

df %>% 
  group_by(tailnum) %>%
  summarize(mean_distance = mean(distance)) %>% 
  head

You can indicate multiple colmuns.

df %>% 
  group_by(year, month, day) %>%
  summarize(count = n(year)) %>% 
  arrange(year, month, day) %>%
  head

Unlike dplyr, only summarize() can receive the results of group_by().

How to use

To install SparkR 1.4.0, the next articles may be useful.

When you can load SparkR package, you will be also able to use SparkRext package.

# Preparation of data
library(dplyr)
library(nycflights13)
set.seed(123)
data <- sample_n(flights, 10000)

# Load library
library(SparkR)
library(SparkRext)

# Create Spark context and SQL context
sc <- sparkR.init(master="local")
sqlContext <- sparkRSQL.init(sc)

# Create DataFrame
df <- createDataFrame(sqlContext, data.frame(data))

# Play with DataFrame
result <- df %>%
  select(year:day, flight, distance) %>%
  group_by(year, month, day) %>%
  summarize(flight_mean = mean(flight), distance_mean = mean(distance)) %>%
  filter(flight_mean >= 2000, distance_mean >= 1000) %>%
  arrange(year, month, day) %>%
  collect

# Print result
head(result)
# Play with DataFrame
df %>%
  select(year:day, flight, distance) %>%
  group_by(year, month, day) %>%
  summarize(flight_mean = mean(flight), distance_mean = mean(distance)) %>%
  filter(flight_mean >= 2000, distance_mean >= 1000) %>%
  arrange(year, month, day) %>%
  head

Interoperability between SparkR and dplyr

The appeal of SparkR is operating a large scale dataset with familiar R syntax. However it would be a shame if we limit ourselves into relying on SparkR for all data manipulation. For example, R visualization is very powerful and easy to use. You might want to produce a medium size aggregated local data.frame using SparkR and proceed further slice and dice during charting.

For example, we can look average departure delay in a few different dimensions

library(ggplot2)

aggDF <- df %>%
  group_by(year, month, carrier, origin, dest) %>%
  summarise(n = n(dep_delay), dep_delay = mean(dep_delay)) %>%
  collect()

aggDF %>%
  group_by(period = as.Date(paste0(year, "-", month, "-01")), carrier) %>%
  summarise(total = n(), dep_delay = weighted.mean(dep_delay, n)) %>%
  # keep period/carrier with more than 20 observations
  filter(total > 20) %>%
  ggplot(aes(period, dep_delay, color = carrier)) +
  geom_line() +
  theme_bw()

aggDF %>%
  group_by(origin, dest) %>%
  summarise(total = n(), dep_delay = weighted.mean(dep_delay, n)) %>%
  filter(total > 30) %>%
  ggplot(aes(origin, dest, fill = dep_delay)) +
  geom_tile(colour = "white") +
  scale_fill_gradient(low = "white", high = "steelblue")

As you can see, the seamless transition from large dataframe to small dataframe can be very powerful. Data science is a not big data or small data endeavor. Having the same set of functions that allow us handle both end of the spectrum in the same project can deliver an really enjoyable experience.



saurfang/SparkRext documentation built on May 29, 2019, 3:19 p.m.