knitr::opts_chunk$set(echo=TRUE, warning=FALSE, message=FALSE, cache=TRUE, error=TRUE)
knitr::knit_hooks$set(document = function(x) { SparkR::sparkR.stop() x })
library(dplyr) library(SparkR) library(SparkRext) sc <- sparkR.init(master="local") sqlContext <- sparkRSQL.init(sc)
This is a fork of the excellent package SparkRext, by @hoxo-m, which enables users to use dplyr NSE style calls for all data wrangling functions. However it's still impossible to use these functions for distributed Spark DataFrame and local R DataFrame at the same time. This fork enables such use case as shown below.
The motivation is that while SparkR provides a powerful interface to transform distributed DataFrame and practice machine learning algorithms, R still excels in small data world such as data visualization, small data aggregation and etc.
Apache Spark is one of the hottest products in data science.
Spark 1.4.0 has formally adopted SparkR package which enables to handle Spark DataFrames on R.(See this article)
SparkR is very useful and powerful.
One of the reasons is that SparkR DataFrames present an API similar to dplyr.
For example:
df <- createDataFrame(sqlContext, iris) df %>% select("Sepal_Length", "Species") %>% filter(df$Sepal_Length >= 5.5) %>% group_by(df$Species) %>% summarize(count = n(df$Sepal_Length), mean = mean(df$Sepal_Length)) %>% collect
This is very cool. But I have a little discontent.
One of the reasons that dplyr is so much popular is the functions adopts NSE(non-standard evaluation).
library(dplyr) iris %>% select(Sepal.Length, Species) %>% filter(Sepal.Length >= 5.5) %>% group_by(Species) %>% summarize(count = n(), mean = mean(Sepal.Length))
It's very smart.
With NSE, you don't need to type quotations or names of DataFrame that the columns belong to.
The package SparkRext have been created to make SparkR be closer to dplyr.
library(dplyr) library(SparkR) library(SparkRext)
library(SparkRext) df <- createDataFrame(sqlContext, iris) df %>% select(Sepal_Length, Species) %>% filter(Sepal_Length >= 5.5) %>% group_by(Species) %>% summarize(count = n(Sepal_Length), mean = mean(Sepal_Length)) %>% collect
SparkRext redefines the functions of SparkR to enable NSE inputs.
As a result, the functions will be able to be used in the same way as dplyr.
You can install the package from there.
install.packages("devtools") # if you have not installed "devtools" package devtools::install_github("saurfang/SparkRext")
SparkRext redefines six functions on SparkR.
filter()
select()
mutate()
arrange()
summarize()
group_by()
Note we only redefine the generic functions for Spark DataFrame so these functions can still be used with local data frame.
In this section, these funcions are explained.
For illustration, let’s prepare data.
library(dplyr) library(nycflights13) library(SparkR) set.seed(123) data <- sample_n(flights, 10000) library(SparkRext) df <- createDataFrame(sqlContext, data.frame(data)) df %>% head
filter()
filter()
is used to extract rows that the conditions specified are satisfied.
df %>% filter(month == 12, day == 31) %>% head
df %>% filter(month == 12 | day == 31) %>% head
Note that filter()
of SparkR cannot accept multiple conditions at once.
select()
select()
is used to extract columns specified.
df %>% select(year, month, day) %>% head
Continuous columns can be extracted using a colon :
.
df %>% select(year:day) %>% head
You can use the minus sign -
to extract columns with the exception of columns specified.
df %>% select(-year, -month, -day) %>% head
You can also extract columns by using column numbers.
df %>% select(1, 2, 3) %>% head
You can use the select utility functions in dplyr such as starts_with()
.
df %>% select(starts_with("arr")) %>% head
All select utility functions is below.
starts_with(match, ignore.case = TRUE)
ends_with(match, ignore.case = TRUE)
contains(match, ignore.case = TRUE)
matches(match, ignore.case = TRUE)
num_range(prefix, range, width = NULL)
one_of(...)
everything()
Note that select()
of SparkR cannot accept a variety of input like this.
mutate()
mutate()
is used to add new columns.
df %>% mutate(gain = arr_delay - dep_delay, speed = distance / air_time * 60) %>% head
Note that mutate()
of SparkR cannot accept multiple input at once.
Furthermore, mutate()
of SparkR cannot also reuse columns added like below.
df %>% mutate(gain = arr_delay - dep_delay, gain_per_hour = gain/(air_time/60)) %>% head
arrange()
arrange()
is used to sort rows by columns specified.
df %>% arrange(month, day) %>% head
It will be sorted in ascending order if you write just column names.
If you want to sort in descending order, you can use desc()
.
df %>% arrange(month, desc(day)) %>% head
You can also sort by values that are transformed from columns.
df %>% arrange(abs(dep_delay)) %>% head
summarize()
summarize()
is used to collapse a DataFrame to a single row.
df %>% summarize(count = n(year)) %>% collect
Typically, summarize()
is used with group_by()
to collapse each group to a single row.
As far as I know, you can use the following functions in summarize()
.
n()
n_distinct()
approxCountDistinct()
mean()
first()
last()
It seems that other aggregate functions are available in Scala (See docs).
Like dplyr, you can use summarise()
instead of simmarize()
.
group_by()
group_by()
is used to describe how to break a DataFrame down into groups of rows.
Usually it is used with summarize()
to collapse each group to a single row.
df %>% group_by(tailnum) %>% summarize(mean_distance = mean(distance)) %>% head
You can indicate multiple colmuns.
df %>% group_by(year, month, day) %>% summarize(count = n(year)) %>% arrange(year, month, day) %>% head
Unlike dplyr, only summarize()
can receive the results of group_by()
.
To install SparkR 1.4.0, the next articles may be useful.
When you can load SparkR package, you will be also able to use SparkRext package.
# Preparation of data library(dplyr) library(nycflights13) set.seed(123) data <- sample_n(flights, 10000) # Load library library(SparkR) library(SparkRext) # Create Spark context and SQL context sc <- sparkR.init(master="local") sqlContext <- sparkRSQL.init(sc) # Create DataFrame df <- createDataFrame(sqlContext, data.frame(data)) # Play with DataFrame result <- df %>% select(year:day, flight, distance) %>% group_by(year, month, day) %>% summarize(flight_mean = mean(flight), distance_mean = mean(distance)) %>% filter(flight_mean >= 2000, distance_mean >= 1000) %>% arrange(year, month, day) %>% collect # Print result head(result)
# Play with DataFrame df %>% select(year:day, flight, distance) %>% group_by(year, month, day) %>% summarize(flight_mean = mean(flight), distance_mean = mean(distance)) %>% filter(flight_mean >= 2000, distance_mean >= 1000) %>% arrange(year, month, day) %>% head
The appeal of SparkR is operating a large scale dataset with familiar R syntax. However it would be a shame if we limit ourselves into relying on SparkR for all data manipulation. For example, R visualization is very powerful and easy to use. You might want to produce a medium size aggregated local data.frame using SparkR and proceed further slice and dice during charting.
For example, we can look average departure delay in a few different dimensions
library(ggplot2) aggDF <- df %>% group_by(year, month, carrier, origin, dest) %>% summarise(n = n(dep_delay), dep_delay = mean(dep_delay)) %>% collect() aggDF %>% group_by(period = as.Date(paste0(year, "-", month, "-01")), carrier) %>% summarise(total = n(), dep_delay = weighted.mean(dep_delay, n)) %>% # keep period/carrier with more than 20 observations filter(total > 20) %>% ggplot(aes(period, dep_delay, color = carrier)) + geom_line() + theme_bw() aggDF %>% group_by(origin, dest) %>% summarise(total = n(), dep_delay = weighted.mean(dep_delay, n)) %>% filter(total > 30) %>% ggplot(aes(origin, dest, fill = dep_delay)) + geom_tile(colour = "white") + scale_fill_gradient(low = "white", high = "steelblue")
As you can see, the seamless transition from large dataframe to small dataframe can be very powerful. Data science is a not big data or small data endeavor. Having the same set of functions that allow us handle both end of the spectrum in the same project can deliver an really enjoyable experience.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.