#RSparkling Demo
#This demo demonstrates the following:
#1.How to set up your R environment for RSparkling
#2.Sparklyr/Spark for data munging
#3.H2O for model building
#4.Bunding the previous two together in a Shiny Application
#Problem Statement:
#Given that your flight was delayed by 15 minutes or more, what is the likelihood your airline carrier will make up time in route?
#Some of the most signficant factors for making up time are flight distance and airline carrier.
#Dataset:
#Provided by the nycflights13 package.
#This package contains information about all flights that departed from NYC (e.g. EWR, JFK and LGA) in 2013: 336,776 flights in total.
#To help understand what causes delays, it also includes a number of other useful datasets:
#weather: hourly meterological data for each airport
#planes: construction information about each plane
#airports: airport names and locations
#airlines: translation between two letter carrier codes and names
##############################################################################################################
#Set up environment for RSparkling
#Install H2O if not already installed
#The following two commands remove any previously installed H2O packages for R.
#if ("package:h2o" %in% search()) { detach("package:h2o", unload=TRUE) }
#if ("h2o" %in% rownames(installed.packages())) { remove.packages("h2o") }
# Next, we download packages that H2O depends on.
#pkgs <- c("methods","statmod","stats","graphics","RCurl","jsonlite","tools","utils")
#for (pkg in pkgs) {
# if (! (pkg %in% rownames(installed.packages()))) { install.packages(pkg) }
#}
# Now we download, install and initialize the H2O package for R.
#install.packages("h2o", type = "source", repos = "http://h2o-release.s3.amazonaws.com/h2o/rel-turing/10/R")
#Install sparklyr if not already installed
#install.packages("sparklyr")
#The latest stable version of rsparkling can be installed as follows:
#install.packages("rsparkling")
##############################################################################################################
#If Spark needs to be installed, that can be done using the following sparklyr command:
library(sparklyr)
spark_install(version = "2.0.0")
#The call to library(rsparkling) will make the H2O functions available on the R search path and will also ensure that the dependencies
#required by the Sparkling Water package are included when we connect to Spark. Also, we need to set up a valid Sparkling Water version as a global variable.
#This will be sent to sparklyr to set up a connection to Sparkling Water.
options(rsparkling.sparklingwater.version = "2.0.0")
library(rsparkling)
sc <- spark_connect(master = "local")
#Let's inspect the H2OContext for our Spark connection:
h2o_context(sc,FALSE)
#Open Spark UI
spark_web(sc)
#We can also view the H2O Flow web UI:
h2o_flow(sc,FALSE)
##############################################################################################################
#Start model building process
# Attach packages
library(dplyr)
library(ggplot2)
library(DT)
library(leaflet)
library(geosphere)
library(readr)
library(sparklyr)
library(h2o)
library(shinythemes)
library(nycflights13)
#Upload data
flights_tbl <- copy_to(sc, nycflights13::flights, "flights")
airports_tbl <- copy_to(sc, nycflights13::airports, "airports")
airlines_tbl <- copy_to(sc, nycflights13::airlines, "airlines")
#Prepare data for modelling
model_tbl <- flights_tbl %>%
filter(!is.na(arr_delay) & !is.na(dep_delay) & !is.na(distance)) %>%
filter(dep_delay > 15 & dep_delay < 240) %>%
filter(arr_delay > -60 & arr_delay < 360) %>%
left_join(airlines_tbl, by = c("carrier" = "carrier")) %>%
mutate(gain = dep_delay - arr_delay) %>%
select(origin, dest, carrier, airline = name, distance, dep_delay, arr_delay, gain)
#Data has been processed using Sparklyr. Now we go into our H2O session
#Convert munged spark data frame to an h2o frame
df_hex <- as_h2o_frame(sc,model_tbl,name="model_hex",FALSE)
#Set up some character variables as factors, which can be used in model building
df_hex$origin = as.factor(df_hex$origin)
df_hex$dest = as.factor(df_hex$dest)
df_hex$carrier = as.factor(df_hex$carrier)
df_hex$airline = as.factor(df_hex$airline)
#Take a look at dimensions, head, tail, and summary of h2o frame
dim(df_hex)
head(df_hex)
tail(df_hex)
summary(df_hex,exact_quantiles=TRUE)
#Pick a response for the supervised problem
response <- "gain"
#Use all other columns (except for response) as predictors
predictors <- setdiff(names(df_hex), c(response))
print(predictors)
#Set up train, validation, and test set
splits <- h2o.splitFrame(
data = df_hex,
ratios = c(0.7,0.2), ## only need to specify 2 fractions, the 3rd is implied
destination_frames = c("train.hex","valid.hex", "test.hex"), seed = 1234
)
train <- splits[[1]]
valid <- splits[[2]]
test <- splits[[3]]
#Run H2O's GLM
#We only provide the required parameters, everything else is default
glm <- h2o.glm(x = predictors, y = response, training_frame = train)
#Show a detailed model summary
glm
#Get the rmse on the validation set
h2o.rmse(h2o.performance(glm, newdata = valid))
#The second model is another default glm, but trained on 80% of the data (here, we combine the training and validation splits to get more training data),
#and cross-validated using 4 folds. Note that cross-validation takes longer and is not usually done for really large datasets.
#Note:h2o.rbind makes a copy here, so it's better to use splitFrame with `ratios = c(0.8)` instead
glm <- h2o.glm(x = predictors, y = response, training_frame = h2o.rbind(train, valid), nfolds = 4, seed = 0xDECAF)
#Show a detailed summary of the cross validation metrics
#This gives you an idea of the variance between the folds
glm@model$cross_validation_metrics_summary
#Get the cross-validated RMSE by scoring the combined holdout predictions.
#(Instead of taking the average of the metrics across the folds
h2o.rmse(h2o.performance(glm, xval = TRUE))
#Make prediction on test set
preds <- h2o.predict(glm, test)
preds <- h2o.cbind(test,preds)
#Convert back to Spark to utilize dplyr backend for aggregations that will later be used for visuals (Shiny app)
pred_tbl <- as_spark_dataframe(sc,preds,strict_version_check=FALSE)
#Create scored look up data for Shiny app
lookup_tbl <- pred_tbl %>%
group_by(origin, dest, carrier, airline) %>%
summarize(
flights = n(),
distance = mean(distance),
avg_dep_delay = mean(dep_delay),
avg_arr_delay = mean(arr_delay),
avg_gain = mean(gain),
pred_gain = mean(predict)
)
#Cache the look up table
sdf_register(lookup_tbl, "lookup")
tbl_cache(sc, "lookup")
#Find distinct airport codes
carrier_origin <- c("JFK", "LGA", "EWR")
carrier_dest <- c("BOS", "DCA", "DEN", "HNL", "LAX", "SEA", "SFO", "STL")
#Shiny UI
ui <- fluidPage(theme = shinytheme("yeti"),
# Set display mode to bottom
tags$script(' var setInitialCodePosition = function()
{ setCodePosition(false, false); }; '),
# Title
titlePanel("NYCFlights13: Time Gained in Flight"),
# Create sidebar
sidebarLayout(
sidebarPanel(
radioButtons("origin", "Flight origin:",
carrier_origin, selected = "JFK"),
br(),
radioButtons("dest", "Flight destination:",
carrier_dest, selected = "SFO")
),
# Show a tabset that includes a plot, model, and table view
mainPanel(
tabsetPanel(type = "tabs",
tabPanel("Plot", plotOutput("plot")),
tabPanel("Variable Importance", plotOutput("plotvarimp")),
tabPanel("Data", DT::dataTableOutput("datatable")),
tabPanel("Map", leafletOutput("map"))
)
)
)
)
#Shiny server function
server <- function(input, output) {
#Identify origin lat and log
origin <- reactive({
req(input$origin)
filter(nycflights13::airports, faa == input$origin)
})
#Identify destination lat and log
dest <- reactive({
req(input$dest)
filter(nycflights13::airports, faa == input$dest)
})
#Create plot data
plot_data <- reactive({
req(input$origin, input$dest)
lookup_tbl %>%
filter(origin==input$origin & dest==input$dest) %>%
ungroup() %>%
select(airline, flights, distance, avg_gain, pred_gain) %>%
collect
})
#Plot observed versus predicted time gain for carriers and route
output$plot <- renderPlot({
ggplot(plot_data(), aes(factor(airline), pred_gain)) +
geom_bar(stat = "identity", fill = '#2780E3') +
geom_point(aes(factor(airline), avg_gain)) +
coord_flip() +
labs(x = "", y = "Time gained in flight (minutes)") +
labs(title = "Observed gain (point) vs Predicted gain (bar)")
})
#Output the route map
output$map <- renderLeaflet({
gcIntermediate(
select(origin(), lon, lat),
select(dest(), lon, lat),
n=100, addStartEnd=TRUE, sp=TRUE
) %>%
leaflet() %>%
addProviderTiles("CartoDB.Positron") %>%
addPolylines()
})
#Print table of observed and predicted gains by airline
output$datatable <- DT::renderDataTable(
datatable(plot_data()) %>%
formatRound(c("flights", "distance"), 0) %>%
formatRound(c("avg_gain", "pred_gain"), 1)
)
output$plotvarimp <- renderPlot({
#Plot top 20 variable importances
h2o.varimp_plot(glm,20)
})
}
#Run Shiny Application
shinyApp(ui = ui, server = server)
#Disconnect from Spark and end session
#h2o.shutdown(prompt=FALSE)
#spark_disconnect(sc)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.