rmr2
package allows the R to use Hadoop MapReduce. rmr2
is due to two reasons. SparkR
was released as a separated component of Spark in 2014. They are still in the process of deciding on the API of SparkR
.
SparkR
.SparkR
package.SPARK_HOME
. This can be in done in our bashrc
file, or in R itself, via
r
Sys.setenv(SPARK_HOME = "/path/to/spark/")
* Then we load the SparkR
package
r
library("SparkR")
Next we initialise the Spark cluster and create a SparkContext
sc = sparkR.init(master = "local")
sparkR.init
function has number of arguments. sc = sparkR.init(sparkPackages = "com.databricks:spark-csv_2.11:1.0.3")
When we finish our Spark session, we should terminate the Spark context via
sparkR.stop()
r
## Note :::
moby = SparkR:::textFile(sc, "data/moby_dick.txt")
sc
In any case, the moby
object is an RDD object
R > moby # MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:-2
Moby
from our data set.get_moby
that only returns TRUE
or FALSE
. filterRDD
function then retains any rows that are TRUE
, i.e.
r
get_moby = function(x)
"Moby" %in% strsplit(x, split = " ")[[1]]
mobys = SparkR:::filterRDD(moby, get_moby)
apply
family. get_moby
to be efficient!If we want to know how many rows contain the word Moby
, we use the count function
## The answer is 77 count(mobys)
dplyr
, Spark uses lazy evaluation: it only performs the computation when it is used by an action. textFile
and filterRDD
commands are run only when we use count
. dplyr
recognize that lazy evaluation is essential when working with big data. textFile
straight away, this would use up a load of disk space. Spark's RDDs are (by default) recomputed each time you run an action on them.\sidenote{Alternatively, we could use \texttt{cache(moby)}, which is the same as \texttt{persist} with the default level of storage.} To reuse RDDs in multiple operations we can ask Spark to persist it via
## There are different levels of storage persist(mobys, "MEMORY_ONLY")
If you are not planning on reusing the object, don't use persist.
To summarise, every Spark session will have a similar structure.
persist
for efficiency.count
.parallelize
function. 1:100
, we would usevec_sp = SparkR:::parallelize(sc, 1:100)
As before, we don't actually compute vec_sp
until it is needed,
count(vec_sp)
Suppose we have already initialised a Spark context. To use Spark data frames, we need to create an SQLContext
, via
sql_context = sparkRSQL.init(sc)
The SQLContext
enables us to create data frames from a local R data frame,
chicks_sp = createDataFrame(sql_context, chickwts)
\noindent or from other data sources, such as CSV files or a 'Hive table'. If we examine the newly created object, we get
R > chicks_sp # DataFrame[weight:double, feed:string]
\noindent An S3 method for head
is also available, so
R > head(chicks_sp, 2) # weight feed #1 179 horsebean #2 160 horsebean
gives what we would expect. We can extract columns using the dollar notation, chicks_sp$weight
or using select
R > select(chicks_sp, "weight") # DataFrame[weight:double]
\noindent We can subset or filter the data frame using the filter
function
filter(chicks_sp, chicks_sp$feed == "horsebean")
\noindent Using Spark data frames, we can also easily group and aggregate data frames. (Note this is similar to the dplyr
syntax). For example, to count the number of chicks in each feed group, we group, and then summarise:
chicks_cnt = groupBy(chicks_sp, chicks_sp$feed) %>% summarize(count = n(chicks_sp$feed))
Then use head
to view the top rows
head(chicks_cnt, 2) # feed count #1 casein 12 #2 meatmeal 11
We can also use arrange the data by the most common group
arrange(chicks_cnt, desc(chicks_cnt$count))
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.