The vignette aims to show examples of using SparkR as an interface to run streaming Spark jobs through R - using the analysisPipelines package. The major use case is that of implementing a pipeline using SparkR dataframes for streaming data.
Using Spark as an engine requires the SparkR package to be installed. SparkR is distributed natively with Apache Spark and is not distributed on CRAN. The SparkR version needs to directly map to the Spark version (hence the native distribution), and care needs to be taken to ensure that this is configured properly.
To install from Github, run the following command, if you know the Spark version:
devtools::install_github('apache/spark@v2.x.x', subdir='R/pkg')
The other option is to install SparkR by running the following terminal commands if Spark has already been installed.
```{bash eval = F} $ export SPARK_HOME=/path/to/spark/directory $ cd $SPARK_HOME/R/lib/SparkR/ $ R -e "devtools::install('.')"
# Initialize libraries * Initialize the analysisPipelines and SparkR libraries * Ensure you have a local installation of Spark and SparkR package is installed * Check if the SPARK_HOME environment variable is set to Spark installation folder. Else, define it using `sys.setenv()` function. ```r knitr::opts_chunk$set( eval = FALSE )
library(analysisPipelines) library(SparkR) ## Define these variables as per the configuration of your machine. The below example is just illustrative. sparkHome <- "/path/to/spark/directory/" sparkMaster <- "local[1]" sparkPackages <- c("org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1") # Set spark home variable if not present if(Sys.getenv("SPARK_HOME") == "") { Sys.setenv(SPARK_HOME = sparkHome) }
sparkPackages <- c("org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1")
sparkRSessionCreateIfNotPresent
functionsparkRSessionCreateIfNotPresent(master = sparkMaster, sparkPackages = sparkPackages)
This example illustrates usage of pipelines for a streaming application. In this use case streaming data is read from Kafka, aggregations are performed and the output is written to the console.
Read streaming data from Kafka.
## Define these variables as per the configuration of your machine. The below example is just illustrative. kafkaBootstrapServers <- "192.168.0.256:9092,192.168.0.257:9092,192.168.0.258:9092" consumerTopic <- "topic1" streamObj <- read.stream(source = "kafka", kafka.bootstrap.servers = kafkaBootstrapServers, subscribe = consumerTopic, startingOffsets="earliest") printSchema(streamObj)
Users can define their own functions and use it as a part of the pipeline. These functions range from data prep, aggregations, casting data to suitable write stream format, etc.
# Function to convert datatype json struct to columns convertStructToDf <- function(streamObj) { streamObj <- SparkR::select(streamObj,list(getField(streamObj$`jsontostructs(value)`,"bannerId"), getField(streamObj$`jsontostructs(value)`,"mobile"), getField(streamObj$`jsontostructs(value)`,"homeAppliance"), getField(streamObj$`jsontostructs(value)`,"gamingConsole"), getField(streamObj$`jsontostructs(value)`,"accessories"), getField(streamObj$`jsontostructs(value)`,"brand"), getField(streamObj$`jsontostructs(value)`,"previousPrice"), getField(streamObj$`jsontostructs(value)`,"currentPrice"), getField(streamObj$`jsontostructs(value)`,"discount"), getField(streamObj$`jsontostructs(value)`,"emi"), getField(streamObj$`jsontostructs(value)`,"crossSale"), getField(streamObj$`jsontostructs(value)`,"customerId"), getField(streamObj$`jsontostructs(value)`,"ts"), getField(streamObj$`jsontostructs(value)`,"click"), getField(streamObj$`jsontostructs(value)`,"conversion"), getField(streamObj$`jsontostructs(value)`,"age"), getField(streamObj$`jsontostructs(value)`,"income"), getField(streamObj$`jsontostructs(value)`,"maritalStatus"), getField(streamObj$`jsontostructs(value)`,"segment"))) colnames(streamObj) <- c("bannerId","mobile","homeAppliance","gamingConsole","accessories","brand","previousPrice","currentPrice", "discount","emi","crossSale","customerId","ts","click","conversion","age","income","maritalStatus","segment") return(streamObj) } # Function to cast columns as string, integer, etc castDfColumns <- function(streamObj) { streamObj <- SparkR::selectExpr(streamObj, "bannerId","mobile","homeAppliance","gamingConsole","accessories","brand", "CAST(previousPrice as INTEGER)","CAST(currentPrice as INTEGER)","CAST(discount as INTEGER)","emi", "crossSale","customerId","ts","CAST(click as INTEGER)","CAST(conversion as INTEGER)", "CAST(age as INTEGER)","CAST(income as INTEGER)","maritalStatus","segment") streamObj$ts <- SparkR::to_timestamp(streamObj$ts,"yyyy-MM-dd HH:mm:ss") return (streamObj) } # Function to convert datatype json struct to columns convertDfToKafkaKeyValuePairs <- function (streamObj, kafkaKey) { streamObj <- SparkR::toJSON(streamObj) streamObj$key <- kafkaKey return(streamObj) } # Function to summarize click stream data globalUiMetrics <- function (streamObj) { ## Aggregation query streamObj <- SparkR::summarize(SparkR::groupBy(streamObj,streamObj$bannerId), impressions=count(streamObj$customerId), clicks=sum(streamObj$click), conversions=sum(streamObj$conversion)) SparkR::colnames(streamObj) <- c("banner_id","impressions","clicks","conversions") return (streamObj) }
In order to use pipelines, a pipeline object needs to be defined. Notice the Spark pipelines are defined using the readInputSpark
function.
Each user-defined function needs to be registered to the pipeline object. Post registration, the function can be used to construct a pipeline. A pipeline can be a pipeline of multiple functions called in a particular sequence.
# Define pipeline object pipelineObj <- analysisPipelines::StreamingAnalysisPipeline(input = streamObj) consumerDataSchema <- structType(structField("bannerId", "string"), structField("mobile", "string"), structField("homeAppliance", "string"), structField("gamingConsole", "string"), structField("accessories", "string"), structField("brand", "string"), structField("previousPrice", "string"), structField("currentPrice", "string"), structField("discount", "string"), structField("emi", "string"), structField("crossSale", "string"), structField("customerId", "string"), structField("ts", "string"), structField("click", "string"), structField("conversion", "string"), structField("age", "string"), structField("income", "string"), structField("maritalStatus", "string"), structField("segment", "string")) # Register user-defined functions registerFunction("convertStructToDf", "", functionType = "streaming", engine = "spark-structured-streaming") registerFunction("castDfColumns", "", functionType = "streaming", engine = "spark-structured-streaming") registerFunction("convertDfToKafkaKeyValuePairs", "", functionType = "streaming", engine = "spark-structured-streaming") getRegistry() # Define pipeline # Do data prep pipelineObj %>% castKafkaStreamAsString_sparkSS() %>% convertKafkaValueFromJson_sparkSS(schema = consumerDataSchema, outAsIn = T) %>% convertStructToDf_sparkSS(outAsIn = T) %>% castDfColumns_sparkSS(outAsIn = T, storeOutput = T) -> pipelineObj pipelineObj %>>% getPipeline pipelineObj %>>% visualizePipeline
The pipeline is run by calling the generateOutput()
function. The output
attribute of the pipeline object contains the resultant Spark dataframe(s).
In this example the Spark DataFrames are converted to R dataframes to help understand the result.
## Run pipeline pipelineObj %>% generateOutput() -> pipelineObj ## Write to output stream streamObj <- pipelineObj %>>% getOutputById("4") streamObj
Currently, streaming pipelines have the limitation that they are able to execute only linear flows as this constrained by Apache Spark Structured Streaming. Non-linear flows can be defined but might throw execution errors in runtime. Also, streaming pipelines can be implemented using only 1 engine i.e. Apache Spark Structured Streaming.
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.