Analysis pipelines for working with Spark DataFrames for one-time/ batch analyses


Apache Spark can be leveraged to process large volumes of distributed data that are typically impossible to process on standalone R servers. The vignette describes defining and executing Spark-only pipelines using the analysisPipelines package.

Important Note

Using Spark as an engine requires the SparkR package to be installed. SparkR is distributed natively with Apache Spark and is not distributed on CRAN. The SparkR version needs to directly map to the Spark version (hence the native distribution), and care needs to be taken to ensure that this is configured properly.

To install from Github, run the following command, if you know the Spark version:

devtools::install_github('apache/spark@v2.x.x', subdir='R/pkg')

The other option is to install SparkR by running the following terminal commands if Spark has already been installed.

```{bash eval = F} $ export SPARK_HOME=/path/to/spark/directory $ cd $SPARK_HOME/R/lib/SparkR/ $ R -e "devtools::install('.')"

# Initialize libraries

* Load the *analysisPipelines* and *SparkR* libraries
* Check if the  SPARK_HOME environment variable is set to Spark installation folder. Else, define it using `sys.setenv()` function.
    eval = FALSE

## Define these variables as per the configuration of your machine. This is just an example.
sparkHome <- "/Users/naren/softwares/spark-2.3.1-bin-hadoop2.7/"
sparkMaster <- "local[1]"
sparkPackages <- c("org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1")
# Set spark home variable if not present
if(Sys.getenv("SPARK_HOME") == "") {
  Sys.setenv(SPARK_HOME = sparkHome)  

Connect to Spark cluster

sparkRSessionCreateIfNotPresent(master = sparkMaster, sparkPackages = sparkPackages)

Read data from csv and initialize pipeline object

Spark can connect to datasources like Hive, Kafka. Besides, it can also read parquet, json and csv files. In this example we will read a csv file.

inputDataset <- iris

# Replacing '.' in column names with '_' as SparkR is not able to deal with '.' in column names
colnames(inputDataset) <- gsub(".", "_", colnames(inputDataset), fixed = T)

pipelineObj <- AnalysisPipeline(input = iris)

User-defined Spark functions

The example below shows a few functions to perform simple aggregations.

meanByGroup <- function(inputDataset, groupByColumn, colToSummarize) {
  groupSummary <- SparkR::summarize( SparkR::groupBy(inputDataset,inputDataset[[groupByColumn]]),
                       avg =  SparkR::mean(inputDataset[[colToSummarize]]))

Registering user-defined functions to the pipeline object

Each user-defined function needs to be registered to the pipeline object. For non-R engines, such as Spark and Python, a suffix with the engine name is added to the function name on registration. So, functions with this suffix need to be used when pipelining to an Analysis Pipeline object. The engine is added as a suffix for better readability. A suffix is used (as opposed to a prefix) to enable easier auto-completes.

Post registration, the function can be used to construct a pipeline. A pipeline is a set of multiple functions called in a particular sequence.

# Register user-defined functions
registerFunction("meanByGroup", "Mean By Group",
                                                 engine = "spark")

# List all registered functions 

# Define pipeline from list of registered functions
pipelineObj %>% meanByGroup_spark(groupByColumn = "Species", colToSummarize = "Sepal_Length", storeOutput = T) %>% 
                meanByGroup_spark(groupByColumn = "Species", colToSummarize = "Petal_Length", storeOutput = T) -> pipelineObj

pipelineObj %>>% getPipeline
pipelineObj %>>% visualizePipeline

Running the pipeline and generating an output

The pipeline is run by calling the generateOutput() function. A particular output in the sequence on evaluations can be accessed by calling the getOutputById function

pipelineObj %>% generateOutput -> pipelineObj

sepalLengthBySpecies <- pipelineObj %>>% getOutputById(1)
sepalLengthBySpeciesDf <-
DT::datatable(head(sepalLengthBySpeciesDf),options = list(scrollX = T, scrollY = T))

petalLengthBySpecies <- pipelineObj %>>% getOutputById(2)
petalLengthBySpeciesDf <-
DT::datatable(head(petalLengthBySpeciesDf),options = list(scrollX = T, scrollY = T))

Supplementary Note

The analysisPipelines package internally uses the SparkR package to interface with Spark. SparkR masks many typical data manipulation and processing functions from base as well as packages like dplyr. Therefore, ensure you use function scoping when calling a function.

Try the analysisPipelines package in your browser

Any scripts or data that you put into this service are public.

analysisPipelines documentation built on July 1, 2020, 7:09 p.m.