Apache Spark can be leveraged to process large volumes of distributed data that are typically impossible to process on standalone R servers. The vignette describes defining and executing Spark-only pipelines using the analysisPipelines package.
Using Spark as an engine requires the SparkR package to be installed. SparkR is distributed natively with Apache Spark and is not distributed on CRAN. The SparkR version needs to directly map to the Spark version (hence the native distribution), and care needs to be taken to ensure that this is configured properly.
To install from Github, run the following command, if you know the Spark version:
devtools::install_github('apache/spark@v2.x.x', subdir='R/pkg')
The other option is to install SparkR by running the following terminal commands if Spark has already been installed.
```{bash eval = F} $ export SPARK_HOME=/path/to/spark/directory $ cd $SPARK_HOME/R/lib/SparkR/ $ R -e "devtools::install('.')"
# Initialize libraries * Load the *analysisPipelines* and *SparkR* libraries * Check if the SPARK_HOME environment variable is set to Spark installation folder. Else, define it using `sys.setenv()` function. ```r knitr::opts_chunk$set( eval = FALSE )
library(ggplot2) library(analysisPipelines) library(SparkR) ## Define these variables as per the configuration of your machine. This is just an example. sparkHome <- "/Users/naren/softwares/spark-2.3.1-bin-hadoop2.7/" sparkMaster <- "local[1]" sparkPackages <- c("org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1") # Set spark home variable if not present if(Sys.getenv("SPARK_HOME") == "") { Sys.setenv(SPARK_HOME = sparkHome) }
sparkPackages <- c("org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1")
sparkRSessionCreateIfNotPresent
functionsparkRSessionCreateIfNotPresent(master = sparkMaster, sparkPackages = sparkPackages)
Spark can connect to datasources like Hive, Kafka. Besides, it can also read parquet, json and csv files. In this example we will read a csv file.
inputDataset <- iris # Replacing '.' in column names with '_' as SparkR is not able to deal with '.' in column names colnames(inputDataset) <- gsub(".", "_", colnames(inputDataset), fixed = T) pipelineObj <- AnalysisPipeline(input = iris)
The example below shows a few functions to perform simple aggregations.
meanByGroup <- function(inputDataset, groupByColumn, colToSummarize) { groupSummary <- SparkR::summarize( SparkR::groupBy(inputDataset,inputDataset[[groupByColumn]]), avg = SparkR::mean(inputDataset[[colToSummarize]])) return(groupSummary) }
Each user-defined function needs to be registered to the pipeline object. For non-R engines, such as Spark and Python, a suffix with the engine name is added to the function name on registration. So, functions with this suffix need to be used when pipelining to an Analysis Pipeline object. The engine is added as a suffix for better readability. A suffix is used (as opposed to a prefix) to enable easier auto-completes.
Post registration, the function can be used to construct a pipeline. A pipeline is a set of multiple functions called in a particular sequence.
# Register user-defined functions registerFunction("meanByGroup", "Mean By Group", engine = "spark") # List all registered functions getRegistry() # Define pipeline from list of registered functions pipelineObj %>% meanByGroup_spark(groupByColumn = "Species", colToSummarize = "Sepal_Length", storeOutput = T) %>% meanByGroup_spark(groupByColumn = "Species", colToSummarize = "Petal_Length", storeOutput = T) -> pipelineObj pipelineObj %>>% getPipeline pipelineObj %>>% visualizePipeline
The pipeline is run by calling the generateOutput()
function. A particular output in the sequence on evaluations can be accessed by calling the getOutputById
function
pipelineObj %>% generateOutput -> pipelineObj sepalLengthBySpecies <- pipelineObj %>>% getOutputById(1) sepalLengthBySpeciesDf <- as.data.frame(sepalLengthBySpecies) DT::datatable(head(sepalLengthBySpeciesDf),options = list(scrollX = T, scrollY = T)) petalLengthBySpecies <- pipelineObj %>>% getOutputById(2) petalLengthBySpeciesDf <- as.data.frame(petalLengthBySpecies) DT::datatable(head(petalLengthBySpeciesDf),options = list(scrollX = T, scrollY = T))
The analysisPipelines package internally uses the SparkR package to interface with Spark. SparkR masks many typical data manipulation and processing functions from base as well as packages like dplyr. Therefore, ensure you use function scoping when calling a function.
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.