rmr2 package allows the R to use Hadoop MapReduce. rmr2 is due to two reasons. SparkR was released as a separated component of Spark in 2014. They are still in the process of deciding on the API of SparkR.
SparkR.SparkR package.SPARK_HOME. This can be in done in our bashrc file, or in R itself, via
r
Sys.setenv(SPARK_HOME = "/path/to/spark/")
* Then we load the SparkR package
r
library("SparkR")
Next we initialise the Spark cluster and create a SparkContext
sc = sparkR.init(master = "local")
sparkR.init function has number of arguments. sc = sparkR.init(sparkPackages = "com.databricks:spark-csv_2.11:1.0.3")
When we finish our Spark session, we should terminate the Spark context via
sparkR.stop()
r
## Note :::
moby = SparkR:::textFile(sc, "data/moby_dick.txt")scIn any case, the moby object is an RDD object
R > moby # MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:-2
Moby from our data set.get_moby that only returns TRUE or FALSE. filterRDD function then retains any rows that are TRUE, i.e.
r
get_moby = function(x)
"Moby" %in% strsplit(x, split = " ")[[1]]
mobys = SparkR:::filterRDD(moby, get_moby)apply family. get_moby to be efficient!If we want to know how many rows contain the word Moby, we use the count function
## The answer is 77 count(mobys)
dplyr, Spark uses lazy evaluation: it only performs the computation when it is used by an action. textFile and filterRDD commands are run only when we use count. dplyr recognize that lazy evaluation is essential when working with big data. textFile straight away, this would use up a load of disk space. Spark's RDDs are (by default) recomputed each time you run an action on them.\sidenote{Alternatively, we could use \texttt{cache(moby)}, which is the same as \texttt{persist} with the default level of storage.} To reuse RDDs in multiple operations we can ask Spark to persist it via
## There are different levels of storage persist(mobys, "MEMORY_ONLY")
If you are not planning on reusing the object, don't use persist.
To summarise, every Spark session will have a similar structure.
persist for efficiency.count.parallelize function. 1:100, we would usevec_sp = SparkR:::parallelize(sc, 1:100)
As before, we don't actually compute vec_sp until it is needed,
count(vec_sp)
Suppose we have already initialised a Spark context. To use Spark data frames, we need to create an SQLContext, via
sql_context = sparkRSQL.init(sc)
The SQLContext enables us to create data frames from a local R data frame,
chicks_sp = createDataFrame(sql_context, chickwts)
\noindent or from other data sources, such as CSV files or a 'Hive table'. If we examine the newly created object, we get
R > chicks_sp # DataFrame[weight:double, feed:string]
\noindent An S3 method for head is also available, so
R > head(chicks_sp, 2) # weight feed #1 179 horsebean #2 160 horsebean
gives what we would expect. We can extract columns using the dollar notation, chicks_sp$weight or using select
R > select(chicks_sp, "weight") # DataFrame[weight:double]
\noindent We can subset or filter the data frame using the filter function
filter(chicks_sp, chicks_sp$feed == "horsebean")
\noindent Using Spark data frames, we can also easily group and aggregate data frames. (Note this is similar to the dplyr syntax). For example, to count the number of chicks in each feed group, we group, and then summarise:
chicks_cnt = groupBy(chicks_sp, chicks_sp$feed) %>% summarize(count = n(chicks_sp$feed))
Then use head to view the top rows
head(chicks_cnt, 2) # feed count #1 casein 12 #2 meatmeal 11
We can also use arrange the data by the most common group
arrange(chicks_cnt, desc(chicks_cnt$count))
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.